Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

[split] topo: a minimal appserver topology for profiling and testing

  • Loading branch information...
commit 6b761a98f22352f0f9c40da0d7f974a58d5bc622 1 parent 2d37e7b
marius a. eriksen authored February 08, 2012
38  finagle-stress/README
... ...
@@ -0,0 +1,38 @@
  1
+com.twitter.finagle.topo
  2
+
  3
+  Consists of 2 servers: Backendserver and Appserver, to be set up in
  4
+  typical application service topology. The application server exposes
  5
+  an HTTP service and fans out requests to thrift backends.
  6
+
  7
+  This is the minimal toplogy for useful isolated testing and
  8
+  examination -- appserver is both a server and client (with different
  9
+  protocols), and can be configured to talk to large pools of backends,
  10
+  using a large number of connections. backend simulates latencies and
  11
+  response sizes (they are parameterized by the request). backend does
  12
+  not require configuration, request behavior is modelled completely by
  13
+  the appserver configuration, specified entirely via the command line:
  14
+
  15
+  	bin/appserver port [size:milliseconds,..] [n*host:port..]
  16
+
  17
+  for example
  18
+
  19
+  	bin/appserver 1500 424300:88,339400:14,894300:81,695900:94 \
  20
+  	  100*localhost:2000 10*localhost:3000
  21
+
  22
+  will serve http on port 1500, dispatching requests to a virtual
  23
+  cluster of two shards (served by bin/backend 2000 and bin/backend
  24
+  3000), the first which is striped over 100 finagle clients, the second
  25
+  10. Responses are sampled from the list given. It's simple to generate
  26
+  random samples:
  27
+
  28
+  	jot -r 160 1000 10000 | rs 0 2 | awk '{print 100*$1 ":" int($2/100)}' | tr '\n' ','
  29
+
  30
+  bin/client drives load with specified concurrency:
  31
+
  32
+  	bin/client statsport targethostport concurrency
  33
+
  34
+  so to drive 100 concurrent requests to the above,
  35
+
  36
+  	bin/client 8000 localhost:1500 100
  37
+
  38
+  where the client exports stats with ostrich on port 8000.
1  finagle-stress/bin/appserver
1  finagle-stress/bin/backend
1  finagle-stress/bin/client
28  finagle-stress/bin/run
... ...
@@ -0,0 +1,28 @@
  1
+#!/bin/bash
  2
+
  3
+fatal() {
  4
+  echo "$@" 1>&2
  5
+  exit 1
  6
+}
  7
+
  8
+case $(basename $0) in
  9
+  appserver)
  10
+    class=com.twitter.finagle.topo.Appserver
  11
+    ;;
  12
+  backend)
  13
+    class=com.twitter.finagle.topo.Backendserver
  14
+    ;;
  15
+  client)
  16
+    class=com.twitter.finagle.topo.Client
  17
+    ;;
  18
+  *)
  19
+    fatal "invalid target"
  20
+esac
  21
+
  22
+root=$(
  23
+  cd $(dirname $0)
  24
+  cd ../../../
  25
+  echo $PWD
  26
+)
  27
+
  28
+$root/bin/scala -i finagle/finagle-stress $class "$@"
110  finagle-stress/src/main/scala/com/twitter/finagle/topo/Appserver.scala
... ...
@@ -0,0 +1,110 @@
  1
+package com.twitter.finagle.topo
  2
+
  3
+import com.twitter.conversions.storage._
  4
+import com.twitter.conversions.time._
  5
+import com.twitter.finagle.Service
  6
+import com.twitter.finagle.builder.{
  7
+  ServerBuilder, Cluster, StaticCluster, ClientBuilder}
  8
+import com.twitter.finagle.http.Http
  9
+import com.twitter.finagle.stats.OstrichStatsReceiver
  10
+import com.twitter.finagle.thrift.ThriftClientFramedCodec
  11
+import com.twitter.logging.Logger
  12
+import com.twitter.ostrich.admin.{RuntimeEnvironment, AdminHttpService}
  13
+import com.twitter.util.{Future, Duration, Time, StorageUnit}
  14
+import java.net.{SocketAddress, InetSocketAddress}
  15
+import org.apache.thrift.protocol.TBinaryProtocol
  16
+import org.jboss.netty.buffer.ChannelBuffers
  17
+import org.jboss.netty.handler.codec.http._
  18
+import scala.util.Random
  19
+
  20
+class AppService(clients: Seq[thrift.Backend.ServiceIface], responseSample: Seq[(Duration, StorageUnit)])
  21
+  extends Service[HttpRequest, HttpResponse]
  22
