Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Cleaned up chaining of ChannelGroupFutures #991

Merged
merged 1 commit into from

5 participants

@drewhk
Owner

Added support to NettyFutureBridge for ChannelGroupFutures following the recommendation of Viktor.

@viktorklang
Owner

LGTM! :-)

@drewhk drewhk commented on the diff
...cala/akka/remote/transport/netty/NettyTransport.scala
@@ -41,6 +41,20 @@ object NettyFutureBridge {
})
p.future
}
+
+ def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = {
+ import scala.collection.JavaConverters._
+ val p = Promise[ChannelGroup]
+ nettyFuture.addListener(new ChannelGroupFutureListener {
+ def operationComplete(future: ChannelGroupFuture): Unit = p complete Try(
+ if (future.isCompleteSuccess) future.getGroup
+ else throw future.iterator.asScala.collectFirst {
@drewhk Owner
drewhk added a note

Need feedback on this line. I only report the first failure in the future group, is this enough?

@viktorklang Owner

Yeah, I think so

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff
...cala/akka/remote/transport/netty/NettyTransport.scala
((10 lines not shown))
- channelGroup.close().addListener(new ChannelGroupFutureListener {
- def operationComplete(future: ChannelGroupFuture) {
- inboundBootstrap.releaseExternalResources()
- }
- })
- }
- })
- }
- })
+ def always(c: ChannelGroupFuture) = NettyFutureBridge(c) recover { case _ c.getGroup }
+ for {
+ // Force flush by trying to write an empty buffer and wait for success
+ _ always(channelGroup.write(ChannelBuffers.buffer(0)))
+ _ always({ channelGroup.unbind(); channelGroup.disconnect() })
+ _ always(channelGroup.close())
+ } inboundBootstrap.releaseExternalResources()
@patriknw Owner
patriknw added a note

Nice!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw
Owner

LGTM

@rkuhn
Owner

LGTM

@viktorklang
Owner

You can merge this

@drewhk
Owner

No build kitteh?

@viktorklang
Owner

hmmm, the build kitteh hasn't run in 5h?? Sounds like a problem to me

@viktorklang
Owner

PLS REBUILD ALL

@akka-ci
Owner

Started jenkins job akka-pr-validator at https://jenkins.akka.io/job/akka-pr-validator/268/

@akka-ci
Owner

jenkins job akka-pr-validator: Success - https://jenkins.akka.io/job/akka-pr-validator/268/

@drewhk drewhk merged commit 3ceb02f into akka:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 3, 2013
  1. Cleaned up chaining of ChannelGroupFutures

    Endre Sándor Varga authored
This page is out of date. Refresh to see the latest.
View
40 akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala
@@ -11,9 +11,9 @@ import com.typesafe.config.Config
import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException }
import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException }
import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap }
-import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer}
+import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer }
import org.jboss.netty.channel._
-import org.jboss.netty.channel.group.{ ChannelGroupFuture, ChannelGroupFutureListener }
+import org.jboss.netty.channel.group.{ ChannelGroup, ChannelGroupFuture, ChannelGroupFutureListener }
import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory }
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS }
@@ -41,6 +41,20 @@ object NettyFutureBridge {
})
p.future
}
+
+ def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = {
+ import scala.collection.JavaConverters._
+ val p = Promise[ChannelGroup]
+ nettyFuture.addListener(new ChannelGroupFutureListener {
+ def operationComplete(future: ChannelGroupFuture): Unit = p complete Try(
+ if (future.isCompleteSuccess) future.getGroup
+ else throw future.iterator.asScala.collectFirst {
@drewhk Owner
drewhk added a note

Need feedback on this line. I only report the first failure in the future group, is this enough?

@viktorklang Owner

Yeah, I think so

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ case f if f.isCancelled new CancellationException
+ case f if !f.isSuccess f.getCause
+ } getOrElse new IllegalStateException("Error reported in ChannelGroupFuture, but no error found in individual futures."))
+ })
+ p.future
+ }
}
class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) with OnlyCauseStackTrace {
@@ -348,21 +362,13 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
}
override def shutdown(): Unit = {
- // Force flush by trying to write an empty buffer and wait for success
- channelGroup.write(ChannelBuffers.buffer(0)).addListener(new ChannelGroupFutureListener {
- def operationComplete(future: ChannelGroupFuture) {
- channelGroup.unbind()
- channelGroup.disconnect().addListener(new ChannelGroupFutureListener {
- def operationComplete(future: ChannelGroupFuture) {
- channelGroup.close().addListener(new ChannelGroupFutureListener {
- def operationComplete(future: ChannelGroupFuture) {
- inboundBootstrap.releaseExternalResources()
- }
- })
- }
- })
- }
- })
+ def always(c: ChannelGroupFuture) = NettyFutureBridge(c) recover { case _ c.getGroup }
+ for {
+ // Force flush by trying to write an empty buffer and wait for success
+ _ always(channelGroup.write(ChannelBuffers.buffer(0)))
+ _ always({ channelGroup.unbind(); channelGroup.disconnect() })
+ _ always(channelGroup.close())
+ } inboundBootstrap.releaseExternalResources()
@patriknw Owner
patriknw added a note

Nice!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
}
View
2  akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala
@@ -35,7 +35,7 @@ private[remote] trait UdpHandlers extends CommonHandlers {
} else {
val listener = transport.udpConnectionTable.get(inetSocketAddress)
val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
- if (bytes.length > 0)listener notify InboundPayload(ByteString(bytes))
+ if (bytes.length > 0) listener notify InboundPayload(ByteString(bytes))
}
case _
}
Something went wrong with that request. Please try again.