From 92ec4a24be6749a48ef0cfad3e04b0096cbe85f2 Mon Sep 17 00:00:00 2001 From: smarthi Date: Tue, 29 Dec 2015 04:55:57 -0500 Subject: [PATCH] FLINK-3115: Update ElasticSearch connector to 2.x --- .../flink-connector-elasticsearch2/pom.xml | 112 +++++++ .../elasticsearch2/ElasticsearchSink.java | 302 ++++++++++++++++++ .../elasticsearch2/IndexRequestBuilder.java | 66 ++++ .../examples/ElasticsearchExample.java | 78 +++++ .../ElasticsearchSinkITCase.java | 210 ++++++++++++ .../src/test/resources/log4j-test.properties | 27 ++ .../src/test/resources/logback-test.xml | 30 ++ flink-streaming-connectors/pom.xml | 1 + 8 files changed, 826 insertions(+) create mode 100644 flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml create mode 100644 flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java create mode 100644 flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/IndexRequestBuilder.java create mode 100644 flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java create mode 100644 flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java create mode 100644 flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties create mode 100644 flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml b/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml new file mode 100644 index 0000000000000..2b6ca1efa48a4 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml @@ -0,0 +1,112 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors-parent + 1.0-SNAPSHOT + .. + + + flink-connector-elasticsearch2 + flink-connector-elasticsearch2 + + jar + + + + 2.1.1 + + + + + + org.apache.flink + flink-streaming-java + ${project.version} + + + + org.elasticsearch + elasticsearch + ${elasticsearch.version} + + + + com.google.guava + guava + ${guava.version} + + + + com.fasterxml.jackson.core + jackson-core + 2.6.4 + + + + org.apache.flink + flink-streaming-java + ${project.version} + test + test-jar + + + + org.apache.flink + flink-tests + ${project.version} + test + + + + org.apache.flink + flink-test-utils + ${project.version} + test + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + 3 + + + + org.apache.maven.plugins + maven-failsafe-plugin + + 3 + + + + + + diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java new file mode 100644 index 0000000000000..82021f7177b9c --- /dev/null +++ b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch2; + +import com.google.common.collect.ImmutableList; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Sink that emits its input elements to an Elasticsearch cluster. + * + *

