Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #111 from sbuzzard/bump-rev-min-threads-in-pool
Browse files Browse the repository at this point in the history
Updated revisions of libs: scodec 1.10, scalaz-stream 0.8.1, scalaz 7…
  • Loading branch information
timperrett committed Jun 2, 2016
2 parents f2e158e + 885f805 commit dccd7b0
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 24 deletions.
10 changes: 5 additions & 5 deletions core/build.sbt
Expand Up @@ -17,15 +17,15 @@ scalacOptions ++= Seq(
)

libraryDependencies ++= Seq(
"org.scodec" %% "scodec-core" % "1.8.2",
"org.scodec" %% "scodec-scalaz" % "1.1.0",
"org.scalaz" %% "scalaz-core" % "7.1.3",
"org.scalaz.stream" %% "scalaz-stream" % "0.7.3a",
"org.scodec" %% "scodec-core" % "1.10.0",
"org.scodec" %% "scodec-scalaz" % "1.3.0",
"org.scalaz" %% "scalaz-core" % "7.1.8",
"org.scalaz.stream" %% "scalaz-stream" % "0.8.1",
"org.apache.commons" % "commons-pool2" % "2.4.2",
"io.netty" % "netty-handler" % "4.1.0.Final",
"io.netty" % "netty-codec" % "4.1.0.Final"
)

common.macrosSettings

common.settings
common.settings
2 changes: 1 addition & 1 deletion core/src/main/scala/Endpoint.scala
Expand Up @@ -62,7 +62,7 @@ object Endpoint {
def failoverChain(timeout: Duration, es: Process[Task, Endpoint]): Endpoint =
Endpoint(transpose(es.map(_.connections)).flatMap { cs =>
cs.reduce((c1, c2) => bs => c1(bs) match {
case w@Await(a, k) =>
case w@Await(a, k, _) =>
await(time(a.attempt))((p: (Duration, Throwable \/ Any)) => p match {
case (d, -\/(e)) =>
if (timeout - d > 0.milliseconds) c2(bs)
Expand Down
6 changes: 0 additions & 6 deletions core/src/main/scala/Utils.scala
Expand Up @@ -31,10 +31,4 @@ package object utils {
}
}
implicit def errToE(err: Err) = new DecodingFailure(err)
implicit class AugmentedAttempt[A](a: Attempt[A]) {
def toTask(implicit conv: Err => Throwable): Task[A] = a match {
case Failure(err) => Task.fail(conv(err))
case Successful(a) => Task.now(a)
}
}
}
4 changes: 1 addition & 3 deletions core/src/main/scala/package.scala
Expand Up @@ -26,6 +26,7 @@ package object remotely {
import scalaz.\/.{left,right}
import scalaz.Monoid
import scodec.bits.{BitVector,ByteVector}
import scodec.interop.scalaz._
// import scodec.Decoder
import utils._

Expand Down Expand Up @@ -91,9 +92,6 @@ package object remotely {
}
}}

implicit val BitVectorMonoid = Monoid.instance[BitVector]((a,b) => a ++ b, BitVector.empty)
implicit val ByteVectorMonoid = Monoid.instance[ByteVector]((a,b) => a ++ b, ByteVector.empty)

private[remotely] def fullyRead(s: Process[Task,BitVector]): Task[BitVector] = s.runFoldMap(x => x)

private[remotely] def fixedNamedThreadPool(name: String): ExecutorService =
Expand Down
Expand Up @@ -62,7 +62,8 @@ class NettyConnectionPool(hosts: Process[Task,InetSocketAddress],
M: Monitoring,
sslContext: Option[SslContext]) extends BasePooledObjectFactory[Channel] {

val numWorkerThreads = workerThreads getOrElse Runtime.getRuntime.availableProcessors
val numWorkerThreads = workerThreads getOrElse Runtime.getRuntime.availableProcessors.max(4)

val workerThreadPool = new NioEventLoopGroup(numWorkerThreads, namedThreadFactory("nettyWorker"))

val validateCapabilities: ((Capabilities,Channel)) => Task[Channel] = {
Expand Down Expand Up @@ -97,7 +98,6 @@ class NettyConnectionPool(hosts: Process[Task,InetSocketAddress],
)

} flatMap(_ => Task.fail(error))

}
}

Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/transport/netty/Server.scala
Expand Up @@ -16,7 +16,7 @@
//: ----------------------------------------------------------------------------

package remotely
package transport.netty
package transport.netty

import java.util.concurrent.Executors
import io.netty.channel._, socket.SocketChannel, nio.NioEventLoopGroup
Expand Down Expand Up @@ -65,9 +65,9 @@ private[remotely] class NettyServer(handler: Handler,
/**
* once a connection is negotiated, we send our capabilities string
* to the client, which might look something like:
*
*
* OK: [Remotely 1.0]
*/
*/
@ChannelHandler.Sharable
object ChannelInitialize extends ChannelInboundHandlerAdapter {
override def channelRegistered(ctx: ChannelHandlerContext): Unit = {
Expand Down Expand Up @@ -105,7 +105,7 @@ private[remotely] class NettyServer(handler: Handler,
object NettyServer {
/**
* start a netty server listening to the given address
*
*
* @param addr the address to bind to
* @param handler the request handler
* @param strategy the strategy used for processing incoming requests
Expand All @@ -128,7 +128,7 @@ object NettyServer {
SslParameters.toServerContext(sslParameters) map { ssl =>
logger.negotiating(Some(addr), s"got ssl parameters: $ssl", None)
val numBossThreads = bossThreads getOrElse 2
val numWorkerThreads = workerThreads getOrElse Runtime.getRuntime.availableProcessors
val numWorkerThreads = workerThreads getOrElse Runtime.getRuntime.availableProcessors.max(4)

val server = new NettyServer(handler, strategy, numBossThreads, numWorkerThreads, capabilities, logger, ssl.map(_ -> sslParameters.fold(true)(p => p.requireClientAuth)))
val b = server.bootstrap
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/transport/netty/Transport.scala
Expand Up @@ -177,9 +177,9 @@ object Enframe extends ChannelOutboundHandlerAdapter {
obj match {
case Bits(bv) =>
val byv = bv.toByteVector
val _ = ctx.write(Unpooled.wrappedBuffer((codecs.int32.encode(byv.size).require ++ bv).toByteBuffer), cp)
val _ = ctx.writeAndFlush(Unpooled.wrappedBuffer((codecs.int32.encode(byv.size.toInt).require ++ bv).toByteBuffer), cp)
case EOS =>
val _ = ctx.write(Unpooled.wrappedBuffer(codecs.int32.encode(0).require.toByteBuffer), cp)
val _ = ctx.writeAndFlush(Unpooled.wrappedBuffer(codecs.int32.encode(0).require.toByteBuffer), cp)
case x => throw new IllegalArgumentException("was expecting Framed, got: " + x)
}
}
Expand Down

0 comments on commit dccd7b0

Please sign in to comment.