Skip to content

Commit

Permalink
Modify MIGRATE to support COPY, REPLACE and AUTH options
Browse files Browse the repository at this point in the history
  • Loading branch information
drmontgomery authored and etaty committed Oct 16, 2019
1 parent fc3db1e commit 5e0483d
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 13 deletions.
33 changes: 24 additions & 9 deletions src/main/scala/redis/api/Keys.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,32 @@ case class Keys(pattern: String) extends RedisCommandMultiBulk[Seq[String]] {
def decodeReply(mb: MultiBulk) = MultiBulkConverter.toSeqString(mb)
}

case class Migrate[K](host: String, port: Int, key: K, destinationDB: Int, timeout: FiniteDuration)(implicit redisKey: ByteStringSerializer[K])
case class Migrate[K](host: String, port: Int, keys: Seq[K], destinationDB: Int, timeout: FiniteDuration, copy: Boolean = false, replace: Boolean = false, password: Option[String])(implicit redisKey: ByteStringSerializer[K])
extends RedisCommandStatusBoolean {
val isMasterOnly = true
val encodedRequest: ByteString =
encode("MIGRATE",
Seq(ByteString(host),
ByteString(port.toString),
redisKey.serialize(key),
ByteString(destinationDB.toString),
ByteString(timeout.toMillis.toString)
))
val encodedRequest: ByteString = {
val builder = Seq.newBuilder[ByteString]

builder += ByteString(host)
builder += ByteString(port.toString)
builder += ByteString("")
builder += ByteString(destinationDB.toString)
builder += ByteString(timeout.toMillis.toString)

if (copy)
builder += ByteString("COPY")
if (replace)
builder += ByteString("REPLACE")
if (password.isDefined) {
builder += ByteString("AUTH")
builder += ByteString(password.get)
}

builder += ByteString("KEYS")
builder ++= keys.map(redisKey.serialize)

RedisProtocolRequest.multiBulk("MIGRATE", builder.result())
}
}

case class Move[K](key: K, db: Int)(implicit redisKey: ByteStringSerializer[K]) extends SimpleClusterKey[K] with RedisCommandIntegerBoolean {
Expand Down
8 changes: 6 additions & 2 deletions src/main/scala/redis/commands/Keys.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ trait Keys extends Request {
def keys(pattern: String): Future[Seq[String]] =
send(Keys(pattern))

def migrate(host: String, port: Int, key: String, destinationDB: Int, timeout: FiniteDuration): Future[Boolean] = {
send(Migrate(host, port, key, destinationDB, timeout))
def migrate(host: String, port: Int, key: String, destinationDB: Int, timeout: FiniteDuration, copy: Boolean = false, replace: Boolean = false, password: Option[String] = None): Future[Boolean] = {
send(Migrate(host, port, Seq(key), destinationDB, timeout, copy, replace, password))
}

def migrateMany(host: String, port: Int, keys: Seq[String], destinationDB: Int, timeout: FiniteDuration, copy: Boolean = false, replace: Boolean = false, password: Option[String] = None): Future[Boolean] = {
send(Migrate(host, port, keys, destinationDB, timeout, copy, replace, password))
}

def move(key: String, db: Int): Future[Boolean] =
Expand Down
67 changes: 65 additions & 2 deletions src/test/scala/redis/commands/KeysSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,73 @@ class KeysSpec extends RedisStandaloneServer {
val r = for {
_ <- redis.set(key, "value")
m <- redis.migrate("localhost", port, key, 0, 10 seconds)
get <- redisMigrate.get(key)
getSource <- redis.get(key)
getTarget <- redisMigrate.get(key)
} yield {
m must beTrue
getSource mustEqual None
getTarget mustEqual Some(ByteString("value"))
}
Await.result(r, timeOut)
})
}

"MIGRATE COPY" in {
import scala.concurrent.duration._

withRedisServer(port => {
val redisMigrate = RedisClient("localhost", port)
val key = "migrateCopyKey-" + System.currentTimeMillis()
val r = for {
_ <- redis.set(key, "value")
m <- redis.migrate("localhost", port, key, 0, 10 seconds, copy = true)
getSource <- redis.get(key)
getTarget <- redisMigrate.get(key)
} yield {
m must beTrue
getSource mustEqual Some(ByteString("value"))
getTarget mustEqual Some(ByteString("value"))
}
Await.result(r, timeOut)
})
}

"MIGRATE KEYS" in {
import scala.concurrent.duration._

withRedisServer(port => {
val redisMigrate = RedisClient("localhost", port)
val key1 = "migrateKey1-" + System.currentTimeMillis()
val key2 = "migrateKey2" + System.currentTimeMillis()
val r = for {
_ <- redis.set(key1, "value")
_ <- redis.set(key2, "value")
m <- redis.migrateMany("localhost", port, Seq(key1, key2), 0, 10 seconds)
get1 <- redisMigrate.get(key1)
get2 <- redisMigrate.get(key2)
} yield {
m must beTrue
get mustEqual Some(ByteString("value"))
get1 mustEqual Some(ByteString("value"))
get2 mustEqual Some(ByteString("value"))
}
Await.result(r, timeOut)
})
}

"MIGRATE REPLACE" in {
import scala.concurrent.duration._

withRedisServer(port => {
val redisMigrate = RedisClient("localhost", port)
val key = "migrateReplaceKey-" + System.currentTimeMillis()
val r = for {
_ <- redis.set(key, "value1")
_ <- redisMigrate.set(key, "value2")
m2 <- redis.migrate("localhost", port, key, 0, 10 seconds, replace = true)
get <- redisMigrate.get(key)
} yield {
m2 must beTrue
get mustEqual Some(ByteString("value1"))
}
Await.result(r, timeOut)
})
Expand Down

0 comments on commit 5e0483d

Please sign in to comment.