Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Hardened flushing by sending a final "null" message #990

Merged
merged 1 commit into from

4 participants

drewhk Roland Kuhn Patrik Nordwall Viktor Klang (√)
drewhk
Owner

To ensure proper flushing of the Netty transport a "null" message is written to the channel first, and we proceed with the release of the resources after that operation is finished.

Roland Kuhn
Owner

LGTM

Patrik Nordwall patriknw commented on the diff
...in/scala/akka/remote/transport/netty/TcpSupport.scala
@@ -32,7 +32,8 @@ private[remote] trait TcpHandlers extends CommonHandlers {
}
override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent) {
- notifyListener(e.getChannel, InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array())))
+ val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
+ if (bytes.length > 0) notifyListener(e.getChannel, InboundPayload(ByteString(bytes)))
Patrik Nordwall Owner
patriknw added a note

Does this imply that application messages can't be 0 bytes? Is that an issue?

Viktor Klang (√) Owner

That's not an application message tho? It's the inbound payload (Akka protocol), right?

Patrik Nordwall Owner
patriknw added a note

ok, I see

drewhk Owner
drewhk added a note
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Patrik Nordwall
Owner

LGTM

Patrik Nordwall
Owner

I tried this patch in the stress test branch, locally, and it was success. That is a good start, and I would like to do a proper rebase and continue running the stress test on the build server. Please merge.

drewhk drewhk merged commit f2aa947 into from
Viktor Klang (√) viktorklang commented on the diff
...cala/akka/remote/transport/netty/NettyTransport.scala
((21 lines not shown))
}
})
+
Viktor Klang (√) Owner

I suggest adding the following to NettyFutureBridge:

def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = {
    val p = Promise[ChannelGroup]()
    nettyFuture.addListener(new ChannelFutureListener {
      def operationComplete(future: ChannelFuture): Unit = p complete Try(
        if (future.isSuccess) future.getGroup
        else if (future.isCancelled) throw new CancellationException
        else throw future.getCause)
    })
    p.future
  }

And then you can write shutdown as something like:

override def shutdown(): Unit = {
  import appropriate ExecutionContext here
  def always(c: ChannelGroupFuture) = NettyFutureBridge(c) recover { case _ => c.getGroup }
  for {
   _ <- always(channelGroup.write(ChannelBuffers.buffer(0)))
   _ <- always({ channelGroup.unbind(); channelGroup.disconnect() })
   _ <- always(channelGroup.close())
   } inboundBootstrap.releaseExternalResources()
}
drewhk Owner
drewhk added a note
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 3, 2013
  1. Hardened flushing by sending a final "null" message

    Endre Sándor Varga authored
This page is out of date. Refresh to see the latest.
19 akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala
View
@@ -11,7 +11,7 @@ import com.typesafe.config.Config
import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException }
import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException }
import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap }
-import org.jboss.netty.buffer.ChannelBuffer
+import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer}
import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{ ChannelGroupFuture, ChannelGroupFutureListener }
import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory }
@@ -348,13 +348,22 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
}
override def shutdown(): Unit = {
- channelGroup.unbind()
- channelGroup.disconnect().addListener(new ChannelGroupFutureListener {
+ // Force flush by trying to write an empty buffer and wait for success
+ channelGroup.write(ChannelBuffers.buffer(0)).addListener(new ChannelGroupFutureListener {
def operationComplete(future: ChannelGroupFuture) {
- channelGroup.close()
- inboundBootstrap.releaseExternalResources()
+ channelGroup.unbind()
+ channelGroup.disconnect().addListener(new ChannelGroupFutureListener {
+ def operationComplete(future: ChannelGroupFuture) {
+ channelGroup.close().addListener(new ChannelGroupFutureListener {
+ def operationComplete(future: ChannelGroupFuture) {
+ inboundBootstrap.releaseExternalResources()
+ }
+ })
+ }
+ })
}
})
+
Viktor Klang (√) Owner

I suggest adding the following to NettyFutureBridge:

def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = {
    val p = Promise[ChannelGroup]()
    nettyFuture.addListener(new ChannelFutureListener {
      def operationComplete(future: ChannelFuture): Unit = p complete Try(
        if (future.isSuccess) future.getGroup
        else if (future.isCancelled) throw new CancellationException
        else throw future.getCause)
    })
    p.future
  }

And then you can write shutdown as something like:

override def shutdown(): Unit = {
  import appropriate ExecutionContext here
  def always(c: ChannelGroupFuture) = NettyFutureBridge(c) recover { case _ => c.getGroup }
  for {
   _ <- always(channelGroup.write(ChannelBuffers.buffer(0)))
   _ <- always({ channelGroup.unbind(); channelGroup.disconnect() })
   _ <- always(channelGroup.close())
   } inboundBootstrap.releaseExternalResources()
}
drewhk Owner
drewhk added a note
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
}
}
3  akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala
View
@@ -32,7 +32,8 @@ private[remote] trait TcpHandlers extends CommonHandlers {
}
override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent) {
- notifyListener(e.getChannel, InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array())))
+ val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
+ if (bytes.length > 0) notifyListener(e.getChannel, InboundPayload(ByteString(bytes)))
Patrik Nordwall Owner
patriknw added a note

Does this imply that application messages can't be 0 bytes? Is that an issue?

Viktor Klang (√) Owner

That's not an application message tho? It's the inbound payload (Akka protocol), right?

Patrik Nordwall Owner
patriknw added a note

ok, I see

drewhk Owner
drewhk added a note
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
}
override def onException(ctx: ChannelHandlerContext, e: ExceptionEvent) {
3  akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala
View
@@ -34,7 +34,8 @@ private[remote] trait UdpHandlers extends CommonHandlers {
initUdp(e.getChannel, e.getRemoteAddress, e.getMessage.asInstanceOf[ChannelBuffer])
} else {
val listener = transport.udpConnectionTable.get(inetSocketAddress)
- listener notify InboundPayload(ByteString(e.getMessage.asInstanceOf[ChannelBuffer].array()))
+ val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
+ if (bytes.length > 0)listener notify InboundPayload(ByteString(bytes))
}
case _
}
Something went wrong with that request. Please try again.