Skip to content

Commit

Permalink
Replace BulkProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishiihara committed Jun 30, 2016
1 parent f67a013 commit 83be437
Show file tree
Hide file tree
Showing 18 changed files with 1,160 additions and 236 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -12,7 +12,7 @@
<confluent.version>3.0.0-SNAPSHOT</confluent.version>
<kafka.version>0.10.1.0-SNAPSHOT</kafka.version>
<junit.version>4.12</junit.version>
<es.version>2.2.1</es.version>
<es.version>2.3.3</es.version>
<lucene.version>5.3.1</lucene.version>
<slf4j.version>1.7.5</slf4j.version>
<jna.version>4.2.1</jna.version>
Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,6 +40,8 @@
import java.util.Map;
import java.util.Set;

import io.confluent.connect.elasticsearch.internals.ESRequest;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;

Expand Down Expand Up @@ -96,7 +97,7 @@ public static String convertKey(Object key, Schema keySchema) {
* @return The converted IndexRequest.
*/

public static IndexRequest convertRecord(
public static ESRequest convertRecord(
SinkRecord record,
String type,
Client client,
Expand Down Expand Up @@ -160,7 +161,7 @@ public static IndexRequest convertRecord(
}

byte[] json = converter.fromConnectData(topic, newSchema, newValue);
return new IndexRequest(index, type, id).source(json);
return new ESRequest(index, type, id, json);
}

// We need to pre process the Kafka Connect schema before converting to JSON as Elasticsearch
Expand Down
Expand Up @@ -72,9 +72,38 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {

public static final String BATCH_SIZE_CONFIG = "batch.size";
private static final String BATCH_SIZE_DOC = "The number of requests to process as a batch when writing to Elasticsearch.";
private static final long BATCH_SIZE_DEFAULT = 10000;
private static final int BATCH_SIZE_DEFAULT = 10000;
private static final String BATCH_SIZE_DISPLAY = "Batch Size";

public static final String LINGER_MS_CONFIG = "linger.ms";
private static final String LINGER_MS_DOC =
"The task groups together any records that arrive in between request transmissions into a single batched request. "
+ "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the "
+ "tasks may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount "
+ "of artificial delay. Rather than immediately sending out a record the task will wait for up to the given delay to allow other "
+ "records to be sent so that the sends can be batched together.";
private static final long LINGER_MS_DEFAULT = 1;
private static final String LINGER_MS_DISPLAY = "Linger (ms)";

public static final String MAX_IN_FLIGHT_REQUESTS_CONFIG = "max.in.flight.requests";
private static final String MAX_IN_FLIGHT_REQUESTS_DOC =
"The maximum number of incomplete batches each task will send before blocking. Note that if this is set to be greater "
+ "than 1 and there are failed sends, there is a risk of message re-ordering due to retries";
private static final int MAX_IN_FLIGHT_REQUESTS_DEFAULT = 5;
private static final String MAX_IN_FLIGHT_REQUESTS_DISPLAY = "Max in Flight Requests";

public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
private static final String RETRY_BACKOFF_MS_DOC =
"The amount of time to wait before attempting to retry a failed batch. "
+ "This avoids repeatedly sending requests in a tight loop under some failure scenarios.";
private static final long RETRY_BACKOFF_MS_DEFAULT = 100L;
private static final String RETRY_BACKOFF_MS_DISPLAY = "Retry Backoff (ms)";

public static final String MAX_RETRY_CONFIG = "max.retry";
private static final String MAX_RETRY_DOC = "The max allowed number of retries. Allowing retries will potentially change the ordering of records.";
private static final int MAX_RETRY_DEFAULT = 5;
private static final String MAX_RETRY_DISPLAY = "Max Retry";

public static final String SCHEMA_IGNORE_CONFIG = "schema.ignore";
private static final String SCHEMA_IGNORE_DOC =
"Whether to ignore schemas during indexing. When this is set to true, the schema in `SinkRecord` will be ignored and Elasticsearch will infer the mapping from data. "
Expand All @@ -92,15 +121,20 @@ public static ConfigDef baseConfigDef() {
return new ConfigDef()
.define(TRANSPORT_ADDRESSES_CONFIG, Type.LIST, Importance.HIGH, TRANSPORT_ADDRESSES_DOC, ELASTICSEARCH_GROUP, 1, Width.LONG, TRANSPORT_ADDRESSES_DISPLAY)
.define(TYPE_NAME_CONFIG, Type.STRING, Importance.HIGH, TYPE_NAME_DOC, ELASTICSEARCH_GROUP, 2, Width.SHORT, TYPE_NAME_DISPLAY)
.define(KEY_IGNORE_CONFIG, Type.BOOLEAN, KEY_IGNORE_DEFAULT, Importance.HIGH, KEY_IGNORE_DOC, CONNECTOR_GROUP, 1, Width.SHORT, KEY_IGNORE_DISPLAY)
.define(FLUSH_TIMEOUT_MS_CONFIG, Type.LONG, FLUSH_TIMEOUT_MS_DEFAULT, Importance.MEDIUM, FLUSH_TIMEOUT_MS_DOC, CONNECTOR_GROUP, 2, Width.SHORT, FLUSH_TIMEOUT_MS_DISPLAY)
.define(MAX_BUFFERED_RECORDS_CONFIG, Type.LONG, MAX_BUFFERED_RECORDS_DEFAULT, Importance.MEDIUM, MAX_BUFFERED_RECORDS_DOC, CONNECTOR_GROUP, 3, Width.SHORT, MAX_BUFFERED_RECORDS_DISPLAY)
.define(BATCH_SIZE_CONFIG, Type.LONG, BATCH_SIZE_DEFAULT, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP, 4, Width.SHORT, BATCH_SIZE_DISPLAY)
.define(TOPIC_INDEX_MAP_CONFIG, Type.LIST, TOPIC_INDEX_MAP_DEFAULT, Importance.LOW,
TOPIC_INDEX_MAP_DOC, CONNECTOR_GROUP, 5, Width.LONG, TOPIC_INDEX_MAP_DISPLAY)
.define(TOPIC_KEY_IGNORE_CONFIG, Type.LIST, TOPIC_KEY_IGNORE_DEFAULT, Importance.LOW, TOPIC_KEY_IGNORE_DOC, CONNECTOR_GROUP, 6, Width.LONG, TOPIC_KEY_IGNORE_DISPLAY)
.define(SCHEMA_IGNORE_CONFIG, Type.BOOLEAN, SCHEMA_IGNORE_DEFAULT, Importance.LOW, SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 7, Width.SHORT, SCHEMA_IGNORE_DISPLAY)
.define(TOPIC_SCHEMA_IGNORE_CONFIG, Type.LIST, TOPIC_SCHEMA_IGNORE_DEFAULT, Importance.LOW, TOPIC_SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 8, Width.LONG, TOPIC_SCHEMA_IGNORE_DISPLAY);
.define(KEY_IGNORE_CONFIG, Type.BOOLEAN, KEY_IGNORE_DEFAULT, Importance.HIGH, KEY_IGNORE_DOC, CONNECTOR_GROUP, 3, Width.SHORT, KEY_IGNORE_DISPLAY)
.define(BATCH_SIZE_CONFIG, Type.INT, BATCH_SIZE_DEFAULT, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP, 4, Width.SHORT, BATCH_SIZE_DISPLAY)
.define(MAX_IN_FLIGHT_REQUESTS_CONFIG, Type.INT, MAX_IN_FLIGHT_REQUESTS_DEFAULT, Importance.MEDIUM,
MAX_IN_FLIGHT_REQUESTS_DOC, CONNECTOR_GROUP, 5, Width.SHORT,
MAX_IN_FLIGHT_REQUESTS_DISPLAY)
.define(TOPIC_INDEX_MAP_CONFIG, Type.LIST, TOPIC_INDEX_MAP_DEFAULT, Importance.LOW, TOPIC_INDEX_MAP_DOC, CONNECTOR_GROUP, 6, Width.LONG, TOPIC_INDEX_MAP_DISPLAY)
.define(TOPIC_KEY_IGNORE_CONFIG, Type.LIST, TOPIC_KEY_IGNORE_DEFAULT, Importance.LOW, TOPIC_KEY_IGNORE_DOC, CONNECTOR_GROUP, 7, Width.LONG, TOPIC_KEY_IGNORE_DISPLAY)
.define(SCHEMA_IGNORE_CONFIG, Type.BOOLEAN, SCHEMA_IGNORE_DEFAULT, Importance.LOW, SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 8, Width.SHORT, SCHEMA_IGNORE_DISPLAY)
.define(TOPIC_SCHEMA_IGNORE_CONFIG, Type.LIST, TOPIC_SCHEMA_IGNORE_DEFAULT, Importance.LOW, TOPIC_SCHEMA_IGNORE_DOC, CONNECTOR_GROUP, 9, Width.LONG, TOPIC_SCHEMA_IGNORE_DISPLAY)
.define(LINGER_MS_CONFIG, Type.LONG, LINGER_MS_DEFAULT, Importance.LOW, LINGER_MS_DOC, CONNECTOR_GROUP, 10, Width.SHORT, LINGER_MS_DISPLAY)
.define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, RETRY_BACKOFF_MS_DEFAULT, Importance.LOW, RETRY_BACKOFF_MS_DOC, CONNECTOR_GROUP, 11, Width.SHORT, RETRY_BACKOFF_MS_DISPLAY)
.define(MAX_RETRY_CONFIG, Type.INT, MAX_RETRY_DEFAULT, Importance.LOW, MAX_RETRY_DOC, CONNECTOR_GROUP, 12, Width.SHORT, MAX_RETRY_DISPLAY)
.define(FLUSH_TIMEOUT_MS_CONFIG, Type.LONG, FLUSH_TIMEOUT_MS_DEFAULT, Importance.LOW, FLUSH_TIMEOUT_MS_DOC, CONNECTOR_GROUP, 13, Width.SHORT, FLUSH_TIMEOUT_MS_DISPLAY)
.define(MAX_BUFFERED_RECORDS_CONFIG, Type.LONG, MAX_BUFFERED_RECORDS_DEFAULT, Importance.LOW, MAX_BUFFERED_RECORDS_DOC, CONNECTOR_GROUP, 14, Width.SHORT, MAX_BUFFERED_RECORDS_DISPLAY);
}

static ConfigDef config = baseConfigDef();
Expand Down
Expand Up @@ -83,7 +83,11 @@ public void start(Map<String, String> props, Client client) {

long flushTimeoutMs = config.getLong(ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG);
long maxBufferedRecords = config.getLong(ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG);
long batchSize = config.getLong(ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG);
int batchSize = config.getInt(ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG);
long lingerMs = config.getLong(ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG);
int maxInFlightRequests = config.getInt(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG);
long retryBackoffMs = config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
int maxRetry = config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRY_CONFIG);

if (client != null) {
this.client = client;
Expand All @@ -105,7 +109,11 @@ public void start(Map<String, String> props, Client client) {
.setTopicConfigs(topicConfigs)
.setFlushTimoutMs(flushTimeoutMs)
.setMaxBufferedRecords(maxBufferedRecords)
.setMaxInFlightRequests(maxInFlightRequests)
.setBatchSize(batchSize)
.setLingerMs(lingerMs)
.setRetryBackoffMs(retryBackoffMs)
.setMaxRetry(maxRetry)
.setContext(context)
.setConverter(converter);

Expand Down

0 comments on commit 83be437

Please sign in to comment.