Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Improve the benchmark

  • Loading branch information...
commit 83e2f5b323aa0f8bf818139954a8d4913169b157 1 parent e7f6e38
Dag Liodden authored
2  src/main/scala/com/tapad/scitrusleaf/akka/AkkaClient.scala
View
@@ -152,7 +152,7 @@ object AkkaClient {
bs.setOption("tcpNoDelay", true)
bs.setOption("reuseAddress", true)
bs.setOption("receiveBufferSize", 1600)
- bs.setOption("connectTimeoutMillis", 1)
+ bs.setOption("connectTimeoutMillis", 100)
def build(host: String, port: Int) = new ClClient(host, port, bs, executionContext)
129 src/test/scala/com/tapad/scitrusleaf/protocol/AkkaBench.scala
View
@@ -1,89 +1,74 @@
package com.tapad.scitrusleaf.protocol
-import java.util.concurrent.atomic.AtomicInteger
-import org.jboss.netty.buffer.ChannelBuffers
+import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
import com.tapad.scitrusleaf.akka.AkkaClient
-
-
-object Debug {
- val rvc = new AtomicInteger()
- def receive() { println("R:" + rvc.incrementAndGet())}
- val tx = new AtomicInteger()
- def write() { println("W:" + tx.incrementAndGet())}
-}
-
-
+import com.twitter.finagle.stats.SummarizingStatsReceiver
+import com.twitter.util.{Time}
+import akka.dispatch._
+import akka.util.Duration
+import akka.actor.ActorSystem
+import java.util.concurrent.TimeUnit
+import org.jboss.netty.buffer.ChannelBuffers
object AkkaBench {
def main(args: Array[String]) {
import AkkaClient._
+ implicit val system = ActorSystem("foo")
+ implicit val executor = ExecutionContext.defaultExecutionContext
+
+
+ val concurrency = 150
+ val totalRequests = 50000
+
+ val key = new AtomicInteger(0)
+ val statsReceiver = new SummarizingStatsReceiver
+ val responses = new AtomicInteger(0)
+ val completedRequests = new AtomicInteger()
+ val errors = new AtomicInteger(0)
+ val bytesReceived = new AtomicLong(0)
+
+ val client = new LoadBalancer(concurrency, () => ClientBuilder.build("192.168.0.16", 3000))
+
+ val start = Time.now
+
+ val blanks = Array.ofDim[Byte](500)
+ val requests = (0 until concurrency).map { _ =>
+ (0 until (totalRequests / concurrency)).map { _ =>
+ val request = Get(namespace = "test", key = key.incrementAndGet().toString)
+// val data = ChannelBuffers.copiedBuffer(blanks)
+// val request = Set(namespace = "test", key = key.incrementAndGet().toString, value = data)
+ client(request) onSuccess { case response =>
+ response.asInstanceOf[Response].ops.headOption.map(_.value.readableBytes()).foreach(c => bytesReceived.addAndGet(c))
+ responses.incrementAndGet()
+ } onFailure {
+ case e =>
+ errors.incrementAndGet()
+ } onComplete {
+ case _ =>
+ completedRequests.incrementAndGet()
+ }
+ }
+ }.flatten
- val clientCount = 150
- val client = new LoadBalancer(clientCount, () => ClientBuilder.build("192.168.0.10", 3000))
- Thread.sleep(1000)
+ val f: Future[Seq[ClMessage]] = Future.sequence(requests)
- val started = new AtomicInteger()
- val completed = new AtomicInteger()
- val startedTime = System.currentTimeMillis()
- val bytesRead = new AtomicInteger()
+ Await.result(f, Duration.apply(10, TimeUnit.MINUTES))
- def success(m: ClMessage) : Unit = {
- m match {
- case r : Response => r.ops.headOption.map(_.value.readableBytes()).foreach(c => bytesRead.addAndGet(c))
- case _ =>
- }
- val c = completed.incrementAndGet()
- if (c % 1000 == 0) {
- println(completed + " of %d started, %.2f requests / s, %(,d bytes read".format(started.get(), c.toDouble * 1000 / (System.currentTimeMillis() - startedTime), bytesRead.get()))
- }
- }
-
-
-
- def runBench() {
- def data = ChannelBuffers.wrappedBuffer(Array.fill[Byte](1500)(47))
-
- var i = 0
-
- while (i < 1000000) {
- i += 1
- val key = started.incrementAndGet()
-// client(Set(namespace = "test", key = key.toString, value = data)) map (success _)
- client(Get(namespace = "test", key = key.toString)).onComplete(_ fold(
- { e =>
- println(e.getMessage)
- e.printStackTrace()
- },
- r => success(r)
- ))
-// while ((started.get() - completed.get()) > 10000) {
-// Thread.sleep(100)
-// }
-// Thread.sleep(1000)
- }
- while (started.get() > completed.get()) {
- println("Started %d, completed %d".format(started.get(), completed.get()))
- Thread.sleep(1000)
- }
- println("Done submitting")
-
-
- Thread.sleep(200000)
-
- }
-
- def spawn() {
- new Thread() {
- override def run() {
- runBench()
- }
- }.start()
- }
- spawn()
+ val duration = start.untilNow
+ println("%20s\t%s".format("Status", "Count"))
+ println("%20s\t%d".format(0, responses.get()))
+ println("================")
+ println("%d requests completed in %dms (%f requests per second)".format(
+ completedRequests.get, duration.inMilliseconds,
+ totalRequests.toFloat / duration.inMillis.toFloat * 1000))
+ println("%d errors".format(errors.get))
+ println("%2f MBytes received".format(bytesReceived.get() / 1000000.0))
+
+ System.exit(0)
}
}
149 src/test/scala/com/tapad/scitrusleaf/protocol/Bench.scala
View
@@ -2,8 +2,11 @@ package com.tapad.scitrusleaf.protocol
import com.twitter.finagle.Service
import org.jboss.netty.buffer.ChannelBuffers
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
import com.tapad.scitrusleaf.finagle.CitrusleafCodec
+import com.twitter.finagle.tracing.NullTracer
+import com.twitter.util.{Time, Future}
+import com.twitter.finagle.stats.SummarizingStatsReceiver
/**
*
@@ -14,71 +17,111 @@ object Bench {
def main(args: Array[String]) {
+ val concurrency = 150
+ val totalRequests = 50000
+
+ val key = new AtomicInteger(0)
+ val statsReceiver = new SummarizingStatsReceiver
+ val responses = new AtomicInteger(0)
+ val completedRequests = new AtomicInteger()
+ val errors = new AtomicInteger(0)
+ val bytesReceived = new AtomicLong(0)
+
val client: Service[ClMessage, ClMessage] = com.twitter.finagle.builder.ClientBuilder()
.codec(CitrusleafCodec)
- .hosts("192.168.0.12:3000")
- .hostConnectionLimit(1000)
+ .hosts("192.168.0.16:3000")
+ .hostConnectionLimit(concurrency)
+ .hostConnectionCoresize(concurrency)
+ .reportTo(statsReceiver)
.build()
- class Citrusleaf(client: Service[ClMessage, ClMessage], ns: String) {
- def getString(key: String) = client(Get(ns, key = key)) map {
- _ match {
- case m: Response => m.ops.headOption.map(_.value)
- case m@_ => throw new IllegalArgumentException("Unable to handle mssage: " + m)
+ val start = Time.now
+
+ val requests = Future.parallel(concurrency) {
+ Future.times(totalRequests / concurrency) {
+ val request = Get(namespace = "test", key = key.incrementAndGet().toString)
+ client(request) onSuccess { response =>
+ response.asInstanceOf[Response].ops.headOption.map(_.value.readableBytes()).foreach(c => bytesReceived.addAndGet(c))
+ responses.incrementAndGet()
+ } handle { case e =>
+ errors.incrementAndGet()
+ } ensure {
+ completedRequests.incrementAndGet()
}
}
}
- val started = new AtomicInteger()
- val completed = new AtomicInteger()
- val startedTime = System.currentTimeMillis()
- val bytesRead = new AtomicInteger()
-
-
- def success(m: ClMessage) : Unit = {
- m match {
- case r : Response => r.ops.headOption.map(_.value.readableBytes()).foreach(c => bytesRead.addAndGet(c))
- case _ =>
- }
- val c = completed.incrementAndGet()
- if (c % 1000 == 0) {
- println(completed + " of %d started, %.2f requests / s, %(,d bytes read".format(started.get(), c.toDouble * 1000 / (System.currentTimeMillis() - startedTime), bytesRead.get()))
- }
- }
-
-
+ Future.join(requests) ensure {
+ client.release()
- def runBench() {
- def data = ChannelBuffers.wrappedBuffer(Array.fill[Byte](1500)(47))
+ val duration = start.untilNow
+ println("%20s\t%s".format("Status", "Count"))
+ println("%20s\t%d".format(0, responses.get()))
+ println("================")
+ println("%d requests completed in %dms (%f requests per second)".format(
+ completedRequests.get, duration.inMilliseconds,
+ totalRequests.toFloat / duration.inMillis.toFloat * 1000))
+ println("%d errors".format(errors.get))
- var i = 0
-
- while (i < 100000) {
- i += 1
- val key = started.incrementAndGet()
-// client(Set(namespace = "test", key = key.toString, value = data)) map (success _)
- client(Get(namespace = "test", key = key.toString)).onSuccess(success _ )
- }
- while (started.get() > completed.get()) {
- println("Started %d, completed %d".format(started.get(), completed.get()))
- Thread.sleep(1000)
- }
- println("Done submitting")
-
-
- Thread.sleep(200000)
+ println("%2f MBytes received".format(bytesReceived.get() / 1000000.0))
+ println("stats")
+ println("=====")
+ statsReceiver.print()
}
-
- def spawn() {
- new Thread() {
- override def run() {
- runBench()
- }
- }.start()
- }
- spawn()
}
+
+// val started = new AtomicInteger()
+// val completed = new AtomicInteger()
+// val startedTime = System.currentTimeMillis()
+// val bytesRead = new AtomicInteger()
+//
+//
+// def success(m: ClMessage) : Unit = {
+// m match {
+// case r : Response => r.ops.headOption.map(_.value.readableBytes()).foreach(c => bytesRead.addAndGet(c))
+// case _ =>
+// }
+// val c = completed.incrementAndGet()
+// if (c % 1000 == 0) {
+// println(completed + " of %d started, %.2f requests / s, %(,d bytes read".format(started.get(), c.toDouble * 1000 / (System.currentTimeMillis() - startedTime), bytesRead.get()))
+// }
+// }
+//
+// def runBench() {
+// def data = ChannelBuffers.wrappedBuffer(Array.fill[Byte](1500)(47))
+//
+// var i = 0
+//
+// while (i < 100000) {
+// i += 1
+// val key = started.incrementAndGet()
+//// client(Set(namespace = "test", key = key.toString, value = data)) map (success _)
+// client(Get(namespace = "test", key = key.toString))
+// .onSuccess(success _ )
+// .onFailure(_ => completed.decrementAndGet())
+// }
+// while (started.get() > completed.get()) {
+// println("Started %d, completed %d".format(started.get(), completed.get()))
+// Thread.sleep(1000)
+// }
+// println("Done submitting")
+//
+//
+// Thread.sleep(200000)
+//
+// }
+//
+// def spawn() {
+// new Thread() {
+// override def run() {
+// runBench()
+// }
+// }.start()
+// }
+// spawn()
+//
+
}
Please sign in to comment.
Something went wrong with that request. Please try again.