Skip to content

Commit

Permalink
Merge 209cc63 into 0c7faba
Browse files Browse the repository at this point in the history
  • Loading branch information
kardapoltsev committed Feb 24, 2019
2 parents 0c7faba + 209cc63 commit 820899f
Show file tree
Hide file tree
Showing 37 changed files with 1,553 additions and 1,544 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
env:
TEST_TIME_FACTOR=100
dist: xenial
language: scala

Expand Down
123 changes: 64 additions & 59 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,70 +1,75 @@
lazy val common = Seq(
organization := "com.github.Ma27",
publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value)
Some("snapshots" at nexus + "content/repositories/snapshots")
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
},

scalaVersion := "2.12.8",
crossScalaVersions := Seq(scalaVersion.value, "2.11.12"),
licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0.html")),
homepage := Some(url("https://github.com/Ma27/rediscala")),
scmInfo := Some(ScmInfo(url("https://github.com/Ma27/rediscala"), "scm:git:git@github.com:Ma27/rediscala.git")),
apiURL := Some(url("http://etaty.github.io/rediscala/latest/api/")),
pomExtra :=
<developers>
organization := "com.github.Ma27",
publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value)
Some("snapshots" at nexus + "content/repositories/snapshots")
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
},
scalaVersion := "2.12.8",
crossScalaVersions := Seq(scalaVersion.value, "2.11.12"),
licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0.html")),
homepage := Some(url("https://github.com/Ma27/rediscala")),
scmInfo := Some(ScmInfo(url("https://github.com/Ma27/rediscala"), "scm:git:git@github.com:Ma27/rediscala.git")),
apiURL := Some(url("http://etaty.github.io/rediscala/latest/api/")),
pomExtra :=
<developers>
<developer>
<id>Ma27</id>
<name>Valerian Barbot, The Rediscala community</name>
<url>http://github.com/Ma27/</url>
</developer>
</developers>,
resolvers ++= Seq(
"Typesafe repository snapshots" at "http://repo.typesafe.com/typesafe/snapshots/",
"Typesafe repository releases" at "http://repo.typesafe.com/typesafe/releases/"
),
publishMavenStyle := true,
scalacOptions ++= Seq(
"-encoding", "UTF-8",
"-Xlint",
"-deprecation",
"-Xfatal-warnings",
"-feature",
"-language:postfixOps",
"-unchecked"
),


libraryDependencies ++= {
val akkaVersion = "2.5.19"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test,
"org.specs2" %% "specs2-core" % "4.3.6" % Test,
"org.scala-stm" %% "scala-stm" % "0.9",
"org.scalacheck" %% "scalacheck" % "1.14.0" % Test
)
},

autoAPIMappings := true,
resolvers ++= Seq(
"Typesafe repository snapshots" at "http://repo.typesafe.com/typesafe/snapshots/",
"Typesafe repository releases" at "http://repo.typesafe.com/typesafe/releases/"
),
publishMavenStyle := true,
scalacOptions ++= Seq(
"-encoding",
"UTF-8",
"-Xlint",
"-deprecation",
"-Xfatal-warnings",
"-feature",
"-language:postfixOps",
"-unchecked"
),
libraryDependencies ++= {
val akkaVersion = "2.5.19"
Seq(
"org.scala-stm" %% "scala-stm" % "0.9",
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test,
"de.heikoseeberger" %% "akka-log4j" % "1.6.1" % Test,
"org.scalatest" %% "scalatest" % "3.0.5" % Test,
"org.scalacheck" %% "scalacheck" % "1.14.0" % Test,
"org.apache.logging.log4j" % "log4j-api" % "2.11.1" % Test,
"org.apache.logging.log4j" % "log4j-core" % "2.11.1" % Test,
"org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.11.1" % Test,
"org.apache.logging.log4j" %% "log4j-api-scala" % "11.0" % Test,
)
},
autoAPIMappings := true,
// TODO create new github pages target
apiURL := Some(url("http://etaty.github.io/rediscala/"))
)

// TODO create new github pages target
apiURL := Some(url("http://etaty.github.io/rediscala/"))
)

