Skip to content

Commit

Permalink
Merge 6808200 into 8d766a5
Browse files Browse the repository at this point in the history
  • Loading branch information
kailuowang committed Aug 18, 2016
2 parents 8d766a5 + 6808200 commit 6e357f0
Show file tree
Hide file tree
Showing 14 changed files with 327 additions and 134 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ cache:

script:
- sbt ++$TRAVIS_SCALA_VERSION clean coverage test integration:test coverageReport
- sbt ++$TRAVIS_SCALA_VERSION stress

after_success:
- sbt ++$TRAVIS_SCALA_VERSION coverageAggregate coveralls
Expand Down
8 changes: 6 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,19 @@ lazy val stressFrontend = project.in(file("./stress/frontend"))
)

lazy val stressGatling = project.in(file("./stress/gatling"))
.aggregate(stressFrontend)
.dependsOn(stressFrontend)
.enablePlugins(GatlingPlugin)
.settings(moduleName := "kanaloa-stress-gatling")
.settings(noPublishing:_*)
.settings(
resolvers += Resolver.sonatypeRepo("snapshots"),
libraryDependencies ++= Dependencies.gatling
)



addCommandAlias("validate", ";root;clean;compile;test;integration:test;multi-jvm:test")
addCommandAlias("root", ";project root")
addCommandAlias("stress", ";stressGatling/gatling:test-only kanaloa.stress.AutomatedKanaloaSimulation")
addCommandAlias("validate", ";root;clean;compile;test;integration:test;gatling:autoStress")
addCommandAlias("root", ";project root")

Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ trait Dispatcher extends Actor with ActorLogging {

context watch processor

private val autothrottler = settings.autothrottle.foreach { s
context.actorOf(Autothrottler.default(processor, s, metricsCollector), "auto-scaler")
private val _ = settings.autothrottle.foreach { s
context.actorOf(Autothrottler.default(processor, s, metricsCollector), "autothrottler")
}

def receive: Receive = ({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class PullingDispatcherIntegration extends IntegrationSpec {

class PullingDispatcherSanityCheckIntegration extends IntegrationSpec {

"can remain sane when all workers are failing" in new TestScope with Backends {
"can remain sane when all workers are constantly failing" in new TestScope with Backends {
val backend = suicidal(1.milliseconds)
val iterator = Stream.continually("a").iterator

Expand Down Expand Up @@ -197,11 +197,11 @@ class PullingDispatcherSanityCheckIntegration extends IntegrationSpec {
val prob = TestProbe()
pd ! SubscribePerformanceMetrics(prob.ref)

var samples = List[Sample]()
var samples = List[Sample]() //collect 20 samples
val r = prob.fishForMessage(10.seconds) {
case s: Sample
samples = s :: samples
samples.length > 30
samples.length > 20
case p: Report
false
}
Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ object Dependencies {
)

val gatling = Seq(
"io.gatling.highcharts" % "gatling-charts-highcharts" % "2.2.2" % Test,
"io.gatling" % "gatling-test-framework" % "2.2.2" % Test
"io.gatling.highcharts" % "gatling-charts-highcharts" % "2.2.3-SNAPSHOT" % Test,
"io.gatling" % "gatling-test-framework" % "2.2.3-SNAPSHOT" % Test
)

val (test, integration) = {
Expand All @@ -53,7 +53,7 @@ object Dependencies {
Resolver.typesafeRepo("releases"),
Resolver.jcenterRepo,
Resolver.bintrayRepo("scalaz", "releases"),
"Sonatype OSS Releases" at "http://oss.sonatype.org/content/repositories/releases/"
Resolver.sonatypeRepo("releases")
),

libraryDependencies ++= Dependencies.akka ++
Expand Down
4 changes: 2 additions & 2 deletions stress/backend/src/main/resources/backend.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
optimal-concurrency = 10 //optimal concurrent requests the backend can handle
optimal-throughput = 100 //the opitmal throughput (msg / second) the backend can handle
optimal-throughput = 200 //the opitmal throughput (msg / second) the backend can handle
buffer-size = 5000
overload-punish-factor = 0 //between 1 and 0, one gives the maximum punishment while 0 gives none

Expand All @@ -22,7 +22,7 @@ akka {
"akka.tcp://kanaloa-stress@127.0.0.1:2551"]
roles = [ backend ]
metrics.enabled = off

auto-down-unreachable-after = 3s
}

loglevel= "INFO"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

object BackendApp extends App {
val throughput = args.headOption.map(_.toInt)
val props = MockBackend.props(maxThroughput = throughput)
val system = ActorSystem("kanaloa-stress", ConfigFactory.load("backend.conf"))
system.actorOf(MockBackend.props, "backend")

system.actorOf(props, "backend")
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,30 @@ import akka.actor.Props
import akka.contrib.throttle.TimerBasedThrottler
import akka.contrib.throttle.Throttler._
import akka.actor._
import com.typesafe.config.ConfigFactory
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.duration._
import scala.util.Random

object MockBackend {

lazy val props = {
val cfg = ConfigFactory.load("backend.conf")
def props(
throttle: Boolean = true,
maxThroughput: Option[Int] = None,
minLatency: Option[FiniteDuration] = None,
cfg: Config = ConfigFactory.load("backend.conf")
) = {

val optimalConcurrency = cfg.getInt("optimal-concurrency")
maxThroughput.foreach { op =>
assert(op / 10d / optimalConcurrency >= 1, s"throughput $op too low to manager should be at least ${10 * optimalConcurrency}")

}
Props(new BackendRouter(
cfg.getInt("optimal-concurrency"),
cfg.getInt("optimal-throughput"),
throttle,
optimalConcurrency,
maxThroughput.getOrElse(cfg.getInt("optimal-throughput")),
cfg.getInt("buffer-size"),
minLatency,
Some(cfg.getDouble("overload-punish-factor"))
))
}
Expand All @@ -26,17 +38,23 @@ object MockBackend {
* @param optimalThroughput maximum number of requests can handle per second
*/
class BackendRouter(
throttle: Boolean,
optimalConcurrency: Int,
optimalThroughput: Int,
bufferSize: Int = 10000,
minLatency: Option[FiniteDuration] = None,
overloadPunishmentFactor: Option[Double] = None
) extends Actor with ActorLogging {

var requestsHandling = 0

val rand = new Random(System.currentTimeMillis())
val perResponderRate = Math.round(optimalThroughput.toDouble / 10d / optimalConcurrency).toInt msgsPer 100.milliseconds
val baseLatency = perResponderRate.duration / perResponderRate.numberOfCalls

val baseLatency = {
val latencyFromThroughput = perResponderRate.duration / perResponderRate.numberOfCalls
if (minLatency.fold(false)(_ > latencyFromThroughput)) minLatency.get else latencyFromThroughput
}

val responders: Array[ActorRef] = {

Expand All @@ -49,17 +67,25 @@ object MockBackend {
}
}

val receive: Receive = {
def receive = if (throttle) throttled else direct

val throttled: Receive = {
case Request("overflow")
log.warning("Overflow command received. Switching to unresponsive mode.")
context become overflow
case Request(msg)
requestsHandling += 1

if (requestsHandling > bufferSize) {
log.error("!!!! Buffer overflow at, declare dead" + bufferSize)
context become bufferOverflow
context become overflow
}

val overloadPunishment: Double = if (requestsHandling > optimalConcurrency)
// the overload punishment is caped at 0.5 (50% of the throughput)
val overloadPunishment: Double = if (requestsHandling > optimalConcurrency) Math.min(
0.5,
overloadPunishmentFactor.fold(0d)(_ * (requestsHandling.toDouble - optimalConcurrency.toDouble) / requestsHandling.toDouble)
)
else 0

val index: Int = if (responders.length > 1)
Expand All @@ -80,9 +106,19 @@ object MockBackend {
case other log.error("unknown msg received " + other)
}

val bufferOverflow: Receive = {
val overflow: Receive = {
case Request("throttled")
log.info("Back command received. Switching back to normal mode.")
context become throttled
case _ => //just pretend to be dead
}

val direct: Receive = {
case Request("overflow")
log.warning("Overflow command received. Switching to unresponsive mode.")
context become overflow
case Request(m) => sender ! Respond(m)
}
}

class ResponderBehindThrottler extends Actor with ActorLogging {
Expand Down
13 changes: 3 additions & 10 deletions stress/frontend/src/main/resources/frontend.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ kanaloa {
updateInterval = 200ms

workerPool {
startingPoolSize = 50
startingPoolSize = 30
}

autothrottle {
Expand All @@ -19,17 +19,9 @@ kanaloa {

backPressure {
referenceDelay = 1s
durationOfBurstAllowed = 5s
durationOfBurstAllowed = 10s
}

metrics {
enabled = on // turn it off if you don't have a statsD server and hostname set as an env var KANALOA_STRESS_STATSD_HOST
statsd {
namespace = kanaloa-stress
host = ${?KANALOA_STRESS_STATSD_HOST}
eventSampleRate = 0.25
}
}
}

}
Expand Down Expand Up @@ -67,6 +59,7 @@ akka {
"akka.tcp://kanaloa-stress@127.0.0.1:2551"]
roles = [ frontend ]
metrics.enabled = off
auto-down-unreachable-after = 3s
}

loglevel= "INFO"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package kanaloa.stress.frontend

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.routing.FromConfig
import akka.stream.ActorMaterializer
import akka.pattern.{ AskTimeoutException, ask }
import akka.util.Timeout
import kanaloa.reactive.dispatcher.ApiProtocol.{ WorkRejected, WorkTimedOut, WorkFailed }
import kanaloa.stress.backend.BackendApp._
import kanaloa.stress.backend.MockBackend
import kanaloa.util.JavaDurationConverters._
import scala.util.{ Failure, Success }
import scala.io.StdIn._
import com.typesafe.config.ConfigFactory
import kanaloa.reactive.dispatcher.{ ResultChecker, ClusterAwareBackend, PushingDispatcher }
import scala.concurrent.duration._

class HttpService(inCluster: Boolean, maxThroughputRPS: Option[Int] = None) {

lazy val statsDHostO = sys.env.get("KANALOA_STRESS_STATSD_HOST") //hook with statsD if its available
lazy val metricsConfig = statsDHostO.map { host
s"""
metrics {
enabled = on // turn it off if you don't have a statsD server and hostname set as an env var
statsd {
namespace = kanaloa-stress
host = $host //todo do not commit this
eventSampleRate = 0.25
}
}
"""
}.getOrElse("")

val baseCfg = ConfigFactory.load("frontend.conf").withFallback(ConfigFactory.parseString(metricsConfig))
val cfg = if (inCluster) baseCfg else baseCfg.withoutPath("akka.actor.provider").withoutPath("akka.cluster").withoutPath("akka.remote")

implicit val system = ActorSystem("kanaloa-stress", cfg.resolve())
implicit val materializer = ActorMaterializer()
implicit val execCtx = system.dispatcher
implicit val timeout: Timeout = (cfg.getDuration("frontend-timeout").asScala).asInstanceOf[FiniteDuration]

case class Failed(msg: String)

lazy val localBackend = system.actorOf(MockBackend.props(
maxThroughput = maxThroughputRPS
), name = "local-backend")

lazy val localUnThrottledBackend = system.actorOf(MockBackend.props(false), name = "local-direct-backend")

lazy val remoteBackendRouter = system.actorOf(FromConfig.props(), "backendRouter")

val resultChecker: ResultChecker = {
case r: MockBackend.Respond
Right(r)
case other
Left("Dispatcher: MockBackend.Respond() acceptable only. Received: " + other)
}

lazy val dispatcher =
system.actorOf(PushingDispatcher.props(
name = "with-local-backend",
localBackend,
cfg
)(resultChecker), "local-dispatcher")

lazy val localUnThrottledBackendWithKanaloa = system.actorOf(PushingDispatcher.props(
name = "with-local-unthrottled-backend",
localUnThrottledBackend,
ConfigFactory.parseString(
"""
|kanaloa.default-dispatcher {
| workerPool {
| startingPoolSize = 1000
| maxPoolSize = 3000
| }
|}
""".stripMargin
).withFallback(cfg)
)(resultChecker), "local-unthrottled-dispatcher")

lazy val clusterDispatcher =
system.actorOf(PushingDispatcher.props(
name = "with-remote-backend",
ClusterAwareBackend("backend", "backend"),
cfg
)(resultChecker), "cluster-dispatcher")

def testRoute(rootPath: String, destination: ActorRef) =
get {
path(rootPath / (".+"r)) { msg

val f = (destination ? MockBackend.Request(msg)).recover {
case e: akka.pattern.AskTimeoutException => WorkTimedOut(s"ask timeout after $timeout")
}

onComplete(f) {
case Success(WorkRejected(msg)) complete(503, "service unavailable")
case Success(WorkFailed(msg)) failWith(new Exception(s"Failed: $msg"))
case Success(WorkTimedOut(msg)) complete(408, msg)
case Success(MockBackend.Respond(msg)) complete("Success! " + msg)
case Success(unknown) failWith(new Exception(s"unknown response: $unknown"))
case Failure(e) complete(408, e)
}
}
}

val localRoutes = testRoute("kanaloa", dispatcher) ~
testRoute("straight", localBackend) ~
testRoute("straight_unthrottled", localUnThrottledBackend) ~
testRoute("kanaloa_unthrottled", localUnThrottledBackendWithKanaloa)

lazy val clusterEnabledRoutes =
testRoute("round_robin", remoteBackendRouter) ~
testRoute("cluster_kanaloa", clusterDispatcher)

val routes = if (inCluster) clusterEnabledRoutes ~ localRoutes else localRoutes
val bindingFuture = Http().bindAndHandle(routes, "localhost", 8081)

def close(): Unit = {
bindingFuture.flatMap(_.unbind()).onComplete { _ system.terminate() }
}
}

object HttpService extends App {
val inCluster = args.headOption.map(_.toBoolean).getOrElse(true)
println("Starting http service " + (if (inCluster) " in cluster" else ""))

val service = new HttpService(inCluster, None)
println(s"Server online at http://localhost:8081/\nPress RETURN to stop...")

readLine()

service.close()

}

Loading

0 comments on commit 6e357f0

Please sign in to comment.