Permalink
Browse files

Merge pull request #991 from drewhk/wip-updated-nettyfuturebridge-drewhk

Cleaned up chaining of ChannelGroupFutures
  • Loading branch information...
2 parents cad30df + b2a050b commit 3ceb02f84682ea386dbfb96aeca601e4f27b1fd3 @drewhk drewhk committed Jan 3, 2013
View
40 akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala
@@ -11,9 +11,9 @@ import com.typesafe.config.Config
import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException }
import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException }
import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap }
-import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer}
+import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer }
import org.jboss.netty.channel._
-import org.jboss.netty.channel.group.{ ChannelGroupFuture, ChannelGroupFutureListener }
+import org.jboss.netty.channel.group.{ ChannelGroup, ChannelGroupFuture, ChannelGroupFutureListener }
import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory }
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS }
@@ -41,6 +41,20 @@ object NettyFutureBridge {
})
p.future
}
+
+ def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = {
+ import scala.collection.JavaConverters._
+ val p = Promise[ChannelGroup]
+ nettyFuture.addListener(new ChannelGroupFutureListener {
+ def operationComplete(future: ChannelGroupFuture): Unit = p complete Try(
+ if (future.isCompleteSuccess) future.getGroup
+ else throw future.iterator.asScala.collectFirst {
+ case f if f.isCancelled new CancellationException
+ case f if !f.isSuccess f.getCause
+ } getOrElse new IllegalStateException("Error reported in ChannelGroupFuture, but no error found in individual futures."))
+ })
+ p.future
+ }
}
class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) with OnlyCauseStackTrace {
@@ -348,21 +362,13 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
}
override def shutdown(): Unit = {
- // Force flush by trying to write an empty buffer and wait for success
- channelGroup.write(ChannelBuffers.buffer(0)).addListener(new ChannelGroupFutureListener {
- def operationComplete(future: ChannelGroupFuture) {
- channelGroup.unbind()
- channelGroup.disconnect().addListener(new ChannelGroupFutureListener {
- def operationComplete(future: ChannelGroupFuture) {
- channelGroup.close().addListener(new ChannelGroupFutureListener {
- def operationComplete(future: ChannelGroupFuture) {
- inboundBootstrap.releaseExternalResources()
- }
- })
- }
- })
- }
- })
+ def always(c: ChannelGroupFuture) = NettyFutureBridge(c) recover { case _ c.getGroup }
+ for {
+ // Force flush by trying to write an empty buffer and wait for success
+ _ always(channelGroup.write(ChannelBuffers.buffer(0)))
+ _ always({ channelGroup.unbind(); channelGroup.disconnect() })
+ _ always(channelGroup.close())
+ } inboundBootstrap.releaseExternalResources()
}
View
2 akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala
@@ -35,7 +35,7 @@ private[remote] trait UdpHandlers extends CommonHandlers {
} else {
val listener = transport.udpConnectionTable.get(inetSocketAddress)
val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
- if (bytes.length > 0)listener notify InboundPayload(ByteString(bytes))
+ if (bytes.length > 0) listener notify InboundPayload(ByteString(bytes))
}
case _
}

0 comments on commit 3ceb02f

Please sign in to comment.