+{
  23
+  private[this] val rng = new Random
  24
+
  25
+  private[this] def nextResponse() =
  26
+    responseSample(rng.nextInt(responseSample.size))
  27
+
  28
+  def apply(req: HttpRequest) = {
  29
+    val responses = for (client <- clients) yield {
  30
+      val (latency, size) = nextResponse()
  31
+      client.request(size.inBytes.toInt, latency.inMilliseconds.toInt)
  32
+    }
  33
+
  34
+    val begin = Time.now
  35
+
  36
+    Future.collect(responses) map { bodies =>
  37
+      val response = new DefaultHttpResponse(req.getProtocolVersion, HttpResponseStatus.OK)
  38
+      val bytes = (bodies mkString "").getBytes
  39
+      response.setContent(ChannelBuffers.wrappedBuffer(bytes))
  40
+      response.setHeader("Content-Lenth", "%d".format(bytes.size))
  41
+      response.setHeader("X-Finagle-Latency-Ms", "%d".format(begin.untilNow.inMilliseconds))
  42
+      response
  43
+    }
  44
+  }
  45
+}
  46
+
  47
+object Appserver {
  48
+  private[this] lazy val log = Logger(getClass)
  49
+
  50
+  private[this] def mkClient(name: String, cluster: Cluster[SocketAddress]) = {
  51
+    val transport = ClientBuilder()
  52
+      .name(name)
  53
+      .cluster(cluster)
  54
+      .codec(ThriftClientFramedCodec())
  55
+      .reportTo(new OstrichStatsReceiver)
  56
+      .hostConnectionLimit(10)
  57
+      .build()
  58
+
  59
+    new thrift.Backend.ServiceToClient(
  60
+      transport, new TBinaryProtocol.Factory())
  61
+  }
  62
+
  63
+  private[this] def usage() {
  64
+    System.err.println("Server basePort responseSample n*hostport [k*hostport..]")
  65
+    System.exit(1)
  66
+  }
  67
+
  68
+  def main(args: Array[String]) = {
  69
+    {
  70
+      import com.twitter.logging.config._
  71
+      val config = new LoggerConfig {
  72
+        node = ""
  73
+        level = Level.INFO
  74
+        handlers = new ConsoleHandlerConfig
  75
+      }
  76
+      config()
  77
+    }
  78
+
  79
+    if (args.size < 3)
  80
+      usage()
  81
+
  82
+    val basePort = args(0).toInt
  83
+    // eg. generate with jot -r 160 1000 10000 | rs  0 2 | awk '{print 100*$1 ":" int($2/100)}' | tr '\n' ','
  84
+
  85
+    val responseSample = for {
  86
+      sample <- args(1) split ","
  87
+      Array(size, latency) = sample split ":"
  88
+    } yield (latency.toInt.milliseconds, size.toInt.bytes)
  89
+
  90
+    val clients = for {
  91
+      (spec, i) <- (args drop 2).zipWithIndex
  92
+      Array(n, hostport) = spec split "\\*"
  93
+      Array(host, port) = hostport split ":"
  94
+      addr = new InetSocketAddress(host, port.toInt)
  95
+    } yield mkClient("client%d".format(i), new StaticCluster[SocketAddress]((0 until n.toInt) map { _ => addr }))
  96
+
  97
+    val runtime = RuntimeEnvironment(this, Array()/*no args for you*/)
  98
+    val adminService = new AdminHttpService(basePort+1, 100/*backlog*/, runtime)
  99
+    adminService.start()
  100
+
  101
+    val service = new AppService(clients.toSeq, responseSample)
  102
+
  103
+    ServerBuilder()
  104
+      .name("appserver")
  105
+      .codec(Http())
  106
+      .reportTo(new OstrichStatsReceiver)
  107
+      .bindTo(new InetSocketAddress(basePort))
  108
+      .build(service)
  109
+  }
  110
+}
57  finagle-stress/src/main/scala/com/twitter/finagle/topo/Backend.scala
... ...
@@ -0,0 +1,57 @@
  1
+package com.twitter.finagle.topo
  2
+
  3
+import com.twitter.conversions.time._
  4
+import com.twitter.finagle.builder.ServerBuilder
  5
+import com.twitter.finagle.stats.OstrichStatsReceiver
  6
+import com.twitter.finagle.thrift.ThriftServerFramedCodec
  7
+import com.twitter.logging.Logger
  8
+import com.twitter.ostrich.admin.{RuntimeEnvironment, AdminHttpService}
  9
+import com.twitter.util.{Future, JavaTimer}
  10
+import java.net.InetSocketAddress
  11
+import org.apache.thrift.protocol.TBinaryProtocol
  12
+
  13
