Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
daggerrz committed Aug 24, 2012
1 parent bafe81a commit e7f6e38
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 14 deletions.
6 changes: 3 additions & 3 deletions project/Build.scala
Expand Up @@ -19,9 +19,9 @@ object Scitrusleaf extends Build {
val dependencies = Seq(
"ch.qos.logback" % "logback-classic" % "0.9.24" % "runtime",
"org.slf4j" % "slf4j-api" % "1.6.1",
"com.twitter" % "finagle-core" % "5.1.0",
"com.twitter" % "finagle-stream" % "5.1.0",
"com.twitter" % "util-core" % "5.2.1-SNAPSHOT",
"com.twitter" % "finagle-core" % "5.3.7",
"com.twitter" % "finagle-stream" % "5.3.7",
"com.twitter" % "util-core" % "5.3.6",
"io.netty" % "netty" % "3.5.2.Final",
"com.typesafe.akka" % "akka-actor" % "2.0.2",
"org.specs2" %% "specs2" % "1.11" % "test"
Expand Down
20 changes: 9 additions & 11 deletions src/main/scala/com/tapad/scitrusleaf/akka/AkkaClient.scala
@@ -1,6 +1,6 @@
package com.tapad.scitrusleaf.akka

import java.util.concurrent.{ConcurrentSkipListMap, Executors}
import java.util.concurrent.Executors
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel._
Expand All @@ -10,7 +10,6 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
import java.net.InetSocketAddress
import annotation.tailrec
import java.nio.channels.ClosedChannelException
import org.apache.commons.collections.list.SynchronizedList


object AkkaClient {
Expand All @@ -27,12 +26,11 @@ object AkkaClient {

(0 until nodeCount).foreach { _ =>
factory().connect().onComplete { _ fold (
_.printStackTrace(),
{ error => println(error.getMessage) },
n => nodes = n +: nodes
)}
}


private[this] def next(): Int = {
val c = nodeIndex.getAndIncrement()
if (c >= nodes.size - 1) {
Expand All @@ -49,9 +47,9 @@ object AkkaClient {
def connect() = null
}

class ClClient(private val host: String,
private val port: Int,
private val bs: ClientBootstrap,
class ClClient(host: String,
port: Int,
bs: ClientBootstrap,
private implicit val executionContext: ExecutionContext) extends Client[ClMessage, ClMessage] {


Expand All @@ -62,11 +60,11 @@ object AkkaClient {
private[this] var connector: Channel = _

@tailrec private def sendFailures(ex: Throwable) {
Option(requestQueue.poll()) match {
case Some(promise) =>
requestQueue.poll() match {
case null =>
case promise =>
promise.failure(ex)
sendFailures(ex)
case _ =>
}
}

Expand Down Expand Up @@ -123,7 +121,6 @@ object AkkaClient {
} else {
System.out.println("--- CLIENT - Failed to connect to server at " +
"%s:%d".format(host, port))
bs.releaseExternalResources();
res.failure(future.getCause)
}
}
Expand Down Expand Up @@ -155,6 +152,7 @@ object AkkaClient {
bs.setOption("tcpNoDelay", true)
bs.setOption("reuseAddress", true)
bs.setOption("receiveBufferSize", 1600)
bs.setOption("connectTimeoutMillis", 1)


def build(host: String, port: Int) = new ClClient(host, port, bs, executionContext)
Expand Down

0 comments on commit e7f6e38

Please sign in to comment.