From c4b0c02f8c791a85d1924fe4530e14aa839d9d19 Mon Sep 17 00:00:00 2001 From: Ilanji Rajamanickam Date: Mon, 28 Jun 2021 09:10:33 -0700 Subject: [PATCH 01/11] Log server version on sink task startup. Revert client upgrade --- .../elasticsearch/ElasticsearchSinkTask.java | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index 53552b3e6..34eab1235 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -16,6 +16,8 @@ package io.confluent.connect.elasticsearch; import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues; + +import org.apache.http.HttpHost; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; @@ -23,14 +25,21 @@ import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.core.MainResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class ElasticsearchSinkTask extends SinkTask { @@ -38,6 +47,7 @@ public class ElasticsearchSinkTask extends SinkTask { private DataConverter converter; private ElasticsearchClient client; + private RestHighLevelClient highLevelClient; private ElasticsearchSinkConnectorConfig config; private ErrantRecordReporter reporter; private Set existingMappings; @@ -72,7 +82,7 @@ protected void start(Map props, ElasticsearchClient client) { this.client = client != null ? client : new ElasticsearchClient(config, reporter); - log.info("Started ElasticsearchSinkTask."); + log.info("Started ElasticsearchSinkTask. Connecting to ES server version: {}", getServerVersion()); } @Override @@ -125,6 +135,33 @@ private void checkMapping(SinkRecord record) { } } + private String getServerVersion() { + ConfigCallbackHandler configCallbackHandler = new ConfigCallbackHandler(config); + this.highLevelClient = highLevelClient != null ? highLevelClient : new RestHighLevelClient( + RestClient + .builder( + config.connectionUrls() + .stream() + .map(HttpHost::create) + .collect(Collectors.toList()) + .toArray(new HttpHost[config.connectionUrls().size()]) + ) + .setHttpClientConfigCallback(configCallbackHandler) + .setRequestConfigCallback(configCallbackHandler) + ); + MainResponse response; + String esVersionNumber = "Unknown"; + try { + response = highLevelClient.info(RequestOptions.DEFAULT); + esVersionNumber = response.getVersion().getNumber(); + } catch (IOException | ElasticsearchStatusException e) { + // Same error messages as from validating the connection for IOException. + // Insufficient privileges to validate the version number if caught + // ElasticsearchStatusException. + } + return esVersionNumber; + } + /** * Returns the converted index name from a given topic name. Elasticsearch accepts: *
    From b13ea6eff4d8e82ec83f1b9805d43fd67dab405f Mon Sep 17 00:00:00 2001 From: Ilanji Rajamanickam Date: Mon, 28 Jun 2021 09:21:12 -0700 Subject: [PATCH 02/11] Fix spacing --- .../connect/elasticsearch/ElasticsearchSinkTask.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index 34eab1235..1a0e38adb 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -15,8 +15,6 @@ package io.confluent.connect.elasticsearch; -import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues; - import org.apache.http.HttpHost; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -41,6 +39,8 @@ import java.util.Set; import java.util.stream.Collectors; +import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues; + public class ElasticsearchSinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class); @@ -72,7 +72,6 @@ protected void start(Map props, ElasticsearchClient client) { if (context.errantRecordReporter() == null) { log.info("Errant record reporter not configured."); } - // may be null if DLQ not enabled reporter = context.errantRecordReporter(); } catch (NoClassDefFoundError | NoSuchMethodError e) { @@ -82,7 +81,8 @@ protected void start(Map props, ElasticsearchClient client) { this.client = client != null ? client : new ElasticsearchClient(config, reporter); - log.info("Started ElasticsearchSinkTask. Connecting to ES server version: {}", getServerVersion()); + log.info("Started ElasticsearchSinkTask. Connecting to ES server version: {}", + getServerVersion()); } @Override From fb85c05e528e58d49b0c0ba9712ced3b23107e63 Mon Sep 17 00:00:00 2001 From: Ilanji Rajamanickam Date: Mon, 28 Jun 2021 09:34:46 -0700 Subject: [PATCH 03/11] Fix build by replacing Metadata packages --- .../elasticsearch/ElasticsearchClient.java | 40 ++++++++++--------- .../elasticsearch/ElasticsearchSinkTask.java | 4 +- .../helper/ElasticsearchHelperClient.java | 11 ++--- 3 files changed, 29 insertions(+), 26 deletions(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java index 636549986..1a690bde5 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java @@ -15,25 +15,6 @@ package io.confluent.connect.elasticsearch; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG; -import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG; - -import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import org.apache.http.HttpHost; import org.apache.http.nio.conn.NHttpClientConnectionManager; import org.apache.kafka.common.utils.Time; @@ -62,6 +43,27 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc; + +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG; +import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG; + public class ElasticsearchClient { private static final Logger log = LoggerFactory.getLogger(ElasticsearchClient.class); diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index 1a0e38adb..57ba5d03f 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -25,10 +25,10 @@ import org.apache.kafka.connect.sink.SinkTask; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.main.MainResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.core.MainResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,7 +153,7 @@ private String getServerVersion() { String esVersionNumber = "Unknown"; try { response = highLevelClient.info(RequestOptions.DEFAULT); - esVersionNumber = response.getVersion().getNumber(); + esVersionNumber = response.getVersion().toString(); } catch (IOException | ElasticsearchStatusException e) { // Same error messages as from validating the connection for IOException. // Insufficient privileges to validate the version number if caught diff --git a/src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchHelperClient.java b/src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchHelperClient.java index 93939c3a3..83a006419 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchHelperClient.java +++ b/src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchHelperClient.java @@ -15,11 +15,6 @@ package io.confluent.connect.elasticsearch.helper; -import io.confluent.connect.elasticsearch.ConfigCallbackHandler; -import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig; -import java.io.IOException; -import java.util.Map.Entry; - import org.apache.http.HttpHost; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.search.SearchRequest; @@ -42,6 +37,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Map.Entry; + +import io.confluent.connect.elasticsearch.ConfigCallbackHandler; +import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig; + public class ElasticsearchHelperClient { private static final Logger log = LoggerFactory.getLogger(ElasticsearchHelperClient.class); From bd1fbd93b7441537b1b7a378db4f9f036193a01b Mon Sep 17 00:00:00 2001 From: Ilanji Rajamanickam Date: Mon, 28 Jun 2021 10:35:12 -0700 Subject: [PATCH 04/11] Catch all exceptions when getting ES server version. Close Client --- .../connect/elasticsearch/ElasticsearchSinkTask.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index 57ba5d03f..ae997d308 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -117,6 +117,11 @@ public void flush(Map offsets) { public void stop() { log.debug("Stopping Elasticsearch client."); client.close(); + try { + highLevelClient.close(); + } catch (Exception e) { + log.error("Failed to close high level rest client", e); + } } @Override @@ -154,7 +159,8 @@ private String getServerVersion() { try { response = highLevelClient.info(RequestOptions.DEFAULT); esVersionNumber = response.getVersion().toString(); - } catch (IOException | ElasticsearchStatusException e) { + } catch (Exception e) { + log.info("Failed to get ES server version", e); // Same error messages as from validating the connection for IOException. // Insufficient privileges to validate the version number if caught // ElasticsearchStatusException. From 8a332fcb14e364819ce0639f15d08b226bf077c8 Mon Sep 17 00:00:00 2001 From: Ilanji Rajamanickam Date: Mon, 28 Jun 2021 10:36:48 -0700 Subject: [PATCH 05/11] set high level client to null on close --- .../confluent/connect/elasticsearch/ElasticsearchSinkTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index ae997d308..904e90fd3 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -119,6 +119,7 @@ public void stop() { client.close(); try { highLevelClient.close(); + highLevelClient = null; } catch (Exception e) { log.error("Failed to close high level rest client", e); } From 84c99bf5877ebd7394150f4faf799bdf9f48d105 Mon Sep 17 00:00:00 2001 From: Ilanji Rajamanickam Date: Mon, 28 Jun 2021 10:50:00 -0700 Subject: [PATCH 06/11] Unused Imports --- .../confluent/connect/elasticsearch/ElasticsearchSinkTask.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index 904e90fd3..f1ca623e5 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -23,7 +23,6 @@ import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.main.MainResponse; import org.elasticsearch.client.RequestOptions; @@ -32,7 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Collection; import java.util.HashSet; import java.util.Map; From 1a660b156a5984fa9c457d80d9964dc4c8f3b191 Mon Sep 17 00:00:00 2001 From: Ilanji Rajamanickam Date: Mon, 28 Jun 2021 11:03:56 -0700 Subject: [PATCH 07/11] Move comments. change log level to warn --- .../confluent/connect/elasticsearch/ElasticsearchSinkTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index f1ca623e5..ed2cc39d6 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -159,10 +159,10 @@ private String getServerVersion() { response = highLevelClient.info(RequestOptions.DEFAULT); esVersionNumber = response.getVersion().toString(); } catch (Exception e) { - log.info("Failed to get ES server version", e); // Same error messages as from validating the connection for IOException. // Insufficient privileges to validate the version number if caught // ElasticsearchStatusException. + log.warn("Failed to get ES server version", e); } return esVersionNumber; } From 4a129ea435c998e36fa4e34b7928e8d7ec31ac77 Mon Sep 17 00:00:00 2001 From: Ilanji Rajamanickam Date: Mon, 28 Jun 2021 12:36:39 -0700 Subject: [PATCH 08/11] Close high level client after use` --- .../elasticsearch/ElasticsearchSinkTask.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index ed2cc39d6..cfffbd276 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -45,7 +45,6 @@ public class ElasticsearchSinkTask extends SinkTask { private DataConverter converter; private ElasticsearchClient client; - private RestHighLevelClient highLevelClient; private ElasticsearchSinkConnectorConfig config; private ErrantRecordReporter reporter; private Set existingMappings; @@ -115,12 +114,6 @@ public void flush(Map offsets) { public void stop() { log.debug("Stopping Elasticsearch client."); client.close(); - try { - highLevelClient.close(); - highLevelClient = null; - } catch (Exception e) { - log.error("Failed to close high level rest client", e); - } } @Override @@ -141,7 +134,7 @@ private void checkMapping(SinkRecord record) { private String getServerVersion() { ConfigCallbackHandler configCallbackHandler = new ConfigCallbackHandler(config); - this.highLevelClient = highLevelClient != null ? highLevelClient : new RestHighLevelClient( + RestHighLevelClient highLevelClient = new RestHighLevelClient( RestClient .builder( config.connectionUrls() @@ -164,6 +157,11 @@ private String getServerVersion() { // ElasticsearchStatusException. log.warn("Failed to get ES server version", e); } + try { + highLevelClient.close(); + } catch (Exception e) { + log.error("Failed to close high level client"); + } return esVersionNumber; } From d6ac411b9140afd25b43e482b6bc1fe89ec366d1 Mon Sep 17 00:00:00 2001 From: Ilanji Rajamanickam Date: Mon, 28 Jun 2021 13:00:31 -0700 Subject: [PATCH 09/11] Wrap client close in try --- .../connect/elasticsearch/ElasticsearchSinkTask.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index cfffbd276..b3fd014ab 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -156,11 +156,12 @@ private String getServerVersion() { // Insufficient privileges to validate the version number if caught // ElasticsearchStatusException. log.warn("Failed to get ES server version", e); - } - try { - highLevelClient.close(); - } catch (Exception e) { - log.error("Failed to close high level client"); + } finally { + try { + highLevelClient.close(); + } catch (Exception e) { + log.error("Failed to close high level client"); + } } return esVersionNumber; } From 94c8aa6e61cf5e0859288c68627211b7afe679f7 Mon Sep 17 00:00:00 2001 From: Ilanji Rajamanickam Date: Mon, 28 Jun 2021 13:01:11 -0700 Subject: [PATCH 10/11] Change log level to warn --- .../confluent/connect/elasticsearch/ElasticsearchSinkTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index b3fd014ab..b3da1ddef 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -160,7 +160,7 @@ private String getServerVersion() { try { highLevelClient.close(); } catch (Exception e) { - log.error("Failed to close high level client"); + log.warn("Failed to close high level client"); } } return esVersionNumber; From 06d5112ecc73b9cf36b7ab9c807c966de2a7e914 Mon Sep 17 00:00:00 2001 From: Ilanji Rajamanickam Date: Mon, 28 Jun 2021 13:21:51 -0700 Subject: [PATCH 11/11] Add error to log` --- .../confluent/connect/elasticsearch/ElasticsearchSinkTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index b3da1ddef..8ff7aa64d 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -160,7 +160,7 @@ private String getServerVersion() { try { highLevelClient.close(); } catch (Exception e) { - log.warn("Failed to close high level client"); + log.warn("Failed to close high level client", e); } } return esVersionNumber;