Permalink
Browse files

Merge pull request #990 from drewhk/wip-hardened-flushing-drewhk

Hardened flushing by sending a final "null" message
  • Loading branch information...
2 parents 0dda2ad + fa49486 commit f2aa94727a5aa06f1a4cb176a54c68e738ee043e @drewhk drewhk committed Jan 3, 2013
@@ -11,7 +11,7 @@ import com.typesafe.config.Config
import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException }
import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException }
import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap }
-import org.jboss.netty.buffer.ChannelBuffer
+import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer}
import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{ ChannelGroupFuture, ChannelGroupFutureListener }
import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory }
@@ -348,13 +348,22 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
}
override def shutdown(): Unit = {
- channelGroup.unbind()
- channelGroup.disconnect().addListener(new ChannelGroupFutureListener {
+ // Force flush by trying to write an empty buffer and wait for success
+ channelGroup.write(ChannelBuffers.buffer(0)).addListener(new ChannelGroupFutureListener {
def operationComplete(future: ChannelGroupFuture) {
- channelGroup.close()
- inboundBootstrap.releaseExternalResources()
+ channelGroup.unbind()
+ channelGroup.disconnect().addListener(new ChannelGroupFutureListener {
+ def operationComplete(future: ChannelGroupFuture) {
+ channelGroup.close().addListener(new ChannelGroupFutureListener {
+ def operationComplete(future: ChannelGroupFuture) {
+ inboundBootstrap.releaseExternalResources()
+ }
+ })
+ }
+ })
}
})
+
}
}
@@ -32,7 +32,8 @@ private[remote] trait TcpHandlers extends CommonHandlers {
}
override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent) {
- notifyListener(e.getChannel, InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array())))
+ val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
+ if (bytes.length > 0) notifyListener(e.getChannel, InboundPayload(ByteString(bytes)))
}
override def onException(ctx: ChannelHandlerContext, e: ExceptionEvent) {
@@ -34,7 +34,8 @@ private[remote] trait UdpHandlers extends CommonHandlers {
initUdp(e.getChannel, e.getRemoteAddress, e.getMessage.asInstanceOf[ChannelBuffer])
} else {
val listener = transport.udpConnectionTable.get(inetSocketAddress)
- listener notify InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array()))
+ val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
+ if (bytes.length > 0)listener notify InboundPayload(ByteString(bytes))
}
case _
}

0 comments on commit f2aa947

Please sign in to comment.