Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

Commit

Permalink
Updated revisions of libs: scodec 1.10, scalaz-stream 0.8.1, scalaz 7…
Browse files Browse the repository at this point in the history
….1.8, commons-pool-2.4.2, netty 4.1.0, flush client request in pipeline to avoid ~1 sec lags, minimum of 4 *default* endpoint and worker threads to avoid 100 to 200ms lags with single CPU machines awaiting for free connection
  • Loading branch information
sbuzzard committed May 30, 2016
1 parent d9eb5c9 commit a3cb9cd
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 40 deletions.
16 changes: 8 additions & 8 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ scalacOptions ++= Seq(
)

libraryDependencies ++= Seq(
"org.scodec" %% "scodec-core" % "1.8.2",
"org.scodec" %% "scodec-scalaz" % "1.1.0",
"org.scalaz" %% "scalaz-core" % "7.1.3",
"org.scalaz.stream" %% "scalaz-stream" % "0.7.3a",
"org.apache.commons" % "commons-pool2" % "2.2",
"io.netty" % "netty-handler" % "4.0.32.Final",
"io.netty" % "netty-codec" % "4.0.32.Final"
"org.scodec" %% "scodec-core" % "1.10.0",
"org.scodec" %% "scodec-scalaz" % "1.3.0",
"org.scalaz" %% "scalaz-core" % "7.1.8",
"org.scalaz.stream" %% "scalaz-stream" % "0.8.1",
"org.apache.commons" % "commons-pool2" % "2.4.2",
"io.netty" % "netty-handler" % "4.1.0.Final",
"io.netty" % "netty-codec" % "4.1.0.Final"
)

common.macrosSettings