+ * When using the first constructor {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)} + * the sink will create a local {@link Node} for communicating with the + * Elasticsearch cluster. When using the second constructor + * {@link #ElasticsearchSink(java.util.Map, java.util.List, IndexRequestBuilder)} a {@link TransportClient} will + * be used instead. + * + *

+ * Attention: When using the {@code TransportClient} the sink will fail if no cluster + * can be connected to. With the {@code Node Client} the sink will block and wait for a cluster + * to come online. + * + *

+ * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating + * the {@link Node} or {@link TransportClient}. The config keys can be found in the Elasticsearch + * documentation. An important setting is {@code cluster.name}, this should be set to the name + * of the cluster that the sink should emit to. + * + *

+ * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. + * This will buffer elements before sending a request to the cluster. The behaviour of the + * {@code BulkProcessor} can be configured using these config keys: + *

+ * + *

+ * You also have to provide an {@link IndexRequestBuilder}. This is used to create an + * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See + * {@link IndexRequestBuilder} for an example. + * + * @param Type of the elements emitted by this sink + */ +public class ElasticsearchSink extends RichSinkFunction { + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); + + /** + * The user specified config map that we forward to Elasticsearch when we create the Client. + */ + private final Map userConfig; + + /** + * The list of nodes that the TransportClient should connect to. This is null if we are using + * an embedded Node to get a Client. + */ + private transient List transportNodes; + + /** + * The builder that is used to construct an {@link IndexRequest} from the incoming element. + */ + private final IndexRequestBuilder indexRequestBuilder; + + /** + * The embedded Node that is used to communicate with the Elasticsearch cluster. This is null + * if we are using a TransportClient. + */ + private transient Node node; + + /** + * The Client that was either retrieved from a Node or is a TransportClient. + */ + private transient Client client; + + /** + * Bulk processor that was created using the client + */ + private transient BulkProcessor bulkProcessor; + + /** + * This is set from inside the BulkProcessor listener if there where failures in processing. + */ + private final AtomicBoolean hasFailure = new AtomicBoolean(false); + + /** + * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing. + */ + private final AtomicReference failureThrowable = new AtomicReference<>(); + + /** + * Creates a new ElasticsearchSink that connects to the cluster using an embedded Node. + * + * @param userConfig The map of user settings that are passed when constructing the Node and BulkProcessor + * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element + */ + public ElasticsearchSink(Map userConfig, IndexRequestBuilder indexRequestBuilder) { + this.userConfig = userConfig; + this.indexRequestBuilder = indexRequestBuilder; + transportNodes = null; + } + + /** + * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient. + * + * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor + * @param transportNodes The Elasticsearch Nodes to which to connect using a {@code TransportClient} + * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element + * + */ + public ElasticsearchSink(Map userConfig, List transportNodes, IndexRequestBuilder indexRequestBuilder) { + this.userConfig = userConfig; + this.indexRequestBuilder = indexRequestBuilder; + this.transportNodes = transportNodes; + } + + /** + * Initializes the connection to Elasticsearch by either creating an embedded + * {@link org.elasticsearch.node.Node} and retrieving the + * {@link org.elasticsearch.client.Client} from it or by creating a + * {@link org.elasticsearch.client.transport.TransportClient}. + */ + @Override + public void open(Configuration configuration) { + if (transportNodes == null) { + // Make sure that we disable http access to our embedded node + Settings settings = + Settings.builder() + .put(userConfig) + .put("http.enabled", false) + .build(); + + node = NodeBuilder.nodeBuilder() + .settings(settings) + .client(true) + .data(false) + .node(); + + client = node.client(); + + if (LOG.isInfoEnabled()) { + LOG.info("Created Elasticsearch Client {} from embedded Node", client); + } + + } else { + Settings settings = Settings.settingsBuilder().put(userConfig).build(); + + TransportClient transportClient = TransportClient.builder().settings(settings).build(); + for (TransportAddress transport: transportNodes) { + transportClient.addTransportAddress(transport); + } + + // verify that we actually are connected to a cluster + ImmutableList nodes = ImmutableList.copyOf(transportClient.connectedNodes()); + if (nodes.isEmpty()) { + throw new RuntimeException("Client is not connected to any Elasticsearch nodes!"); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Connected to nodes: " + nodes.toString()); + } + } + + client = transportClient; + + if (LOG.isInfoEnabled()) { + LOG.info("Created Elasticsearch TransportClient {}", client); + } + } + + BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + if (response.hasFailures()) { + for (BulkItemResponse itemResp : response.getItems()) { + if (itemResp.isFailed()) { + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); + failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + } + } + hasFailure.set(true); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + LOG.error(failure.getMessage()); + failureThrowable.compareAndSet(null, failure); + hasFailure.set(true); + } + }); + + // This makes flush() blocking + bulkProcessorBuilder.setConcurrentRequests(0); + + ParameterTool params = ParameterTool.fromMap(userConfig); + + if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { + bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { + bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt( + CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB)); + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { + bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); + } + + bulkProcessor = bulkProcessorBuilder.build(); + } + + @Override + public void invoke(T element) { + IndexRequest indexRequest = indexRequestBuilder.createIndexRequest(element, getRuntimeContext()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Emitting IndexRequest: {}", indexRequest); + } + + bulkProcessor.add(indexRequest); + } + + @Override + public void close() { + if (bulkProcessor != null) { + bulkProcessor.close(); + bulkProcessor = null; + } + + if (client != null) { + client.close(); + } + + if (node != null) { + node.close(); + } + + if (hasFailure.get()) { + Throwable cause = failureThrowable.get(); + if (cause != null) { + throw new RuntimeException("An error occured in ElasticsearchSink.", cause); + } else { + throw new RuntimeException("An error occured in ElasticsearchSink."); + + } + } + } + +} diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/IndexRequestBuilder.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/IndexRequestBuilder.java new file mode 100644 index 0000000000000..5f5bd15e483fd --- /dev/null +++ b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/IndexRequestBuilder.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch2; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.elasticsearch.action.index.IndexRequest; + +import java.io.Serializable; + +/** + * Function that creates an {@link IndexRequest} from an element in a Stream. + * + *

+ * This is used by {@link ElasticsearchSink} + * to prepare elements for sending them to Elasticsearch. See + * Index API + * for information about how to format data for adding it to an Elasticsearch index. + * + *

+ * Example: + * + *

{@code
+ *     private static class MyIndexRequestBuilder implements IndexRequestBuilder {
+ *
+ *         public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
+ *             Map json = new HashMap<>();
+ *             json.put("data", element);
+ *
+ *             return Requests.indexRequest()
+ *                 .index("my-index")
+ *                 .type("my-type")
+ *                 .source(json);
+ *         }
+ *     }
+ * }
+ * + * @param The type of the element handled by this {@code IndexRequestBuilder} + */ +public interface IndexRequestBuilder extends Function, Serializable { + + /** + * Creates an {@link org.elasticsearch.action.index.IndexRequest} from an element. + * + * @param element The element that needs to be turned in to an {@code IndexRequest} + * @param ctx The Flink {@link RuntimeContext} of the {@link ElasticsearchSink} + * + * @return The constructed {@code IndexRequest} + */ + IndexRequest createIndexRequest(T element, RuntimeContext ctx); +} diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java new file mode 100644 index 0000000000000..b5732975f0b55 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch2.examples; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink; +import org.apache.flink.streaming.connectors.elasticsearch2.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.util.HashMap; +import java.util.Map; + +/** + * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that + * you have a cluster names "elasticsearch" running or change the cluster name in the config map. + */ +public class ElasticsearchExample { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SingleOutputStreamOperator source = + env.generateSequence(0, 20).map(new MapFunction() { + /** + * The mapping method. Takes an element from the input data set and transforms + * it into exactly one element. + * + * @param value The input value. + * @return The transformed value + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); + + Map config = new HashMap<>(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + source.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder() { + @Override + public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { + Map json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index("my-index") + .type("my-type").source(json); + } + })); + + + env.execute("Elasticsearch Example"); + } +} diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java new file mode 100644 index 0000000000000..8c7b805b725ff --- /dev/null +++ b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink; +import org.apache.flink.streaming.connectors.elasticsearch2.IndexRequestBuilder; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { + + private static final int NUM_ELEMENTS = 20; + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testNodeClient() throws Exception{ + + File dataDir = tempFolder.newFolder(); + + Node node = NodeBuilder.nodeBuilder() + .settings(Settings.settingsBuilder() + .put("path.home", dataDir.getParent()) + .put("http.enabled", false) + .put("path.data", dataDir.getAbsolutePath())) + // set a custom cluster name to verify that user config works correctly + .clusterName("my-node-client-cluster") + .local(true) + .node(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource> source = env.addSource(new TestSourceFunction()); + + Map config = new HashMap<>(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + config.put("cluster.name", "my-node-client-cluster"); + + // connect to our local node + config.put("node.local", "true"); + + // need this with ElasticSearch v2.x + config.put("path.home", dataDir.getParent()); + + source.addSink(new ElasticsearchSink<>(config, new TestIndexRequestBuilder())); + + env.execute("Elasticsearch Node Client Test"); + + + // verify the results + Client client = node.client(); + for (int i = 0; i < NUM_ELEMENTS; i++) { + GetResponse response = client.get(new GetRequest("my-index", + "my-type", Integer.toString(i))).actionGet(); + Assert.assertEquals("message #" + i, response.getSource().get("data")); + } + + node.close(); + } + + @Test + public void testTransportClient() throws Exception { + + File dataDir = tempFolder.newFolder(); + + Node node = NodeBuilder.nodeBuilder() + .settings(Settings.settingsBuilder() + .put("path.home", dataDir.getParent()) + .put("http.enabled", false) + .put("path.data", dataDir.getAbsolutePath())) + // set a custom cluster name to verify that user config works correctly + .clusterName("my-node-client-cluster") + .local(true) + .node(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource> source = env.addSource(new TestSourceFunction()); + + Map config = new HashMap<>(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + config.put("cluster.name", "my-node-client-cluster"); + + // need this with ElasticSearch v2.x + config.put("path.home", dataDir.getParent()); + + // connect to our local node + config.put("node.local", "true"); + + List transports = new ArrayList<>(); + transports.add(new LocalTransportAddress("1")); + + source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder())); + + env.execute("Elasticsearch TransportClient Test"); + + // verify the results + Client client = node.client(); + for (int i = 0; i < NUM_ELEMENTS; i++) { + GetResponse response = client.get(new GetRequest("my-index", + "my-type", Integer.toString(i))).actionGet(); + Assert.assertEquals("message #" + i, response.getSource().get("data")); + } + + node.close(); + } + + @Test(expected = JobExecutionException.class) + public void testTransportClientFails() throws Exception{ + // this checks whether the TransportClient fails early when there is no cluster to + // connect to. There isn't a similar test for the Node Client version since that + // one will block and wait for a cluster to come online + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource> source = env.addSource(new TestSourceFunction()); + + Map config = new HashMap<>(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + config.put("cluster.name", "my-node-client-cluster"); + + // connect to our local node + config.put("node.local", "true"); + + List transports = new ArrayList<>(); + transports.add(new LocalTransportAddress("1")); + + source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder())); + + env.execute("Elasticsearch Node Client Test"); + } + + private static class TestSourceFunction implements SourceFunction> { + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + @Override + public void run(SourceContext> ctx) throws Exception { + for (int i = 0; i < NUM_ELEMENTS && running; i++) { + ctx.collect(Tuple2.of(i, "message #" + i)); + } + } + + @Override + public void cancel() { + running = false; + } + } + + private static class TestIndexRequestBuilder implements IndexRequestBuilder> { + private static final long serialVersionUID = 1L; + + @Override + public IndexRequest createIndexRequest(Tuple2 element, RuntimeContext ctx) { + Map json = new HashMap<>(); + json.put("data", element.f1); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .id(element.f0.toString()) + .source(json); + } + } +} diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties new file mode 100644 index 0000000000000..dc207262c0760 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml new file mode 100644 index 0000000000000..45b3b92f0cf65 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n + + + + + + + + \ No newline at end of file diff --git a/flink-streaming-connectors/pom.xml b/flink-streaming-connectors/pom.xml index 1b829f29833b7..d2cceb5811e8c 100644 --- a/flink-streaming-connectors/pom.xml +++ b/flink-streaming-connectors/pom.xml @@ -39,6 +39,7 @@ under the License. flink-connector-flume flink-connector-kafka flink-connector-elasticsearch + flink-connector-elasticsearch2 flink-connector-rabbitmq flink-connector-twitter flink-connector-nifi