Skip to content

Commit

Permalink
CC-9314: Allow disabling hostname verification (#410)
Browse files Browse the repository at this point in the history
* CC-9314: Allow disabling hostname verification

When `elastic.https.ssl.endpoint.identification.algorithm` is empty, instead
of default hostname verifier, use an all-trusting verifier.

Updated SecurityIT with testcase to verify
  • Loading branch information
ncliang committed May 27, 2020
1 parent 3f38c59 commit 9300e21
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 32 deletions.
13 changes: 9 additions & 4 deletions pom.xml
Expand Up @@ -179,9 +179,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.0.0-M3</version>
<configuration>
<skipITs>true</skipITs>
</configuration>
</plugin>
<plugin>
<groupId>io.confluent</groupId>
Expand Down Expand Up @@ -332,7 +329,15 @@
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.0.0-M3</version>
<configuration>
<skipITs>false</skipITs>
<excludes>
<!--
The jenkins container is only accessible through its DOCKER_HOST IP
address. This makes it hard to test both hostname verification
enabled and disabled cases with the same cert. So, disable this IT
in jenkins and only test the enabled case in SecurityIT.
-->
<exclude>**/HostnameVerificationDisabledIT.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
Expand Down
Expand Up @@ -25,6 +25,7 @@

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;
import static io.confluent.connect.elasticsearch.bulk.BulkProcessor.BehaviorOnMalformedDoc;
import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.addClientSslSupport;

public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
Expand Down Expand Up @@ -456,6 +457,13 @@ public Map<String, Object> sslConfigs() {
return sslConfigDef.parse(originalsWithPrefix(CONNECTION_SSL_CONFIG_PREFIX));
}

public boolean shouldDisableHostnameVerification() {
String sslEndpointIdentificationAlgorithm = getString(
CONNECTION_SSL_CONFIG_PREFIX + SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
return sslEndpointIdentificationAlgorithm != null
&& sslEndpointIdentificationAlgorithm.isEmpty();
}

public static void main(String[] args) {
System.out.println(CONFIG.toEnrichedRst());
}
Expand Down
Expand Up @@ -48,6 +48,7 @@
import io.searchbox.indices.IndicesExists;
import io.searchbox.indices.Refresh;
import io.searchbox.indices.mapping.PutMapping;
import javax.net.ssl.HostnameVerifier;
import org.apache.http.HttpHost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
Expand Down Expand Up @@ -194,14 +195,17 @@ private static void configureSslContext(HttpClientConfig.Builder builder,
kafkaSslFactory.configure(config.sslConfigs());
SSLContext sslContext = kafkaSslFactory.sslContext();

HostnameVerifier hostnameVerifier = config.shouldDisableHostnameVerification()
? (hostname, session) -> true
: SSLConnectionSocketFactory.getDefaultHostnameVerifier();

// Sync calls
SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(sslContext,
SSLConnectionSocketFactory.getDefaultHostnameVerifier());
SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(
sslContext, hostnameVerifier);
builder.sslSocketFactory(sslSocketFactory);

// Async calls
SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext,
SSLConnectionSocketFactory.getDefaultHostnameVerifier());
SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext, hostnameVerifier);
builder.httpsIOSessionStrategy(sessionStrategy);
}

