diff --git a/docs/setup/config.md b/docs/setup/config.md index c8358827ab875..d9d5f15670829 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -327,7 +327,7 @@ The following parameters configure Flink's JobManager and TaskManagers. - `security.ssl.truststore-password`: The secret to decrypt the truststore. -- `security.ssl.protocol`: The SSL protocol version to be supported for the ssl transport (DEFAULT: **TLSv1.2**). +- `security.ssl.protocol`: The SSL protocol version to be supported for the ssl transport (DEFAULT: **TLSv1.2**). Note that it doesn't support comma separated list. - `security.ssl.algorithms`: The comma separated list of standard SSL algorithms to be supported. Read more [here](http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites) (DEFAULT: **TLS_RSA_WITH_AES_128_CBC_SHA**). diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java index 37cb2604bb3dc..ae826db8c9a50 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java @@ -130,6 +130,7 @@ public MesosArtifactServer(String prefix, String serverHostname, int configuredP router = new Router(); + final Configuration sslConfig = config; ChannelInitializer initializer = new ChannelInitializer() { @Override @@ -139,6 +140,7 @@ protected void initChannel(SocketChannel ch) { // SSL should be the first handler in the pipeline if (serverSSLContext != null) { SSLEngine sslEngine = serverSSLContext.createSSLEngine(); + SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig); sslEngine.setUseClientMode(false); ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index e604ce830086b..d88fdcf218fba 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -380,6 +380,7 @@ public void run() { LOG.warn("Error while adding shutdown hook", t); } + final Configuration sslConfig = config; ChannelInitializer initializer = new ChannelInitializer() { @Override @@ -389,6 +390,7 @@ protected void initChannel(SocketChannel ch) { // SSL should be the first handler in the pipeline if (serverSSLContext != null) { SSLEngine sslEngine = serverSSLContext.createSSLEngine(); + SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig); sslEngine.setUseClientMode(false); ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 5b00ae4acf7e7..8a70559b589ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -168,6 +168,7 @@ public ServerSocket createSocket(int port) throws IOException { if(socketAttempt == null) { throw new IOException("Unable to allocate socket for blob server in specified port range: "+serverPortRange); } else { + SSLUtils.setSSLVerAndCipherSuites(socketAttempt, config); this.serverSocket = socketAttempt; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java index 7b0da43fbd39e..b9a1b9013fc1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLParameters; import java.net.InetAddress; @@ -236,6 +237,10 @@ public boolean getSSLEnabled() { && SSLUtils.getSSLEnabled(config); } + public void setSSLVerAndCipherSuites(SSLEngine engine) { + SSLUtils.setSSLVerAndCipherSuites(engine, config); + } + public void setSSLVerifyHostname(SSLParameters sslParams) { SSLUtils.setSSLVerifyHostname(config, sslParams); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java index 55d2b181eb9a9..3cf14b8b15569 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java @@ -140,6 +140,7 @@ void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws public void initChannel(SocketChannel channel) throws Exception { if (serverSSLContext != null) { SSLEngine sslEngine = serverSSLContext.createSSLEngine(); + config.setSSLVerAndCipherSuites(sslEngine); sslEngine.setUseClientMode(false); channel.pipeline().addLast("ssl", new SslHandler(sslEngine)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index fc38b5d0fbe9f..c2d7a7b307cd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -27,10 +27,13 @@ import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLServerSocket; import javax.net.ssl.TrustManagerFactory; import java.io.File; import java.io.FileInputStream; +import java.net.ServerSocket; import java.security.KeyStore; /** @@ -54,6 +57,42 @@ public static boolean getSSLEnabled(Configuration sslConfig) { ConfigConstants.DEFAULT_SECURITY_SSL_ENABLED); } + /** + * Sets SSl version and cipher suites for SSLServerSocket + * @param socket + * Socket to be handled + * @param config + * The application configuration + */ + public static void setSSLVerAndCipherSuites(ServerSocket socket, Configuration config) { + if (socket instanceof SSLServerSocket) { + ((SSLServerSocket) socket).setEnabledProtocols(config.getString( + ConfigConstants.SECURITY_SSL_PROTOCOL, + ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(",")); + ((SSLServerSocket) socket).setEnabledCipherSuites(config.getString( + ConfigConstants.SECURITY_SSL_ALGORITHMS, + ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(",")); + } else { + LOG.warn("Not a SSL socket, will skip setting tls version and cipher suites."); + } + } + + /** + * Sets SSL version and cipher suites for SSLEngine + * @param engine + * SSLEngine to be handled + * @param config + * The application configuration + */ + public static void setSSLVerAndCipherSuites(SSLEngine engine, Configuration config) { + engine.setEnabledProtocols(config.getString( + ConfigConstants.SECURITY_SSL_PROTOCOL, + ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(",")); + engine.setEnabledCipherSuites(config.getString( + ConfigConstants.SECURITY_SSL_ALGORITHMS, + ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(",")); + } + /** * Sets SSL options to verify peer's hostname in the certificate * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java index 113734135e2fd..d28d693896976 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java @@ -23,6 +23,10 @@ import org.junit.Test; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLServerSocket; +import java.net.ServerSocket; +import java.util.Random; /* * Tests for the SSL utilities @@ -125,4 +129,100 @@ public void testCreateSSLServerContextMisconfiguration() { } } + /** + * Tests if SSL Server Context creation fails with bad SSL configuration + */ + @Test + public void testCreateSSLServerContextWithMultiProtocols() { + + Configuration serverConfig = new Configuration(); + serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1,TLSv1.2"); + + try { + SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); + Assert.fail("SSL server context created even with multiple protocols set "); + } catch (Exception e) { + // Exception here is valid + } + } + + /** + * Tests if SSLUtils set the right ssl version and cipher suites for SSLServerSocket + */ + @Test + public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exception { + + Configuration serverConfig = new Configuration(); + serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1.1"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256"); + + SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); + ServerSocket socket = null; + try { + socket = serverContext.getServerSocketFactory().createServerSocket(0); + + String[] protocols = ((SSLServerSocket) socket).getEnabledProtocols(); + String[] algorithms = ((SSLServerSocket) socket).getEnabledCipherSuites(); + + Assert.assertNotEquals(1, protocols.length); + Assert.assertNotEquals(2, algorithms.length); + + SSLUtils.setSSLVerAndCipherSuites(socket, serverConfig); + protocols = ((SSLServerSocket) socket).getEnabledProtocols(); + algorithms = ((SSLServerSocket) socket).getEnabledCipherSuites(); + + Assert.assertEquals(1, protocols.length); + Assert.assertEquals("TLSv1.1", protocols[0]); + Assert.assertEquals(2, algorithms.length); + Assert.assertTrue(algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA256")); + Assert.assertTrue(algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA256")); + } finally { + if (socket != null) { + socket.close(); + } + } + } + + /** + * Tests if SSLUtils set the right ssl version and cipher suites for SSLEngine + */ + @Test + public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception { + + Configuration serverConfig = new Configuration(); + serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"); + + SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); + SSLEngine engine = serverContext.createSSLEngine(); + + String[] protocols = engine.getEnabledProtocols(); + String[] algorithms = engine.getEnabledCipherSuites(); + + Assert.assertNotEquals(1, protocols.length); + Assert.assertNotEquals(2, algorithms.length); + + SSLUtils.setSSLVerAndCipherSuites(engine, serverConfig); + protocols = engine.getEnabledProtocols(); + algorithms = engine.getEnabledCipherSuites(); + + Assert.assertEquals(1, protocols.length); + Assert.assertEquals("TLSv1", protocols[0]); + Assert.assertEquals(2, algorithms.length); + Assert.assertTrue(algorithms[0].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA") || algorithms[0].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA256")); + Assert.assertTrue(algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA") || algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA256")); + } + } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala index 0f6509c94bfbd..9f8e3e193b532 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -71,6 +71,33 @@ class AkkaSslITCase(_system: ActorSystem) assert(cluster.running) } + "Failed to start ssl enabled akka with two protocols set" in { + + an[Exception] should be thrownBy { + + val config = new Configuration() + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1") + config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1") + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) + + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true) + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, + getClass.getResource("/local127.keystore").getPath) + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password") + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") + config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, + getClass.getResource("/local127.truststore").getPath) + + config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password") + config.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLSv1,TLSv1.1") + + val cluster = new TestingCluster(config, false) + + cluster.start(true) + } + } + "start with akka ssl disabled" in { val config = new Configuration()