diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index 3618573465b..f8cb687c84a 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -117,8 +117,6 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep actors.put(actor.getClass.getName, actor) actor.lifeCycle = Some(lifeCycle) startLink(actor) - remoteAddress.foreach(address => println("----- ADDING actor for " + address.get.hostname + " - " + address.get.port)) - remoteAddress.foreach(address => println("----- " + RemoteServer.Address(address.hostname, address.port).hashCode)) remoteAddress.foreach(address => RemoteServer.actorsFor(RemoteServer.Address(address.hostname, address.port)).actors.put(actor.id, actor)) case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 670bf521918..22e39276294 100755 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -146,9 +146,6 @@ class RemoteServer extends Logging { hostname = _hostname port = _port log.info("Starting remote server at [%s:%s]", hostname, port) - println("======= ADDING actor for " + hostname + " - " + port) - println("======= " + RemoteServer.Address(hostname, port).hashCode) - println("======= " + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.size) val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects) bootstrap.setPipelineFactory(pipelineFactory) @@ -172,6 +169,8 @@ class RemoteServer extends Logging { } } +case class Codec(encoder : ChannelHandler,decoder : ChannelHandler) + /** * @author Jonas Bonér */ @@ -184,23 +183,20 @@ class RemoteServerPipelineFactory( import RemoteServer._ def getPipeline: ChannelPipeline = { - val pipeline = Channels.pipeline() - RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => pipeline.addLast("zlibDecoder", new ZlibDecoder) - //case "lzf" => pipeline.addLast("lzfDecoder", new LzfDecoder) - case _ => {} // no compression - } - pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)) - pipeline.addLast("protobufDecoder", new ProtobufDecoder(RemoteRequest.getDefaultInstance)) - RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => pipeline.addLast("zlibEncoder", new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)) - //case "lzf" => pipeline.addLast("lzfEncoder", new LzfEncoder) - case _ => {} // no compression + val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) + val lenPrep = new LengthFieldPrepender(4) + val protobufDec = new ProtobufDecoder(RemoteRequest.getDefaultInstance) + val protobufEnc = new ProtobufEncoder + val zipCodec = RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL),new ZlibDecoder)) + //case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder)) + case _ => None } - pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)) - pipeline.addLast("protobufEncoder", new ProtobufEncoder) - pipeline.addLast("handler", new RemoteServerHandler(name, openChannels, loader, actors, activeObjects)) - pipeline + val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, activeObjects) + + val stages: Array[ChannelHandler] = zipCodec.map(codec => Array(codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteServer)) + .getOrElse(Array(lenDec, protobufDec, lenPrep, protobufEnc, remoteServer)) + new StaticChannelPipeline(stages: _*) } }