diff --git a/src/main/scala/redis/RedisCommand.scala b/src/main/scala/redis/RedisCommand.scala index 4fc3da97..37d771fd 100644 --- a/src/main/scala/redis/RedisCommand.scala +++ b/src/main/scala/redis/RedisCommand.scala @@ -79,6 +79,30 @@ trait RedisCommandMultiBulkSeqByteStringDouble[R] extends RedisCommandMultiBulk[ def decodeReply(mb: MultiBulk) = MultiBulkConverter.toSeqTuple2ByteStringDouble(mb)(deserializer) } +case class Cursor[T](index: Int, data: T) + +trait RedisCommandMultiBulkCursor[R] extends RedisCommandMultiBulk[Cursor[R]] { + def decodeReply(mb: MultiBulk) = { + mb.responses.map { responses => + val cursor = ParseNumber.parseInt(responses.head.toByteString) + val remainder = responses(1).asInstanceOf[MultiBulk] + + Cursor(cursor, remainder.responses.map(decodeResponses).getOrElse(empty)) + }.getOrElse(Cursor(0, empty)) + } + + def decodeResponses(responses: Seq[RedisReply]): R + + val empty: R + val count: Option[Int] + val matchGlob: Option[String] + + def withOptionalParams(params: Seq[ByteString]): Seq[ByteString] = { + val withCount = count.fold(params)(c => params ++ Seq(ByteString("COUNT"), ByteString(c.toString))) + matchGlob.fold(withCount)(m => withCount ++ Seq(ByteString("MATCH"), ByteString(m))) + } +} + trait RedisCommandRedisReplyOptionLong extends RedisCommandRedisReply[Option[Long]] { def decodeReply(redisReply: RedisReply): Option[Long] = redisReply match { case i: Integer => Some(i.toLong) diff --git a/src/main/scala/redis/api/Hashes.scala b/src/main/scala/redis/api/Hashes.scala index 4a05750a..1b346ddb 100644 --- a/src/main/scala/redis/api/Hashes.scala +++ b/src/main/scala/redis/api/Hashes.scala @@ -4,7 +4,7 @@ import redis._ import akka.util.ByteString import scala.collection.mutable import scala.annotation.tailrec -import redis.protocol.MultiBulk +import redis.protocol.{RedisReply, MultiBulk} case class Hdel[K, KK](key: K, fields: Seq[KK])(implicit redisKey: ByteStringSerializer[K], redisFields: ByteStringSerializer[KK]) extends RedisCommandIntegerLong { val isMasterOnly = true @@ -99,4 +99,20 @@ case class Hvals[K, R](key: K)(implicit redisKey: ByteStringSerializer[K], deser val isMasterOnly = false val encodedRequest: ByteString = encode("HVALS", Seq(redisKey.serialize(key))) val deserializer: ByteStringDeserializer[R] = deserializerR +} + +case class HScan[K, C, R](key: K, cursor: C, count: Option[Int], matchGlob: Option[String])(implicit redisKey: ByteStringSerializer[K], deserializer: ByteStringDeserializer[R], cursorConverter: ByteStringSerializer[C]) extends RedisCommandMultiBulkCursor[Map[String, R]] { + val isMasterOnly: Boolean = false + + val encodedRequest: ByteString = encode("HSCAN", withOptionalParams(Seq(redisKey.serialize(key), cursorConverter.serialize(cursor)))) + + def decodeResponses(responses: Seq[RedisReply]) = + responses.grouped(2).map { xs => + val k = xs.head + val v = xs(1) + + k.toByteString.utf8String -> deserializer.deserialize(v.toByteString) + }.toMap + + val empty: Map[String, R] = Map.empty } \ No newline at end of file diff --git a/src/main/scala/redis/api/Keys.scala b/src/main/scala/redis/api/Keys.scala index 2c8f19bd..a92b82a7 100644 --- a/src/main/scala/redis/api/Keys.scala +++ b/src/main/scala/redis/api/Keys.scala @@ -2,11 +2,10 @@ package redis.api.keys import redis._ import akka.util.ByteString -import redis.protocol.{Bulk, Status, MultiBulk} +import redis.protocol._ import scala.concurrent.duration.FiniteDuration import redis.api.Order import redis.api.LimitOffsetCount -import scala.Some case class Del[K](keys: Seq[K])(implicit redisKey: ByteStringSerializer[K]) extends RedisCommandIntegerLong { @@ -173,4 +172,15 @@ case class Ttl[K](key: K)(implicit redisKey: ByteStringSerializer[K]) extends Re case class Type[K](key: K)(implicit redisKey: ByteStringSerializer[K]) extends RedisCommandStatusString { val isMasterOnly = false val encodedRequest: ByteString = encode("TYPE", Seq(redisKey.serialize(key))) -} \ No newline at end of file +} + +case class Scan[C](cursor: C, count: Option[Int], matchGlob: Option[String])(implicit redisCursor: ByteStringSerializer[C], deserializer: ByteStringDeserializer[String]) extends RedisCommandMultiBulkCursor[Seq[String]] { + val encodedRequest: ByteString = encode("SCAN", withOptionalParams(Seq(redisCursor.serialize(cursor)))) + + val isMasterOnly = false + + def decodeResponses(responses: Seq[RedisReply]) = + responses.map(response => deserializer.deserialize(response.toByteString)) + + val empty: Seq[String] = Seq.empty +} diff --git a/src/main/scala/redis/api/Sets.scala b/src/main/scala/redis/api/Sets.scala index d7124070..2dfe7e68 100644 --- a/src/main/scala/redis/api/Sets.scala +++ b/src/main/scala/redis/api/Sets.scala @@ -2,6 +2,7 @@ package redis.api.sets import redis._ import akka.util.ByteString +import redis.protocol.RedisReply case class Sadd[K, V](key: K, members: Seq[V])(implicit redisKey: ByteStringSerializer[K], convert: ByteStringSerializer[V]) extends RedisCommandIntegerLong { val isMasterOnly = true @@ -94,4 +95,14 @@ case class Sunionstore[KD, K, KK](destination: KD, key: K, keys: Seq[KK]) extends RedisCommandIntegerLong { val isMasterOnly = true val encodedRequest: ByteString = encode("SUNIONSTORE", redisDest.serialize(destination) +: redisKey.serialize(key) +: keys.map(redisKeys.serialize)) +} + +case class Sscan[K, C, R](key: K, cursor: C, count: Option[Int], matchGlob: Option[String])(implicit redisKey: ByteStringSerializer[K], redisCursor: ByteStringSerializer[C], deserializerR: ByteStringDeserializer[R]) extends RedisCommandMultiBulkCursor[Seq[R]] { + val isMasterOnly = false + val encodedRequest = encode("SSCAN", withOptionalParams(Seq(redisKey.serialize(key), redisCursor.serialize(cursor)))) + + val empty = Seq.empty + + def decodeResponses(responses: Seq[RedisReply]) = + responses.map(response => deserializerR.deserialize(response.toByteString)) } \ No newline at end of file diff --git a/src/main/scala/redis/api/SortedSets.scala b/src/main/scala/redis/api/SortedSets.scala index 4fe4d87d..f2e21310 100644 --- a/src/main/scala/redis/api/SortedSets.scala +++ b/src/main/scala/redis/api/SortedSets.scala @@ -3,6 +3,7 @@ package redis.api.sortedsets import redis._ import akka.util.ByteString import redis.api.{SUM, Aggregate, Limit} +import redis.protocol.RedisReply case class Zadd[K, V](key: K, scoreMembers: Seq[(Double, V)])(implicit keySeria: ByteStringSerializer[K], convert: ByteStringSerializer[V]) extends RedisCommandIntegerLong { @@ -205,3 +206,16 @@ case class Zrevrangebylex[K, R](key: K, max: String, min: String, limit: Option[ val deserializer: ByteStringDeserializer[R] = deserializerR } +case class Zscan[K, C, R](key: K, cursor: C, count: Option[Int], matchGlob: Option[String])(implicit redisKey: ByteStringSerializer[K], redisCursor: ByteStringSerializer[C], deserializerR: ByteStringDeserializer[R], scoreDeserializer: ByteStringDeserializer[Double]) extends RedisCommandMultiBulkCursor[Seq[(Double, R)]] with ByteStringDeserializerDefault { + val isMasterOnly: Boolean = false + val encodedRequest: ByteString = encode("ZSCAN", withOptionalParams(Seq(redisKey.serialize(key), redisCursor.serialize(cursor)))) + + val empty: Seq[(Double, R)] = Seq.empty + + def decodeResponses(responses: Seq[RedisReply]) = + responses.grouped(2).map { xs => + val data = xs.head + val score = scoreDeserializer.deserialize(xs(1).toByteString) + score -> deserializerR.deserialize(data.toByteString) + }.toSeq +} diff --git a/src/main/scala/redis/commands/Hashes.scala b/src/main/scala/redis/commands/Hashes.scala index 1f5ce436..88d4b5ab 100644 --- a/src/main/scala/redis/commands/Hashes.scala +++ b/src/main/scala/redis/commands/Hashes.scala @@ -1,6 +1,7 @@ package redis.commands -import redis.{ByteStringDeserializer, ByteStringSerializer, Request} +import redis.protocol.MultiBulk +import redis.{Cursor, ByteStringDeserializer, ByteStringSerializer, Request} import scala.concurrent.Future import redis.api.hashes._ @@ -45,4 +46,7 @@ trait Hashes extends Request { def hvals[R: ByteStringDeserializer](key: String): Future[Seq[R]] = send(Hvals(key)) + def hscan[R: ByteStringDeserializer](key: String, cursor: Int = 0, count: Option[Int] = None, matchGlob: Option[String] = None): Future[Cursor[Map[String, R]]] = + send(HScan(key, cursor, count, matchGlob)) + } diff --git a/src/main/scala/redis/commands/Keys.scala b/src/main/scala/redis/commands/Keys.scala index 11ac0ca0..ffed798a 100644 --- a/src/main/scala/redis/commands/Keys.scala +++ b/src/main/scala/redis/commands/Keys.scala @@ -91,4 +91,7 @@ trait Keys extends Request { def `type`(key: String): Future[String] = send(Type(key)) + def scan(cursor: Int = 0, count: Option[Int] = None, matchGlob: Option[String] = None): Future[Cursor[Seq[String]]] = + send(Scan(cursor, count, matchGlob)) + } diff --git a/src/main/scala/redis/commands/Sets.scala b/src/main/scala/redis/commands/Sets.scala index e330862a..4b06c854 100644 --- a/src/main/scala/redis/commands/Sets.scala +++ b/src/main/scala/redis/commands/Sets.scala @@ -1,6 +1,6 @@ package redis.commands -import redis.{ByteStringDeserializer, ByteStringSerializer, Request} +import redis.{Cursor, ByteStringDeserializer, ByteStringSerializer, Request} import scala.concurrent.Future import redis.api.sets._ @@ -51,4 +51,6 @@ trait Sets extends Request { def sunionstore(destination: String, key: String, keys: String*): Future[Long] = send(Sunionstore(destination, key, keys)) + def sscan[R: ByteStringDeserializer](key: String, cursor: Int = 0, count: Option[Int] = None, matchGlob: Option[String] = None): Future[Cursor[Seq[R]]] = + send(Sscan(key, cursor, count, matchGlob)) } diff --git a/src/main/scala/redis/commands/SortedSets.scala b/src/main/scala/redis/commands/SortedSets.scala index 27b522ea..59243cfc 100644 --- a/src/main/scala/redis/commands/SortedSets.scala +++ b/src/main/scala/redis/commands/SortedSets.scala @@ -1,6 +1,6 @@ package redis.commands -import redis.{ByteStringDeserializer, ByteStringSerializer, Request} +import redis.{Cursor, ByteStringDeserializer, ByteStringSerializer, Request} import scala.concurrent.Future import redis.api._ import redis.api.sortedsets._ @@ -84,5 +84,9 @@ trait SortedSets extends Request { def zrevrangebylex[R: ByteStringDeserializer](key: String, max: Option[String], min: Option[String], limit: Option[(Long, Long)] = None): Future[Seq[R]] = send(Zrevrangebylex(key, max.getOrElse("+"), max.getOrElse("-"), limit)) + + def zscan[R: ByteStringDeserializer](key: String, cursor: Int = 0, count: Option[Int] = None, matchGlob: Option[String] = None): Future[Cursor[Seq[(Double, R)]]] = + send(Zscan(key, cursor, count, matchGlob)) + } diff --git a/src/test/scala/redis/commands/HashesSpec.scala b/src/test/scala/redis/commands/HashesSpec.scala index 1179addb..d01df7bd 100644 --- a/src/test/scala/redis/commands/HashesSpec.scala +++ b/src/test/scala/redis/commands/HashesSpec.scala @@ -147,6 +147,19 @@ class HashesSpec extends RedisSpec { Await.result(r, timeOut) } + "HSCAN" in { + val initialData = (1 to 20).grouped(2).map(x => x.head.toString -> x.tail.head.toString).toMap + val r = for { + _ <- redis.del("hscan") + _ <- redis.hmset("hscan", initialData) + scanResult <- redis.hscan[String]("hscan", count = Some(300)) + } yield { + scanResult.data.values.toList.map(_.toInt).sorted mustEqual (2 to 20 by 2) + scanResult.index mustEqual 0 + } + Await.result(r, timeOut) + } + "HVALS" in { val r = for { _ <- redis.hdel("hvalsKey", "field") diff --git a/src/test/scala/redis/commands/KeysSpec.scala b/src/test/scala/redis/commands/KeysSpec.scala index 03ed4c36..c9d77c23 100644 --- a/src/test/scala/redis/commands/KeysSpec.scala +++ b/src/test/scala/redis/commands/KeysSpec.scala @@ -281,6 +281,24 @@ class KeysSpec extends RedisSpec { Await.result(r, timeOut) } + "SCAN" in { + + withRedisServer(port => { + val scanRedis = RedisClient("localhost", port) + + val r = for { + _ <- scanRedis.set("scanKey1", "value1") + _ <- scanRedis.set("scanKey2", "value2") + _ <- scanRedis.set("scanKey3", "value3") + result <- scanRedis.scan(count = Some(1000)) + } yield { + result.index mustEqual 0 + result.data.sorted mustEqual Seq("scanKey1", "scanKey2", "scanKey3") + } + Await.result(r, timeOut) + }) + } + // @see https://gist.github.com/jacqui/983051 "SORT" in { val init = Future.sequence(Seq( diff --git a/src/test/scala/redis/commands/SetsSpec.scala b/src/test/scala/redis/commands/SetsSpec.scala index 93bb69ba..b6fdf031 100644 --- a/src/test/scala/redis/commands/SetsSpec.scala +++ b/src/test/scala/redis/commands/SetsSpec.scala @@ -179,6 +179,18 @@ class SetsSpec extends RedisSpec { Await.result(r, timeOut) } + "SSCAN" in { + val r = for { + _ <- redis.sadd("sscan", (1 to 20).map(_.toString):_*) + scanResult <- redis.sscan[String]("sscan", count = Some(100)) + } yield { + scanResult.index mustEqual 0 + scanResult.data.map(_.toInt).sorted mustEqual (1 to 20) + } + + Await.result(r, timeOut) + } + "SUNION" in { val r = for { _ <- redis.del("sunionKey1") diff --git a/src/test/scala/redis/commands/SortedSetsSpec.scala b/src/test/scala/redis/commands/SortedSetsSpec.scala index 413beea9..d73ee240 100644 --- a/src/test/scala/redis/commands/SortedSetsSpec.scala +++ b/src/test/scala/redis/commands/SortedSetsSpec.scala @@ -248,6 +248,19 @@ class SortedSetsSpec extends RedisSpec { Await.result(r, timeOut) } + "ZSCAN" in { + val r = for { + _ <- redis.del("zscan") + _ <- redis.zadd("zscan", (1 to 20).map(x => x.toDouble -> x.toString):_*) + scanResult <- redis.zscan[String]("zscan", count = Some(100)) + } yield { + scanResult.index mustEqual 0 + scanResult.data mustEqual (1 to 20).map(x => x.toDouble -> x.toString) + } + + Await.result(r, timeOut) + } + "ZSCORE" in { val r = for { _ <- redis.del("zscoreKey")