diff --git a/config/elasticsearch.yml b/config/elasticsearch.yml index 6ef5d5ea7b099..1f3e16e3f5725 100644 --- a/config/elasticsearch.yml +++ b/config/elasticsearch.yml @@ -217,6 +217,20 @@ # # transport.tcp.compress: true +# Enable SSL/TLS encryption for all communication between nodes (disabled by default): +# +# transport.tcp.ssl: true + +# Settings for SSL/TLS encryption, used when transport.tcp.ssl is set to true +# +# transport.tcp.ssl.keystore: /path/to/the/keystore +# transport.tcp.ssl.keystore_password: password +# transport.tcp.ssl.keystore_algorithm: SunX509 +# +# transport.tcp.ssl.truststore: /path/to/the/truststore +# transport.tcp.ssl.truststore_password: password +# transport.tcp.ssl.truststore_algorithm: PKIX + # Set a custom port to listen for HTTP traffic: # # http.port: 9200 diff --git a/src/main/java/org/elasticsearch/transport/SSLTransportException.java b/src/main/java/org/elasticsearch/transport/SSLTransportException.java new file mode 100644 index 0000000000000..c22b9df514ca9 --- /dev/null +++ b/src/main/java/org/elasticsearch/transport/SSLTransportException.java @@ -0,0 +1,34 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +/** + * @author Tanguy Leroux - tlrx.dev@gmail.com + */ +public class SSLTransportException extends TransportException { + + public SSLTransportException(String message) { + super(message); + } + + public SSLTransportException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 46b040fdbf2b6..fcef6fdad3ca0 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -43,6 +43,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; +import org.elasticsearch.transport.netty.ssl.*; import org.elasticsearch.transport.support.TransportStreams; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap; @@ -107,6 +108,14 @@ public class NettyTransport extends AbstractLifecycleComponent implem final Boolean tcpKeepAlive; final Boolean reuseAddress; + + final boolean ssl; + final String sslKeyStore; + final String sslKeyStorePassword; + final String sslKeyStoreAlgorithm; + final String sslTrustStore; + final String sslTrustStorePassword; + final String sslTrustStoreAlgorithm; final ByteSizeValue tcpSendBufferSize; final ByteSizeValue tcpReceiveBufferSize; @@ -177,8 +186,16 @@ public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService n this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null); this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1); - logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}]", - workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeLow, connectionsPerNodeMed, connectionsPerNodeHigh); + this.ssl = settings.getAsBoolean("transport.tcp.ssl", false); + this.sslKeyStore = settings.get("transport.tcp.ssl.keystore", System.getProperty("javax.net.ssl.keyStore")); + this.sslKeyStorePassword = settings.get("transport.tcp.ssl.keystore_password", System.getProperty("javax.net.ssl.keyStorePassword")); + this.sslKeyStoreAlgorithm = settings.get("transport.tcp.ssl.keystore_algorithm", System.getProperty("ssl.KeyManagerFactory.algorithm")); + this.sslTrustStore = settings.get("transport.tcp.ssl.truststore", System.getProperty("javax.net.ssl.trustStore")); + this.sslTrustStorePassword = settings.get("transport.tcp.ssl.truststore_password", System.getProperty("javax.net.ssl.trustStorePassword")); + this.sslTrustStoreAlgorithm = settings.get("ransport.tcp.ssl.truststore_algorithm", System.getProperty("ssl.TrustManagerFactory.algorithm")); + + logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}], ssl[{}]", + workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeLow, connectionsPerNodeMed, connectionsPerNodeHigh, ssl); } public Settings settings() { @@ -208,26 +225,34 @@ protected void doStart() throws ElasticSearchException { Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker")), workerCount)); } - ChannelPipelineFactory clientPipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); - if (maxCumulationBufferCapacity != null) { - if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { - sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); - } else { - sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); + ChannelPipelineFactory clientPipelineFactory = null; + if (ssl) { + clientPipelineFactory = new SecureClientChannelPipelineFactory(new SecureMessageChannelHandler(NettyTransport.this, logger), + sslKeyStore, sslKeyStorePassword, sslKeyStoreAlgorithm, + sslTrustStore, sslTrustStorePassword, sslTrustStoreAlgorithm, + maxCumulationBufferCapacity, maxCompositeBufferComponents); + } else { + clientPipelineFactory = new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); + if (maxCumulationBufferCapacity != null) { + if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { + sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); + } else { + sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); + } } + if (maxCompositeBufferComponents != -1) { + sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); + } + pipeline.addLast("size", sizeHeader); + pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); + return pipeline; } - if (maxCompositeBufferComponents != -1) { - sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); - } - pipeline.addLast("size", sizeHeader); - pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); - return pipeline; - } - }; + }; + } clientBootstrap.setPipelineFactory(clientPipelineFactory); clientBootstrap.setOption("connectTimeoutMillis", connectTimeout.millis()); if (tcpNoDelay != null) { @@ -262,27 +287,35 @@ public ChannelPipeline getPipeline() throws Exception { Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker")), workerCount)); } - ChannelPipelineFactory serverPipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("openChannels", serverOpenChannels); - SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); - if (maxCumulationBufferCapacity != null) { - if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { - sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); - } else { - sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); - } - } - if (maxCompositeBufferComponents != -1) { - sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); - } - pipeline.addLast("size", sizeHeader); - pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); - return pipeline; - } - }; + ChannelPipelineFactory serverPipelineFactory = null; + if (ssl) { + serverPipelineFactory = serverPipelineFactory = new SecureServerChannelPipelineFactory(new SecureMessageChannelHandler(NettyTransport.this, logger), serverOpenChannels, + sslKeyStore, sslKeyStorePassword, sslKeyStoreAlgorithm, + sslTrustStore, sslTrustStorePassword, sslTrustStoreAlgorithm, + maxCumulationBufferCapacity, maxCompositeBufferComponents); + } else { + serverPipelineFactory = new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("openChannels", serverOpenChannels); + SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); + if (maxCumulationBufferCapacity != null) { + if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { + sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); + } else { + sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); + } + } + if (maxCompositeBufferComponents != -1) { + sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); + } + pipeline.addLast("size", sizeHeader); + pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); + return pipeline; + } + }; + } serverBootstrap.setPipelineFactory(serverPipelineFactory); if (tcpNoDelay != null) { serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay); diff --git a/src/main/java/org/elasticsearch/transport/netty/ssl/SSLChannelPipelineFactory.java b/src/main/java/org/elasticsearch/transport/netty/ssl/SSLChannelPipelineFactory.java new file mode 100644 index 0000000000000..8aa7eaf505669 --- /dev/null +++ b/src/main/java/org/elasticsearch/transport/netty/ssl/SSLChannelPipelineFactory.java @@ -0,0 +1,131 @@ +package org.elasticsearch.transport.netty.ssl; + +import java.io.FileInputStream; +import java.security.KeyStore; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.jboss.netty.channel.ChannelPipelineFactory; + +/** + * ChannelPipelineFactory used for Client/Server SSL channel pipelines + * + * @author Tanguy Leroux + * + */ +public abstract class SSLChannelPipelineFactory implements ChannelPipelineFactory { + + private static final ESLogger logger = Loggers.getLogger(SSLChannelPipelineFactory.class); + + private SSLContext sslContext; + + SecureMessageChannelHandler messageChannelHandler; + + private String keyStore; + + private String keyStorePassword; + + private String keyStoreAlgorithm; + + private String trustStore; + + private String trustStorePassword; + + private String trustStoreAlgorithm; + + ByteSizeValue maxCumulationBufferCapacity; + + int maxCompositeBufferComponents; + + public SSLChannelPipelineFactory(SecureMessageChannelHandler channelHandler, + String sslKeyStore, String sslKeyStorePassword, String sslKeyStoreAlgorithm, + String sslTrustStore, String sslTrustStorePassword, String sslTrustStoreAlgorithm, + ByteSizeValue mCumulationBufferCapacity, int mCompositeBufferComponents) { + + messageChannelHandler = channelHandler; + maxCumulationBufferCapacity = mCumulationBufferCapacity; + maxCompositeBufferComponents = mCompositeBufferComponents; + + keyStore = sslKeyStore; + keyStorePassword = sslKeyStorePassword; + if (sslKeyStoreAlgorithm != null) { + keyStoreAlgorithm = sslKeyStoreAlgorithm; + } else { + keyStoreAlgorithm = KeyManagerFactory.getDefaultAlgorithm(); + } + + trustStore = sslTrustStore; + trustStorePassword = sslTrustStorePassword; + if (sslTrustStoreAlgorithm != null) { + trustStoreAlgorithm = sslTrustStoreAlgorithm; + } else { + trustStoreAlgorithm = TrustManagerFactory.getDefaultAlgorithm(); + } + + logger.debug("using keyStore[{}], keyAlgorithm[{}], trustStore[{}], trustAlgorithm[{}]", keyStore, keyStoreAlgorithm, trustStore, trustStoreAlgorithm); + + KeyStore ks = null; + KeyManagerFactory kmf = null; + FileInputStream in = null; + try { + // Load KeyStore + ks = KeyStore.getInstance("jks"); + in = new FileInputStream(keyStore); + ks.load(in, keyStorePassword.toCharArray()); + + // Initialize KeyManagerFactory + kmf = KeyManagerFactory.getInstance(keyStoreAlgorithm); + kmf.init(ks, keyStorePassword.toCharArray()); + } catch (Exception e) { + throw new Error("Failed to initialize a KeyManagerFactory", e); + } finally { + try { + in.close(); + } catch (Exception e2) { + } + } + + TrustManager[] trustManagers = null; + try { + // Load TrustStore + in = new FileInputStream(trustStore); + ks.load(in, trustStorePassword.toCharArray()); + + // Initialize a trust manager factory with the trusted store + TrustManagerFactory trustFactory = TrustManagerFactory.getInstance(trustStoreAlgorithm); + trustFactory.init(ks); + + // Retrieve the trust managers from the factory + trustManagers = trustFactory.getTrustManagers(); + } catch (Exception e) { + throw new Error("Failed to initialize a TrustManagerFactory", e); + } finally { + try { + in.close(); + } catch (Exception e2) { + } + } + + // Initialize sslContext + try { + sslContext = SSLContext.getInstance("TLS"); + sslContext.init(kmf.getKeyManagers(), trustManagers, null); + } catch (Exception e) { + throw new Error("Failed to initialize the SSLContext", e); + } + } + + public SSLContext getSslContext() { + return sslContext; + } + + public void setSslContext(SSLContext sslContext) { + this.sslContext = sslContext; + } +} diff --git a/src/main/java/org/elasticsearch/transport/netty/ssl/SecureClientChannelPipelineFactory.java b/src/main/java/org/elasticsearch/transport/netty/ssl/SecureClientChannelPipelineFactory.java new file mode 100644 index 0000000000000..4d2a8e8d63a2e --- /dev/null +++ b/src/main/java/org/elasticsearch/transport/netty/ssl/SecureClientChannelPipelineFactory.java @@ -0,0 +1,53 @@ +package org.elasticsearch.transport.netty.ssl; + +import javax.net.ssl.SSLEngine; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.transport.netty.SizeHeaderFrameDecoder; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.handler.ssl.SslHandler; + +/** + * ChannelPipelineFactory used for Client SSL channel pipelines + * + * @author Tanguy Leroux + * + */ +public class SecureClientChannelPipelineFactory extends SSLChannelPipelineFactory { + + public SecureClientChannelPipelineFactory(SecureMessageChannelHandler channelHandler, + String sslKeyStore, String sslKeyStorePassword, String sslKeyStoreAlgorithm, + String sslTrustStore, String sslTrustStorePassword, String sslTrustStoreAlgorithm, + ByteSizeValue maxCumulationBufferCapacity, int maxCompositeBufferComponents) { + super(channelHandler, + sslKeyStore, sslKeyStorePassword, sslKeyStoreAlgorithm, + sslTrustStore, sslTrustStorePassword, sslTrustStoreAlgorithm, + maxCumulationBufferCapacity, maxCompositeBufferComponents); + } + + @Override + public ChannelPipeline getPipeline() throws Exception { + SSLEngine engine = getSslContext().createSSLEngine(); + engine.setUseClientMode(true); + + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("ssl", new SslHandler(engine)); + SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); + if (maxCumulationBufferCapacity != null) { + if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { + sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); + } else { + sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); + } + } + if (maxCompositeBufferComponents != -1) { + sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); + } + pipeline.addLast("size", sizeHeader); + pipeline.addLast("dispatcher", messageChannelHandler); + + + return pipeline; + } +} diff --git a/src/main/java/org/elasticsearch/transport/netty/ssl/SecureMessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/ssl/SecureMessageChannelHandler.java new file mode 100644 index 0000000000000..59bcf2a4a3e31 --- /dev/null +++ b/src/main/java/org/elasticsearch/transport/netty/ssl/SecureMessageChannelHandler.java @@ -0,0 +1,49 @@ +package org.elasticsearch.transport.netty.ssl; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.transport.SSLTransportException; +import org.elasticsearch.transport.netty.MessageChannelHandler; +import org.elasticsearch.transport.netty.NettyTransport; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.handler.ssl.SslHandler; + +/** + * SSL message channel handler + * + * @author Tanguy Leroux + * + */ +public class SecureMessageChannelHandler extends MessageChannelHandler { + + private static final ESLogger logger = Loggers.getLogger(SecureMessageChannelHandler.class); + + public SecureMessageChannelHandler(NettyTransport transport, ESLogger logger) { + super(transport, logger); + } + + @Override + public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception { + SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); + sslHandler.handshake(); + + // Get notified when SSL handshake is done. + final ChannelFuture handshakeFuture = sslHandler.handshake(); + handshakeFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + logger.debug("SSL / TLS handshake completed for the channel."); + ctx.sendUpstream(e); + } else { + logger.error("SSL / TLS handshake failed, closing the channel"); + future.getChannel().close(); + throw new SSLTransportException("SSL / TLS handshake failed, closing the channel", future.getCause()); + } + } + }); + } +} diff --git a/src/main/java/org/elasticsearch/transport/netty/ssl/SecureServerChannelPipelineFactory.java b/src/main/java/org/elasticsearch/transport/netty/ssl/SecureServerChannelPipelineFactory.java new file mode 100644 index 0000000000000..bc1cc791c59f1 --- /dev/null +++ b/src/main/java/org/elasticsearch/transport/netty/ssl/SecureServerChannelPipelineFactory.java @@ -0,0 +1,58 @@ +package org.elasticsearch.transport.netty.ssl; + +import javax.net.ssl.SSLEngine; + +import org.elasticsearch.common.netty.OpenChannelsHandler; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.transport.netty.SizeHeaderFrameDecoder; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.handler.ssl.SslHandler; + +/** + * ChannelPipelineFactory used for Server SSL channel pipelines + * + * @author Tanguy Leroux + * + */ +public class SecureServerChannelPipelineFactory extends SSLChannelPipelineFactory { + + private OpenChannelsHandler serverOpenChannels; + + public SecureServerChannelPipelineFactory(SecureMessageChannelHandler channelHandler, OpenChannelsHandler openChannels, + String sslKeyStore, String sslKeyStorePassword, String sslKeyStoreAlgorithm, + String sslTrustStore, String sslTrustStorePassword, String sslTrustStoreAlgorithm, + ByteSizeValue maxCumulationBufferCapacity, int maxCompositeBufferComponents) { + + super(channelHandler, sslKeyStore, sslKeyStorePassword, sslKeyStoreAlgorithm, + sslTrustStore, sslTrustStorePassword, sslTrustStoreAlgorithm, + maxCumulationBufferCapacity, maxCompositeBufferComponents); + this.serverOpenChannels = openChannels; + } + + @Override + public ChannelPipeline getPipeline() throws Exception { + SSLEngine engine = getSslContext().createSSLEngine(); + engine.setUseClientMode(false); + engine.setNeedClientAuth(true); + + ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("ssl", new SslHandler(engine)); + pipeline.addLast("openChannels", serverOpenChannels); + SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); + if (maxCumulationBufferCapacity != null) { + if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { + sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); + } else { + sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); + } + } + if (maxCompositeBufferComponents != -1) { + sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); + } + pipeline.addLast("size", sizeHeader); + pipeline.addLast("dispatcher", messageChannelHandler); + return pipeline; + + } +} diff --git a/src/test/java/org/elasticsearch/benchmark/transport/netty/ssl/NettySSLEchoBenchmark.java b/src/test/java/org/elasticsearch/benchmark/transport/netty/ssl/NettySSLEchoBenchmark.java new file mode 100644 index 0000000000000..4af0473b5ce16 --- /dev/null +++ b/src/test/java/org/elasticsearch/benchmark/transport/netty/ssl/NettySSLEchoBenchmark.java @@ -0,0 +1,216 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.benchmark.transport.netty.ssl; + +import org.elasticsearch.transport.SSLTransportException; +import org.elasticsearch.transport.netty.ssl.SSLChannelPipelineFactory; +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.*; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.jboss.netty.handler.ssl.SslHandler; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; + +import javax.net.ssl.SSLEngine; + +public class NettySSLEchoBenchmark { + + private static final String CERTIFICATES_DIR = "./src/test/resources/certificates/"; + + public static void main(String[] args) { + final int payloadSize = 100; + int CYCLE_SIZE = 50000; + final long NUMBER_OF_ITERATIONS = 500000; + + ChannelBuffer message = ChannelBuffers.buffer(100); + for (int i = 0; i < message.capacity(); i++) { + message.writeByte((byte) i); + } + + // Configure the server. + ServerBootstrap serverBootstrap = new ServerBootstrap( + new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(), + Executors.newCachedThreadPool())); + + // Set up the pipeline factory. + serverBootstrap.setPipelineFactory(new SSLChannelPipelineFactory(null, + CERTIFICATES_DIR + "esnode1.jks", "esnode1", null, + CERTIFICATES_DIR + "esnode1.jks", "esnode1", null, + null, 0) { + public ChannelPipeline getPipeline() throws Exception { + SSLEngine engine = getSslContext().createSSLEngine(); + engine.setUseClientMode(false); + engine.setNeedClientAuth(true); + return Channels.pipeline(new SslHandler(engine), new EchoServerHandler()); + } + }); + + // Bind and start to accept incoming connections. + serverBootstrap.bind(new InetSocketAddress(9000)); + + ClientBootstrap clientBootstrap = new ClientBootstrap( + new NioClientSocketChannelFactory( + Executors.newCachedThreadPool(), + Executors.newCachedThreadPool())); + +// ClientBootstrap clientBootstrap = new ClientBootstrap( +// new OioClientSocketChannelFactory(Executors.newCachedThreadPool())); + + // Set up the pipeline factory. + final EchoClientHandler clientHandler = new EchoClientHandler(); + clientBootstrap.setPipelineFactory(new SSLChannelPipelineFactory(null, + CERTIFICATES_DIR + "esnode2.jks", "esnode2", null, + CERTIFICATES_DIR + "esnode2.jks", "esnode2", null, + null, 0) { + public ChannelPipeline getPipeline() throws Exception { + SSLEngine engine = getSslContext().createSSLEngine(); + engine.setUseClientMode(true); + return Channels.pipeline(new SslHandler(engine), clientHandler); + } + }); + + // Start the connection attempt. + ChannelFuture future = clientBootstrap.connect(new InetSocketAddress("localhost", 9000)); + future.awaitUninterruptibly(); + Channel clientChannel = future.getChannel(); + + System.out.println("Warming up..."); + for (long i = 0; i < 10000; i++) { + clientHandler.latch = new CountDownLatch(1); + clientChannel.write(message); + try { + clientHandler.latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + System.out.println("Warmed up"); + + + long start = System.currentTimeMillis(); + long cycleStart = System.currentTimeMillis(); + for (long i = 1; i < NUMBER_OF_ITERATIONS; i++) { + clientHandler.latch = new CountDownLatch(1); + clientChannel.write(message); + try { + clientHandler.latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if ((i % CYCLE_SIZE) == 0) { + long cycleEnd = System.currentTimeMillis(); + System.out.println("Ran 50000, TPS " + (CYCLE_SIZE / ((double) (cycleEnd - cycleStart) / 1000))); + cycleStart = cycleEnd; + } + } + long end = System.currentTimeMillis(); + long seconds = (end - start) / 1000; + System.out.println("Ran [" + NUMBER_OF_ITERATIONS + "] iterations, payload [" + payloadSize + "]: took [" + seconds + "], TPS: " + ((double) NUMBER_OF_ITERATIONS) / seconds); + + clientChannel.close().awaitUninterruptibly(); + clientBootstrap.releaseExternalResources(); + serverBootstrap.releaseExternalResources(); + } + + public static class EchoClientHandler extends SimpleChannelUpstreamHandler { + + public volatile CountDownLatch latch; + + public EchoClientHandler() { + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { + latch.countDown(); + } + + @Override + public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception { + SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); + sslHandler.handshake(); + + // Get notified when SSL handshake is done. + final ChannelFuture handshakeFuture = sslHandler.handshake(); + handshakeFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + System.out.println("Handshake completed on client side"); + ctx.sendUpstream(e); + } else { + System.out.println("Handshake failed on client side"); + future.getChannel().close(); + throw new SSLTransportException("SSL / TLS handshake failed, closing the channel", future.getCause()); + } + } + }); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + e.getCause().printStackTrace(); + e.getChannel().close(); + } + } + + + public static class EchoServerHandler extends SimpleChannelUpstreamHandler { + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { + e.getChannel().write(e.getMessage()); + } + + public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception { + SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); + sslHandler.handshake(); + + // Get notified when SSL handshake is done. + final ChannelFuture handshakeFuture = sslHandler.handshake(); + handshakeFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + System.out.println("Handshake completed on server side"); + ctx.sendUpstream(e); + } else { + System.out.println("Handshake failed on server side"); + future.getChannel().close(); + throw new SSLTransportException("SSL / TLS handshake failed, closing the channel", future.getCause()); + } + } + }); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + // Close the connection when an exception is raised. + e.getCause().printStackTrace(); + e.getChannel().close(); + } + } +} \ No newline at end of file diff --git a/src/test/resources/certificates/esnode1.cert b/src/test/resources/certificates/esnode1.cert new file mode 100644 index 0000000000000..47a32701e2d61 --- /dev/null +++ b/src/test/resources/certificates/esnode1.cert @@ -0,0 +1,11 @@ +-----BEGIN CERTIFICATE----- +MIIB/TCCAWagAwIBAgIEUAZ4+TANBgkqhkiG9w0BAQUFADBDMQwwCgYDVQQKEwNvcmcxFjAUBgNV +BAsTDWVsYXN0aWNzZWFyY2gxGzAZBgNVBAMTEkVsYXN0aWNzZWFyY2ggTm9kZTAeFw0xMjA3MTgw +ODUxMDVaFw0xMjEwMTYwODUxMDVaMEMxDDAKBgNVBAoTA29yZzEWMBQGA1UECxMNZWxhc3RpY3Nl +YXJjaDEbMBkGA1UEAxMSRWxhc3RpY3NlYXJjaCBOb2RlMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCB +iQKBgQCVWUbN1yPz8Gy9j49vuatDROzjgkhFvHQikK4uH4WvZdVDt57uf8wzVkk+pWzQki6Xcchv +wINrjh7lqmQV2LLedhE4R5lE6NuPHDHovncx2VweL+Vi4KuTzPdg7GxgbKXERZ/LLpdQfQGAvkqn +EtWugWJXkY5fj63NCVX9VpwJDwIDAQABMA0GCSqGSIb3DQEBBQUAA4GBADYkAloVoHZ+ijZhwKGz +99uimzHy52ThJhVVFfMFFz5vCo8Neo63+RskxsBbmvWluTajNvxFfgY/6rfeqZen1JmP7kGdVBQ/ +Y9zvc6EZSkpOfegTZR6y0OrFiTGaQL0pgo8UUUjiB1yvYgFfz16E/sYHfWIDlojFY1jiWeaGq9bO +-----END CERTIFICATE----- diff --git a/src/test/resources/certificates/esnode1.jks b/src/test/resources/certificates/esnode1.jks new file mode 100644 index 0000000000000..c91e71ceb13ff Binary files /dev/null and b/src/test/resources/certificates/esnode1.jks differ diff --git a/src/test/resources/certificates/esnode2.cert b/src/test/resources/certificates/esnode2.cert new file mode 100644 index 0000000000000..f1a4a78359702 --- /dev/null +++ b/src/test/resources/certificates/esnode2.cert @@ -0,0 +1,11 @@ +-----BEGIN CERTIFICATE----- +MIIB/TCCAWagAwIBAgIEUAZ4+jANBgkqhkiG9w0BAQUFADBDMQwwCgYDVQQKEwNvcmcxFjAUBgNV +BAsTDWVsYXN0aWNzZWFyY2gxGzAZBgNVBAMTEkVsYXN0aWNzZWFyY2ggTm9kZTAeFw0xMjA3MTgw +ODUxMDZaFw0xMjEwMTYwODUxMDZaMEMxDDAKBgNVBAoTA29yZzEWMBQGA1UECxMNZWxhc3RpY3Nl +YXJjaDEbMBkGA1UEAxMSRWxhc3RpY3NlYXJjaCBOb2RlMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCB +iQKBgQCJxYHz7VW3v2qZ/4GWjMWCJKr9M8b9Cyub6LpMDoQ4ijW7r7PzjXZjFI9ougdMZ4PfIvvY +XbnOmB7BLqArSb9j9YMCHmxCjK2jhXWWYB1B4TLGazc93HeKT2bHSHdATZ6i/jijh7TaS3iBkIVI +aMUtQsUTus5qrXGV4LAU0lLfEwIDAQABMA0GCSqGSIb3DQEBBQUAA4GBAIiCz949ZNW2e8EmkqGq +nV1eLzTXxGxCbh16aw8m9+O22S0T8VB/aHxPqQFpzJUc0/oudDvOSVIABXma84LIEdCrKxblBKGm +SNP+ld2DoGY4igruANqpu3DDUihueyFobr6fh99id6DsABtbTalZANO59S+4DimCFyhXesMYhjOZ +-----END CERTIFICATE----- diff --git a/src/test/resources/certificates/esnode2.jks b/src/test/resources/certificates/esnode2.jks new file mode 100644 index 0000000000000..052ceac548769 Binary files /dev/null and b/src/test/resources/certificates/esnode2.jks differ diff --git a/src/test/resources/certificates/esnode3.cert b/src/test/resources/certificates/esnode3.cert new file mode 100644 index 0000000000000..99a4ca0f53f02 --- /dev/null +++ b/src/test/resources/certificates/esnode3.cert @@ -0,0 +1,11 @@ +-----BEGIN CERTIFICATE----- +MIIB/TCCAWagAwIBAgIEUAZ4+jANBgkqhkiG9w0BAQUFADBDMQwwCgYDVQQKEwNvcmcxFjAUBgNV +BAsTDWVsYXN0aWNzZWFyY2gxGzAZBgNVBAMTEkVsYXN0aWNzZWFyY2ggTm9kZTAeFw0xMjA3MTgw +ODUxMDZaFw0xMjEwMTYwODUxMDZaMEMxDDAKBgNVBAoTA29yZzEWMBQGA1UECxMNZWxhc3RpY3Nl +YXJjaDEbMBkGA1UEAxMSRWxhc3RpY3NlYXJjaCBOb2RlMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCB +iQKBgQDWjYENgSfrrpY0ZcIyBGbCyjVJ1DBH2YKRptlv4+d4xDEoIy5eANFMhulGc4X6VjmswZgO +fP3ZBqneZpz+G0i7TQ/OczkvfGypiumzD3lgDhllPdMQDjnbJXBEb3/5whkaMZV+edJSSP8+FMKy +fwndrwNSa4bIuKKX73oYgUCE/wIDAQABMA0GCSqGSIb3DQEBBQUAA4GBAM9tnhUlryCvq98PCw5h +DGzEEyfo6BFsRnwXkzeX7tb6jELpKsv/cm7EqwH1OXnN/QUNIxId/TP33XE3Pfh+VznprWlC/Kg3 +ahJ8cSLHepUQom7MQGkbV/vq3SGaPPT6cym53lPLZ+tNzrqYAT4HfHSr4OhbZvuXsGsYvIgdN9HN +-----END CERTIFICATE----- diff --git a/src/test/resources/certificates/esnode3.jks b/src/test/resources/certificates/esnode3.jks new file mode 100644 index 0000000000000..5fd294595a097 Binary files /dev/null and b/src/test/resources/certificates/esnode3.jks differ diff --git a/src/test/resources/certificates/generate.sh b/src/test/resources/certificates/generate.sh new file mode 100755 index 0000000000000..6c1ab2657e935 --- /dev/null +++ b/src/test/resources/certificates/generate.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +# Number of nodes key pair to create +nb=3 + +prefix="esnode" + +die () { + echo >&2 "$@" + exit 1 +} + +[ "$#" -eq 1 ] || die "1 argument required, $# provided" +echo $1 | grep -E -q '^[0-9]+$' || die "Numeric argument required, $1 provided" + +nb=$1 + +rm ./$prefix*.jks +rm ./$prefix*.cert + +# Create key pair and certificate for every node +nb_nodes=1 +while [ $nb_nodes -le $nb ] +do + node_name=$prefix$nb_nodes + + echo "Create" $node_name "key pair:" + keytool -genkeypair -alias $node_name -keystore $node_name.jks -keyalg RSA -storepass $node_name -keypass $node_name -dname "cn=Elasticsearch Node, ou=elasticsearch, o=org" + + echo "Generate" $node_name "certificate:" + keytool -export -alias $node_name -keystore $node_name.jks -rfc -file $node_name.cert -storepass $node_name + + nb_nodes=$(( $nb_nodes + 1 )) +done + +# Import certificates in nodes +current_node=1 +while [ $current_node -le $nb ] +do + node_name=$prefix$current_node + import_node=1 + + while [ $import_node -le $nb ] + do + import_node_name=$prefix$import_node + if [ "$import_node" -ne "$current_node" ] + then + echo "Importing" $node_name "certificate into" $import_node_name "keystore" + keytool -import -trustcacerts -alias $node_name -file $node_name.cert -keystore $import_node_name.jks -storepass $import_node_name -noprompt + fi + import_node=$(( $import_node + 1 )) + done + + current_node=$(( $current_node + 1 )) +done + +exit; +