Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace BulkProcessor with our own implementation #2

Merged
merged 1 commit into from Jun 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -12,7 +12,7 @@
<confluent.version>3.0.0-SNAPSHOT</confluent.version>
<kafka.version>0.10.1.0-SNAPSHOT</kafka.version>
<junit.version>4.12</junit.version>
<es.version>2.2.1</es.version>
<es.version>2.3.3</es.version>
<lucene.version>5.3.1</lucene.version>
<slf4j.version>1.7.5</slf4j.version>
<jna.version>4.2.1</jna.version>
Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,6 +40,8 @@
import java.util.Map;
import java.util.Set;

import io.confluent.connect.elasticsearch.internals.ESRequest;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;

Expand Down Expand Up @@ -96,7 +97,7 @@ public static String convertKey(Object key, Schema keySchema) {
* @return The converted IndexRequest.
*/

public static IndexRequest convertRecord(
public static ESRequest convertRecord(
SinkRecord record,
String type,
Client client,
Expand Down Expand Up @@ -160,7 +161,7 @@ public static IndexRequest convertRecord(
}

byte[] json = converter.fromConnectData(topic, newSchema, newValue);
return new IndexRequest(index, type, id).source(json);
return new ESRequest(index, type, id, json);
}

// We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
Expand Down
Expand Up @@ -72,9 +72,38 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

public static final String BATCH_SIZE_CONFIG = "batch.size";
private static final String BATCH_SIZE_DOC = "The number of requests to process as a batch when writing to Elasticsearch.";
private static final long BATCH_SIZE_DEFAULT = 10000;
private static final int BATCH_SIZE_DEFAULT = 10000;
private static final String BATCH_SIZE_DISPLAY = "Batch Size";

public static final String LINGER_MS_CONFIG = "linger.ms";
private static final String LINGER_MS_DOC =
"The task groups together any records that arrive in between request transmissions into a single batched request. "
+ "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the "
+ "tasks may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount "
+ "of artificial delay. Rather than immediately sending out a record the task will wait for up to the given delay to allow other "
+ "records to be sent so that the sends can be batched together.";
private static final long LINGER_MS_DEFAULT = 1;
private static final String LINGER_MS_DISPLAY = "Linger (ms)";

public static final String MAX_IN_FLIGHT_REQUESTS_CONFIG = "max.in.flight.requests";
private static final String MAX_IN_FLIGHT_REQUESTS_DOC =
"The maximum number of incomplete batches each task will send before blocking. Note that if this is set to be greater "
+ "than 1 and there are failed sends, there is a risk of message re-ordering due to retries";
private static final int MAX_IN_FLIGHT_REQUESTS_DEFAULT = 5;
private static final String MAX_IN_FLIGHT_REQUESTS_DISPLAY = "Max in Flight Requests";

public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
private static final String RETRY_BACKOFF_MS_DOC =
"The amount of time to wait before attempting to retry a failed batch. "
+ "This avoids repeatedly sending requests in a tight loop under some failure scenarios.";
private static final long RETRY_BACKOFF_MS_DEFAULT = 100L;
private static final String RETRY_BACKOFF_MS_DISPLAY = "Retry Backoff (ms)";

public static final String MAX_RETRY_CONFIG = "max.retry";
private static final String MAX_RETRY_DOC = "The max allowed number of retries. Allowing retries will potentially change the ordering of records.";
private static final int MAX_RETRY_DEFAULT = 5;
private static final String MAX_RETRY_DISPLAY = "Max Retry";

public static final String SCHEMA_IGNORE_CONFIG = "schema.ignore";
private static final String SCHEMA_IGNORE_DOC =
"Whether to ignore schemas during indexing. When this is set to true, the schema in `SinkRecord` will be ignored and Elasticsearch will infer the mapping from data. "
Expand All @@ -92,15 +121,20 @@ public static ConfigDef baseConfigDef() {
return new ConfigDef()
.define(TRANSPORT_ADDRESSES_CONFIG, Type.LIST, Importance.HIGH, TRANSPORT_ADDRESSES_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG, TRANSPORT_ADDRESSES_DISPLAY)
.define(TYPE_NAME_CONFIG, Type.STRING, Importance.HIGH, TYPE_NAME_DOC, ELASTICSEARCH_GROUP, 2, Width.SHORT, TYPE_NAME_DISPLAY)
.define(KEY_IGNORE_CONFIG, Type.BOOLEAN, KEY_IGNORE_DEFAULT, Importance.HIGH, KEY_IGNORE_DOC, CONNECTOR_GROUP, 1, Width.SHORT, KEY_IGNORE_DISPLAY)
.define(FLUSH_TIMEOUT_MS_CONFIG, Type.LONG, FLUSH_TIMEOUT_MS_DEFAULT, Importance.MEDIUM, FLUSH_TIMEOUT_MS_DOC, CONNECTOR_GROUP, 2, Width.SHORT, FLUSH_TIMEOUT_MS_DISPLAY)
.define(MAX_BUFFERED_RECORDS_CONFIG, Type.LONG, MAX_BUFFERED_RECORDS_DEFAULT, Importance.MEDIUM, MAX_BUFFERED_RECORDS_DOC, CONNECTOR_GROUP, 3, Width.SHORT, MAX_BUFFERED_RECORDS_DISPLAY)
.define(BATCH_SIZE_CONFIG, Type.LONG, BATCH_SIZE_DEFAULT, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP, 4, Width.SHORT, BATCH_SIZE_DISPLAY)
.define(TOPIC_INDEX_MAP_CONFIG, Type.LIST, TOPIC_INDEX_MAP_DEFAULT, Importance.LOW,
TOPIC_INDEX_MAP_DOC, CONNECTOR_GROUP, 5, Width.LONG, TOPIC_INDEX_MAP_DISPLAY)
.define(TOPIC_KEY_IGNORE_CONFIG, Type.LIST, TOPIC_KEY_IGNORE_DEFAULT, Importance.LOW, TOPIC_KEY_IGNORE_DOC, CONNECTOR_GROUP, 6, Width.LONG, TOPIC_KEY_IGNORE_DISPLAY)
.define(SCHEMA_IGNORE_CONFIG, Type.BOOLEAN, SCHEMA_IGNORE_DEFAULT, Importance.LOW, SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 7, Width.SHORT, SCHEMA_IGNORE_DISPLAY)
.define(TOPIC_SCHEMA_IGNORE_CONFIG, Type.LIST, TOPIC_SCHEMA_IGNORE_DEFAULT, Importance.LOW, TOPIC_SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 8, Width.LONG, TOPIC_SCHEMA_IGNORE_DISPLAY);
.define(KEY_IGNORE_CONFIG, Type.BOOLEAN, KEY_IGNORE_DEFAULT, Importance.HIGH, KEY_IGNORE_DOC, CONNECTOR_GROUP, 3, Width.SHORT, KEY_IGNORE_DISPLAY)
.define(BATCH_SIZE_CONFIG, Type.INT, BATCH_SIZE_DEFAULT, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP, 4, Width.SHORT, BATCH_SIZE_DISPLAY)
.define(MAX_IN_FLIGHT_REQUESTS_CONFIG, Type.INT, MAX_IN_FLIGHT_REQUESTS_DEFAULT, Importance.MEDIUM,
MAX_IN_FLIGHT_REQUESTS_DOC, CONNECTOR_GROUP, 5, Width.SHORT,
MAX_IN_FLIGHT_REQUESTS_DISPLAY)
.define(TOPIC_INDEX_MAP_CONFIG, Type.LIST, TOPIC_INDEX_MAP_DEFAULT, Importance.LOW, TOPIC_INDEX_MAP_DOC, CONNECTOR_GROUP, 6, Width.LONG, TOPIC_INDEX_MAP_DISPLAY)
.define(TOPIC_KEY_IGNORE_CONFIG, Type.LIST, TOPIC_KEY_IGNORE_DEFAULT, Importance.LOW, TOPIC_KEY_IGNORE_DOC, CONNECTOR_GROUP, 7, Width.LONG, TOPIC_KEY_IGNORE_DISPLAY)
.define(SCHEMA_IGNORE_CONFIG, Type.BOOLEAN, SCHEMA_IGNORE_DEFAULT, Importance.LOW, SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 8, Width.SHORT, SCHEMA_IGNORE_DISPLAY)
.define(TOPIC_SCHEMA_IGNORE_CONFIG, Type.LIST, TOPIC_SCHEMA_IGNORE_DEFAULT, Importance.LOW, TOPIC_SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 9, Width.LONG, TOPIC_SCHEMA_IGNORE_DISPLAY)
.define(LINGER_MS_CONFIG, Type.LONG, LINGER_MS_DEFAULT, Importance.LOW, LINGER_MS_DOC, CONNECTOR_GROUP, 10, Width.SHORT, LINGER_MS_DISPLAY)
.define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, RETRY_BACKOFF_MS_DEFAULT, Importance.LOW, RETRY_BACKOFF_MS_DOC, CONNECTOR_GROUP, 11, Width.SHORT, RETRY_BACKOFF_MS_DISPLAY)
.define(MAX_RETRY_CONFIG, Type.INT, MAX_RETRY_DEFAULT, Importance.LOW, MAX_RETRY_DOC, CONNECTOR_GROUP, 12, Width.SHORT, MAX_RETRY_DISPLAY)
.define(FLUSH_TIMEOUT_MS_CONFIG, Type.LONG, FLUSH_TIMEOUT_MS_DEFAULT, Importance.LOW, FLUSH_TIMEOUT_MS_DOC, CONNECTOR_GROUP, 13, Width.SHORT, FLUSH_TIMEOUT_MS_DISPLAY)
.define(MAX_BUFFERED_RECORDS_CONFIG, Type.LONG, MAX_BUFFERED_RECORDS_DEFAULT, Importance.LOW, MAX_BUFFERED_RECORDS_DOC, CONNECTOR_GROUP, 14, Width.SHORT, MAX_BUFFERED_RECORDS_DISPLAY);
}

static ConfigDef config = baseConfigDef();
Expand Down
Expand Up @@ -83,7 +83,11 @@ public void start(Map<String, String> props, Client client) {

long flushTimeoutMs = config.getLong(ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG);
long maxBufferedRecords = config.getLong(ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG);
long batchSize = config.getLong(ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG);
int batchSize = config.getInt(ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG);
long lingerMs = config.getLong(ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG);
int maxInFlightRequests = config.getInt(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG);
long retryBackoffMs = config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
int maxRetry = config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRY_CONFIG);

if (client != null) {
this.client = client;
Expand All @@ -105,7 +109,11 @@ public void start(Map<String, String> props, Client client) {
.setTopicConfigs(topicConfigs)
.setFlushTimoutMs(flushTimeoutMs)
.setMaxBufferedRecords(maxBufferedRecords)
.setMaxInFlightRequests(maxInFlightRequests)
.setBatchSize(batchSize)
.setLingerMs(lingerMs)
.setRetryBackoffMs(retryBackoffMs)
.setMaxRetry(maxRetry)
.setContext(context)
.setConverter(converter);

Expand Down