Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #98 from ahjohannessen/exp-travis-bugger
Browse files Browse the repository at this point in the history
ok Travis, smartass
  • Loading branch information
timperrett committed May 31, 2016
2 parents dc793f2 + 3bf6ee7 commit 813733b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 14 deletions.
43 changes: 33 additions & 10 deletions core/src/main/scala/transport/netty/ClientConnectionPool.scala
Expand Up @@ -20,6 +20,7 @@ package transport.netty

import java.net.InetSocketAddress
import java.util.concurrent.{Executors, ThreadFactory}
import io.netty.util.concurrent.{Future, GenericFutureListener}
import org.apache.commons.pool2.BasePooledObjectFactory
import org.apache.commons.pool2.PooledObject
import org.apache.commons.pool2.impl.DefaultPooledObject
Expand Down Expand Up @@ -251,23 +252,45 @@ class NettyConnectionPool(hosts: Process[Task,InetSocketAddress],
addr <- addrMaybe.fold[Task[InetSocketAddress]](Task.fail(new Exception("out of connections")))(Task.now(_))
_ = M.negotiating(Some(addr), "address selected", None)
fut <- {

Task.delay {

// assign this to a val so we can throw it away later, wreckx-n-effect
val bootstrap = new Bootstrap()

val s = bootstrap.option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
val i = bootstrap.group(workerThreadPool)
val d = bootstrap.channel(classOf[NioSocketChannel])
val e = bootstrap.handler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
val pipe = ch.pipeline
// add an SSL layer first iff we were constructed with an SslContext, foreach like a unit boss
sslContext.foreach{s =>
pipe.addLast(s.newHandler(ch.alloc(), addr.getAddress.getHostAddress, addr.getPort))
}
val effect = pipe.addLast(negotiateCapable)
}
})

val init = new ChannelInitializer[SocketChannel] {

def initChannel(ch: SocketChannel): Unit = {

val pipe = ch.pipeline

// add an SSL layer first iff we were constructed with an SslContext, foreach like a unit boss

sslContext.foreach { s =>

val sh = s.newHandler(ch.alloc(), addr.getAddress.getHostAddress, addr.getPort)

sh.handshakeFuture().addListener(new GenericFutureListener[Future[Channel]] {
def operationComplete(future: Future[Channel]): Unit = {
// avoid negotiation when ssl fails
if(!future.isSuccess) { pipe.remove(negotiateCapable) }
}
})

pipe.addLast(sh)
}

val effect = pipe.addLast(negotiateCapable)
}
}


val e = bootstrap.handler(init)

bootstrap.connect(addr)
}}
chan <- {
Expand Down
15 changes: 11 additions & 4 deletions core/src/test/scala/ServerErrors.scala
Expand Up @@ -36,12 +36,16 @@ class ServerErrors extends FlatSpec with Matchers {

val call = Remote.local(true).runWithoutContext(endpoint)

val thrown = the [ServerException] thrownBy call.run
val expectedMsg = (s"[decoding] server does not have response serializer for: ${Remote.toTag[Boolean]}")

thrown.getMessage should startWith (s"[decoding] server does not have response serializer for: ${Remote.toTag[Boolean]}")
try(call.run) catch {
case se: ServerException se.getMessage should startWith(expectedMsg)
case huh: Exception huh.printStackTrace(); fail(huh)
}

shutdown.run
}

behavior of "incompatible reference on server"
it should "throw the appropriate error if there is some kind of reference mismatch" in {
val address = new java.net.InetSocketAddress("localhost", 9077)
Expand All @@ -56,9 +60,12 @@ class ServerErrors extends FlatSpec with Matchers {

val call = wrongRef(1,2).runWithoutContext(endpoint)

val thrown = the [ServerException] thrownBy call.run
val expectedMsg = ("[validation] server values: <Set(ping: Int => Int, describe: List[remotely.Signature])> does not have referenced values:\n ping: (Int, Int) => Int")

thrown.getMessage should startWith ("[validation] server values: <Set(ping: Int => Int, describe: List[remotely.Signature])> does not have referenced values:\n ping: (Int, Int) => Int")
try(call.run) catch {
case se: ServerException se.getMessage should startWith(expectedMsg)
case huh: Exception huh.printStackTrace(); fail(huh)
}

shutdown.run
}
Expand Down

0 comments on commit 813733b

Please sign in to comment.