From afaf81600fc14356366a36910a320c5d3977defc Mon Sep 17 00:00:00 2001 From: Florian Gilcher Date: Fri, 20 Jul 2012 12:25:11 +0200 Subject: [PATCH 1/2] Allow plugins to replace pipeline factories This patch allows plugins to replace NettyTransports pipeline factories. This allows things like SSL support to be implemented in plugins instead of core elasticsearch. This patch assumes that PipelineFactories will be replaced by a subclass. --- .../common/netty/PipelineFactories.java | 40 +++++++++++++++++++ .../transport/netty/NettyTransport.java | 32 +++++---------- 2 files changed, 51 insertions(+), 21 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/netty/PipelineFactories.java diff --git a/src/main/java/org/elasticsearch/common/netty/PipelineFactories.java b/src/main/java/org/elasticsearch/common/netty/PipelineFactories.java new file mode 100644 index 0000000000000..26c923a1ebdc3 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/netty/PipelineFactories.java @@ -0,0 +1,40 @@ +package org.elasticsearch.common.netty; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.transport.netty.NettyTransport; +import org.elasticsearch.transport.netty.MessageChannelHandler; +import org.elasticsearch.common.netty.OpenChannelsHandler; +import org.elasticsearch.common.inject.Inject; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.Channels; + +public class PipelineFactories { + + @Inject() + public PipelineFactories() { + } + + public ChannelPipelineFactory serverPipelineFactory(final NettyTransport transport, final OpenChannelsHandler serverOpenChannels, final ESLogger logger) throws ElasticSearchException { + return new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("openChannels", serverOpenChannels); + pipeline.addLast("dispatcher", new MessageChannelHandler(transport, logger)); + return pipeline; + } + }; + } + + public ChannelPipelineFactory clientPipelineFactory(final NettyTransport transport, final ESLogger logger) throws ElasticSearchException { + return new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("dispatcher", new MessageChannelHandler(transport, logger)); + return pipeline; + } + }; + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index cc00b37559315..a8d7856c65d89 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.netty.NettyStaticSetup; import org.elasticsearch.common.netty.OpenChannelsHandler; +import org.elasticsearch.common.netty.PipelineFactories; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; @@ -119,6 +120,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem private volatile OpenChannelsHandler serverOpenChannels; + private PipelineFactories pipelineFactories; + private volatile ClientBootstrap clientBootstrap; private volatile ServerBootstrap serverBootstrap; @@ -126,7 +129,6 @@ public class NettyTransport extends AbstractLifecycleComponent implem // node id to actual channel final ConcurrentMap connectedNodes = newConcurrentMap(); - private volatile Channel serverChannel; private volatile TransportServiceAdapter transportServiceAdapter; @@ -136,18 +138,19 @@ public class NettyTransport extends AbstractLifecycleComponent implem private final Object[] connectMutex; public NettyTransport(ThreadPool threadPool) { - this(EMPTY_SETTINGS, threadPool, new NetworkService(EMPTY_SETTINGS)); + this(EMPTY_SETTINGS, threadPool, new NetworkService(EMPTY_SETTINGS), new PipelineFactories()); } public NettyTransport(Settings settings, ThreadPool threadPool) { - this(settings, threadPool, new NetworkService(settings)); + this(settings, threadPool, new NetworkService(settings), new PipelineFactories()); } @Inject - public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService) { + public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, PipelineFactories pipelineFactories) { super(settings); this.threadPool = threadPool; this.networkService = networkService; + this.pipelineFactories = pipelineFactories; this.connectMutex = new Object[500]; for (int i = 0; i < connectMutex.length; i++) { @@ -202,14 +205,8 @@ protected void doStart() throws ElasticSearchException { Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker")), workerCount)); } - ChannelPipelineFactory clientPipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); - return pipeline; - } - }; + ChannelPipelineFactory clientPipelineFactory = pipelineFactories.clientPipelineFactory(this, logger); + clientBootstrap.setPipelineFactory(clientPipelineFactory); clientBootstrap.setOption("connectTimeoutMillis", connectTimeout.millis()); if (tcpNoDelay != null) { @@ -244,15 +241,8 @@ public ChannelPipeline getPipeline() throws Exception { Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker")), workerCount)); } - ChannelPipelineFactory serverPipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("openChannels", serverOpenChannels); - pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); - return pipeline; - } - }; + ChannelPipelineFactory serverPipelineFactory = pipelineFactories.serverPipelineFactory(this, serverOpenChannels, logger); + serverBootstrap.setPipelineFactory(serverPipelineFactory); if (tcpNoDelay != null) { serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay); From 7626f62f6a1fee9de3af72562a8852683d905b2c Mon Sep 17 00:00:00 2001 From: Florian Gilcher Date: Fri, 20 Jul 2012 15:53:33 +0200 Subject: [PATCH 2/2] Don't minimimize netty This makes life a lot harder for plugins wanting to use parts of the vendored api. E.g. all SSL classes would be stripped from netty, but at the same time be relocated into a new package, making it hard to use those parts from the "real netty" later on as the package differs. This requires version 1.6 of the shade plugin to work. --- pom.xml | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 306f04af8519f..26be47d0cf537 100644 --- a/pom.xml +++ b/pom.xml @@ -301,7 +301,7 @@ org.apache.maven.plugins maven-shade-plugin - 1.5 + 1.6 package @@ -378,6 +378,17 @@ build.properties + + io.netty:netty + + ** + META-INF/** + LICENSE + NOTICE + /*.txt + build.properties + +