From d8acabb12b0d2ebeaf9e286788a119b487f44ad4 Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 9 May 2017 10:46:18 +0200 Subject: [PATCH 1/9] SecurityOptions#SSL_ENABLED --- .../flink/configuration/ConfigConstants.java | 7 ++++++- .../configuration/HistoryServerOptions.java | 2 +- .../flink/configuration/SecurityOptions.java | 11 +++++++++++ .../apache/flink/runtime/net/SSLUtils.java | 4 ++-- .../flink/runtime/blob/BlobClientSslTest.java | 10 +++++----- .../netty/NettyClientServerSslTest.java | 3 ++- .../flink/runtime/net/SSLUtilsTest.java | 19 ++++++++++--------- .../flink/runtime/akka/AkkaSslITCase.scala | 14 +++++++------- 8 files changed, 44 insertions(+), 26 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 476797e52af05..703e2f763281d 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -837,8 +837,9 @@ public final class ConfigConstants { // ----------------------------- Transport SSL Settings-------------------- /** - * Enable SSL support + * @deprecated use {@link SecurityOptions#SSL_ENABLED} instead */ + @Deprecated public static final String SECURITY_SSL_ENABLED = "security.ssl.enabled"; /** The Java keystore file containing the flink endpoint key and certificate */ @@ -1541,6 +1542,10 @@ public final class ConfigConstants { // ----------------------------- SSL Values -------------------------------- + /** + * @deprecated use {@link SecurityOptions#SSL_ENABLED} instead + */ + @Deprecated public static boolean DEFAULT_SECURITY_SSL_ENABLED = false; public static String DEFAULT_SECURITY_SSL_PROTOCOL = "TLSv1.2"; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java index ebe4f2b7b5806..27c56d46b5611 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java @@ -71,7 +71,7 @@ public class HistoryServerOptions { /** * Enables/Disables SSL support for the HistoryServer web-frontend. Only relevant if - * {@link ConfigConstants#SECURITY_SSL_ENABLED} is enabled. + * {@link SecurityOptions#SSL_ENABLED} is enabled. */ public static final ConfigOption HISTORY_SERVER_WEB_SSL_ENABLED = key("historyserver.web.ssl.enabled") diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index 37631986d43c2..70391668472d5 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -66,4 +66,15 @@ public class SecurityOptions { public static final ConfigOption ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME = key("zookeeper.sasl.login-context-name") .defaultValue("Client"); + + // ------------------------------------------------------------------------ + // SSL Security Options + // ------------------------------------------------------------------------ + + /** + * Enable SSL support. + */ + public static final ConfigOption SSL_ENABLED = + key("security.ssl.enabled") + .defaultValue(false); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index 2267eacc8b13b..9166444034fea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -54,8 +55,7 @@ public static boolean getSSLEnabled(Configuration sslConfig) { Preconditions.checkNotNull(sslConfig); - return sslConfig.getBoolean( ConfigConstants.SECURITY_SSL_ENABLED, - ConfigConstants.DEFAULT_SECURITY_SSL_ENABLED); + return sslConfig.getBoolean(SecurityOptions.SSL_ENABLED); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index f9052e13ee8dc..0d68d646767a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -35,6 +35,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; @@ -67,15 +68,14 @@ public class BlobClientSslTest extends TestLogger { @BeforeClass public static void startSSLServer() throws IOException { Configuration config = new Configuration(); - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setBoolean(SecurityOptions.SSL_ENABLED, true); config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore()); - sslClientConfig = new Configuration(); - sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + sslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); sslClientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); sslClientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); } @@ -86,7 +86,7 @@ public static void startSSLServer() throws IOException { @BeforeClass public static void startNonSSLServer() throws IOException { Configuration config = new Configuration(); - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setBoolean(SecurityOptions.SSL_ENABLED, true); config.setBoolean(BlobServerOptions.SSL_ENABLED, false); config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); @@ -94,7 +94,7 @@ public static void startNonSSLServer() throws IOException { BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); clientConfig = new Configuration(); - clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); clientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false); clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java index da678bdb8478a..d4f67262372f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java @@ -24,6 +24,7 @@ import io.netty.handler.codec.string.StringEncoder; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.util.NetUtils; import org.junit.Assert; import org.junit.Test; @@ -149,7 +150,7 @@ public ChannelHandler[] getServerChannelHandlers() { private Configuration createSslConfig() throws Exception { Configuration flinkConfig = new Configuration(); - flinkConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + flinkConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java index d28d693896976..5d259fa721f23 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java @@ -19,6 +19,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; import org.junit.Assert; import org.junit.Test; @@ -40,7 +41,7 @@ public class SSLUtilsTest { public void testCreateSSLClientContext() throws Exception { Configuration clientConfig = new Configuration(); - clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); @@ -55,7 +56,7 @@ public void testCreateSSLClientContext() throws Exception { public void testCreateSSLClientContextWithSSLDisabled() throws Exception { Configuration clientConfig = new Configuration(); - clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, false); + clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, false); SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig); Assert.assertNull(clientContext); @@ -68,7 +69,7 @@ public void testCreateSSLClientContextWithSSLDisabled() throws Exception { public void testCreateSSLClientContextMisconfiguration() { Configuration clientConfig = new Configuration(); - clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "badpassword"); @@ -87,7 +88,7 @@ public void testCreateSSLClientContextMisconfiguration() { public void testCreateSSLServerContext() throws Exception { Configuration serverConfig = new Configuration(); - serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); @@ -103,7 +104,7 @@ public void testCreateSSLServerContext() throws Exception { public void testCreateSSLServerContextWithSSLDisabled() throws Exception { Configuration serverConfig = new Configuration(); - serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, false); + serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, false); SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); Assert.assertNull(serverContext); @@ -116,7 +117,7 @@ public void testCreateSSLServerContextWithSSLDisabled() throws Exception { public void testCreateSSLServerContextMisconfiguration() { Configuration serverConfig = new Configuration(); - serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "badpassword"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "badpassword"); @@ -136,7 +137,7 @@ public void testCreateSSLServerContextMisconfiguration() { public void testCreateSSLServerContextWithMultiProtocols() { Configuration serverConfig = new Configuration(); - serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); @@ -157,7 +158,7 @@ public void testCreateSSLServerContextWithMultiProtocols() { public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exception { Configuration serverConfig = new Configuration(); - serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); @@ -198,7 +199,7 @@ public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exceptio public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception { Configuration serverConfig = new Configuration(); - serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala index daf0f4734701d..aa98b38934e1a 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -20,8 +20,8 @@ package org.apache.flink.runtime.akka import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration} -import org.apache.flink.runtime.testingUtils.{TestingCluster, TestingUtils, ScalaTestingUtils} +import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, SecurityOptions} +import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils} import org.junit.runner.RunWith import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} import org.scalatest.junit.JUnitRunner @@ -54,7 +54,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true) + config.setBoolean(SecurityOptions.SSL_ENABLED, true) config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, getClass.getResource("/local127.keystore").getPath) config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password") @@ -81,7 +81,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true) + config.setBoolean(SecurityOptions.SSL_ENABLED, true) config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, getClass.getResource("/local127.keystore").getPath) config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password") @@ -103,7 +103,7 @@ class AkkaSslITCase(_system: ActorSystem) val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, false) + config.setBoolean(SecurityOptions.SSL_ENABLED, false) val cluster = new TestingCluster(config, false) @@ -121,7 +121,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) config.setString(AkkaOptions.ASK_TIMEOUT, "2 s") - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true) + config.setBoolean(SecurityOptions.SSL_ENABLED, true) config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "invalid.keystore") config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") @@ -143,7 +143,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) config.setString(AkkaOptions.ASK_TIMEOUT, "2 s") - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true) + config.setBoolean(SecurityOptions.SSL_ENABLED, true) val cluster = new TestingCluster(config, false) From 1ccde414c1aa71640ec8ec7bd36ce9160431fd2c Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 9 May 2017 10:51:54 +0200 Subject: [PATCH 2/9] SecurityOptions#SSL_KEYSTORE --- .../apache/flink/configuration/ConfigConstants.java | 5 ++++- .../apache/flink/configuration/SecurityOptions.java | 7 +++++++ .../clusterframework/overlays/SSLStoreOverlay.java | 7 ++++--- .../java/org/apache/flink/runtime/net/SSLUtils.java | 6 ++---- .../org/apache/flink/runtime/akka/AkkaUtils.scala | 6 ++---- .../apache/flink/runtime/blob/BlobClientSslTest.java | 4 ++-- .../clusterframework/overlays/SSLStoreOverlayTest.java | 5 +++-- .../io/network/netty/NettyClientServerSslTest.java | 4 ++-- .../org/apache/flink/runtime/net/SSLUtilsTest.java | 10 +++++----- .../org/apache/flink/runtime/akka/AkkaSslITCase.scala | 6 +++--- 10 files changed, 34 insertions(+), 26 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 703e2f763281d..07de77ec574a1 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -842,7 +842,10 @@ public final class ConfigConstants { @Deprecated public static final String SECURITY_SSL_ENABLED = "security.ssl.enabled"; - /** The Java keystore file containing the flink endpoint key and certificate */ + /** + * @deprecated use {@link SecurityOptions#SSL_KEYSTORE} instead + */ + @Deprecated public static final String SECURITY_SSL_KEYSTORE = "security.ssl.keystore"; /** secret to decrypt the keystore file */ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index 70391668472d5..bb30f626ea88c 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -77,4 +77,11 @@ public class SecurityOptions { public static final ConfigOption SSL_ENABLED = key("security.ssl.enabled") .defaultValue(false); + + /** + * The Java keystore file containing the flink endpoint key and certificate. + */ + public static final ConfigOption SSL_KEYSTORE = + key("security.ssl.keystore") + .noDefaultValue(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java index dd79ca1e10890..2ab181aba7452 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.slf4j.Logger; @@ -64,7 +65,7 @@ public void configure(ContainerSpecification container) throws IOException { .setDest(TARGET_KEYSTORE_PATH) .setCachable(false) .build()); - container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_KEYSTORE, TARGET_KEYSTORE_PATH.getPath()); + container.getDynamicConfiguration().setString(SecurityOptions.SSL_KEYSTORE, TARGET_KEYSTORE_PATH.getPath()); } if(truststore != null) { container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder() @@ -98,11 +99,11 @@ public static class Builder { */ public Builder fromEnvironment(Configuration globalConfiguration) { - String keystore = globalConfiguration.getString(ConfigConstants.SECURITY_SSL_KEYSTORE, null); + String keystore = globalConfiguration.getString(SecurityOptions.SSL_KEYSTORE); if(keystore != null) { keystorePath = new File(keystore); if(!keystorePath.exists()) { - throw new IllegalStateException("Invalid configuration for " + ConfigConstants.SECURITY_SSL_KEYSTORE); + throw new IllegalStateException("Invalid configuration for " + SecurityOptions.SSL_KEYSTORE.key()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index 9166444034fea..ee69a33051363 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -193,9 +193,7 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws if (getSSLEnabled(sslConfig)) { LOG.debug("Creating server SSL context from configuration"); - String keystoreFilePath = sslConfig.getString( - ConfigConstants.SECURITY_SSL_KEYSTORE, - null); + String keystoreFilePath = sslConfig.getString(SecurityOptions.SSL_KEYSTORE); String keystorePassword = sslConfig.getString( ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, @@ -209,7 +207,7 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws ConfigConstants.SECURITY_SSL_PROTOCOL, ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL); - Preconditions.checkNotNull(keystoreFilePath, ConfigConstants.SECURITY_SSL_KEYSTORE + " was not configured."); + Preconditions.checkNotNull(keystoreFilePath, SecurityOptions.SSL_KEYSTORE.key() + " was not configured."); Preconditions.checkNotNull(keystorePassword, ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD + " was not configured."); Preconditions.checkNotNull(certPassword, ConfigConstants.SECURITY_SSL_KEY_PASSWORD + " was not configured."); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 60a33ba5f9e6b..d0f5f6d2480c3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -26,7 +26,7 @@ import akka.actor._ import akka.pattern.{ask => akkaAsk} import com.typesafe.config.{Config, ConfigFactory} import org.apache.flink.api.common.time.Time -import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration} +import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, SecurityOptions} import org.apache.flink.runtime.net.SSLUtils import org.apache.flink.util.{ConfigurationException, NetUtils, Preconditions} import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory} @@ -301,9 +301,7 @@ object AkkaUtils { val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off" - val akkaSSLKeyStore = configuration.getString( - ConfigConstants.SECURITY_SSL_KEYSTORE, - null) + val akkaSSLKeyStore = configuration.getString(SecurityOptions.SSL_KEYSTORE) val akkaSSLKeyStorePassword = configuration.getString( ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index 0d68d646767a1..a894c0dd2c1ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -69,7 +69,7 @@ public class BlobClientSslTest extends TestLogger { public static void startSSLServer() throws IOException { Configuration config = new Configuration(); config.setBoolean(SecurityOptions.SSL_ENABLED, true); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore()); @@ -88,7 +88,7 @@ public static void startNonSSLServer() throws IOException { Configuration config = new Configuration(); config.setBoolean(SecurityOptions.SSL_ENABLED, true); config.setBoolean(BlobServerOptions.SSL_ENABLED, false); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java index 0894ce61cac90..4f57cc1a351b8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.junit.Rule; import org.junit.Test; @@ -46,7 +47,7 @@ public void testConfigure() throws Exception { ContainerSpecification spec = new ContainerSpecification(); overlay.configure(spec); - assertEquals(TARGET_KEYSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_KEYSTORE, null)); + assertEquals(TARGET_KEYSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(SecurityOptions.SSL_KEYSTORE)); checkArtifact(spec, TARGET_KEYSTORE_PATH); assertEquals(TARGET_TRUSTSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, null)); @@ -68,7 +69,7 @@ public void testBuilderFromEnvironment() throws Exception { File keystore = tempFolder.newFile(); File truststore = tempFolder.newFile(); - conf.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, keystore.getAbsolutePath()); + conf.setString(SecurityOptions.SSL_KEYSTORE, keystore.getAbsolutePath()); conf.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, truststore.getAbsolutePath()); SSLStoreOverlay.Builder builder = SSLStoreOverlay.newBuilder().fromEnvironment(conf); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java index d4f67262372f3..d8eaab94d5db5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java @@ -127,7 +127,7 @@ public ChannelHandler[] getServerChannelHandlers() { Configuration config = createSslConfig(); // Use a server certificate which is not present in the truststore - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/untrusted.keystore"); + config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/untrusted.keystore"); NettyConfig nettyConfig = new NettyConfig( InetAddress.getLoopbackAddress(), @@ -151,7 +151,7 @@ private Configuration createSslConfig() throws Exception { Configuration flinkConfig = new Configuration(); flinkConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + flinkConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java index 5d259fa721f23..e85dc92bf2361 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java @@ -89,7 +89,7 @@ public void testCreateSSLServerContext() throws Exception { Configuration serverConfig = new Configuration(); serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); @@ -118,7 +118,7 @@ public void testCreateSSLServerContextMisconfiguration() { Configuration serverConfig = new Configuration(); serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "badpassword"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "badpassword"); @@ -138,7 +138,7 @@ public void testCreateSSLServerContextWithMultiProtocols() { Configuration serverConfig = new Configuration(); serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1,TLSv1.2"); @@ -159,7 +159,7 @@ public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exceptio Configuration serverConfig = new Configuration(); serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1.1"); @@ -200,7 +200,7 @@ public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception { Configuration serverConfig = new Configuration(); serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1"); diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala index aa98b38934e1a..26242996c2b1e 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -55,7 +55,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) config.setBoolean(SecurityOptions.SSL_ENABLED, true) - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, + config.setString(SecurityOptions.SSL_KEYSTORE, getClass.getResource("/local127.keystore").getPath) config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") @@ -82,7 +82,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) config.setBoolean(SecurityOptions.SSL_ENABLED, true) - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, + config.setString(SecurityOptions.SSL_KEYSTORE, getClass.getResource("/local127.keystore").getPath) config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") @@ -122,7 +122,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setString(AkkaOptions.ASK_TIMEOUT, "2 s") config.setBoolean(SecurityOptions.SSL_ENABLED, true) - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "invalid.keystore") + config.setString(SecurityOptions.SSL_KEYSTORE, "invalid.keystore") config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "invalid.keystore") From d6a3c15f0cf0dd68e60db421d4cd7528dfc5a211 Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 9 May 2017 10:58:36 +0200 Subject: [PATCH 3/9] SecurityOptions#SSL_KEYSTORE_PASSWORD --- .../apache/flink/configuration/ConfigConstants.java | 5 ++++- .../apache/flink/configuration/SecurityOptions.java | 7 +++++++ .../java/org/apache/flink/runtime/net/SSLUtils.java | 6 ++---- .../org/apache/flink/runtime/akka/AkkaUtils.scala | 4 +--- .../apache/flink/runtime/blob/BlobClientSslTest.java | 4 ++-- .../io/network/netty/NettyClientServerSslTest.java | 4 ++-- .../org/apache/flink/runtime/net/SSLUtilsTest.java | 10 +++++----- .../org/apache/flink/runtime/akka/AkkaSslITCase.scala | 6 +++--- 8 files changed, 26 insertions(+), 20 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 07de77ec574a1..1195282c1f0cb 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -848,7 +848,10 @@ public final class ConfigConstants { @Deprecated public static final String SECURITY_SSL_KEYSTORE = "security.ssl.keystore"; - /** secret to decrypt the keystore file */ + /** + * @deprecated use {@link SecurityOptions#SSL_KEYSTORE_PASSWORD} instead + */ + @Deprecated public static final String SECURITY_SSL_KEYSTORE_PASSWORD = "security.ssl.keystore-password"; /** secret to decrypt the server key */ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index bb30f626ea88c..4de621ceeb31a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -84,4 +84,11 @@ public class SecurityOptions { public static final ConfigOption SSL_KEYSTORE = key("security.ssl.keystore") .noDefaultValue(); + + /** + * Secret to decrypt the keystore file. + */ + public static final ConfigOption SSL_KEYSTORE_PASSWORD = + key("security.ssl.keystore-password") + .noDefaultValue(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index ee69a33051363..1f39cef207746 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -195,9 +195,7 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws String keystoreFilePath = sslConfig.getString(SecurityOptions.SSL_KEYSTORE); - String keystorePassword = sslConfig.getString( - ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, - null); + String keystorePassword = sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD); String certPassword = sslConfig.getString( ConfigConstants.SECURITY_SSL_KEY_PASSWORD, @@ -208,7 +206,7 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL); Preconditions.checkNotNull(keystoreFilePath, SecurityOptions.SSL_KEYSTORE.key() + " was not configured."); - Preconditions.checkNotNull(keystorePassword, ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD + " was not configured."); + Preconditions.checkNotNull(keystorePassword, SecurityOptions.SSL_KEYSTORE_PASSWORD.key() + " was not configured."); Preconditions.checkNotNull(certPassword, ConfigConstants.SECURITY_SSL_KEY_PASSWORD + " was not configured."); KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index d0f5f6d2480c3..1d92bdac884a9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -303,9 +303,7 @@ object AkkaUtils { val akkaSSLKeyStore = configuration.getString(SecurityOptions.SSL_KEYSTORE) - val akkaSSLKeyStorePassword = configuration.getString( - ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, - null) + val akkaSSLKeyStorePassword = configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD) val akkaSSLKeyPassword = configuration.getString( ConfigConstants.SECURITY_SSL_KEY_PASSWORD, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index a894c0dd2c1ff..700d4ace72253 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -70,7 +70,7 @@ public static void startSSLServer() throws IOException { Configuration config = new Configuration(); config.setBoolean(SecurityOptions.SSL_ENABLED, true); config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore()); @@ -89,7 +89,7 @@ public static void startNonSSLServer() throws IOException { config.setBoolean(SecurityOptions.SSL_ENABLED, true); config.setBoolean(BlobServerOptions.SSL_ENABLED, false); config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java index d8eaab94d5db5..584c13c610e58 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java @@ -88,7 +88,7 @@ public ChannelHandler[] getServerChannelHandlers() { Configuration config = createSslConfig(); // Modify the keystore password to an incorrect one - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "invalidpassword"); + config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "invalidpassword"); NettyConfig nettyConfig = new NettyConfig( InetAddress.getLoopbackAddress(), @@ -152,7 +152,7 @@ private Configuration createSslConfig() throws Exception { Configuration flinkConfig = new Configuration(); flinkConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); flinkConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + flinkConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java index e85dc92bf2361..cca14d6b89ffb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java @@ -90,7 +90,7 @@ public void testCreateSSLServerContext() throws Exception { Configuration serverConfig = new Configuration(); serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); @@ -119,7 +119,7 @@ public void testCreateSSLServerContextMisconfiguration() { Configuration serverConfig = new Configuration(); serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "badpassword"); + serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "badpassword"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "badpassword"); try { @@ -139,7 +139,7 @@ public void testCreateSSLServerContextWithMultiProtocols() { Configuration serverConfig = new Configuration(); serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1,TLSv1.2"); @@ -160,7 +160,7 @@ public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exceptio Configuration serverConfig = new Configuration(); serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1.1"); serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256"); @@ -201,7 +201,7 @@ public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception { Configuration serverConfig = new Configuration(); serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1"); serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"); diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala index 26242996c2b1e..ad04aa3059eb4 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -57,7 +57,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setBoolean(SecurityOptions.SSL_ENABLED, true) config.setString(SecurityOptions.SSL_KEYSTORE, getClass.getResource("/local127.keystore").getPath) - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password") + config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, getClass.getResource("/local127.truststore").getPath) @@ -84,7 +84,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setBoolean(SecurityOptions.SSL_ENABLED, true) config.setString(SecurityOptions.SSL_KEYSTORE, getClass.getResource("/local127.keystore").getPath) - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password") + config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, getClass.getResource("/local127.truststore").getPath) @@ -123,7 +123,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setBoolean(SecurityOptions.SSL_ENABLED, true) config.setString(SecurityOptions.SSL_KEYSTORE, "invalid.keystore") - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password") + config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "invalid.keystore") config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password") From c69fbd84da216b3b57d0f6d9263d5a6a8e0aa350 Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 9 May 2017 11:29:06 +0200 Subject: [PATCH 4/9] SecurityOptions#SSL_KEY_PASSWORD --- .../apache/flink/configuration/ConfigConstants.java | 5 ++++- .../apache/flink/configuration/SecurityOptions.java | 7 +++++++ .../java/org/apache/flink/runtime/net/SSLUtils.java | 6 ++---- .../org/apache/flink/runtime/akka/AkkaUtils.scala | 4 +--- .../apache/flink/runtime/blob/BlobClientSslTest.java | 4 ++-- .../io/network/netty/NettyClientServerSslTest.java | 2 +- .../org/apache/flink/runtime/net/SSLUtilsTest.java | 10 +++++----- .../org/apache/flink/runtime/akka/AkkaSslITCase.scala | 6 +++--- 8 files changed, 25 insertions(+), 19 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 1195282c1f0cb..2a6039f95a1c6 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -854,7 +854,10 @@ public final class ConfigConstants { @Deprecated public static final String SECURITY_SSL_KEYSTORE_PASSWORD = "security.ssl.keystore-password"; - /** secret to decrypt the server key */ + /** + * @deprecated use {@link SecurityOptions#SSL_KEY_PASSWORD} instead + */ + @Deprecated public static final String SECURITY_SSL_KEY_PASSWORD = "security.ssl.key-password"; /** The truststore file containing the public CA certificates to verify the ssl peers */ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index 4de621ceeb31a..815a612f25bcc 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -91,4 +91,11 @@ public class SecurityOptions { public static final ConfigOption SSL_KEYSTORE_PASSWORD = key("security.ssl.keystore-password") .noDefaultValue(); + + /** + * Secret to decrypt the server key. + */ + public static final ConfigOption SSL_KEY_PASSWORD = + key("security.ssl.key-password") + .noDefaultValue(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index 1f39cef207746..509522c892b81 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -197,9 +197,7 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws String keystorePassword = sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD); - String certPassword = sslConfig.getString( - ConfigConstants.SECURITY_SSL_KEY_PASSWORD, - null); + String certPassword = sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD); String sslProtocolVersion = sslConfig.getString( ConfigConstants.SECURITY_SSL_PROTOCOL, @@ -207,7 +205,7 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws Preconditions.checkNotNull(keystoreFilePath, SecurityOptions.SSL_KEYSTORE.key() + " was not configured."); Preconditions.checkNotNull(keystorePassword, SecurityOptions.SSL_KEYSTORE_PASSWORD.key() + " was not configured."); - Preconditions.checkNotNull(certPassword, ConfigConstants.SECURITY_SSL_KEY_PASSWORD + " was not configured."); + Preconditions.checkNotNull(certPassword, SecurityOptions.SSL_KEY_PASSWORD.key() + " was not configured."); KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); FileInputStream keyStoreFile = null; diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 1d92bdac884a9..9cf1f129df2f1 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -305,9 +305,7 @@ object AkkaUtils { val akkaSSLKeyStorePassword = configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD) - val akkaSSLKeyPassword = configuration.getString( - ConfigConstants.SECURITY_SSL_KEY_PASSWORD, - null) + val akkaSSLKeyPassword = configuration.getString(SecurityOptions.SSL_KEY_PASSWORD) val akkaSSLTrustStore = configuration.getString( ConfigConstants.SECURITY_SSL_TRUSTSTORE, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index 700d4ace72253..bdc20005ccb4c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -71,7 +71,7 @@ public static void startSSLServer() throws IOException { config.setBoolean(SecurityOptions.SSL_ENABLED, true); config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore()); sslClientConfig = new Configuration(); @@ -90,7 +90,7 @@ public static void startNonSSLServer() throws IOException { config.setBoolean(BlobServerOptions.SSL_ENABLED, false); config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); clientConfig = new Configuration(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java index 584c13c610e58..74e47dc61a50e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java @@ -153,7 +153,7 @@ private Configuration createSslConfig() throws Exception { flinkConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); flinkConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); flinkConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); return flinkConfig; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java index cca14d6b89ffb..8c2ba6ed91915 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java @@ -91,7 +91,7 @@ public void testCreateSSLServerContext() throws Exception { serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); Assert.assertNotNull(serverContext); @@ -120,7 +120,7 @@ public void testCreateSSLServerContextMisconfiguration() { serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "badpassword"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "badpassword"); + serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "badpassword"); try { SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); @@ -140,7 +140,7 @@ public void testCreateSSLServerContextWithMultiProtocols() { serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1,TLSv1.2"); try { @@ -161,7 +161,7 @@ public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exceptio serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1.1"); serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256"); @@ -202,7 +202,7 @@ public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception { serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1"); serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"); diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala index ad04aa3059eb4..448f06c63e6e7 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -58,7 +58,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setString(SecurityOptions.SSL_KEYSTORE, getClass.getResource("/local127.keystore").getPath) config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password") - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") + config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, getClass.getResource("/local127.truststore").getPath) @@ -85,7 +85,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setString(SecurityOptions.SSL_KEYSTORE, getClass.getResource("/local127.keystore").getPath) config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password") - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") + config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, getClass.getResource("/local127.truststore").getPath) @@ -124,7 +124,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setBoolean(SecurityOptions.SSL_ENABLED, true) config.setString(SecurityOptions.SSL_KEYSTORE, "invalid.keystore") config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password") - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") + config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "invalid.keystore") config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password") From 19a43f53813e03ae1ec76e32b18b3270a4a9236e Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 9 May 2017 11:32:40 +0200 Subject: [PATCH 5/9] SecurityOptions#SSL_TRUSTSTORE --- .../org/apache/flink/configuration/ConfigConstants.java | 5 ++++- .../org/apache/flink/configuration/SecurityOptions.java | 7 +++++++ .../runtime/clusterframework/overlays/SSLStoreOverlay.java | 7 +++---- .../main/java/org/apache/flink/runtime/net/SSLUtils.java | 6 ++---- .../scala/org/apache/flink/runtime/akka/AkkaUtils.scala | 4 +--- .../org/apache/flink/runtime/blob/BlobClientSslTest.java | 4 ++-- .../clusterframework/overlays/SSLStoreOverlayTest.java | 5 ++--- .../runtime/io/network/netty/NettyClientServerSslTest.java | 2 +- .../java/org/apache/flink/runtime/net/SSLUtilsTest.java | 4 ++-- .../org/apache/flink/runtime/akka/AkkaSslITCase.scala | 6 +++--- 10 files changed, 27 insertions(+), 23 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 2a6039f95a1c6..3627649713098 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -860,7 +860,10 @@ public final class ConfigConstants { @Deprecated public static final String SECURITY_SSL_KEY_PASSWORD = "security.ssl.key-password"; - /** The truststore file containing the public CA certificates to verify the ssl peers */ + /** + * @deprecated use {@link SecurityOptions#SSL_TRUSTSTORE} instead + */ + @Deprecated public static final String SECURITY_SSL_TRUSTSTORE = "security.ssl.truststore"; /** Secret to decrypt the truststore */ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index 815a612f25bcc..38d7738bbb160 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -98,4 +98,11 @@ public class SecurityOptions { public static final ConfigOption SSL_KEY_PASSWORD = key("security.ssl.key-password") .noDefaultValue(); + + /** + * The truststore file containing the public CA certificates to verify the ssl peers. + */ + public static final ConfigOption SSL_TRUSTSTORE = + key("security.ssl.truststore") + .noDefaultValue(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java index 2ab181aba7452..84d407b5687e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.clusterframework.overlays; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.core.fs.Path; @@ -73,7 +72,7 @@ public void configure(ContainerSpecification container) throws IOException { .setDest(TARGET_TRUSTSTORE_PATH) .setCachable(false) .build()); - container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, TARGET_TRUSTSTORE_PATH.getPath()); + container.getDynamicConfiguration().setString(SecurityOptions.SSL_TRUSTSTORE, TARGET_TRUSTSTORE_PATH.getPath()); } } @@ -107,11 +106,11 @@ public Builder fromEnvironment(Configuration globalConfiguration) { } } - String truststore = globalConfiguration.getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, null); + String truststore = globalConfiguration.getString(SecurityOptions.SSL_TRUSTSTORE); if(truststore != null) { truststorePath = new File(truststore); if(!truststorePath.exists()) { - throw new IllegalStateException("Invalid configuration for " + ConfigConstants.SECURITY_SSL_TRUSTSTORE); + throw new IllegalStateException("Invalid configuration for " + SecurityOptions.SSL_TRUSTSTORE.key()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index 509522c892b81..efba694da4ccf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -139,9 +139,7 @@ public static SSLContext createSSLClientContext(Configuration sslConfig) throws if (getSSLEnabled(sslConfig)) { LOG.debug("Creating client SSL context from configuration"); - String trustStoreFilePath = sslConfig.getString( - ConfigConstants.SECURITY_SSL_TRUSTSTORE, - null); + String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); String trustStorePassword = sslConfig.getString( ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, null); @@ -149,7 +147,7 @@ public static SSLContext createSSLClientContext(Configuration sslConfig) throws ConfigConstants.SECURITY_SSL_PROTOCOL, ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL); - Preconditions.checkNotNull(trustStoreFilePath, ConfigConstants.SECURITY_SSL_TRUSTSTORE + " was not configured."); + Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); Preconditions.checkNotNull(trustStorePassword, ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD + " was not configured."); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 9cf1f129df2f1..efed1a4703efa 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -307,9 +307,7 @@ object AkkaUtils { val akkaSSLKeyPassword = configuration.getString(SecurityOptions.SSL_KEY_PASSWORD) - val akkaSSLTrustStore = configuration.getString( - ConfigConstants.SECURITY_SSL_TRUSTSTORE, - null) + val akkaSSLTrustStore = configuration.getString(SecurityOptions.SSL_TRUSTSTORE) val akkaSSLTrustStorePassword = configuration.getString( ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index bdc20005ccb4c..97ff3e54bd7d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -76,7 +76,7 @@ public static void startSSLServer() throws IOException { sslClientConfig = new Configuration(); sslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - sslClientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); + sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); sslClientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); } @@ -96,7 +96,7 @@ public static void startNonSSLServer() throws IOException { clientConfig = new Configuration(); clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); clientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false); - clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); + clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java index 4f57cc1a351b8..ce48ce442c680 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.clusterframework.overlays; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.clusterframework.ContainerSpecification; @@ -50,7 +49,7 @@ public void testConfigure() throws Exception { assertEquals(TARGET_KEYSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(SecurityOptions.SSL_KEYSTORE)); checkArtifact(spec, TARGET_KEYSTORE_PATH); - assertEquals(TARGET_TRUSTSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, null)); + assertEquals(TARGET_TRUSTSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(SecurityOptions.SSL_TRUSTSTORE)); checkArtifact(spec, TARGET_TRUSTSTORE_PATH); } @@ -70,7 +69,7 @@ public void testBuilderFromEnvironment() throws Exception { File truststore = tempFolder.newFile(); conf.setString(SecurityOptions.SSL_KEYSTORE, keystore.getAbsolutePath()); - conf.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, truststore.getAbsolutePath()); + conf.setString(SecurityOptions.SSL_TRUSTSTORE, truststore.getAbsolutePath()); SSLStoreOverlay.Builder builder = SSLStoreOverlay.newBuilder().fromEnvironment(conf); assertEquals(builder.keystorePath, keystore); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java index 74e47dc61a50e..a3588bab4d85c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java @@ -154,7 +154,7 @@ private Configuration createSslConfig() throws Exception { flinkConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); flinkConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); - flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); + flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); return flinkConfig; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java index 8c2ba6ed91915..895bb1e9eb8fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java @@ -42,7 +42,7 @@ public void testCreateSSLClientContext() throws Exception { Configuration clientConfig = new Configuration(); clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); + clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig); @@ -70,7 +70,7 @@ public void testCreateSSLClientContextMisconfiguration() { Configuration clientConfig = new Configuration(); clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); + clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "badpassword"); try { diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala index 448f06c63e6e7..a38bde3420fc2 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -59,7 +59,7 @@ class AkkaSslITCase(_system: ActorSystem) getClass.getResource("/local127.keystore").getPath) config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password") config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password") - config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, + config.setString(SecurityOptions.SSL_TRUSTSTORE, getClass.getResource("/local127.truststore").getPath) config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password") @@ -86,7 +86,7 @@ class AkkaSslITCase(_system: ActorSystem) getClass.getResource("/local127.keystore").getPath) config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password") config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password") - config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, + config.setString(SecurityOptions.SSL_TRUSTSTORE, getClass.getResource("/local127.truststore").getPath) config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password") @@ -125,7 +125,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setString(SecurityOptions.SSL_KEYSTORE, "invalid.keystore") config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password") config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password") - config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "invalid.keystore") + config.setString(SecurityOptions.SSL_TRUSTSTORE, "invalid.keystore") config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password") val cluster = new TestingCluster(config, false) From da992486b2068d926779ddcf606c6c678cb98fd0 Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 9 May 2017 11:37:15 +0200 Subject: [PATCH 6/9] SecurityOptions#SSL_TRUSTSTORE_PASSWORD --- .../org/apache/flink/configuration/ConfigConstants.java | 5 ++++- .../org/apache/flink/configuration/SecurityOptions.java | 7 +++++++ .../main/java/org/apache/flink/runtime/net/SSLUtils.java | 6 ++---- .../scala/org/apache/flink/runtime/akka/AkkaUtils.scala | 4 +--- .../org/apache/flink/runtime/blob/BlobClientSslTest.java | 4 ++-- .../runtime/io/network/netty/NettyClientServerSslTest.java | 3 +-- .../java/org/apache/flink/runtime/net/SSLUtilsTest.java | 4 ++-- .../org/apache/flink/runtime/akka/AkkaSslITCase.scala | 6 +++--- 8 files changed, 22 insertions(+), 17 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 3627649713098..fe27593865501 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -866,7 +866,10 @@ public final class ConfigConstants { @Deprecated public static final String SECURITY_SSL_TRUSTSTORE = "security.ssl.truststore"; - /** Secret to decrypt the truststore */ + /** + * @deprecated use {@link SecurityOptions#SSL_TRUSTSTORE_PASSWORD} instead + */ + @Deprecated public static final String SECURITY_SSL_TRUSTSTORE_PASSWORD = "security.ssl.truststore-password"; /** SSL protocol version to be supported */ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index 38d7738bbb160..3d94356aa522d 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -105,4 +105,11 @@ public class SecurityOptions { public static final ConfigOption SSL_TRUSTSTORE = key("security.ssl.truststore") .noDefaultValue(); + + /** + * Secret to decrypt the truststore. + */ + public static final ConfigOption SSL_TRUSTSTORE_PASSWORD = + key("security.ssl.truststore-password") + .noDefaultValue(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index efba694da4ccf..1879c4efe6df7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -140,15 +140,13 @@ public static SSLContext createSSLClientContext(Configuration sslConfig) throws LOG.debug("Creating client SSL context from configuration"); String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); - String trustStorePassword = sslConfig.getString( - ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, - null); + String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); String sslProtocolVersion = sslConfig.getString( ConfigConstants.SECURITY_SSL_PROTOCOL, ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL); Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); - Preconditions.checkNotNull(trustStorePassword, ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD + " was not configured."); + Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured."); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index efed1a4703efa..e07f7a6b8b0e8 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -309,9 +309,7 @@ object AkkaUtils { val akkaSSLTrustStore = configuration.getString(SecurityOptions.SSL_TRUSTSTORE) - val akkaSSLTrustStorePassword = configuration.getString( - ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, - null) + val akkaSSLTrustStorePassword = configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD) val akkaSSLProtocol = configuration.getString( ConfigConstants.SECURITY_SSL_PROTOCOL, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index 97ff3e54bd7d8..c02768db77b4b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -77,7 +77,7 @@ public static void startSSLServer() throws IOException { sslClientConfig = new Configuration(); sslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); - sslClientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); + sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); } /** @@ -97,7 +97,7 @@ public static void startNonSSLServer() throws IOException { clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); clientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false); clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); - clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); + clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java index a3588bab4d85c..d0e875b341c13 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelHandler; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.util.NetUtils; @@ -155,7 +154,7 @@ private Configuration createSslConfig() throws Exception { flinkConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); flinkConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); - flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); + flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); return flinkConfig; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java index 895bb1e9eb8fa..2af11ba7d3bed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java @@ -43,7 +43,7 @@ public void testCreateSSLClientContext() throws Exception { Configuration clientConfig = new Configuration(); clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); - clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); + clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig); Assert.assertNotNull(clientContext); @@ -71,7 +71,7 @@ public void testCreateSSLClientContextMisconfiguration() { Configuration clientConfig = new Configuration(); clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); - clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "badpassword"); + clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "badpassword"); try { SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig); diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala index a38bde3420fc2..f34e768d2ca46 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -62,7 +62,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setString(SecurityOptions.SSL_TRUSTSTORE, getClass.getResource("/local127.truststore").getPath) - config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password") + config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password") val cluster = new TestingCluster(config, false) @@ -89,7 +89,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setString(SecurityOptions.SSL_TRUSTSTORE, getClass.getResource("/local127.truststore").getPath) - config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password") + config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password") config.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLSv1,TLSv1.1") val cluster = new TestingCluster(config, false) @@ -126,7 +126,7 @@ class AkkaSslITCase(_system: ActorSystem) config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password") config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password") config.setString(SecurityOptions.SSL_TRUSTSTORE, "invalid.keystore") - config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password") + config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password") val cluster = new TestingCluster(config, false) From bcc430c4bbb8a53f9ab9b8271662763c8b18d32c Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 9 May 2017 11:40:49 +0200 Subject: [PATCH 7/9] SecurityOptions#SSL_PROTOCOL --- .../flink/configuration/ConfigConstants.java | 9 ++++++++- .../flink/configuration/SecurityOptions.java | 7 +++++++ .../org/apache/flink/runtime/net/SSLUtils.java | 16 ++++------------ .../apache/flink/runtime/akka/AkkaUtils.scala | 4 +--- .../apache/flink/runtime/net/SSLUtilsTest.java | 6 +++--- 5 files changed, 23 insertions(+), 19 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index fe27593865501..1c4ae84bec3ac 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -872,7 +872,10 @@ public final class ConfigConstants { @Deprecated public static final String SECURITY_SSL_TRUSTSTORE_PASSWORD = "security.ssl.truststore-password"; - /** SSL protocol version to be supported */ + /** + * @deprecated use {@link SecurityOptions#SSL_PROTOCOL} instead + */ + @Deprecated public static final String SECURITY_SSL_PROTOCOL = "security.ssl.protocol"; /** @@ -1563,6 +1566,10 @@ public final class ConfigConstants { @Deprecated public static boolean DEFAULT_SECURITY_SSL_ENABLED = false; + /** + * @deprecated use {@link SecurityOptions#SSL_PROTOCOL} instead + */ + @Deprecated public static String DEFAULT_SECURITY_SSL_PROTOCOL = "TLSv1.2"; public static String DEFAULT_SECURITY_SSL_ALGORITHMS = "TLS_RSA_WITH_AES_128_CBC_SHA"; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index 3d94356aa522d..2bc773f286512 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -112,4 +112,11 @@ public class SecurityOptions { public static final ConfigOption SSL_TRUSTSTORE_PASSWORD = key("security.ssl.truststore-password") .noDefaultValue(); + + /** + * SSL protocol version to be supported. + */ + public static final ConfigOption SSL_PROTOCOL = + key("security.ssl.protocol") + .defaultValue("TLSv1.2"); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index 1879c4efe6df7..1aa14a194f0fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -67,9 +67,7 @@ public static boolean getSSLEnabled(Configuration sslConfig) { */ public static void setSSLVerAndCipherSuites(ServerSocket socket, Configuration config) { if (socket instanceof SSLServerSocket) { - final String[] protocols = config.getString( - ConfigConstants.SECURITY_SSL_PROTOCOL, - ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(","); + final String[] protocols = config.getString(SecurityOptions.SSL_PROTOCOL).split(","); final String[] cipherSuites = config.getString( ConfigConstants.SECURITY_SSL_ALGORITHMS, @@ -93,9 +91,7 @@ public static void setSSLVerAndCipherSuites(ServerSocket socket, Configuration c * The application configuration */ public static void setSSLVerAndCipherSuites(SSLEngine engine, Configuration config) { - engine.setEnabledProtocols(config.getString( - ConfigConstants.SECURITY_SSL_PROTOCOL, - ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(",")); + engine.setEnabledProtocols(config.getString(SecurityOptions.SSL_PROTOCOL).split(",")); engine.setEnabledCipherSuites(config.getString( ConfigConstants.SECURITY_SSL_ALGORITHMS, ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(",")); @@ -141,9 +137,7 @@ public static SSLContext createSSLClientContext(Configuration sslConfig) throws String trustStoreFilePath = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE); String trustStorePassword = sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); - String sslProtocolVersion = sslConfig.getString( - ConfigConstants.SECURITY_SSL_PROTOCOL, - ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL); + String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); Preconditions.checkNotNull(trustStoreFilePath, SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured."); Preconditions.checkNotNull(trustStorePassword, SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured."); @@ -195,9 +189,7 @@ public static SSLContext createSSLServerContext(Configuration sslConfig) throws String certPassword = sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD); - String sslProtocolVersion = sslConfig.getString( - ConfigConstants.SECURITY_SSL_PROTOCOL, - ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL); + String sslProtocolVersion = sslConfig.getString(SecurityOptions.SSL_PROTOCOL); Preconditions.checkNotNull(keystoreFilePath, SecurityOptions.SSL_KEYSTORE.key() + " was not configured."); Preconditions.checkNotNull(keystorePassword, SecurityOptions.SSL_KEYSTORE_PASSWORD.key() + " was not configured."); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index e07f7a6b8b0e8..f3ce00bdaf7b5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -311,9 +311,7 @@ object AkkaUtils { val akkaSSLTrustStorePassword = configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD) - val akkaSSLProtocol = configuration.getString( - ConfigConstants.SECURITY_SSL_PROTOCOL, - ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL) + val akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL) val akkaSSLAlgorithmsString = configuration.getString( ConfigConstants.SECURITY_SSL_ALGORITHMS, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java index 2af11ba7d3bed..bee97af6ce5b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java @@ -141,7 +141,7 @@ public void testCreateSSLServerContextWithMultiProtocols() { serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1,TLSv1.2"); + serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1,TLSv1.2"); try { SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); @@ -162,7 +162,7 @@ public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exceptio serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1.1"); + serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1.1"); serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256"); SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); @@ -203,7 +203,7 @@ public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception { serverConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1"); + serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1"); serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"); SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); From a874694077ac54792c27643768e01d770d89a5f7 Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 9 May 2017 11:43:44 +0200 Subject: [PATCH 8/9] SecurityOptions#SSL_ALGORITHMS --- .../apache/flink/configuration/ConfigConstants.java | 10 +++++++--- .../apache/flink/configuration/SecurityOptions.java | 9 +++++++++ .../java/org/apache/flink/runtime/net/SSLUtils.java | 8 ++------ .../org/apache/flink/runtime/akka/AkkaUtils.scala | 4 +--- .../org/apache/flink/runtime/net/SSLUtilsTest.java | 5 ++--- .../org/apache/flink/runtime/akka/AkkaSslITCase.scala | 2 +- 6 files changed, 22 insertions(+), 16 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 1c4ae84bec3ac..30d65d11dd432 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -879,9 +879,9 @@ public final class ConfigConstants { public static final String SECURITY_SSL_PROTOCOL = "security.ssl.protocol"; /** - * The standard SSL algorithms to be supported - * More options here - http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites - * */ + * @deprecated use {@link SecurityOptions#SSL_ALGORITHMS} instead + */ + @Deprecated public static final String SECURITY_SSL_ALGORITHMS = "security.ssl.algorithms"; /** Flag to enable/disable hostname verification for the ssl connections */ @@ -1572,6 +1572,10 @@ public final class ConfigConstants { @Deprecated public static String DEFAULT_SECURITY_SSL_PROTOCOL = "TLSv1.2"; + /** + * @deprecated use {@link SecurityOptions#SSL_ALGORITHMS} instead + */ + @Deprecated public static String DEFAULT_SECURITY_SSL_ALGORITHMS = "TLS_RSA_WITH_AES_128_CBC_SHA"; public static boolean DEFAULT_SECURITY_SSL_VERIFY_HOSTNAME = true; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index 2bc773f286512..04417e649dd6c 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -119,4 +119,13 @@ public class SecurityOptions { public static final ConfigOption SSL_PROTOCOL = key("security.ssl.protocol") .defaultValue("TLSv1.2"); + + /** + * The standard SSL algorithms to be supported. + * + *