common.settings
common.settings
2 changes: 1 addition & 1 deletion core/src/main/scala/Endpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object Endpoint {
def failoverChain(timeout: Duration, es: Process[Task, Endpoint]): Endpoint =
Endpoint(transpose(es.map(_.connections)).flatMap { cs =>
cs.reduce((c1, c2) => bs => c1(bs) match {
case w@Await(a, k) =>
case w@Await(a, k, _) =>
await(time(a.attempt))((p: (Duration, Throwable \/ Any)) => p match {
case (d, -\/(e)) =>
if (timeout - d > 0.milliseconds) c2(bs)
Expand Down
6 changes: 0 additions & 6 deletions core/src/main/scala/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,4 @@ package object utils {
}
}
implicit def errToE(err: Err) = new DecodingFailure(err)
implicit class AugmentedAttempt[A](a: Attempt[A]) {
def toTask(implicit conv: Err => Throwable): Task[A] = a match {
case Failure(err) => Task.fail(conv(err))
case Successful(a) => Task.now(a)
}
}
}
6 changes: 2 additions & 4 deletions core/src/main/scala/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package object remotely {
import scalaz.\/.{left,right}
import scalaz.Monoid
import scodec.bits.{BitVector,ByteVector}
import scodec.interop.scalaz._
// import scodec.Decoder
import utils._

Expand Down Expand Up @@ -91,13 +92,10 @@ package object remotely {
}
}}

implicit val BitVectorMonoid = Monoid.instance[BitVector]((a,b) => a ++ b, BitVector.empty)
implicit val ByteVectorMonoid = Monoid.instance[ByteVector]((a,b) => a ++ b, ByteVector.empty)

private[remotely] def fullyRead(s: Process[Task,BitVector]): Task[BitVector] = s.runFoldMap(x => x)

private[remotely] def fixedNamedThreadPool(name: String): ExecutorService =
Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors, namedThreadFactory(name))
Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors.max(4), namedThreadFactory(name))

private[remotely] def namedThreadFactory(name: String): ThreadFactory = new ThreadFactory {
val num = new AtomicInteger(1)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/transport/netty/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object NettyTransport {
})
}

def write(c: Channel)(frame: Framed): Task[Unit] = evalCF(c.write(frame))
def write(c: Channel)(frame: Framed): Task[Unit] = evalCF(c.writeAndFlush(frame))

def single(host: InetSocketAddress,
expectedSigs: Set[Signature] = Set.empty,
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/transport/netty/ClientConnectionPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class NettyConnectionPool(hosts: Process[Task,InetSocketAddress],
M: Monitoring,
sslContext: Option[SslContext]) extends BasePooledObjectFactory[Channel] {

val numWorkerThreads = workerThreads getOrElse Runtime.getRuntime.availableProcessors
val numWorkerThreads = workerThreads getOrElse Runtime.getRuntime.availableProcessors.max(4)

val workerThreadPool = new NioEventLoopGroup(numWorkerThreads, namedThreadFactory("nettyWorker"))

val validateCapabilities: ((Capabilities,Channel)) => Task[Channel] = {
Expand Down Expand Up @@ -96,20 +97,19 @@ class NettyConnectionPool(hosts: Process[Task,InetSocketAddress],
)

} flatMap(_ => Task.fail(error))

}
}

/**
* This is an upstream handler that sits in the client's pipeline
* during connection negotiation.
*
* during connection negotiation.
*
* It is put into the pipeline initially, we then make a call to
* the server to ask for the descriptions of all the functions that
* the server provides. when we receive the response, this handler
* checks that all of the expectedSigs passed to the constructor
* are present in the server response.
*
*
* The state of this negotiation is captured by the `valid` Task,
* which is asynchronously updated when the response is recieved
*/
Expand All @@ -122,7 +122,7 @@ class NettyConnectionPool(hosts: Process[Task,InetSocketAddress],


// the callback which will fulfil the valid task
private[this] var cb: Throwable\/Channel => Unit = _
@volatile private[this] var cb: Throwable\/Channel => Unit = Function.const(())

val valid: Task[Channel] = Task.async { cb =>
this.cb = cb
Expand All @@ -131,7 +131,7 @@ class NettyConnectionPool(hosts: Process[Task,InetSocketAddress],
// here we accumulate bits as they arrive, we keep accumulating
// until the handler below us signals the end of stream by
// emitting a EOS
private[this] var bits = BitVector.empty
@volatile private[this] var bits = BitVector.empty

// negotiation failed. fulfil the callback negatively, and remove
// ourselves from the pipeline
Expand Down Expand Up @@ -182,7 +182,7 @@ class NettyConnectionPool(hosts: Process[Task,InetSocketAddress],
}
}


private[this] val pipe = channel.pipeline()
pipe.addLast("negotiateDescription", this)

Expand Down Expand Up @@ -218,7 +218,7 @@ class NettyConnectionPool(hosts: Process[Task,InetSocketAddress],
class ClientNegotiateCapabilities extends DelimiterBasedFrameDecoder(1000,Delimiters.lineDelimiter():_*) {

// callback which fulfills the capabilities Task
private[this] var cb: Throwable\/(Capabilities,Channel) => Unit = _
@volatile private[this] var cb: Throwable\/(Capabilities,Channel) => Unit = Function.const(())


val capabilities: Task[(Capabilities,Channel)] = Task.async { cb =>
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/transport/netty/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//: ----------------------------------------------------------------------------

package remotely
package transport.netty
package transport.netty

import java.util.concurrent.Executors
import io.netty.channel._, socket.SocketChannel, nio.NioEventLoopGroup
Expand Down Expand Up @@ -65,9 +65,9 @@ private[remotely] class NettyServer(handler: Handler,
/**
* once a connection is negotiated, we send our capabilities string
* to the client, which might look something like:
*
*
* OK: [Remotely 1.0]
*/
*/
@ChannelHandler.Sharable
object ChannelInitialize extends ChannelInboundHandlerAdapter {
override def channelRegistered(ctx: ChannelHandlerContext): Unit = {
Expand Down Expand Up @@ -105,7 +105,7 @@ private[remotely] class NettyServer(handler: Handler,
object NettyServer {
/**
* start a netty server listening to the given address
*
*
* @param addr the address to bind to
* @param handler the request handler
* @param strategy the strategy used for processing incoming requests
Expand All @@ -128,7 +128,7 @@ object NettyServer {
SslParameters.toServerContext(sslParameters) map { ssl =>
logger.negotiating(Some(addr), s"got ssl parameters: $ssl", None)
val numBossThreads = bossThreads getOrElse 2
val numWorkerThreads = workerThreads getOrElse Runtime.getRuntime.availableProcessors
val numWorkerThreads = workerThreads getOrElse Runtime.getRuntime.availableProcessors.max(4)

val server = new NettyServer(handler, strategy, numBossThreads, numWorkerThreads, capabilities, logger, ssl.map(_ -> sslParameters.fold(true)(p => p.requireClientAuth)))
val b = server.bootstrap
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/transport/netty/Transport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class Deframe extends ByteToMessageDecoder {
} else {
remaining = Some(rem)
}
}
}
case Some(rem) =>
// we are waiting for at least rem more bytes, as that is what
// is outstanding in the current frame
Expand Down Expand Up @@ -177,9 +177,9 @@ object Enframe extends ChannelOutboundHandlerAdapter {
obj match {
case Bits(bv) =>
val byv = bv.toByteVector
val _ = ctx.writeAndFlush(Unpooled.wrappedBuffer((codecs.int32.encode(byv.size).require ++ bv).toByteBuffer), cp)
val _ = ctx.write(Unpooled.wrappedBuffer((codecs.int32.encode(byv.size.toInt).require ++ bv).toByteBuffer), cp)
case EOS =>
val _ = ctx.writeAndFlush(Unpooled.wrappedBuffer(codecs.int32.encode(0).require.toByteBuffer), cp)
val _ = ctx.write(Unpooled.wrappedBuffer(codecs.int32.encode(0).require.toByteBuffer), cp)
case x => throw new IllegalArgumentException("was expecting Framed, got: " + x)
}
}
Expand Down
2 changes: 1 addition & 1 deletion project.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ organization in Global := "oncue.remotely"

scalaVersion in Global := "2.10.6"

crossScalaVersions in Global := Seq("2.10.6", "2.11.7")
crossScalaVersions in Global := Seq("2.10.6", "2.11.8")

resolvers += Resolver.sonatypeRepo("releases")

Expand Down
4 changes: 2 additions & 2 deletions project/common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ object common {
)

def macrosSettings = Seq(
addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0-M5" cross CrossVersion.full),
addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full),
libraryDependencies ++= Seq(
"org.scala-lang" % "scala-reflect" % scalaVersion.value % "provided"
) ++ (
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, scalaMajor)) if scalaMajor == 10 => Seq("org.scalamacros" %% "quasiquotes" % "2.1.0-M5")
case Some((2, scalaMajor)) if scalaMajor == 10 => Seq("org.scalamacros" %% "quasiquotes" % "2.1.0")
case _ => Nil
}
),
Expand Down

0 comments on commit a3cb9cd

Please sign in to comment.