Skip to content

Commit

Permalink
Add Functionality for Older Redis
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherDavenport committed Oct 17, 2021
1 parent f43d9ee commit 98e8e15
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 6 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ lazy val rediculous = crossProject(JVMPlatform, JSPlatform)

lazy val examples = project.in(file("examples"))
.disablePlugins(MimaPlugin)
.dependsOn(core.jvm)
.dependsOn(core.jvm, rediculous.jvm)


lazy val site = project.in(file("site"))
Expand Down
29 changes: 28 additions & 1 deletion examples/src/main/scala/Main.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,38 @@
package io.chrisdavenport.ratelimit

import cats.effect._
import io.chrisdavenport.rediculous._
import io.chrisdavenport.ratelimit.rediculous.RedisRateLimiter
import com.comcast.ip4s._
import fs2.io.net._

object Main extends IOApp {

implicit class LogOps[A](a: IO[A]){
def flatPrint: IO[A] = a.flatTap(a => IO(println(a)))
}


def run(args: List[String]): IO[ExitCode] = {
IO(println("I am a new project!")).as(ExitCode.Success)
val r = for {
// maxQueued: How many elements before new submissions semantically block. Tradeoff of memory to queue jobs.
// Default 1000 is good for small servers. But can easily take 100,000.
// workers: How many threads will process pipelined messages.
connection <- RedisConnection.queued[IO](Network[IO], host"localhost", port"6379", maxQueued = 10000, workers = 2)
} yield connection
r.use{ connection =>

val rl = RedisRateLimiter.fixedWindow(connection, Function.const(5), 60)

rl.get("foo").flatPrint >>
rl.getAndDecrement("foo").flatPrint >>
rl.getAndDecrement("foo").flatPrint >>
rl.getAndDecrement("foo").flatPrint >>
rl.getAndDecrement("foo").flatPrint >>
rl.getAndDecrement("foo").flatPrint >>
rl.getAndDecrement("foo").flatPrint

}.as(ExitCode.Success)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ object RedisRateLimiter {
periodSeconds: Long,
namespace: String = "rediculous-rate-limiter",
useRedisTime: Boolean = false,
redisSupportsExpireNX: Boolean = false ,
): RateLimiter[F, String] =
new SlidingWindow[F](connection, maxRate, periodSeconds, namespace).mapK{
new SlidingWindow[F](connection, maxRate, periodSeconds, namespace, redisSupportsExpireNX).mapK{
if (useRedisTime) redisTime(connection) else temporalTime
}

Expand All @@ -30,6 +31,8 @@ object RedisRateLimiter {
maxRate: String => Long,
periodSeconds: Long,
namespace : String,
// As of Redis 7.0 the new expire supports nx which is much more space efficient
redisSupportsExpireNX: Boolean = false ,
) extends RateLimiter[Kleisli[F, FiniteDuration, *], String]{

val comment = RateLimiter.QuotaComment("comment", Either.right("sliding window"))
Expand Down Expand Up @@ -87,10 +90,13 @@ object RedisRateLimiter {
val periodEndSeconds = (periodNumber + 1) * periodSeconds
val secondsLeftInPeriod = periodEndSeconds - secondsSinceEpoch
val key = s"$namespace$id$periodNumber"
val expireCommand = if (redisSupportsExpireNX) {
io.chrisdavenport.rediculous.RedisCtx[RedisPipeline].keyed[Int](key, NonEmptyList.of("EXPIRE", key, (secondsLeftInPeriod + 1).toString(), "NX"))
} else RedisCommands.expire[RedisPipeline](key, secondsLeftInPeriod + 1)
(
RedisCommands.get[RedisPipeline](s"$namespace$id${periodNumber - 1}").map(_.map(_.toLong).getOrElse(0L)),
RedisCommands.incr[RedisPipeline](key),
io.chrisdavenport.rediculous.RedisCtx[RedisPipeline].keyed[Int](key, NonEmptyList.of("EXPIRE", key, (secondsLeftInPeriod + 1).toString(), "NX"))
expireCommand
).mapN{ case(last, current, _) =>
createRateLimit(secondsLeftInPeriod, id, last, current)
}.pipeline.run(connection)
Expand All @@ -110,8 +116,9 @@ object RedisRateLimiter {
periodSeconds: Long,
namespace: String = "rediculous-rate-limiter",
useRedisTime: Boolean = false,
redisSupportsExpireNX: Boolean = false,
): RateLimiter[F, String] =
new FixedWindow[F](connection, maxRate, periodSeconds, namespace).mapK{
new FixedWindow[F](connection, maxRate, periodSeconds, namespace, redisSupportsExpireNX).mapK{
if (useRedisTime) redisTime(connection) else temporalTime
}

Expand All @@ -120,6 +127,7 @@ object RedisRateLimiter {
maxRate: String => Long,
periodSeconds: Long,
namespace : String,
redisSupportsExpireNX: Boolean,
) extends RateLimiter[Kleisli[F, FiniteDuration, *], String]{

val comment = RateLimiter.QuotaComment("comment", Either.right("fixed window"))
Expand Down Expand Up @@ -157,9 +165,12 @@ object RedisRateLimiter {
val periodEndSeconds = (periodNumber + 1) * periodSeconds
val secondsLeftInPeriod = periodEndSeconds - secondsSinceEpoch
val key = s"$namespace$id$periodNumber"
val expireCommand = if (redisSupportsExpireNX) {
io.chrisdavenport.rediculous.RedisCtx[RedisPipeline].keyed[Int](key, NonEmptyList.of("EXPIRE", key, (secondsLeftInPeriod + 1).toString(), "NX"))
} else RedisCommands.expire[RedisPipeline](key, secondsLeftInPeriod + 1)
val pipeline = (
RedisCommands.incr[RedisPipeline](key),
io.chrisdavenport.rediculous.RedisCtx[RedisPipeline].keyed[Int](key, NonEmptyList.of("EXPIRE", key, (secondsLeftInPeriod + 1).toString(), "NX"))
expireCommand
).mapN{ case(count, _) => count}
pipeline.pipeline[F].run(connection).map(
createRateLimit(secondsLeftInPeriod, id, _)
Expand Down

0 comments on commit 98e8e15

Please sign in to comment.