Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaned up chaining of ChannelGroupFutures #991

Merged
merged 1 commit into from Jan 3, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -11,9 +11,9 @@ import com.typesafe.config.Config
import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException } import java.net.{ UnknownHostException, SocketAddress, InetAddress, InetSocketAddress, ConnectException }
import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException } import java.util.concurrent.{ ConcurrentHashMap, Executor, Executors, CancellationException }
import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap } import org.jboss.netty.bootstrap.{ ConnectionlessBootstrap, Bootstrap, ClientBootstrap, ServerBootstrap }
import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer} import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer }
import org.jboss.netty.channel._ import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{ ChannelGroupFuture, ChannelGroupFutureListener } import org.jboss.netty.channel.group.{ ChannelGroup, ChannelGroupFuture, ChannelGroupFutureListener }
import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory } import org.jboss.netty.channel.socket.nio.{ NioDatagramChannelFactory, NioServerSocketChannelFactory, NioClientSocketChannelFactory }
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS } import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS }
Expand Down Expand Up @@ -41,6 +41,20 @@ object NettyFutureBridge {
}) })
p.future p.future
} }

def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = {
import scala.collection.JavaConverters._
val p = Promise[ChannelGroup]
nettyFuture.addListener(new ChannelGroupFutureListener {
def operationComplete(future: ChannelGroupFuture): Unit = p complete Try(
if (future.isCompleteSuccess) future.getGroup
else throw future.iterator.asScala.collectFirst {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need feedback on this line. I only report the first failure in the future group, is this enough?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think so

case f if f.isCancelled ⇒ new CancellationException
case f if !f.isSuccess ⇒ f.getCause
} getOrElse new IllegalStateException("Error reported in ChannelGroupFuture, but no error found in individual futures."))
})
p.future
}
} }


class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) with OnlyCauseStackTrace { class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) with OnlyCauseStackTrace {
Expand Down Expand Up @@ -348,21 +362,13 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
} }


override def shutdown(): Unit = { override def shutdown(): Unit = {
// Force flush by trying to write an empty buffer and wait for success def always(c: ChannelGroupFuture) = NettyFutureBridge(c) recover { case _ ⇒ c.getGroup }
channelGroup.write(ChannelBuffers.buffer(0)).addListener(new ChannelGroupFutureListener { for {
def operationComplete(future: ChannelGroupFuture) { // Force flush by trying to write an empty buffer and wait for success
channelGroup.unbind() _ ← always(channelGroup.write(ChannelBuffers.buffer(0)))
channelGroup.disconnect().addListener(new ChannelGroupFutureListener { _ ← always({ channelGroup.unbind(); channelGroup.disconnect() })
def operationComplete(future: ChannelGroupFuture) { _ ← always(channelGroup.close())
channelGroup.close().addListener(new ChannelGroupFutureListener { } inboundBootstrap.releaseExternalResources()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

def operationComplete(future: ChannelGroupFuture) {
inboundBootstrap.releaseExternalResources()
}
})
}
})
}
})


} }


Expand Down
Expand Up @@ -35,7 +35,7 @@ private[remote] trait UdpHandlers extends CommonHandlers {
} else { } else {
val listener = transport.udpConnectionTable.get(inetSocketAddress) val listener = transport.udpConnectionTable.get(inetSocketAddress)
val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array() val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
if (bytes.length > 0)listener notify InboundPayload(ByteString(bytes)) if (bytes.length > 0) listener notify InboundPayload(ByteString(bytes))
} }
case _ ⇒ case _ ⇒
} }
Expand Down