Permalink
Browse files

WIP

  • Loading branch information...
1 parent feb04f7 commit bafe81a4610066525a6297ea4ec43fcf57263c89 @daggerrz committed Aug 24, 2012
@@ -0,0 +1,163 @@
+package com.tapad.scitrusleaf.akka
+
+import java.util.concurrent.{ConcurrentSkipListMap, Executors}
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.bootstrap.ClientBootstrap
+import org.jboss.netty.channel._
+import com.tapad.scitrusleaf.protocol.{ClMessage, ClMessageDecoder, ClFrameDecoder, ClMessageEncoder}
+import akka.dispatch.{Future, Promise, ExecutionContext}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
+import java.net.InetSocketAddress
+import annotation.tailrec
+import java.nio.channels.ClosedChannelException
+import org.apache.commons.collections.list.SynchronizedList
+
+
+object AkkaClient {
+
+ trait Client[REQ, RESP] {
+ def apply(req: REQ): Future[RESP]
+ def connect() : Future[Client[REQ, RESP]]
+ }
+
+ class LoadBalancer[REQ, RESP](nodeCount: Int, factory: () => Client[REQ, RESP]) extends Client[REQ, RESP] {
+
+ private[this] val nodeIndex = new AtomicInteger(0)
+ private[this] var nodes = Vector.empty[Client[REQ, RESP]]
+
+ (0 until nodeCount).foreach { _ =>
+ factory().connect().onComplete { _ fold (
+ _.printStackTrace(),
+ n => nodes = n +: nodes
+ )}
+ }
+
+
+ private[this] def next(): Int = {
+ val c = nodeIndex.getAndIncrement()
+ if (c >= nodes.size - 1) {
+ if (nodeIndex.compareAndSet(nodeCount, 0)) {
+ 0
+ } else {
+ next()
+ }
+ } else c
+ }
+
+ def apply(req: REQ) = nodes(next()).apply(req)
+
+ def connect() = null
+ }
+
+ class ClClient(private val host: String,
+ private val port: Int,
+ private val bs: ClientBootstrap,
+ private implicit val executionContext: ExecutionContext) extends Client[ClMessage, ClMessage] {
+
+
+ private[this] val writeQueue = new java.util.concurrent.ArrayBlockingQueue[ClMessage](100)
+ private[this] val requestQueue = new java.util.concurrent.ArrayBlockingQueue[Promise[ClMessage]](100)
+ private[this] val requestPending = new AtomicBoolean()
+
+ private[this] var connector: Channel = _
+
+ @tailrec private def sendFailures(ex: Throwable) {
+ Option(requestQueue.poll()) match {
+ case Some(promise) =>
+ promise.failure(ex)
+ sendFailures(ex)
+ case _ =>
+ }
+ }
+
+ val handler = new SimpleChannelUpstreamHandler {
+ override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
+ val promise = requestQueue.poll()
+ promise.success(e.getMessage.asInstanceOf[ClMessage])
+ requestPending.set(false)
+ sendNext()
+ }
+
+ override def channelClosed(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
+ super.channelClosed(ctx, e)
+ sendFailures(new ClosedChannelException())
+ }
+
+ override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+ ctx.getChannel.close()
+ sendFailures(e.getCause())
+ }
+ }
+
+ private def sendNext() {
+ if (!writeQueue.isEmpty && requestPending.compareAndSet(false, true)) {
+ try {
+ connector.write(writeQueue.poll())
+ } catch {
+ case e : Exception =>
+ requestPending.set(false)
+ sendFailures(e)
+ }
+ }
+ }
+
+ def apply(req: ClMessage): Future[ClMessage] = {
+ val f = Promise[ClMessage]()
+ requestQueue.put(f)
+ writeQueue.put(req)
+ sendNext()
+ f
+ }
+
+
+ def connect() = {
+ val res = Promise[ClClient]
+ val cf = bs.connect(new InetSocketAddress(host, port))
+ cf.addListener(new ChannelFutureListener {
+ def operationComplete(future: ChannelFuture) {
+ if (future.isSuccess) {
+ println(this + " connected")
+ future.getChannel.getPipeline.addLast("client", handler)
+ connector = future.getChannel
+ res.success(ClClient.this)
+ } else {
+ System.out.println("--- CLIENT - Failed to connect to server at " +
+ "%s:%d".format(host, port))
+ bs.releaseExternalResources();
+ res.failure(future.getCause)
+ }
+ }
+ })
+ res
+ }
+ }
+
+ object ClientBuilder {
+ val resultPool = Executors.newFixedThreadPool(1)
+ val executionContext = ExecutionContext.fromExecutor(resultPool)
+
+ val bossPool = Executors.newFixedThreadPool(1)
+ val workerPool = Executors.newFixedThreadPool(4)
+
+ val factory = new NioClientSocketChannelFactory(bossPool, workerPool)
+ val bs = new ClientBootstrap(factory)
+
+ bs.setPipelineFactory(new ChannelPipelineFactory {
+ def getPipeline = {
+ Channels.pipeline(
+ new ClMessageEncoder,
+ new ClFrameDecoder,
+ new ClMessageDecoder
+
+ )
+ }
+ })
+ bs.setOption("tcpNoDelay", true)
+ bs.setOption("reuseAddress", true)
+ bs.setOption("receiveBufferSize", 1600)
+
+
+ def build(host: String, port: Int) = new ClClient(host, port, bs, executionContext)
+ }
+
+}
@@ -1,153 +1,30 @@
package com.tapad.scitrusleaf.protocol
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.bootstrap.ClientBootstrap
-import org.jboss.netty.channel._
-import java.net.InetSocketAddress
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.atomic.AtomicInteger
import org.jboss.netty.buffer.ChannelBuffers
-import java.util.concurrent.Executors
-import akka.dispatch._
-import akka.actor._
-import akka.actor._
-import akka.routing.RoundRobinRouter
-import akka.util.Duration
-import akka.util.duration._
-import java.util.concurrent.locks.Lock
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock
-
-trait Client[REQ, RESP] {
- def apply(req: REQ) : Future[RESP]
-}
-
-class LoadBalancer[REQ, RESP](targets: Array[Client[REQ, RESP]]) extends Client[REQ, RESP] {
- private[this] val nodeIndex = new AtomicInteger(0)
- private[this] val nodeCount = targets.length
-
- private[this] def next() : Int = {
- val c = nodeIndex.getAndIncrement()
- if (c >= nodeCount -1) {
- if (nodeIndex.compareAndSet(nodeCount, 0)) {
- 0
- } else {
- next()
- }
- } else c
- }
+import com.tapad.scitrusleaf.akka.AkkaClient
- def apply(req: REQ) = targets(next()).apply(req)
-}
object Debug {
val rvc = new AtomicInteger()
def receive() { println("R:" + rvc.incrementAndGet())}
val tx = new AtomicInteger()
def write() { println("W:" + tx.incrementAndGet())}
-
-}
-
-class ClClient(private val host: String,
- private val port: Int,
- private val bs: ClientBootstrap,
- private implicit val executionContext : ExecutionContext) extends Client[ClMessage, ClMessage] {
-
-
- private[this] val writeQueue = new java.util.concurrent.ArrayBlockingQueue[ClMessage](100)
- private[this] val requestQueue = new java.util.concurrent.ArrayBlockingQueue[Promise[ClMessage]](100)
- private[this] val requestPending = new AtomicBoolean()
-
- private[this] var connector: Channel = _
-
- val handler = new SimpleChannelUpstreamHandler {
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- val promise = requestQueue.poll()
- promise.success(e.getMessage.asInstanceOf[ClMessage])
- requestPending.set(false)
- sendNext()
- }
-
- override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
- e.getCause.printStackTrace()
- }
- }
-
- private def sendNext() {
- if (!writeQueue.isEmpty && requestPending.compareAndSet(false, true)) {
- try {
- connector.write(writeQueue.poll())
- } catch {
- case e @ _ =>
- requestPending.set(false)
- requestQueue.poll().failure(e)
- }
- }
- }
-
- def apply(req: ClMessage) : Future[ClMessage] = {
- val f = Promise[ClMessage]()
- requestQueue.put(f)
- writeQueue.put(req)
- sendNext()
- f
- }
-
-
- def start() {
- val future = bs.connect(new InetSocketAddress(host, port))
- if (!future.awaitUninterruptibly().isSuccess()) {
- System.out.println("--- CLIENT - Failed to connect to server at " +
- "%s:%d".format(host, port))
- bs.releaseExternalResources();
- false
- } else {
- println(this + " connected")
- future.getChannel.getPipeline.addLast("client", handler)
- connector = future.getChannel
- connector.isConnected
- }
- }
}
-object ClientBuilder {
-
- val resultPool = Executors.newFixedThreadPool(2)
- val bossPool = Executors.newFixedThreadPool(1)
- val workerPool = Executors.newFixedThreadPool(4)
- val factory = new NioClientSocketChannelFactory(bossPool, workerPool)
- val bs = new ClientBootstrap(factory)
- bs.setPipelineFactory(new ChannelPipelineFactory {
- def getPipeline = {
- Channels.pipeline(
- new ClMessageEncoder,
- new ClFrameDecoder,
- new ClMessageDecoder
-
- )
- }
- })
- bs.setOption("tcpNoDelay", true)
- bs.setOption("reuseAddress", true)
- bs.setOption("receiveBufferSize", 1600)
- val executionContext = ExecutionContext.fromExecutor(resultPool)
-
- def build(host: String, port: Int) = new ClClient(host, port, bs, executionContext)
-}
object AkkaBench {
def main(args: Array[String]) {
+ import AkkaClient._
- val clientCount = 32
-
- val client = new LoadBalancer(Array.fill[Client[ClMessage, ClMessage]](clientCount) {
- val c = ClientBuilder.build("192.168.0.12", 3000)
- c.start()
- c
- })
+ val clientCount = 150
+ val client = new LoadBalancer(clientCount, () => ClientBuilder.build("192.168.0.10", 3000))
+ Thread.sleep(1000)
val started = new AtomicInteger()
val completed = new AtomicInteger()
@@ -178,12 +55,16 @@ object AkkaBench {
val key = started.incrementAndGet()
// client(Set(namespace = "test", key = key.toString, value = data)) map (success _)
client(Get(namespace = "test", key = key.toString)).onComplete(_ fold(
- e => e.printStackTrace(),
+ { e =>
+ println(e.getMessage)
+ e.printStackTrace()
+ },
r => success(r)
))
// while ((started.get() - completed.get()) > 10000) {
// Thread.sleep(100)
// }
+// Thread.sleep(1000)
}
while (started.get() > completed.get()) {
println("Started %d, completed %d".format(started.get(), completed.get()))
Oops, something went wrong.

0 comments on commit bafe81a

Please sign in to comment.