From 9300e21732774a8733625c721fbdf6a8ce431cb4 Mon Sep 17 00:00:00 2001 From: Nigel Liang Date: Wed, 27 May 2020 15:33:45 -0700 Subject: [PATCH] CC-9314: Allow disabling hostname verification (#410) * CC-9314: Allow disabling hostname verification When `elastic.https.ssl.endpoint.identification.algorithm` is empty, instead of default hostname verifier, use an all-trusting verifier. Updated SecurityIT with testcase to verify --- pom.xml | 13 +++-- .../ElasticsearchSinkConnectorConfig.java | 8 ++++ .../jest/JestElasticsearchClient.java | 12 +++-- .../HostnameVerificationDisabledIT.java | 36 ++++++++++++++ .../elasticsearch/integration/SecurityIT.java | 47 ++++++++++--------- src/test/resources/ssl/instances.yml | 1 - 6 files changed, 85 insertions(+), 32 deletions(-) create mode 100644 src/test/java/io/confluent/connect/elasticsearch/integration/HostnameVerificationDisabledIT.java diff --git a/pom.xml b/pom.xml index 55484fcef..9af63c9af 100644 --- a/pom.xml +++ b/pom.xml @@ -179,9 +179,6 @@ org.apache.maven.plugins maven-failsafe-plugin 3.0.0-M3 - - true - io.confluent @@ -332,7 +329,15 @@ maven-failsafe-plugin 3.0.0-M3 - false + + + **/HostnameVerificationDisabledIT.java + diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java index 7bd325283..bab2478f9 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java @@ -25,6 +25,7 @@ import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues; import static io.confluent.connect.elasticsearch.bulk.BulkProcessor.BehaviorOnMalformedDoc; +import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG; import static org.apache.kafka.common.config.SslConfigs.addClientSslSupport; public class ElasticsearchSinkConnectorConfig extends AbstractConfig { @@ -456,6 +457,13 @@ public Map sslConfigs() { return sslConfigDef.parse(originalsWithPrefix(CONNECTION_SSL_CONFIG_PREFIX)); } + public boolean shouldDisableHostnameVerification() { + String sslEndpointIdentificationAlgorithm = getString( + CONNECTION_SSL_CONFIG_PREFIX + SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); + return sslEndpointIdentificationAlgorithm != null + && sslEndpointIdentificationAlgorithm.isEmpty(); + } + public static void main(String[] args) { System.out.println(CONFIG.toEnrichedRst()); } diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java index 292cd552c..5ffc054ee 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java @@ -48,6 +48,7 @@ import io.searchbox.indices.IndicesExists; import io.searchbox.indices.Refresh; import io.searchbox.indices.mapping.PutMapping; +import javax.net.ssl.HostnameVerifier; import org.apache.http.HttpHost; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; @@ -194,14 +195,17 @@ private static void configureSslContext(HttpClientConfig.Builder builder, kafkaSslFactory.configure(config.sslConfigs()); SSLContext sslContext = kafkaSslFactory.sslContext(); + HostnameVerifier hostnameVerifier = config.shouldDisableHostnameVerification() + ? (hostname, session) -> true + : SSLConnectionSocketFactory.getDefaultHostnameVerifier(); + // Sync calls - SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(sslContext, - SSLConnectionSocketFactory.getDefaultHostnameVerifier()); + SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory( + sslContext, hostnameVerifier); builder.sslSocketFactory(sslSocketFactory); // Async calls - SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext, - SSLConnectionSocketFactory.getDefaultHostnameVerifier()); + SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext, hostnameVerifier); builder.httpsIOSessionStrategy(sessionStrategy); } diff --git a/src/test/java/io/confluent/connect/elasticsearch/integration/HostnameVerificationDisabledIT.java b/src/test/java/io/confluent/connect/elasticsearch/integration/HostnameVerificationDisabledIT.java new file mode 100644 index 000000000..9f3b965d2 --- /dev/null +++ b/src/test/java/io/confluent/connect/elasticsearch/integration/HostnameVerificationDisabledIT.java @@ -0,0 +1,36 @@ +package io.confluent.connect.elasticsearch.integration; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class HostnameVerificationDisabledIT extends SecurityIT { + private static final Logger log = LoggerFactory.getLogger(HostnameVerificationDisabledIT.class); + + @Test + public void testSecureConnectionHostnameVerificationDisabled() throws Throwable { + // Use 'localhost' here that is not in self-signed cert + String address = container.getConnectionUrl(); + address = address.replace(container.getContainerIpAddress(), "localhost"); + log.info("Creating connector for {}", address); + + connect.kafka().createTopic(KAFKA_TOPIC, 1); + + Map props = getProps(); + props.put("connection.url", address); + props.put("elastic.security.protocol", "SSL"); + props.put("elastic.https.ssl.keystore.location", container.getKeystorePath()); + props.put("elastic.https.ssl.keystore.password", container.getKeystorePassword()); + props.put("elastic.https.ssl.key.password", container.getKeyPassword()); + props.put("elastic.https.ssl.truststore.location", container.getTruststorePath()); + props.put("elastic.https.ssl.truststore.password", container.getTruststorePassword()); + + // disable hostname verification + props.put("elastic.https.ssl.endpoint.identification.algorithm", ""); + + // Start connector + testSecureConnection(props); + } +} diff --git a/src/test/java/io/confluent/connect/elasticsearch/integration/SecurityIT.java b/src/test/java/io/confluent/connect/elasticsearch/integration/SecurityIT.java index 2e7835829..02eda74c0 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/integration/SecurityIT.java +++ b/src/test/java/io/confluent/connect/elasticsearch/integration/SecurityIT.java @@ -8,10 +8,9 @@ import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; @@ -35,34 +34,25 @@ public class SecurityIT { protected static ElasticsearchContainer container; - private EmbeddedConnectCluster connect; + EmbeddedConnectCluster connect; private static final String MESSAGE_KEY = "message-key"; private static final String MESSAGE_VAL = "{ \"schema\": { \"type\": \"map\", \"keys\": " + "{ \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, " + "\"payload\": { \"key1\": 12, \"key2\": 15} }"; private static final String CONNECTOR_NAME = "elastic-sink"; - private static final String KAFKA_TOPIC = "test-elasticsearch-sink"; + static final String KAFKA_TOPIC = "test-elasticsearch-sink"; private static final String TYPE_NAME = "kafka-connect"; private static final int TASKS_MAX = 1; private static final int NUM_MSG = 200; private static final long VERIFY_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(2); - @BeforeClass - public static void setupBeforeAll() { - + @Before + public void setup() throws IOException { // Relevant and available docker images for elastic can be found at https://www.docker.elastic.co container = ElasticsearchContainer.fromSystemProperties().withSslEnabled(true); container.start(); - } - @AfterClass - public static void teardownAfterAll() { - container.close(); - } - - @Before - public void setup() throws IOException { connect = new EmbeddedConnectCluster.Builder().name("elastic-sink-cluster").build(); connect.start(); } @@ -70,9 +60,10 @@ public void setup() throws IOException { @After public void close() { connect.stop(); + container.close(); } - private Map getProps () { + Map getProps () { Map props = new HashMap<>(); props.put(CONNECTOR_CLASS_CONFIG, ElasticsearchSinkConnector.class.getName()); props.put(SinkConnectorConfig.TOPICS_CONFIG, KAFKA_TOPIC); @@ -85,17 +76,17 @@ private Map getProps () { /** * Run test against docker image running Elasticsearch. - * Certificates are generated with src/test/resources/certs/generate_certificates.sh + * Certificates are generated in src/test/resources/ssl/start-elasticsearch.sh */ @Test - public void testSecureConnection() throws Throwable { - // Use 'localhost' here because that's the IP address the certificates allow - final String address = container.getConnectionUrl(); + public void testSecureConnectionVerifiedHostname() throws Throwable { + // Use IP address here because that's what the certificates allow + String address = container.getConnectionUrl(); + address = address.replace(container.getContainerIpAddress(), container.hostMachineIpAddress()); log.info("Creating connector for {}", address); connect.kafka().createTopic(KAFKA_TOPIC, 1); - // Start connector Map props = getProps(); props.put("connection.url", address); props.put("elastic.security.protocol", "SSL"); @@ -104,10 +95,20 @@ public void testSecureConnection() throws Throwable { props.put("elastic.https.ssl.key.password", container.getKeyPassword()); props.put("elastic.https.ssl.truststore.location", container.getTruststorePath()); props.put("elastic.https.ssl.truststore.password", container.getTruststorePassword()); + + // Start connector + testSecureConnection(props); + } + + void testSecureConnection(Map props) throws Throwable { connect.configureConnector(CONNECTOR_NAME, props); waitForCondition(() -> { - ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME); - return info != null && info.tasks() != null && info.tasks().size() == 1; + try { + ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME); + return info != null && info.tasks() != null && info.tasks().size() == 1; + } catch (ConnectRestException e) { + return false; + } }, "Timed out waiting for connector task to start"); for (int i=0; i