diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java index 636549986..1a690bde5 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java @@ -15,25 +15,6 @@ package io.confluent.connect.elasticsearch; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG; - -import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import org.apache.http.HttpHost; import org.apache.http.nio.conn.NHttpClientConnectionManager; import org.apache.kafka.common.utils.Time; @@ -62,6 +43,27 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc; + +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG; + public class ElasticsearchClient { private static final Logger log = LoggerFactory.getLogger(ElasticsearchClient.class); diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index 53552b3e6..8ff7aa64d 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -15,7 +15,7 @@ package io.confluent.connect.elasticsearch; -import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues; +import org.apache.http.HttpHost; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; @@ -24,6 +24,10 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.main.MainResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +35,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; + +import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues; public class ElasticsearchSinkTask extends SinkTask { @@ -62,7 +69,6 @@ protected void start(Map props, ElasticsearchClient client) { if (context.errantRecordReporter() == null) { log.info("Errant record reporter not configured."); } - // may be null if DLQ not enabled reporter = context.errantRecordReporter(); } catch (NoClassDefFoundError | NoSuchMethodError e) { @@ -72,7 +78,8 @@ protected void start(Map props, ElasticsearchClient client) { this.client = client != null ? client : new ElasticsearchClient(config, reporter); - log.info("Started ElasticsearchSinkTask."); + log.info("Started ElasticsearchSinkTask. Connecting to ES server version: {}", + getServerVersion()); } @Override @@ -125,6 +132,40 @@ private void checkMapping(SinkRecord record) { } } + private String getServerVersion() { + ConfigCallbackHandler configCallbackHandler = new ConfigCallbackHandler(config); + RestHighLevelClient highLevelClient = new RestHighLevelClient( + RestClient + .builder( + config.connectionUrls() + .stream() + .map(HttpHost::create) + .collect(Collectors.toList()) + .toArray(new HttpHost[config.connectionUrls().size()]) + ) + .setHttpClientConfigCallback(configCallbackHandler) + .setRequestConfigCallback(configCallbackHandler) + ); + MainResponse response; + String esVersionNumber = "Unknown"; + try { + response = highLevelClient.info(RequestOptions.DEFAULT); + esVersionNumber = response.getVersion().toString(); + } catch (Exception e) { + // Same error messages as from validating the connection for IOException. + // Insufficient privileges to validate the version number if caught + // ElasticsearchStatusException. + log.warn("Failed to get ES server version", e); + } finally { + try { + highLevelClient.close(); + } catch (Exception e) { + log.warn("Failed to close high level client", e); + } + } + return esVersionNumber; + } + /** * Returns the converted index name from a given topic name. Elasticsearch accepts: *