More options here - http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites + */ + public static final ConfigOption SSL_ALGORITHMS = + key("security.ssl.algorithms") + .defaultValue("TLS_RSA_WITH_AES_128_CBC_SHA"); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index 1aa14a194f0fb..5579d9dd4e4f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -69,9 +69,7 @@ public static void setSSLVerAndCipherSuites(ServerSocket socket, Configuration c if (socket instanceof SSLServerSocket) { final String[] protocols = config.getString(SecurityOptions.SSL_PROTOCOL).split(","); - final String[] cipherSuites = config.getString( - ConfigConstants.SECURITY_SSL_ALGORITHMS, - ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(","); + final String[] cipherSuites = config.getString(SecurityOptions.SSL_ALGORITHMS).split(","); if (LOG.isDebugEnabled()) { LOG.debug("Configuring TLS version and cipher suites on SSL socket {} / {}", @@ -92,9 +90,7 @@ public static void setSSLVerAndCipherSuites(ServerSocket socket, Configuration c */ public static void setSSLVerAndCipherSuites(SSLEngine engine, Configuration config) { engine.setEnabledProtocols(config.getString(SecurityOptions.SSL_PROTOCOL).split(",")); - engine.setEnabledCipherSuites(config.getString( - ConfigConstants.SECURITY_SSL_ALGORITHMS, - ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(",")); + engine.setEnabledCipherSuites(config.getString(SecurityOptions.SSL_ALGORITHMS).split(",")); } /** diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index f3ce00bdaf7b5..ea0821c563fb8 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -313,9 +313,7 @@ object AkkaUtils { val akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL) - val akkaSSLAlgorithmsString = configuration.getString( - ConfigConstants.SECURITY_SSL_ALGORITHMS, - ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS) + val akkaSSLAlgorithmsString = configuration.getString(SecurityOptions.SSL_ALGORITHMS) val akkaSSLAlgorithms = akkaSSLAlgorithmsString.split(",").toList.mkString("[", ",", "]") val configString = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java index bee97af6ce5b6..a3c2b7b7a91fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java @@ -17,7 +17,6 @@ */ package org.apache.flink.runtime.net; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; import org.junit.Assert; @@ -163,7 +162,7 @@ public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exceptio serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1.1"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256"); + serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256"); SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); ServerSocket socket = null; @@ -204,7 +203,7 @@ public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception { serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1"); - serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"); + serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"); SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); SSLEngine engine = serverContext.createSSLEngine(); diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala index f34e768d2ca46..4671981b29f29 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -90,7 +90,7 @@ class AkkaSslITCase(_system: ActorSystem) getClass.getResource("/local127.truststore").getPath) config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password") - config.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLSv1,TLSv1.1") + config.setString(SecurityOptions.SSL_ALGORITHMS, "TLSv1,TLSv1.1") val cluster = new TestingCluster(config, false) From 92ad5951803569ac9a9eade4bb6d6a94fb22a976 Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 9 May 2017 11:46:18 +0200 Subject: [PATCH 9/9] SecurityOptions#SSL_VERIFY_HOSTNAME --- .../org/apache/flink/configuration/ConfigConstants.java | 9 ++++++++- .../org/apache/flink/configuration/SecurityOptions.java | 7 +++++++ .../main/java/org/apache/flink/runtime/net/SSLUtils.java | 4 +--- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 30d65d11dd432..35d3d139766ac 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -884,7 +884,10 @@ public final class ConfigConstants { @Deprecated public static final String SECURITY_SSL_ALGORITHMS = "security.ssl.algorithms"; - /** Flag to enable/disable hostname verification for the ssl connections */ + /** + * @deprecated use {@link SecurityOptions#SSL_VERIFY_HOSTNAME} instead + */ + @Deprecated public static final String SECURITY_SSL_VERIFY_HOSTNAME = "security.ssl.verify-hostname"; // ----------------------------- Streaming -------------------------------- @@ -1578,6 +1581,10 @@ public final class ConfigConstants { @Deprecated public static String DEFAULT_SECURITY_SSL_ALGORITHMS = "TLS_RSA_WITH_AES_128_CBC_SHA"; + /** + * @deprecated use {@link SecurityOptions#SSL_VERIFY_HOSTNAME} instead + */ + @Deprecated public static boolean DEFAULT_SECURITY_SSL_VERIFY_HOSTNAME = true; // ----------------------------- Streaming Values -------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index 04417e649dd6c..2c353d8f6d1f8 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -128,4 +128,11 @@ public class SecurityOptions { public static final ConfigOption SSL_ALGORITHMS = key("security.ssl.algorithms") .defaultValue("TLS_RSA_WITH_AES_128_CBC_SHA"); + + /** + * Flag to enable/disable hostname verification for the ssl connections. + */ + public static final ConfigOption SSL_VERIFY_HOSTNAME = + key("security.ssl.verify-hostname") + .defaultValue(true); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index 5579d9dd4e4f6..015b3d6e157be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.net; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.util.Preconditions; @@ -106,8 +105,7 @@ public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters s Preconditions.checkNotNull(sslConfig); Preconditions.checkNotNull(sslParams); - boolean verifyHostname = sslConfig.getBoolean(ConfigConstants.SECURITY_SSL_VERIFY_HOSTNAME, - ConfigConstants.DEFAULT_SECURITY_SSL_VERIFY_HOSTNAME); + boolean verifyHostname = sslConfig.getBoolean(SecurityOptions.SSL_VERIFY_HOSTNAME); if (verifyHostname) { sslParams.setEndpointIdentificationAlgorithm("HTTPS"); }