Skip to content

Commit

Permalink
Switching to using a ChannelLocal to keep tabs on the inbound clients…
Browse files Browse the repository at this point in the history
… address
  • Loading branch information
viktorklang committed Feb 20, 2012
1 parent d6ca3c9 commit 127b2a3
Showing 1 changed file with 23 additions and 25 deletions.
48 changes: 23 additions & 25 deletions akka-remote/src/main/scala/akka/remote/netty/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel.ChannelHandler.Sharable
import org.jboss.netty.channel.group.ChannelGroup
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.channel.{ StaticChannelPipeline, SimpleChannelUpstreamHandler, MessageEvent, ExceptionEvent, ChannelStateEvent, ChannelPipelineFactory, ChannelPipeline, ChannelHandlerContext, ChannelHandler, Channel }
import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder }
import org.jboss.netty.handler.execution.ExecutionHandler
import akka.event.Logging
Expand All @@ -19,8 +18,7 @@ import akka.remote.{ RemoteServerShutdown, RemoteServerError, RemoteServerClient
import akka.actor.Address
import java.net.InetAddress
import akka.actor.ActorSystemImpl
import org.jboss.netty.channel.ChannelLocal
import org.jboss.netty.channel.ChannelEvent
import org.jboss.netty.channel._

class NettyRemoteServer(val netty: NettyRemoteTransport) {

Expand Down Expand Up @@ -135,6 +133,10 @@ class RemoteServerHandler(
val openChannels: ChannelGroup,
val netty: NettyRemoteTransport) extends SimpleChannelUpstreamHandler {

val channelAddress = new ChannelLocal[Option[Address]](false) {
override def initialValue(channel: Channel) = None
}

import netty.settings

private var addressToSet = true
Expand All @@ -154,23 +156,20 @@ class RemoteServerHandler(
*/
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel)

override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx.getChannel)
netty.notifyListeners(RemoteServerClientConnected(netty, clientAddress))
}
// TODO might want to log or otherwise signal that a TCP connection has been established here.
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = ()

override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx.getChannel)
netty.notifyListeners(RemoteServerClientDisconnected(netty, clientAddress))
netty.notifyListeners(RemoteServerClientDisconnected(netty, channelAddress.get(ctx.getChannel)))
}

override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = getClientAddress(ctx.getChannel) match {
case s @ Some(address)
if (settings.UsePassiveConnections)
netty.unbindClient(address)
netty.notifyListeners(RemoteServerClientClosed(netty, s))
case None
netty.notifyListeners(RemoteServerClientClosed(netty, None))
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val address = channelAddress.get(ctx.getChannel)
if (address.isDefined && settings.UsePassiveConnections)
netty.unbindClient(address.get)

netty.notifyListeners(RemoteServerClientClosed(netty, address))
channelAddress.remove(ctx.getChannel)
}

override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try {
Expand All @@ -181,11 +180,16 @@ class RemoteServerHandler(
case remote: AkkaRemoteProtocol if remote.hasInstruction
val instruction = remote.getInstruction
instruction.getCommandType match {
case CommandType.CONNECT if settings.UsePassiveConnections
case CommandType.CONNECT
val origin = instruction.getOrigin
val inbound = Address("akka", origin.getSystem, origin.getHostname, origin.getPort)
val client = new PassiveRemoteClient(event.getChannel, netty, inbound)
netty.bindClient(inbound, client)
channelAddress.set(event.getChannel, Option(inbound))

//If we want to reuse the inbound connections as outbound we need to get busy
if (settings.UsePassiveConnections)
netty.bindClient(inbound, new PassiveRemoteClient(event.getChannel, netty, inbound))

netty.notifyListeners(RemoteServerClientConnected(netty, Option(inbound)))
case CommandType.SHUTDOWN //Will be unbound in channelClosed
case CommandType.HEARTBEAT //Other guy is still alive
case _ //Unknown command
Expand All @@ -200,11 +204,5 @@ class RemoteServerHandler(
netty.notifyListeners(RemoteServerError(event.getCause, netty))
event.getChannel.close()
}

private def getClientAddress(c: Channel): Option[Address] =
c.getRemoteAddress match {
case inet: InetSocketAddress Some(Address("akka", "unknown(yet)", inet.getAddress.toString, inet.getPort))
case _ None
}
}

0 comments on commit 127b2a3

Please sign in to comment.