From 2815b0c9ca407838611ba60b9c35a0ac550005e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Sch=C3=A4fer?= Date: Wed, 17 May 2017 22:24:14 +0200 Subject: [PATCH] [FLINK-6065] Add initClient method to ElasticsearchApiCallBridge This adds the method `initClient` to `ElasticsearchApiCallBridge` in order to resolve FLINK-6065. This new method takes as argument a function that can create a `TransportClient`. This is required in order to not hardcode the `TransportClient` in the implementation of `createClient`. `createClient` continues to exist in order to allow backwards compatibility. No tests provided because I couldn't find any existing test class that tests the implementation of `ElasticsearchApiCallBridge`. --- .../ElasticsearchApiCallBridge.java | 18 ++++++++++++++++++ .../ElasticsearchSinkBaseTest.java | 8 ++++++++ .../Elasticsearch1ApiCallBridge.java | 13 ++++++++++++- .../Elasticsearch2ApiCallBridge.java | 13 ++++++++++++- .../Elasticsearch5ApiCallBridge.java | 13 ++++++++++++- 5 files changed, 62 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java index b48243285aba4..20da025e95577 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java @@ -21,10 +21,13 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; import javax.annotation.Nullable; import java.io.Serializable; import java.util.Map; +import java.util.function.Function; /** * An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls across different versions. @@ -40,11 +43,26 @@ public interface ElasticsearchApiCallBridge extends Serializable { /** * Creates an Elasticsearch {@link Client}. * + * In comparison to {@link initClient}, this method creates a default {@link Client}. + * * @param clientConfig The configuration to use when constructing the client. * @return The created client. */ Client createClient(Map clientConfig); + /** + * Initializes an Elasticsearch {@link Client}. + * + * A {@link Settings} object is created, which is passed to {@link mapper} in order to allow the creation of a + * {@link TransportClient}. {@link createClient} creates and initializes a default client for cases where the + * implementation doesn't matter. + * + * @param clientConfig The configuration to use when constructing the client. + * @param mapper Receives a {@link Settings} object that can be used to create a {@link TransportClient}. + * @return The initialized client that has been created by {@link mapper}. + */ + Client initClient(Map clientConfig, Function mapper); + /** * Extracts the cause of failure of a bulk item action. * diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java index b9df5c6f44fdb..ed1bde5b3757e 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java @@ -32,6 +32,8 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; import org.junit.Assert; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -44,6 +46,7 @@ import java.util.Map; import java.util.HashMap; import java.util.List; +import java.util.function.Function; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; @@ -518,6 +521,11 @@ public Client createClient(Map clientConfig) { return mock(Client.class); } + @Override + public Client initClient(Map clientConfig, Function mapper) { + return mock(Client.class); + } + @Nullable @Override public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) { diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java index 8a59da9750dd4..b192a50519d3d 100644 --- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java @@ -31,6 +31,7 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.function.Function; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.node.NodeBuilder.nodeBuilder; @@ -67,6 +68,16 @@ public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge { @Override public Client createClient(Map clientConfig) { + return initClient(clientConfig, new Function() { + @Override + public TransportClient apply(Settings settings) { + return new TransportClient(settings); + } + }); + } + + @Override + public Client initClient(Map clientConfig, Function mapper) { if (transportAddresses == null) { // Make sure that we disable http access to our embedded node @@ -93,7 +104,7 @@ public Client createClient(Map clientConfig) { .put(clientConfig) .build(); - TransportClient transportClient = new TransportClient(settings); + TransportClient transportClient = mapper.apply(settings); for (TransportAddress transport: transportAddresses) { transportClient.addTransportAddress(transport); } diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java index e85daf5af3ae8..5af557de4aebe 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java @@ -36,6 +36,7 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.function.Function; /** * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x. @@ -60,9 +61,19 @@ public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge { @Override public Client createClient(Map clientConfig) { + return initClient(clientConfig, new Function() { + @Override + public TransportClient apply(Settings settings) { + return TransportClient.builder().settings(settings).build(); + } + }); + } + + @Override + public Client initClient(Map clientConfig, Function mapper) { Settings settings = Settings.settingsBuilder().put(clientConfig).build(); - TransportClient transportClient = TransportClient.builder().settings(settings).build(); + TransportClient transportClient = mapper.apply(settings); for (TransportAddress address : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) { transportClient.addTransportAddress(address); } diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java index c7d81f551d6e1..d1270b8ee124d 100644 --- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java @@ -39,6 +39,7 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.function.Function; /** * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x. @@ -63,12 +64,22 @@ public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge { @Override public Client createClient(Map clientConfig) { + return initClient(clientConfig, new Function() { + @Override + public TransportClient apply(Settings settings) { + return new PreBuiltTransportClient(settings); + } + }); + } + + @Override + public Client initClient(Map clientConfig, Function mapper) { Settings settings = Settings.builder().put(clientConfig) .put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME) .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME) .build(); - TransportClient transportClient = new PreBuiltTransportClient(settings); + TransportClient transportClient = mapper.apply(settings); for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) { transportClient.addTransportAddress(transport); }