Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4491] Handle index.number_of_shards in the ES connector #2790

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -50,38 +50,41 @@
*
* <p>
* When using the second constructor
* {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a {@link TransportClient} will
* be used.
* {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)}
* a {@link TransportClient} will be used.
*
* <p>
* <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
* can be connected to.
* <b>Attention: </b> When using the {@code TransportClient} the sink will fail
* if no cluster can be connected to.
*
* <p>
* The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
* {@link TransportClient}. The config keys can be found in the Elasticsearch
* documentation. An important setting is {@code cluster.name}, this should be set to the name
* of the cluster that the sink should emit to.
* The {@link Map} passed to the constructor is forwarded to Elasticsearch when
* creating {@link TransportClient}. The config keys can be found in the
* Elasticsearch documentation. An important setting is {@code cluster.name},
* this should be set to the name of the cluster that the sink should emit to.
*
* <p>
* Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
* This will buffer elements before sending a request to the cluster. The behaviour of the
* {@code BulkProcessor} can be configured using these config keys:
* Internally, the sink will use a {@link BulkProcessor} to send
* {@link IndexRequest IndexRequests}. This will buffer elements before sending
* a request to the cluster. The behaviour of the {@code BulkProcessor} can be
* configured using these config keys:
* <ul>
* <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
* <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
* <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
* settings in milliseconds
* <li>{@code bulk.flush.max.actions}: Maximum amount of elements to buffer
* <li>{@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to
* buffer
* <li>{@code bulk.flush.interval.ms}: Interval at which to flush data
* regardless of the other two settings in milliseconds
* </ul>
*
* <p>
* You also have to provide an {@link RequestIndexer}. This is used to create an
* {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
* {@link RequestIndexer} for an example.
* {@link IndexRequest} from an element that needs to be added to Elasticsearch.
* See {@link RequestIndexer} for an example.
*
* @param <T> Type of the elements emitted by this sink
* @param <T>
* Type of the elements emitted by this sink
*/
public class ElasticsearchSink<T> extends RichSinkFunction<T> {
public class ElasticsearchSink<T> extends RichSinkFunction<T> {

public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
Expand All @@ -92,18 +95,20 @@ public class ElasticsearchSink<T> extends RichSinkFunction<T> {
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);

/**
* The user specified config map that we forward to Elasticsearch when we create the Client.
* The user specified config map that we forward to Elasticsearch when we
* create the Client.
*/
private final Map<String, String> userConfig;

/**
* The list of nodes that the TransportClient should connect to. This is null if we are using
* an embedded Node to get a Client.
* The list of nodes that the TransportClient should connect to. This is
* null if we are using an embedded Node to get a Client.
*/
private final List<InetSocketAddress> transportAddresses;

/**
* The builder that is used to construct an {@link IndexRequest} from the incoming element.
* The builder that is used to construct an {@link IndexRequest} from the
* incoming element.
*/
private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;

Expand All @@ -123,24 +128,34 @@ public class ElasticsearchSink<T> extends RichSinkFunction<T> {
private transient RequestIndexer requestIndexer;

/**
* This is set from inside the BulkProcessor listener if there where failures in processing.
* This is set from inside the BulkProcessor listener if there where
* failures in processing.
*/
private final AtomicBoolean hasFailure = new AtomicBoolean(false);

/**
* This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
* This is set from inside the BulkProcessor listener if a Throwable was
* thrown during processing.
*/
private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();

/**
* Creates a new ElasticsearchSink that connects to the cluster using a TransportClient.
* Creates a new ElasticsearchSink that connects to the cluster using a
* TransportClient.
*
* @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor
* @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient}
* @param elasticsearchSinkFunction This is used to generate the ActionRequest from the incoming element
* @param userConfig
* The map of user settings that are passed when constructing the
* TransportClient and BulkProcessor
* @param transportAddresses
* The Elasticsearch Nodes to which to connect using a
* {@code TransportClient}
* @param elasticsearchSinkFunction
* This is used to generate the ActionRequest from the incoming
* element
*
*/
public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
this.userConfig = userConfig;
this.elasticsearchSinkFunction = elasticsearchSinkFunction;
Preconditions.checkArgument(transportAddresses != null && transportAddresses.size() > 0);
Expand All @@ -162,7 +177,7 @@ public void open(Configuration configuration) {
Settings settings = Settings.settingsBuilder().put(userConfig).build();

TransportClient transportClient = TransportClient.builder().settings(settings).build();
for (TransportAddress transport: transportNodes) {
for (TransportAddress transport : transportNodes) {
transportClient.addTransportAddress(transport);
}

Expand Down Expand Up @@ -215,12 +230,13 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}

if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
bulkProcessorBuilder
.setBulkSize(new ByteSizeValue(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
}

if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
bulkProcessorBuilder
.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
}

bulkProcessor = bulkProcessorBuilder.build();
Expand All @@ -244,12 +260,7 @@ public void close() {
}

if (hasFailure.get()) {
Throwable cause = failureThrowable.get();
if (cause != null) {
throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
} else {
throw new RuntimeException("An error occured in ElasticsearchSink.");
}
LOG.error("Some documents failed while indexing to Elasticsearch: " + failureThrowable.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to add a debug log statement as well logging the full stack trace.
Also, in the other connectors we have a flag that allows the user to control whether an error should be logged or fail the connector. I would suggest to add this here as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to substitute the current line 263 with a LOG.DEBUG + stacktrace or leave the current line or to add another LOG.debug line where we log only the stacktrace? Currently the detail of the error is logged within afterBulk(). Should we modify that part?

How do you suggest to control the error behaviour ? Is it ok comething like

public static final String CONFIG_KEY_BULK_ERROR_POLICY = "bulk.error.policy";

that a user can set to "strict" or "lenient" in the userConfig object?

}

}
Expand Down
@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.flink.streaming.connectors.elasticsearch2.helper;

import com.google.common.collect.ImmutableList;

import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.IndexNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
*
* This class manages the creation of index templates and index mapping on elasticsearch.
*
* <p>
* Example:
*
* <pre>{@code
* ElasticSearchHelper esHelper = new ElasticSearchHelper(config, transports);
*
* //Create an Index Template given a name and the json structure
* esHelper.initTemplate(templateName, templateRequest);
*
* //Create an Index Mapping given the Index Name, DocType and the json structure
* esHelper.initIndexMapping(indexName, docType, mappingsRequest);
*
* }</pre>
*
* <p>
* The {@link Map} passed to the constructor is forwarded to Elasticsearch when
* creating {@link TransportClient}. The config keys can be found in the
* Elasticsearch documentation. An important setting is {@code cluster.name},
* this should be set to the name of the cluster that the sink should emit to.
*
*/
public class ElasticSearchHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class needs a javadoc that explains what this it can be used for.


private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);

private Client client;

private final static int DEFAULT_INDEX_SHARDS = 2;
private final static int DEFAULT_INDEX_REPLICAS = 0;

/**
* Creates a new ElasticSearchHelper that connects to the cluster using a TransportClient.
*
* @param userConfig The map of user settings that are passed when constructing the TransportClients
* @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient}
*/
public ElasticSearchHelper(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses) {
client = buildElasticsearchClient(userConfig, transportAddresses);
}

/**
* Build a TransportClient to connect to the cluster.
*
* @param userConfig The map of user settings that are passed when constructing the TransportClients
* @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient}
* @return Initialized TransportClient
*/
public static Client buildElasticsearchClient(Map<String, String> userConfig,
List<InetSocketAddress> transportAddresses) {
List<TransportAddress> transportNodes;
transportNodes = new ArrayList<>(transportAddresses.size());
for (InetSocketAddress address : transportAddresses) {
transportNodes.add(new InetSocketTransportAddress(address));
}

Settings settings = Settings.settingsBuilder().put(userConfig).build();

TransportClient transportClient = TransportClient.builder().settings(settings).build();
for (TransportAddress transport : transportNodes) {
transportClient.addTransportAddress(transport);
}

// verify that we actually are connected to a cluster
ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(transportClient.connectedNodes());
if (nodes.isEmpty()) {
throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
}
return transportClient;
}

/**
* Create a new index template.
*
* @param templateName Name of the template to create
* @param templateReq Json defining the index template
*/
public void initTemplate(String templateName, String templateReq) throws Exception {
// Check if the template is set
if (templateReq != null && !templateReq.equals("")) {
// Json deserialization
PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(templateReq);
// Sending the request to elastic search
sendIndexTemplateRequest(request);
}
}

/**
* Send the index template request to elasticsearch.
*
* @param indexTemplateRequest A valid index template request
*/
public void sendIndexTemplateRequest(PutIndexTemplateRequest indexTemplateRequest) throws Exception {
// Check if the template is set
if (indexTemplateRequest != null) {
// Sending the request to elastic search
client.admin().indices().putTemplate(indexTemplateRequest).get();
}
}

/**
* Create a new mapping for a document type for an index.
*
* @param indexName Index name where add the mapping
* @param docType Document type of the mapping
* @param mappingReq Json defining the index mapping
*/
public void initIndexMapping(String indexName, String docType, String mappingReq) throws Exception {
PutMappingRequest request=new PutMappingRequest(indexName).source(mappingReq).type(docType);

// Put the mapping to the index
sendIndexMappingRequest(request);
LOG.debug("Updating mappings...");
}

/**
* Send the index mapping request to elasticsearch.
*
* @param mappingRequest A valid index mapping request
*/
public void sendIndexMappingRequest(PutMappingRequest mappingRequest) throws Exception {
// Check if the template is set
if (mappingRequest != null) {
try {
// Check if the index exists
SearchResponse response = client.prepareSearch(mappingRequest.indices())
.setTypes(mappingRequest.type()).get();
if (response != null) {
LOG.debug("Index found, no need to create it...");
}
} catch (IndexNotFoundException infe) {
for (String indexName:mappingRequest.indices()){
// If the index does not exist, create it
client.admin().indices().prepareCreate(indexName)
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, DEFAULT_INDEX_SHARDS)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, DEFAULT_INDEX_REPLICAS)).execute().actionGet();
LOG.info("Index "+indexName+" not found, creating it...");
}
}
// Sending the request to elastic search
client.admin().indices().putMapping(mappingRequest).get();
}
}

}