Skip to content
Browse files

Brought back the blocking version of the Scenario class. Benchmark ca…

…n now choose which type of client to use. Just using the non blocking version for the connection scale scenarios.
  • Loading branch information...
1 parent a9c46a1 commit 3e89d14ec1a05cfaa472d79ff6bd3baf8c5f8328 @chirino committed Nov 21, 2010
View
2 readme.md
@@ -40,7 +40,7 @@ running `sbt console`
Then at the console you execute:
- scala> val scenario = new com.github.stomp.benchmark.Scenario
+ scala> val scenario = new com.github.stomp.benchmark.BlockingScenario
scenario: com.github.stomp.benchmark.Scenario =
--------------------------------------
Scenario Settings
View
10 src/main/scala/com/github/stomp/benchmark/Benchmark.scala
@@ -160,15 +160,15 @@ class Benchmark extends Action {
null
}
- protected def create_scenario = new Scenario
+ protected def create_scenario = new BlockingScenario
- private def benchmark(name:String, drain:Boolean=true, sc:Int=sample_count, is_done: (List[Scenario])=>Boolean = null)(init_func: (Scenario)=>Unit ):Unit = {
- multi_benchmark(List(name), drain, sc, is_done) { scenarios =>
+ private def benchmark(name:String, drain:Boolean=true, sc:Int=sample_count, is_done: (List[Scenario])=>Boolean = null, blocking:Boolean=true)(init_func: (Scenario)=>Unit ):Unit = {
+ multi_benchmark(List(name), drain, sc, is_done, blocking) { scenarios =>
init_func(scenarios.head)
}
}
- private def multi_benchmark(names:List[String], drain:Boolean=true, sc:Int=sample_count, is_done: (List[Scenario])=>Boolean = null)(init_func: (List[Scenario])=>Unit ):Unit = {
+ private def multi_benchmark(names:List[String], drain:Boolean=true, sc:Int=sample_count, is_done: (List[Scenario])=>Boolean = null, blocking:Boolean=true)(init_func: (List[Scenario])=>Unit ):Unit = {
val scenarios:List[Scenario] = names.map { name=>
val scenario = create_scenario
scenario.name = name
@@ -286,7 +286,7 @@ class Benchmark extends Action {
return errors >= scenario_connection_scale_rate || remaining <= 0
}
- benchmark("20b_Xa%s_1queue_1".format(if(disconnecting) "d" else ""), true, 0, is_done) { scenario=>
+ benchmark("20b_Xa%s_1queue_1".format(if(disconnecting) "d" else ""), true, 0, is_done, false) { scenario=>
scenario.message_size = 20
scenario.producers = 0
scenario.producers_disconnect = disconnecting
View
571 src/main/scala/com/github/stomp/benchmark/Scenario.scala
@@ -27,30 +27,28 @@ import java.nio.channels.{SelectionKey, SocketChannel}
import java.nio.ByteBuffer
import java.util.concurrent.{CountDownLatch, TimeUnit}
-//object Scenario {
+object Scenario {
+
+ val MESSAGE_ID:Array[Byte] = "message-id"
+ val NEWLINE = '\n'.toByte
+ val NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS)
+
+ implicit def toBytes(value: String):Array[Byte] = value.getBytes("UTF-8")
+
+ def o[T](value:T):Option[T] = value match {
+ case null => None
+ case x => Some(x)
+ }
+
// def main(args:Array[String]):Unit = {
// val s = new Scenario()
// s.message_size = 20
// s.run
// }
-//}
-//
-
-/**
- * Simulates load on the a stomp broker.
- */
-class Scenario {
-
- implicit def toBytes(value: String):Array[Byte] = value.getBytes("UTF-8")
-
- object Constants {
- val MESSAGE_ID:Array[Byte] = "message-id"
- val NEWLINE = '\n'.toByte
- val NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS)
- }
-
- import Constants._
+}
+trait Scenario {
+ import Scenario._
var login:String = _
var passcode:String = _
@@ -88,43 +86,178 @@ class Scenario {
var topic_prefix = "/topic/"
var name = "custom"
- private def destination(i:Int) = destination_type match {
+ def run() = {
+ print(toString)
+ println("--------------------------------------")
+ println(" Running: Press ENTER to stop")
+ println("--------------------------------------")
+ println("")
+
+ with_load {
+
+ // start a sampling client...
+ val sample_thread = new Thread() {
+ override def run() = {
+
+ def print_rate(name: String, periodCount:Long, totalCount:Long, nanos: Long) = {
+
+ val rate_per_second: java.lang.Float = ((1.0f * periodCount / nanos) * NANOS_PER_SECOND)
+ println("%s total: %,d, rate: %,.3f per second".format(name, totalCount, rate_per_second))
+ }
+
+ try {
+ var start = System.nanoTime
+ var total_producer_count = 0L
+ var total_consumer_count = 0L
+ var total_error_count = 0L
+ collection_start
+ while( !done.get ) {
+ Thread.sleep(sample_interval)
+ val end = System.nanoTime
+ collection_sample
+ val samples = collection_end
+ samples.get("p_custom").foreach { case List(count:Long) =>
+ total_producer_count += count
+ print_rate("Producer", count, total_producer_count, end - start)
+ }
+ samples.get("c_custom").foreach { case List(count:Long) =>
+ total_consumer_count += count
+ print_rate("Consumer", count, total_producer_count, end - start)
+ }
+ samples.get("e_custom").foreach { case List(count:Long) =>
+ if( count!= 0 ) {
+ total_error_count += count
+ print_rate("Error", count, total_error_count, end - start)
+ }
+ }
+ start = end
+ }
+ } catch {
+ case e:InterruptedException =>
+ }
+ }
+ }
+ sample_thread.start()
+
+ System.in.read()
+ done.set(true)
+
+ sample_thread.interrupt
+ sample_thread.join
+ }
+
+ }
+
+ override def toString() = {
+ "--------------------------------------\n"+
+ "Scenario Settings\n"+
+ "--------------------------------------\n"+
+ " host = "+host+"\n"+
+ " port = "+port+"\n"+
+ " destination_type = "+destination_type+"\n"+
+ " queue_prefix = "+queue_prefix+"\n"+
+ " topic_prefix = "+topic_prefix+"\n"+
+ " destination_count = "+destination_count+"\n" +
+ " destination_name = "+destination_name+"\n" +
+ " sample_interval (ms) = "+sample_interval+"\n" +
+ " \n"+
+ " --- Producer Properties ---\n"+
+ " producers = "+producers+"\n"+
+ " message_size = "+message_size+"\n"+
+ " persistent = "+persistent+"\n"+
+ " sync_send = "+sync_send+"\n"+
+ " content_length = "+content_length+"\n"+
+ " producer_sleep (ms) = "+producer_sleep+"\n"+
+ " headers = "+headers+"\n"+
+ " \n"+
+ " --- Consumer Properties ---\n"+
+ " consumers = "+consumers+"\n"+
+ " consumer_sleep (ms) = "+consumer_sleep+"\n"+
+ " ack = "+ack+"\n"+
+ " selector = "+selector+"\n"+
+ " durable = "+durable+"\n"+
+ ""
+
+ }
+
+ protected def destination(i:Int) = destination_type match {
case "queue" => queue_prefix+destination_name+"-"+(i%destination_count)
case "topic" => topic_prefix+destination_name+"-"+(i%destination_count)
case _ => throw new Exception("Unsuported destination type: "+destination_type)
}
- private def headers_for(i:Int) = {
+ protected def headers_for(i:Int) = {
if ( headers.isEmpty ) {
Array[String]()
} else {
headers(i%headers.size)
}
}
- var producer_clients = List[ProducerClient]()
- var consumer_clients = List[ConsumerClient]()
+ var producer_samples:Option[ListBuffer[Long]] = None
+ var consumer_samples:Option[ListBuffer[Long]] = None
+ var error_samples = ListBuffer[Long]()
+
+ def collection_start: Unit = {
+ producer_counter.set(0)
+ consumer_counter.set(0)
+ error_counter.set(0)
+
+ producer_samples = if (producers > 0 || producers_per_sample>0 ) {
+ Some(ListBuffer[Long]())
+ } else {
+ None
+ }
+ consumer_samples = if (consumers > 0 || consumers_per_sample>0 ) {
+ Some(ListBuffer[Long]())
+ } else {
+ None
+ }
+ }
+
+ def collection_end: Map[String, scala.List[Long]] = {
+ var rc = Map[String, List[Long]]()
+ producer_samples.foreach{ samples =>
+ rc += "p_"+name -> samples.toList
+ samples.clear
+ }
+ consumer_samples.foreach{ samples =>
+ rc += "c_"+name -> samples.toList
+ samples.clear
+ }
+ rc += "e_"+name -> error_samples.toList
+ error_samples.clear
+ rc
+ }
+
+ trait Client {
+ def start():Unit
+ def shutdown():Unit
+ }
+
+ var producer_clients = List[Client]()
+ var consumer_clients = List[Client]()
def with_load[T](func: =>T ):T = {
done.set(false)
- var producer_clients = List[ProducerClient]()
+
for (i <- 0 until producers) {
- val client = new ProducerClient(i)
+ val client = createProducer(i)
producer_clients ::= client
- client.start
+ client.start()
}
for (i <- 0 until consumers) {
- val client = new ConsumerClient(i)
+ val client = createConsumer(i)
consumer_clients ::= client
- client.start
+ client.start()
}
try {
func
} finally {
done.set(true)
- // wait for the clients to finish..
+ // wait for the threads to finish..
for( client <- consumer_clients ) {
client.shutdown
}
@@ -141,18 +274,17 @@ class Scenario {
if( destination_type=="queue" || durable==true ) {
print("draining")
consumer_counter.set(0)
- var consumer_clients = List[ConsumerClient]()
+ var consumer_clients = List[Client]()
for (i <- 0 until destination_count) {
- val client = new ConsumerClient(i)
+ val client = createConsumer(i)
consumer_clients ::= client
- client.start
+ client.start()
}
// Keep sleeping until we stop draining messages.
var drained = 0L
try {
Thread.sleep(1000);
-
def done() = {
val c = consumer_counter.getAndSet(0)
drained += c
@@ -172,26 +304,6 @@ class Scenario {
}
}
- var producer_samples:Option[ListBuffer[Long]] = None
- var consumer_samples:Option[ListBuffer[Long]] = None
- var error_samples = ListBuffer[Long]()
-
- def collection_start: Unit = {
- producer_counter.set(0)
- consumer_counter.set(0)
- error_counter.set(0)
-
- producer_samples = if (producers > 0 || producers_per_sample>0 ) {
- Some(ListBuffer[Long]())
- } else {
- None
- }
- consumer_samples = if (consumers > 0 || consumers_per_sample>0 ) {
- Some(ListBuffer[Long]())
- } else {
- None
- }
- }
def collection_sample: Unit = {
@@ -201,41 +313,261 @@ class Scenario {
// we might need to increment number the producers..
for (i <- 0 until producers_per_sample) {
- val client = new ProducerClient(producer_clients.length)
+ val client = createProducer(producer_clients.length)
producer_clients ::= client
- client.start
+ client.start()
}
// we might need to increment number the consumers..
for (i <- 0 until consumers_per_sample) {
- val client = new ConsumerClient(consumer_clients.length)
+ val client = createConsumer(consumer_clients.length)
consumer_clients ::= client
- client.start
+ client.start()
}
}
+
+ def createProducer(i:Int):Client
+ def createConsumer(i:Int):Client
- def collection_end: Map[String, scala.List[Long]] = {
- var rc = Map[String, List[Long]]()
- producer_samples.foreach{ samples =>
- rc += "p_"+name -> samples.toList
- samples.clear
+}
+
+
+/**
+ * Simulates load on the a stomp broker using standard blocking IO
+ */
+class BlockingScenario extends Scenario {
+
+ import Scenario._
+
+ var client_stack_size = 1024*500;
+
+ def createProducer(i:Int) = {
+ new ProducerClient(i)
+ }
+
+ def createConsumer(i:Int) = {
+ new ConsumerClient(i)
+ }
+
+ class BlockingClient extends Thread(Thread.currentThread.getThreadGroup, null, "client", client_stack_size) with Client {
+
+ var socket:Socket = new Socket
+ var out:OutputStream = null
+ var in:InputStream = null
+ val buffer_size = 64*1204
+
+ def open(host: String, port: Int) = {
+ socket = new Socket
+ socket.connect(new InetSocketAddress(host, port))
+ socket.setSoLinger(true, 0)
+ out = new BufferedOutputStream(socket.getOutputStream, buffer_size)
+ in = new BufferedInputStream(socket.getInputStream, buffer_size)
}
- consumer_samples.foreach{ samples =>
- rc += "c_"+name -> samples.toList
- samples.clear
+
+ def close() = {
+ socket.close
}
- rc += "e_"+name -> error_samples.toList
- error_samples.clear
- rc
+
+ def write(data:Array[Byte]*) = {
+ data.foreach(out.write(_))
+ out.write(0)
+ out.write('\n')
+ out.flush
+ }
+
+ def skip():Unit = {
+ var c = in.read
+ while( c >= 0 ) {
+ if( c==0 ) {
+ return
+ }
+ c = in.read()
+ }
+ throw new EOFException()
+ }
+
+ def receive():Array[Byte] = {
+ var start = true;
+ val buffer = new ByteArrayOutputStream()
+ var c = in.read
+ while( c >= 0 ) {
+ if( c==0 ) {
+ return buffer.toByteArray
+ }
+ if( !start || c!= NEWLINE) {
+ start = false
+ buffer.write(c)
+ }
+ c = in.read()
+ }
+ throw new EOFException()
+ }
+
+ def receive(expect:Array[Byte]):Array[Byte] = {
+ val rc = receive()
+ if( !rc.startsWith(expect) ) {
+ throw new Exception("Expected "+expect)
+ }
+ rc
+ }
+
+ def connect(proc: =>Unit ) = {
+ try {
+ open(host, port)
+ write("CONNECT\n%s%s\n".format(
+ o(login).map("login:%s\n".format(_)).getOrElse(""),
+ o(passcode).map("passcode:%s\n".format(_)).getOrElse("")
+ ))
+ receive ("CONNECTED")
+ proc
+ } catch {
+ case e: Throwable =>
+ if(!done.get) {
+ println("failure occured: "+e)
+ error_counter.incrementAndGet
+ try {
+ Thread.sleep(1000)
+ } catch {
+ case _ => // ignore
+ }
+ }
+ } finally {
+ try {
+ close()
+ } catch {
+ case ignore: Throwable =>
+ }
+ }
+ }
+
+ def shutdown = {
+ interrupt
+ close
+ join
+ }
+
}
- private def o[T](value:T):Option[T] = value match {
- case null => None
- case x => Some(x)
+ class ProducerClient(val id: Int) extends BlockingClient {
+ val name: String = "producer " + id
+ val content = ("SEND\n" +
+ "destination:"+destination(id)+"\n"+
+ { if(persistent) "persistent:true\n" else "" } +
+ { if(sync_send) "receipt:xxx\n" else "" } +
+ { headers_for(id).foldLeft("") { case (sum, v)=> sum+v+"\n" } } +
+ { if(content_length) "content-length:"+message_size+"\n" else "" } +
+ "\n"+message(name)).getBytes("UTF-8")
+
+
+ override def run() {
+ while (!done.get) {
+ connect {
+ var i =0
+ while (!done.get) {
+ write(content)
+ if( sync_send ) {
+ // waits for the reply..
+ skip
+ }
+ producer_counter.incrementAndGet()
+ if(producer_sleep > 0) {
+ Thread.sleep(producer_sleep)
+ }
+ i += 1
+ }
+ }
+ }
+ }
+ }
+
+ def message(name:String) = {
+ val buffer = new StringBuffer(message_size)
+ buffer.append("Message from " + name+"\n")
+ for( i <- buffer.length to message_size ) {
+ buffer.append(('a'+(i%26)).toChar)
+ }
+ var rc = buffer.toString
+ if( rc.length > message_size ) {
+ rc.substring(0, message_size)
+ } else {
+ rc
+ }
}
- trait ClientTait {
+ class ConsumerClient(val id: Int) extends BlockingClient {
+ val name: String = "producer " + id
+
+ override def run() {
+ while (!done.get) {
+ connect {
+ write(
+ "SUBSCRIBE\n" +
+ (if(!durable) {""} else {"id:durable:mysub-"+id+"\n"}) +
+ (if(selector==null) {""} else {"selector: "+selector+"\n"}) +
+ "ack:"+ack+"\n"+
+ "destination:"+destination(id)+"\n"+
+ "\n")
+
+ receive_loop
+ }
+ }
+ }
+
+
+ def index_of(haystack:Array[Byte], needle:Array[Byte]):Int = {
+ var i = 0
+ while( haystack.length >= i+needle.length ) {
+ if( haystack.startsWith(needle, i) ) {
+ return i
+ }
+ i += 1
+ }
+ return -1
+ }
+
+ def receive_loop() = {
+ val clientAck = ack == "client"
+ while (!done.get) {
+ if( clientAck ) {
+ val msg = receive()
+ val start = index_of(msg, MESSAGE_ID)
+ assert( start >= 0 )
+ val end = msg.indexOf("\n", start)
+ val msgId = msg.slice(start+MESSAGE_ID.length+1, end)
+ write("""
+ACK
+message-id:""", msgId,"""
+
+""")
+
+ } else {
+ skip
+ }
+ consumer_counter.incrementAndGet()
+ Thread.sleep(consumer_sleep)
+ }
+ }
+ }
+
+}
+
+
+/**
+ * Simulates load on the a stomp broker using non blocking io.
+ */
+class NonBlockingScenario extends Scenario {
+
+ import Scenario._
+
+ def createProducer(i:Int) = {
+ new ProducerClient(i)
+ }
+ def createConsumer(i:Int) = {
+ new ConsumerClient(i)
+ }
+
+ trait NonBlockingClient extends Client {
protected var queue = createQueue("client")
protected var channel:SocketChannel = _
@@ -523,7 +855,7 @@ class Scenario {
}
- class ProducerClient(val id: Int) extends ClientTait {
+ class ProducerClient(val id: Int) extends NonBlockingClient {
val name: String = "producer " + id
queue.setLabel(name)
val message_frame:Array[Byte] = "SEND\n" +
@@ -604,7 +936,7 @@ class Scenario {
}
}
- class ConsumerClient(val id: Int) extends ClientTait {
+ class ConsumerClient(val id: Int) extends NonBlockingClient {
val name: String = "consumer " + id
queue.setLabel(name)
val clientAck = ack == "client"
@@ -677,97 +1009,4 @@ class Scenario {
}
}
- def run() = {
- print(toString)
- println("--------------------------------------")
- println(" Running: Press ENTER to stop")
- println("--------------------------------------")
- println("")
-
- with_load {
-
- // start a sampling client...
- val sample_thread = new Thread() {
- override def run() = {
-
- def print_rate(name: String, periodCount:Long, totalCount:Long, nanos: Long) = {
- val rate_per_second: java.lang.Float = ((1.0f * periodCount / nanos) * NANOS_PER_SECOND)
- println("%s total: %,d, rate: %,.3f per second".format(name, totalCount, rate_per_second))
- }
-
- try {
- var start = System.nanoTime
- var total_producer_count = 0L
- var total_consumer_count = 0L
- var total_error_count = 0L
- collection_start
- while( !done.get ) {
- Thread.sleep(sample_interval)
- val end = System.nanoTime
- collection_sample
- val samples = collection_end
- samples.get("p_custom").foreach { case List(count:Long) =>
- total_producer_count += count
- print_rate("Producer", count, total_producer_count, end - start)
- }
- samples.get("c_custom").foreach { case List(count:Long) =>
- total_consumer_count += count
- print_rate("Consumer", count, total_producer_count, end - start)
- }
- samples.get("e_custom").foreach { case List(count:Long) =>
- if( count!= 0 ) {
- total_error_count += count
- print_rate("Error", count, total_error_count, end - start)
- }
- }
- start = end
- }
- } catch {
- case e:InterruptedException =>
- }
- }
- }
- sample_thread.start()
-
- System.in.read()
- done.set(true)
-
- sample_thread.interrupt
- sample_thread.join
- }
-
- }
-
- override def toString() = {
- "--------------------------------------\n"+
- "Scenario Settings\n"+
- "--------------------------------------\n"+
- " host = "+host+"\n"+
- " port = "+port+"\n"+
- " destination_type = "+destination_type+"\n"+
- " queue_prefix = "+queue_prefix+"\n"+
- " topic_prefix = "+topic_prefix+"\n"+
- " destination_count = "+destination_count+"\n" +
- " destination_name = "+destination_name+"\n" +
- " sample_interval (ms) = "+sample_interval+"\n" +
- " \n"+
- " --- Producer Properties ---\n"+
- " producers = "+producers+"\n"+
- " message_size = "+message_size+"\n"+
- " persistent = "+persistent+"\n"+
- " sync_send = "+sync_send+"\n"+
- " content_length = "+content_length+"\n"+
- " producer_sleep (ms) = "+producer_sleep+"\n"+
- " headers = "+headers+"\n"+
- " \n"+
- " --- Consumer Properties ---\n"+
- " consumers = "+consumers+"\n"+
- " consumer_sleep (ms) = "+consumer_sleep+"\n"+
- " ack = "+ack+"\n"+
- " selector = "+selector+"\n"+
- " durable = "+durable+"\n"+
- ""
-
- }
-
}

0 comments on commit 3e89d14

Please sign in to comment.
Something went wrong with that request. Please try again.