lazy val root = (project in file(".")).settings(common, name := "rediscala")
lazy val root = (project in file(".")).settings(
common,
name := "rediscala",
logBuffered in Test := false
)

lazy val bench = (project in file("src/bench"))
.settings(
name := "rediscala-bench",
testFrameworks += new TestFramework("org.scalameter.ScalaMeterFramework"),
parallelExecution in Test := false,
logBuffered := false,

libraryDependencies ++= Seq(
"com.storm-enroute" %% "scalameter" % "0.9"
)
).dependsOn(root)
name := "rediscala-bench",
testFrameworks += new TestFramework("org.scalameter.ScalaMeterFramework"),
parallelExecution in Test := false,
logBuffered := false,
libraryDependencies ++= Seq(
"com.storm-enroute" %% "scalameter" % "0.9"
)
)
.dependsOn(root)
106 changes: 56 additions & 50 deletions src/main/scala/redis/RedisPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ case class RedisServer(host: String = "localhost",
password: Option[String] = None,
db: Option[Int] = None)


case class RedisConnection(actor: ActorRef, active: Ref[Boolean] = Ref(false))

abstract class RedisClientPoolLike(system: ActorSystem, redisDispatcher: RedisDispatcher) {
abstract class RedisClientPoolLike(system: ActorSystem, redisDispatcher: RedisDispatcher) {

def redisServerConnections: scala.collection.Map[RedisServer, RedisConnection]

val name: String
implicit val executionContext = system.dispatchers.lookup(redisDispatcher.name)

private val redisConnectionRef: Ref[Seq[ActorRef]] = Ref(Seq.empty)

/**
*
* @param redisCommand
Expand Down Expand Up @@ -55,12 +55,12 @@ abstract class RedisClientPoolLike(system: ActorSystem, redisDispatcher: RedisDi
server.db.foreach(redis.select)
}

def onConnectStatus(server: RedisServer, active: Ref[Boolean]): (Boolean) => Unit = {
(status: Boolean) => {
if (active.single.compareAndSet(!status, status)) {
refreshConnections()
}
def onConnectStatus(server: RedisServer, active: Ref[Boolean]): (Boolean) => Unit = { (status: Boolean) =>
{
if (active.single.compareAndSet(!status, status)) {
refreshConnections()
}
}
}

def refreshConnections() = {
Expand All @@ -78,8 +78,8 @@ abstract class RedisClientPoolLike(system: ActorSystem, redisDispatcher: RedisDi
}

/**
* Disconnect from the server (stop the actor)
*/
* Disconnect from the server (stop the actor)
*/
def stop() {
redisConnectionPool.foreach { redisConnection =>
system stop redisConnection
Expand All @@ -92,23 +92,31 @@ abstract class RedisClientPoolLike(system: ActorSystem, redisDispatcher: RedisDi
}

def makeRedisClientActor(server: RedisServer, active: Ref[Boolean]): ActorRef = {
system.actorOf(RedisClientActor.props(new InetSocketAddress(server.host, server.port),
getConnectOperations(server), onConnectStatus(server, active), redisDispatcher.name)
.withDispatcher(redisDispatcher.name),
system.actorOf(
RedisClientActor
.props(
new InetSocketAddress(server.host, server.port),
getConnectOperations(server),
onConnectStatus(server, active),
redisDispatcher.name)
.withDispatcher(redisDispatcher.name),
name + '-' + Redis.tempName()
)
}

}

case class RedisClientMutablePool(redisServers: Seq[RedisServer],
name: String = "RedisClientPool")
(implicit system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher
) extends RedisClientPoolLike (system, redisDispatcher) with RoundRobinPoolRequest with RedisCommands {
case class RedisClientMutablePool(redisServers: Seq[RedisServer], name: String = "RedisClientPool")(
implicit system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher)
extends RedisClientPoolLike(system, redisDispatcher)
with RoundRobinPoolRequest
with RedisCommands {

override val redisServerConnections = {
val m = redisServers map { server => makeRedisConnection(server) }
val m = redisServers map { server =>
makeRedisConnection(server)
}
collection.mutable.Map(m: _*)
}

Expand All @@ -134,14 +142,14 @@ case class RedisClientMutablePool(redisServers: Seq[RedisServer],
}
}


}

case class RedisClientPool(redisServers: Seq[RedisServer],
name: String = "RedisClientPool")
(implicit _system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher
) extends RedisClientPoolLike(_system, redisDispatcher) with RoundRobinPoolRequest with RedisCommands {
case class RedisClientPool(redisServers: Seq[RedisServer], name: String = "RedisClientPool")(
implicit _system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher)
extends RedisClientPoolLike(_system, redisDispatcher)
with RoundRobinPoolRequest
with RedisCommands {

override val redisServerConnections = {
redisServers.map { server =>
Expand All @@ -153,11 +161,11 @@ case class RedisClientPool(redisServers: Seq[RedisServer],

}

case class RedisClientMasterSlaves(master: RedisServer,
slaves: Seq[RedisServer])
(implicit _system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher)
extends RedisCommands with Transactions {
case class RedisClientMasterSlaves(master: RedisServer, slaves: Seq[RedisServer])(implicit _system: ActorSystem,
redisDispatcher: RedisDispatcher =
Redis.dispatcher)
extends RedisCommands
with Transactions {
implicit val executionContext = _system.dispatchers.lookup(redisDispatcher.name)

val masterClient = RedisClient(master.host, master.port, master.password, master.db)
Expand All @@ -175,26 +183,24 @@ case class RedisClientMasterSlaves(master: RedisServer,
def redisConnection: ActorRef = masterClient.redisConnection
}


case class SentinelMonitoredRedisClientMasterSlaves(
sentinels: Seq[(String, Int)] = Seq(("localhost", 26379)), master: String)
(implicit _system: ActorSystem, redisDispatcher: RedisDispatcher = Redis.dispatcher)
extends SentinelMonitored(_system, redisDispatcher) with ActorRequest with RedisCommands with Transactions {

val masterClient: RedisClient = withMasterAddr(
(ip, port) => {
new RedisClient(ip, port, name = "SMRedisClient")
})

val slavesClients: RedisClientMutablePool = withSlavesAddr(
slavesHostPort => {
val slaves = slavesHostPort.map {
case (ip, port) =>
new RedisServer(ip, port)
}
new RedisClientMutablePool(slaves, name = "SMRedisClient")
})

sentinels: Seq[(String, Int)] = Seq(("localhost", 26379)),
master: String)(implicit _system: ActorSystem, redisDispatcher: RedisDispatcher = Redis.dispatcher)
extends SentinelMonitored(_system, redisDispatcher)
with ActorRequest
with RedisCommands
with Transactions {

val masterClient: RedisClient = withMasterAddr((ip, port) => {
new RedisClient(ip, port, name = "SMRedisClient")
})

val slavesClients: RedisClientMutablePool = withSlavesAddr(slavesHostPort => {
val slaves = slavesHostPort.map {
case (ip, port) => RedisServer(ip, port)
}
RedisClientMutablePool(slaves, name = "SMRedisClient")
})

val onNewSlave = (ip: String, port: Int) => {
log.info(s"onNewSlave $ip:$port")
Expand All @@ -212,8 +218,8 @@ case class SentinelMonitoredRedisClientMasterSlaves(
}

/**
* Disconnect from the server (stop the actors)
*/
* Disconnect from the server (stop the actors)
*/
def stop() = {
masterClient.stop()
slavesClients.stop()
Expand Down
8 changes: 8 additions & 0 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
akka {
loggers = ["de.heikoseeberger.akkalog4j.Log4jLogger"]
loglevel = "INFO"
test {
timefactor = 10
timefactor = ${?TEST_TIME_FACTOR}
}
}
14 changes: 14 additions & 0 deletions src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Appenders>
<Console name="STDOUT">
<PatternLayout pattern="%date %highlight{%-5level} %logger{1} %msg%n"/>
</Console>
</Appenders>

<Loggers>
<Root level="DEBUG">
<AppenderRef ref="STDOUT"/>
</Root>
</Loggers>
</Configuration>

0 comments on commit 820899f

Please sign in to comment.