Skip to content

Commit

Permalink
small cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kardapoltsev committed Feb 24, 2019
1 parent 1e4572a commit 209cc63
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 91 deletions.
106 changes: 56 additions & 50 deletions src/main/scala/redis/RedisPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ case class RedisServer(host: String = "localhost",
password: Option[String] = None,
db: Option[Int] = None)


case class RedisConnection(actor: ActorRef, active: Ref[Boolean] = Ref(false))

abstract class RedisClientPoolLike(system: ActorSystem, redisDispatcher: RedisDispatcher) {
abstract class RedisClientPoolLike(system: ActorSystem, redisDispatcher: RedisDispatcher) {

def redisServerConnections: scala.collection.Map[RedisServer, RedisConnection]

val name: String
implicit val executionContext = system.dispatchers.lookup(redisDispatcher.name)

private val redisConnectionRef: Ref[Seq[ActorRef]] = Ref(Seq.empty)

/**
*
* @param redisCommand
Expand Down Expand Up @@ -55,12 +55,12 @@ abstract class RedisClientPoolLike(system: ActorSystem, redisDispatcher: RedisDi
server.db.foreach(redis.select)
}

def onConnectStatus(server: RedisServer, active: Ref[Boolean]): (Boolean) => Unit = {
(status: Boolean) => {
if (active.single.compareAndSet(!status, status)) {
refreshConnections()
}
def onConnectStatus(server: RedisServer, active: Ref[Boolean]): (Boolean) => Unit = { (status: Boolean) =>
{
if (active.single.compareAndSet(!status, status)) {
refreshConnections()
}
}
}

def refreshConnections() = {
Expand All @@ -78,8 +78,8 @@ abstract class RedisClientPoolLike(system: ActorSystem, redisDispatcher: RedisDi
}

/**
* Disconnect from the server (stop the actor)
*/
* Disconnect from the server (stop the actor)
*/
def stop() {
redisConnectionPool.foreach { redisConnection =>
system stop redisConnection
Expand All @@ -92,23 +92,31 @@ abstract class RedisClientPoolLike(system: ActorSystem, redisDispatcher: RedisDi
}

def makeRedisClientActor(server: RedisServer, active: Ref[Boolean]): ActorRef = {
system.actorOf(RedisClientActor.props(new InetSocketAddress(server.host, server.port),
getConnectOperations(server), onConnectStatus(server, active), redisDispatcher.name)
.withDispatcher(redisDispatcher.name),
system.actorOf(
RedisClientActor
.props(
new InetSocketAddress(server.host, server.port),
getConnectOperations(server),
onConnectStatus(server, active),
redisDispatcher.name)
.withDispatcher(redisDispatcher.name),
name + '-' + Redis.tempName()
)
}

}

case class RedisClientMutablePool(redisServers: Seq[RedisServer],
name: String = "RedisClientPool")
(implicit system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher
) extends RedisClientPoolLike (system, redisDispatcher) with RoundRobinPoolRequest with RedisCommands {
case class RedisClientMutablePool(redisServers: Seq[RedisServer], name: String = "RedisClientPool")(
implicit system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher)
extends RedisClientPoolLike(system, redisDispatcher)
with RoundRobinPoolRequest
with RedisCommands {

override val redisServerConnections = {
val m = redisServers map { server => makeRedisConnection(server) }
val m = redisServers map { server =>
makeRedisConnection(server)
}
collection.mutable.Map(m: _*)
}

Expand All @@ -134,14 +142,14 @@ case class RedisClientMutablePool(redisServers: Seq[RedisServer],
}
}


}

case class RedisClientPool(redisServers: Seq[RedisServer],
name: String = "RedisClientPool")
(implicit _system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher
) extends RedisClientPoolLike(_system, redisDispatcher) with RoundRobinPoolRequest with RedisCommands {
case class RedisClientPool(redisServers: Seq[RedisServer], name: String = "RedisClientPool")(
implicit _system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher)
extends RedisClientPoolLike(_system, redisDispatcher)
with RoundRobinPoolRequest
with RedisCommands {

override val redisServerConnections = {
redisServers.map { server =>
Expand All @@ -153,11 +161,11 @@ case class RedisClientPool(redisServers: Seq[RedisServer],

}

case class RedisClientMasterSlaves(master: RedisServer,
slaves: Seq[RedisServer])
(implicit _system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher)
extends RedisCommands with Transactions {
case class RedisClientMasterSlaves(master: RedisServer, slaves: Seq[RedisServer])(implicit _system: ActorSystem,
redisDispatcher: RedisDispatcher =
Redis.dispatcher)
extends RedisCommands
with Transactions {
implicit val executionContext = _system.dispatchers.lookup(redisDispatcher.name)

val masterClient = RedisClient(master.host, master.port, master.password, master.db)
Expand All @@ -175,26 +183,24 @@ case class RedisClientMasterSlaves(master: RedisServer,
def redisConnection: ActorRef = masterClient.redisConnection
}


case class SentinelMonitoredRedisClientMasterSlaves(
sentinels: Seq[(String, Int)] = Seq(("localhost", 26379)), master: String)
(implicit _system: ActorSystem, redisDispatcher: RedisDispatcher = Redis.dispatcher)
extends SentinelMonitored(_system, redisDispatcher) with ActorRequest with RedisCommands with Transactions {

val masterClient: RedisClient = withMasterAddr(
(ip, port) => {
new RedisClient(ip, port, name = "SMRedisClient")
})

val slavesClients: RedisClientMutablePool = withSlavesAddr(
slavesHostPort => {
val slaves = slavesHostPort.map {
case (ip, port) =>
new RedisServer(ip, port)
}
new RedisClientMutablePool(slaves, name = "SMRedisClient")
})

sentinels: Seq[(String, Int)] = Seq(("localhost", 26379)),
master: String)(implicit _system: ActorSystem, redisDispatcher: RedisDispatcher = Redis.dispatcher)
extends SentinelMonitored(_system, redisDispatcher)
with ActorRequest
with RedisCommands
with Transactions {

val masterClient: RedisClient = withMasterAddr((ip, port) => {
new RedisClient(ip, port, name = "SMRedisClient")
})

val slavesClients: RedisClientMutablePool = withSlavesAddr(slavesHostPort => {
val slaves = slavesHostPort.map {
case (ip, port) => RedisServer(ip, port)
}
RedisClientMutablePool(slaves, name = "SMRedisClient")
})

val onNewSlave = (ip: String, port: Int) => {
log.info(s"onNewSlave $ip:$port")
Expand All @@ -212,8 +218,8 @@ case class SentinelMonitoredRedisClientMasterSlaves(
}

/**
* Disconnect from the server (stop the actors)
*/
* Disconnect from the server (stop the actors)
*/
def stop() = {
masterClient.stop()
slavesClients.stop()
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/redis/RedisClusterTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import redis.protocol._
class RedisClusterTest extends RedisClusterClients {

var redisCluster: RedisCluster = null
override def setup(): Unit = {
super.setup()
override def beforeAll(): Unit = {
super.beforeAll()
redisCluster = RedisCluster(nodePorts.map(p => RedisServer("127.0.0.1", p)))
}

Expand Down
54 changes: 18 additions & 36 deletions src/test/scala/redis/RedisSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@ import scala.util.control.NonFatal

object RedisServerHelper {
val redisHost = "127.0.0.1"

// remove stacktrace when we stop the process
val redisServerCmd = "redis-server"
val redisCliCmd = "redis-cli"
val redisServerLogLevel = ""

val portNumber = new AtomicInteger(10500)
}

Expand All @@ -38,22 +35,10 @@ abstract class RedisHelper extends TestKit(ActorSystem()) with TestBase with Bef
testKitSettings.TestTimeFactor
}

// import scala.concurrent.duration._
// val timeOut = 10.seconds

override protected def beforeAll(): Unit = {
setup()
}

override protected def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
cleanup()
}

def setup()

def cleanup()

class RedisManager {

import RedisServerHelper._
Expand Down Expand Up @@ -138,9 +123,8 @@ abstract class RedisStandaloneServer extends RedisHelper {
result.get
}

override def setup() = {}

override def cleanup() = {
override def afterAll() = {
super.afterAll()
redisManager.stopAll()
}
}
Expand Down Expand Up @@ -182,9 +166,8 @@ abstract class RedisSentinelClients(val masterName: String = "mymaster") extends
redisManager.newSentinelProcess(masterName, masterPort)
}

override def setup() = {}

override def cleanup() = {
override def afterAll() = {
super.afterAll()
redisManager.stopAll()
}

Expand All @@ -202,33 +185,32 @@ abstract class RedisClusterClients() extends RedisHelper {

val nodePorts = (0 to 5).map(_ => portNumber.incrementAndGet())

override def setup() = {
println("Setup")
override def beforeAll() = {
log.debug(s"Starting Redis cluster with $nodePorts")
fileDir.mkdirs()

processes = nodePorts.map(s => Process(newNode(s), fileDir).run(processLogger))
val nodes = nodePorts.map(s => redisHost + ":" + s).mkString(" ")

val createClusterCmd =
s"$redisCliCmd --cluster create --cluster-replicas 1 ${nodes}"
println(createClusterCmd)
val createClusterCmd = s"$redisCliCmd --cluster create --cluster-replicas 1 ${nodes}"
log.debug(createClusterCmd)
Process(createClusterCmd)
.run(
new ProcessIO(
(writeInput: OutputStream) => {
// Thread.sleep(2000)
println("yes")
writeInput.write("yes\n".getBytes)
writeInput.flush
writeInput.flush()
},
(processOutput: InputStream) => {
Source.fromInputStream(processOutput).getLines().foreach { l =>
println(l)
log.debug(l)
}
},
(processError: InputStream) => {
Source.fromInputStream(processError).getLines().foreach { l =>
println(l)
log.error(l)
}
},
daemonizeThreads = false
Expand All @@ -245,10 +227,12 @@ abstract class RedisClusterClients() extends RedisHelper {
clusterInfo("cluster_known_nodes") shouldBe nodePorts.length.toString
}
client.stop()
log.debug(s"RedisCluster started on $nodePorts")
}

override def cleanup() = {
println("Stop begin")
override def afterAll() = {
super.afterAll()
log.debug("Stop begin")

nodePorts foreach { port =>
val out = new Socket(redisHost, port).getOutputStream
Expand All @@ -260,7 +244,7 @@ abstract class RedisClusterClients() extends RedisHelper {

//deleteDirectory()

println("Stop end")
log.debug("Stop end")
}

def deleteDirectory(): Unit = {
Expand All @@ -278,13 +262,13 @@ class RedisProcess(val port: Int) {
protected val log = Logger(getClass)
protected val processLogger = ProcessLogger(line => log.debug(line), line => log.error(line))

def start() = {
def start(): Unit = {
log.debug(s"starting $this")
if (server == null)
server = Process(cmd).run(processLogger)
}

def stop() = {
def stop(): Unit = {
log.debug(s"stopping $this")
if (server != null) {
try {
Expand All @@ -305,7 +289,6 @@ class RedisProcess(val port: Int) {
}

class SentinelProcess(masterName: String, masterPort: Int, port: Int) extends RedisProcess(port) {
log.debug(s"starting sentinel process with master post $masterPort on $port")
val sentinelConfPath = {
val sentinelConf =
s"""
Expand All @@ -324,6 +307,5 @@ class SentinelProcess(masterName: String, masterPort: Int, port: Int) extends Re
}

class SlaveProcess(masterPort: Int, port: Int) extends RedisProcess(port) {
log.debug(s"starting slave process with master post $masterPort on $port")
override val cmd = s"$redisServerCmd --port $port --slaveof $redisHost $masterPort $redisServerLogLevel"
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class SentinelMonitoredRedisClientMasterSlavesSpec

lazy val redisMasterSlavesPool =
SentinelMonitoredRedisClientMasterSlaves(master = masterName, sentinels = sentinelPorts.map((redisHost, _)))

"sentinel slave pool" should {
"add and remove" in {
eventually {
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/redis/SentinelMutablePoolSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ class SentinelMutablePoolSpec extends RedisSentinelClients("SentinelMutablePoolS

var redisPool: RedisClientMutablePool = null

override def setup(): Unit = {
super.setup()
override def beforeAll(): Unit = {
super.beforeAll()
redisPool = RedisClientMutablePool(Seq(RedisServer(redisHost, slavePort1)), masterName)
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/redis/TestBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.scalatest.{Matchers, WordSpecLike}
trait TestBase extends WordSpecLike with Matchers with ScalaFutures with Eventually {
import org.scalatest.time.{Millis, Seconds, Span}
implicit protected val defaultPatience =
PatienceConfig(timeout = scaled(Span(1, Seconds)), interval = scaled(Span(10, Millis)))
PatienceConfig(timeout = scaled(Span(1, Seconds)), interval = Span(100, Millis))

protected val log = Logger(getClass)

Expand Down

0 comments on commit 209cc63

Please sign in to comment.