diff --git a/pom.xml b/pom.xml index 306f04af8519f..26be47d0cf537 100644 --- a/pom.xml +++ b/pom.xml @@ -301,7 +301,7 @@ org.apache.maven.plugins maven-shade-plugin - 1.5 + 1.6 package @@ -378,6 +378,17 @@ build.properties + + io.netty:netty + + ** + META-INF/** + LICENSE + NOTICE + /*.txt + build.properties + + diff --git a/src/main/java/org/elasticsearch/common/netty/PipelineFactories.java b/src/main/java/org/elasticsearch/common/netty/PipelineFactories.java new file mode 100644 index 0000000000000..26c923a1ebdc3 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/netty/PipelineFactories.java @@ -0,0 +1,40 @@ +package org.elasticsearch.common.netty; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.transport.netty.NettyTransport; +import org.elasticsearch.transport.netty.MessageChannelHandler; +import org.elasticsearch.common.netty.OpenChannelsHandler; +import org.elasticsearch.common.inject.Inject; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.Channels; + +public class PipelineFactories { + + @Inject() + public PipelineFactories() { + } + + public ChannelPipelineFactory serverPipelineFactory(final NettyTransport transport, final OpenChannelsHandler serverOpenChannels, final ESLogger logger) throws ElasticSearchException { + return new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("openChannels", serverOpenChannels); + pipeline.addLast("dispatcher", new MessageChannelHandler(transport, logger)); + return pipeline; + } + }; + } + + public ChannelPipelineFactory clientPipelineFactory(final NettyTransport transport, final ESLogger logger) throws ElasticSearchException { + return new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("dispatcher", new MessageChannelHandler(transport, logger)); + return pipeline; + } + }; + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index cc00b37559315..a8d7856c65d89 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.netty.NettyStaticSetup; import org.elasticsearch.common.netty.OpenChannelsHandler; +import org.elasticsearch.common.netty.PipelineFactories; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; @@ -119,6 +120,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem private volatile OpenChannelsHandler serverOpenChannels; + private PipelineFactories pipelineFactories; + private volatile ClientBootstrap clientBootstrap; private volatile ServerBootstrap serverBootstrap; @@ -126,7 +129,6 @@ public class NettyTransport extends AbstractLifecycleComponent implem // node id to actual channel final ConcurrentMap connectedNodes = newConcurrentMap(); - private volatile Channel serverChannel; private volatile TransportServiceAdapter transportServiceAdapter; @@ -136,18 +138,19 @@ public class NettyTransport extends AbstractLifecycleComponent implem private final Object[] connectMutex; public NettyTransport(ThreadPool threadPool) { - this(EMPTY_SETTINGS, threadPool, new NetworkService(EMPTY_SETTINGS)); + this(EMPTY_SETTINGS, threadPool, new NetworkService(EMPTY_SETTINGS), new PipelineFactories()); } public NettyTransport(Settings settings, ThreadPool threadPool) { - this(settings, threadPool, new NetworkService(settings)); + this(settings, threadPool, new NetworkService(settings), new PipelineFactories()); } @Inject - public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService) { + public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, PipelineFactories pipelineFactories) { super(settings); this.threadPool = threadPool; this.networkService = networkService; + this.pipelineFactories = pipelineFactories; this.connectMutex = new Object[500]; for (int i = 0; i < connectMutex.length; i++) { @@ -202,14 +205,8 @@ protected void doStart() throws ElasticSearchException { Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker")), workerCount)); } - ChannelPipelineFactory clientPipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); - return pipeline; - } - }; + ChannelPipelineFactory clientPipelineFactory = pipelineFactories.clientPipelineFactory(this, logger); + clientBootstrap.setPipelineFactory(clientPipelineFactory); clientBootstrap.setOption("connectTimeoutMillis", connectTimeout.millis()); if (tcpNoDelay != null) { @@ -244,15 +241,8 @@ public ChannelPipeline getPipeline() throws Exception { Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker")), workerCount)); } - ChannelPipelineFactory serverPipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("openChannels", serverOpenChannels); - pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); - return pipeline; - } - }; + ChannelPipelineFactory serverPipelineFactory = pipelineFactories.serverPipelineFactory(this, serverOpenChannels, logger); + serverBootstrap.setPipelineFactory(serverPipelineFactory); if (tcpNoDelay != null) { serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);