+object BackendService extends thrift.Backend.ServiceIface {
  14
+  private[this] val timer = new JavaTimer
  15
+
  16
+  private[this] def makeResponse(size: Int) = "."*size
  17
+
  18
+  def request(size: Int, latencyMs: Int) =
  19
+    if (latencyMs <= 0)
  20
+      Future.value(makeResponse(size))
  21
+    else
  22
+      timer.doLater(latencyMs.milliseconds) { makeResponse(size) }
  23
+}
  24
+
  25
+object Backendserver {
  26
+  private[this] val log = Logger(getClass)
  27
+
  28
+  def main(args: Array[String]) = {
  29
+    {
  30
+      import com.twitter.logging.config._
  31
+      val config = new LoggerConfig {
  32
+        node = ""
  33
+        level = Level.INFO
  34
+        handlers = new ConsoleHandlerConfig
  35
+      }
  36
+      config()
  37
+    }
  38
+
  39
+    if (args.size != 1) {
  40
+      log.fatal("Server port")
  41
+      System.exit(1)
  42
+    }
  43
+
  44
+    val basePort = args(0).toInt
  45
+
  46
+    val runtime = RuntimeEnvironment(this, Array()/*no args for you*/)
  47
+    val adminService = new AdminHttpService(basePort+1, 100/*backlog*/, runtime)
  48
+    adminService.start()
  49
+
  50
+    ServerBuilder()
  51
+      .name("backend")
  52
+      .codec(ThriftServerFramedCodec())
  53
+      .reportTo(new OstrichStatsReceiver)
  54
+      .bindTo(new InetSocketAddress(basePort))
  55
+      .build(new thrift.Backend.Service(BackendService, new TBinaryProtocol.Factory()))
  56
+  }
  57
+}
49  finagle-stress/src/main/scala/com/twitter/finagle/topo/Client.scala
... ...
@@ -0,0 +1,49 @@
  1
+package com.twitter.finagle.topo
  2
+
  3
+import com.twitter.finagle.Service
  4
+import com.twitter.finagle.builder.ClientBuilder
  5
+import com.twitter.finagle.http.Http
  6
+import com.twitter.finagle.stats.OstrichStatsReceiver
  7
+import com.twitter.ostrich.admin.{RuntimeEnvironment, AdminHttpService}
  8
+import org.jboss.netty.handler.codec.http._
  9
+
  10
+object Client {
  11
+  private[this] def go(svc: Service[HttpRequest, HttpResponse]) {
  12
+    val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")
  13
+    svc(req) ensure { go(svc) }
  14
+  }
  15
+
  16
+  def main(args: Array[String]) {
  17
+    {
  18
+      import com.twitter.logging.config._
  19
+      val config = new LoggerConfig {
  20
+        node = ""
  21
+        level = Level.INFO
  22
+        handlers = new ConsoleHandlerConfig
  23
+      }
  24
+      config()
  25
+    }
  26
+
  27
+    if (args.size != 3) {
  28
+      System.err.printf("usage: Client statsport host:port concurrency\n")
  29
+      System.exit(1)
  30
+    }
  31
+
  32
+    val statsPort = args(0).toInt
  33
+    val hostport = args(1)
  34
+    val n = args(2).toInt
  35
+
  36
+    val runtime = RuntimeEnvironment(this, Array()/*no args for you*/)
  37
+    val adminService = new AdminHttpService(statsPort, 100/*backlog*/, runtime)
  38
+    adminService.start()
  39
+
  40
+    val builder = ClientBuilder()
  41
+      .reportTo(new OstrichStatsReceiver)
  42
+      .hosts(hostport)
  43
+      .codec(Http())
  44
+      .hostConnectionLimit(1)
  45
+
  46
+    for (which <- 0 until n)
  47
+      go(builder.name("client%d".format(which)).build())
  48
+  }
  49
+}
5  finagle-stress/src/main/thrift/backend.thrift
... ...
@@ -0,0 +1,5 @@
  1
+namespace java com.twitter.finagle.topo.thrift
  2
+
  3
+service Backend {
  4
+  string request(1: i32 responseSize, 2: i32 responseLatencyMs);
  5
+}
5  project/build/Project.scala
@@ -273,7 +273,10 @@ class Project(info: ProjectInfo) extends StandardParentProject(info)
273 273
     override def compileOrder = CompileOrder.JavaThenScala
274 274
     val thrift   = "thrift"      % "libthrift" % "0.5.0"
275 275
     val slf4jNop = "org.slf4j"   % "slf4j-nop" % "1.5.8" % "provided"
276  
-    projectDependencies("ostrich")
  276
+    projectDependencies(
  277
+      "ostrich",
  278
+      "util" ~ "util-logging"
  279
+    )
277 280
   }
278 281
 
279 282
   class B3Project(info: ProjectInfo) extends StandardProject(info)

0 notes on commit 6b761a9

Please sign in to comment.
Something went wrong with that request. Please try again.