Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HTTPS support to Elasticsearch connector #278

Merged
merged 15 commits into from Feb 15, 2019
Expand Up @@ -20,17 +20,22 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;

import java.util.List;
import java.util.Map;

import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;
import static io.confluent.connect.elasticsearch.bulk.BulkProcessor.BehaviorOnMalformedDoc;
import static org.apache.kafka.common.config.SslConfigs.addClientSslSupport;

public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final String SSL_GROUP = "Security";

public static final String CONNECTION_URL_CONFIG = "connection.url";
private static final String CONNECTION_URL_DOC =
"List of Elasticsearch HTTP connection URLs e.g. ``http://eshost1:9200,"
+ "http://eshost2:9200``.";
"The comma-separated list of one or more Elasticsearch URLs, such as ``http://eshost1:9200,"
+ "http://eshost2:9200`` or ``https://eshost3:9200``. HTTPS is used for all connections "
+ "if any of the URLs starts with ``https:``. A URL without a protocol is treated as "
+ "``http``.";
public static final String CONNECTION_USERNAME_CONFIG = "connection.username";
private static final String CONNECTION_USERNAME_DOC =
"The username used to authenticate with Elasticsearch. "
Expand Down Expand Up @@ -107,7 +112,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
"Whether to ignore schemas during indexing. When this is set to ``true``, the record "
+ "schema will be ignored for the purpose of registering an Elasticsearch mapping. "
+ "Elasticsearch will infer the mapping from the data (dynamic mapping needs to be enabled "
+ "by the user).\n Note that this is a global config that applies to all topics, use ``"
+ "by the user).\n Note that this is a global config that applies to all topics. Use ``"
+ TOPIC_SCHEMA_IGNORE_CONFIG + "`` to override as ``true`` for specific topics.";
private static final String TOPIC_SCHEMA_IGNORE_DOC =
"List of topics for which ``" + SCHEMA_IGNORE_CONFIG + "`` should be ``true``.";
Expand Down Expand Up @@ -147,10 +152,19 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
+ " mapping conflict or a field name containing illegal characters. Valid options are "
+ "'ignore', 'warn', and 'fail'.";

public static final String CONNECTION_SSL_CONFIG_PREFIX = "elastic.https.";

protected static ConfigDef baseConfigDef() {
final ConfigDef configDef = new ConfigDef();
addConnectorConfigs(configDef);
addConversionConfigs(configDef);
ConfigDef sslConfigDef = new ConfigDef();
addClientSslSupport(sslConfigDef);
configDef.embed(
CONNECTION_SSL_CONFIG_PREFIX, SSL_GROUP,
configDef.configKeys().size() + 1, sslConfigDef
);

return configDef;
}

Expand Down Expand Up @@ -275,7 +289,13 @@ private static void addConnectorConfigs(ConfigDef configDef) {
group,
++order,
Width.SHORT,
"Read Timeout");
"Read Timeout"
);
cyrusv marked this conversation as resolved.
Show resolved Hide resolved
}

public boolean secured() {
List<String> address = getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG);
return address.stream().anyMatch(a -> a.startsWith("https:"));
}

private static void addConversionConfigs(ConfigDef configDef) {
Expand Down Expand Up @@ -390,6 +410,12 @@ public ElasticsearchSinkConnectorConfig(Map<String, String> props) {
super(CONFIG, props);
}

public Map<String, Object> sslConfigs() {
ConfigDef sslConfigDef = new ConfigDef();
addClientSslSupport(sslConfigDef);
return sslConfigDef.parse(originalsWithPrefix(CONNECTION_SSL_CONFIG_PREFIX));
}

public static void main(String[] args) {
System.out.println(CONFIG.toEnrichedRst());
}
Expand Down
Expand Up @@ -43,8 +43,12 @@
import io.searchbox.indices.mapping.GetMapping;
import io.searchbox.indices.mapping.PutMapping;
import org.apache.http.HttpHost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
Expand All @@ -56,8 +60,10 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;

public class JestElasticsearchClient implements ElasticsearchClient {
private static final Logger log = LoggerFactory.getLogger(JestElasticsearchClient.class);

// visible for testing
protected static final String MAPPER_PARSE_EXCEPTION
Expand Down Expand Up @@ -125,9 +131,9 @@ protected JestElasticsearchClient(Map<String, String> props, JestClientFactory f
ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG);
final Password password = config.getPassword(
ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG);

List<String> address =
config.getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG);

HttpClientConfig.Builder builder = new HttpClientConfig.Builder(address)
.connTimeout(connTimeout)
.readTimeout(readTimeout)
Expand All @@ -137,6 +143,26 @@ protected JestElasticsearchClient(Map<String, String> props, JestClientFactory f
.preemptiveAuthTargetHosts(address.stream()
.map(addr -> HttpHost.create(addr)).collect(Collectors.toSet()));
}

if (config.secured()) {
log.info("Using secured connection to {}", address);
SslFactory kafkaSslFactory = new SslFactory(Mode.CLIENT, null, false);
kafkaSslFactory.configure(config.sslConfigs());
SSLContext sslContext = kafkaSslFactory.sslContext();

// Sync calls
SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(
sslContext, SSLConnectionSocketFactory.getDefaultHostnameVerifier());
builder.sslSocketFactory(sslSocketFactory);

// Async calls
SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(
sslContext, SSLConnectionSocketFactory.getDefaultHostnameVerifier());
builder.httpsIOSessionStrategy(sessionStrategy);
} else {
log.info("Using unsecured connection to {}", address);
}

HttpClientConfig httpClientConfig = builder.build();
factory.setHttpClientConfig(httpClientConfig);
this.client = factory.getObject();
Expand Down
@@ -1,12 +1,16 @@
package io.confluent.connect.elasticsearch;

import org.apache.kafka.common.config.types.Password;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertFalse;

public class ElasticsearchSinkConnectorConfigTest {

private Map<String, String> props;
Expand Down Expand Up @@ -46,4 +50,41 @@ public void testSetHttpTimeoutsConfig() {
(Integer) 15000
);
}

@Test
public void testSslConfigs() {
cyrusv marked this conversation as resolved.
Show resolved Hide resolved
props.put("elastic.https.ssl.keystore.location", "/path");
props.put("elastic.https.ssl.keystore.password", "opensesame");
props.put("elastic.https.ssl.truststore.location", "/path2");
props.put("elastic.https.ssl.truststore.password", "opensesame2");
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
Map<String, Object> sslConfigs = config.sslConfigs();
Assert.assertTrue(sslConfigs.size() > 0);
Assert.assertEquals(
new Password("opensesame"),
sslConfigs.get("ssl.keystore.password")
);
Assert.assertEquals(
new Password("opensesame2"),
sslConfigs.get("ssl.truststore.password")
);
cyrusv marked this conversation as resolved.
Show resolved Hide resolved
Assert.assertEquals("/path", sslConfigs.get("ssl.keystore.location"));
Assert.assertEquals("/path2", sslConfigs.get("ssl.truststore.location"));
}

@Test
public void testSecured() {
props.put("connection.url", "http://host:9999");
assertFalse(new ElasticsearchSinkConnectorConfig(props).secured());

props.put("connection.url", "https://host:9999");
assertTrue(new ElasticsearchSinkConnectorConfig(props).secured());

props.put("connection.url", "http://host1:9992,https://host:9999");
assertTrue(new ElasticsearchSinkConnectorConfig(props).secured());

// Default behavior should be backwards compat
props.put("connection.url", "host1:9992");
assertFalse(new ElasticsearchSinkConnectorConfig(props).secured());
}
}