Expand Down
@@ -0,0 +1,36 @@
package io.confluent.connect.elasticsearch.integration;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class HostnameVerificationDisabledIT extends SecurityIT {
private static final Logger log = LoggerFactory.getLogger(HostnameVerificationDisabledIT.class);

@Test
public void testSecureConnectionHostnameVerificationDisabled() throws Throwable {
// Use 'localhost' here that is not in self-signed cert
String address = container.getConnectionUrl();
address = address.replace(container.getContainerIpAddress(), "localhost");
log.info("Creating connector for {}", address);

connect.kafka().createTopic(KAFKA_TOPIC, 1);

Map<String, String> props = getProps();
props.put("connection.url", address);
props.put("elastic.security.protocol", "SSL");
props.put("elastic.https.ssl.keystore.location", container.getKeystorePath());
props.put("elastic.https.ssl.keystore.password", container.getKeystorePassword());
props.put("elastic.https.ssl.key.password", container.getKeyPassword());
props.put("elastic.https.ssl.truststore.location", container.getTruststorePath());
props.put("elastic.https.ssl.truststore.password", container.getTruststorePassword());

// disable hostname verification
props.put("elastic.https.ssl.endpoint.identification.algorithm", "");

// Start connector
testSecureConnection(props);
}
}
Expand Up @@ -8,10 +8,9 @@
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
Expand All @@ -35,44 +34,36 @@ public class SecurityIT {

protected static ElasticsearchContainer container;

private EmbeddedConnectCluster connect;
EmbeddedConnectCluster connect;

private static final String MESSAGE_KEY = "message-key";
private static final String MESSAGE_VAL = "{ \"schema\": { \"type\": \"map\", \"keys\": "
+ "{ \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, "
+ "\"payload\": { \"key1\": 12, \"key2\": 15} }";
private static final String CONNECTOR_NAME = "elastic-sink";
private static final String KAFKA_TOPIC = "test-elasticsearch-sink";
static final String KAFKA_TOPIC = "test-elasticsearch-sink";
private static final String TYPE_NAME = "kafka-connect";
private static final int TASKS_MAX = 1;
private static final int NUM_MSG = 200;
private static final long VERIFY_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(2);

@BeforeClass
public static void setupBeforeAll() {

@Before
public void setup() throws IOException {
// Relevant and available docker images for elastic can be found at https://www.docker.elastic.co
container = ElasticsearchContainer.fromSystemProperties().withSslEnabled(true);
container.start();
}

@AfterClass
public static void teardownAfterAll() {
container.close();
}

@Before
public void setup() throws IOException {
connect = new EmbeddedConnectCluster.Builder().name("elastic-sink-cluster").build();
connect.start();
}

@After
public void close() {
connect.stop();
container.close();
}

private Map<String, String> getProps () {
Map<String, String> getProps () {
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG, ElasticsearchSinkConnector.class.getName());
props.put(SinkConnectorConfig.TOPICS_CONFIG, KAFKA_TOPIC);
Expand All @@ -85,17 +76,17 @@ private Map<String, String> getProps () {

/**
* Run test against docker image running Elasticsearch.
* Certificates are generated with src/test/resources/certs/generate_certificates.sh
* Certificates are generated in src/test/resources/ssl/start-elasticsearch.sh
*/
@Test
public void testSecureConnection() throws Throwable {
// Use 'localhost' here because that's the IP address the certificates allow
final String address = container.getConnectionUrl();
public void testSecureConnectionVerifiedHostname() throws Throwable {
// Use IP address here because that's what the certificates allow
String address = container.getConnectionUrl();
address = address.replace(container.getContainerIpAddress(), container.hostMachineIpAddress());
log.info("Creating connector for {}", address);

connect.kafka().createTopic(KAFKA_TOPIC, 1);

// Start connector
Map<String, String> props = getProps();
props.put("connection.url", address);
props.put("elastic.security.protocol", "SSL");
Expand All @@ -104,10 +95,20 @@ public void testSecureConnection() throws Throwable {
props.put("elastic.https.ssl.key.password", container.getKeyPassword());
props.put("elastic.https.ssl.truststore.location", container.getTruststorePath());
props.put("elastic.https.ssl.truststore.password", container.getTruststorePassword());

// Start connector
testSecureConnection(props);
}

void testSecureConnection(Map<String, String> props) throws Throwable {
connect.configureConnector(CONNECTOR_NAME, props);
waitForCondition(() -> {
ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME);
return info != null && info.tasks() != null && info.tasks().size() == 1;
try {
ConnectorStateInfo info = connect.connectorStatus(CONNECTOR_NAME);
return info != null && info.tasks() != null && info.tasks().size() == 1;
} catch (ConnectRestException e) {
return false;
}
}, "Timed out waiting for connector task to start");

for (int i=0; i<NUM_MSG; i++){
Expand Down
1 change: 0 additions & 1 deletion src/test/resources/ssl/instances.yml
Expand Up @@ -2,7 +2,6 @@ instances:
- name: elasticsearch
dns:
- elasticsearch
- localhost
ip:
- ipAddress
- name: kibana
Expand Down

0 comments on commit 9300e21

Please sign in to comment.