Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hardened flushing by sending a final "null" message #990

Merged
merged 1 commit into from
Jan 3, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.typesafe.config.Config
import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException } import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException }
import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException } import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException }
import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap } import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap }
import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer}
import org.jboss.netty.channel._ import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{ ChannelGroupFuture, ChannelGroupFutureListener } import org.jboss.netty.channel.group.{ ChannelGroupFuture, ChannelGroupFutureListener }
import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory } import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory }
Expand Down Expand Up @@ -348,13 +348,22 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
} }


override def shutdown(): Unit = { override def shutdown(): Unit = {
channelGroup.unbind() // Force flush by trying to write an empty buffer and wait for success
channelGroup.disconnect().addListener(new ChannelGroupFutureListener { channelGroup.write(ChannelBuffers.buffer(0)).addListener(new ChannelGroupFutureListener {
def operationComplete(future: ChannelGroupFuture) { def operationComplete(future: ChannelGroupFuture) {
channelGroup.close() channelGroup.unbind()
inboundBootstrap.releaseExternalResources() channelGroup.disconnect().addListener(new ChannelGroupFutureListener {
def operationComplete(future: ChannelGroupFuture) {
channelGroup.close().addListener(new ChannelGroupFutureListener {
def operationComplete(future: ChannelGroupFuture) {
inboundBootstrap.releaseExternalResources()
}
})
}
})
} }
}) })

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding the following to NettyFutureBridge:

def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = {
    val p = Promise[ChannelGroup]()
    nettyFuture.addListener(new ChannelFutureListener {
      def operationComplete(future: ChannelFuture): Unit = p complete Try(
        if (future.isSuccess) future.getGroup
        else if (future.isCancelled) throw new CancellationException
        else throw future.getCause)
    })
    p.future
  }

And then you can write shutdown as something like:

override def shutdown(): Unit = {
  import appropriate ExecutionContext here
  def always(c: ChannelGroupFuture) = NettyFutureBridge(c) recover { case _ => c.getGroup }
  for {
   _ <- always(channelGroup.write(ChannelBuffers.buffer(0)))
   _ <- always({ channelGroup.unbind(); channelGroup.disconnect() })
   _ <- always(channelGroup.close())
   } inboundBootstrap.releaseExternalResources()
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, Cool!

} }


} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ private[remote] trait TcpHandlers extends CommonHandlers {
} }


override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent) { override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent) {
notifyListener(e.getChannel, InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array()))) val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
if (bytes.length > 0) notifyListener(e.getChannel, InboundPayload(ByteString(bytes)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this imply that application messages can't be 0 bytes? Is that an issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not an application message tho? It's the inbound payload (Akka protocol), right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's even lower layer. This is the payload of UDP packets, or length
encoded TCP frames. No upper layer protocol message can be contained in a
0 byte frame, so this is safe.

} }


override def onException(ctx: ChannelHandlerContext, e: ExceptionEvent) { override def onException(ctx: ChannelHandlerContext, e: ExceptionEvent) {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ private[remote] trait UdpHandlers extends CommonHandlers {
initUdp(e.getChannel, e.getRemoteAddress, e.getMessage.asInstanceOf[ChannelBuffer]) initUdp(e.getChannel, e.getRemoteAddress, e.getMessage.asInstanceOf[ChannelBuffer])
} else { } else {
val listener = transport.udpConnectionTable.get(inetSocketAddress) val listener = transport.udpConnectionTable.get(inetSocketAddress)
listener notify InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array())) val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
if (bytes.length > 0)listener notify InboundPayload(ByteString(bytes))
} }
case _ ⇒ case _ ⇒
} }
Expand Down