Skip to content

Commit

Permalink
Merge pull request #91 from KL-WLCR/f/drop_invalid_record
Browse files Browse the repository at this point in the history
Drop invalid messages if needed
  • Loading branch information
ewencp committed Jul 19, 2017
2 parents f3068c9 + 733f9e2 commit 58ae0dd
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
public static final String TOPIC_KEY_IGNORE_CONFIG = "topic.key.ignore";
public static final String SCHEMA_IGNORE_CONFIG = "schema.ignore";
public static final String TOPIC_SCHEMA_IGNORE_CONFIG = "topic.schema.ignore";
public static final String DROP_INVALID_MESSAGE_CONFIG = "drop.invalid.message";

private static final String KEY_IGNORE_DOC =
"Whether to ignore the record key for the purpose of forming the Elasticsearch document ID."
+ " When this is set to ``true``, document IDs will be generated as the record's "
Expand All @@ -92,6 +94,9 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
+ TOPIC_SCHEMA_IGNORE_CONFIG + "`` to override as ``true`` for specific topics.";
private static final String TOPIC_SCHEMA_IGNORE_DOC =
"List of topics for which ``" + SCHEMA_IGNORE_CONFIG + "`` should be ``true``.";
private static final String DROP_INVALID_MESSAGE_DOC =
"Whether to drop kafka message when it cannot be converted to output message.";


protected static ConfigDef baseConfigDef() {
final ConfigDef configDef = new ConfigDef();
Expand Down Expand Up @@ -247,7 +252,18 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.LONG,
"Topics for 'Ignore Schema' mode"
);
).define(
DROP_INVALID_MESSAGE_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
DROP_INVALID_MESSAGE_DOC,
group,
++order,
Width.LONG,
"Drop invalid messages");


}

public static final ConfigDef CONFIG = baseConfigDef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public void start(Map<String, String> props, JestClient client) {
boolean ignoreSchema =
config.getBoolean(ElasticsearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG);


Map<String, String> topicToIndexMap =
parseMapConfig(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_INDEX_MAP_CONFIG));
Set<String> topicIgnoreKey =
Expand All @@ -86,6 +87,8 @@ public void start(Map<String, String> props, JestClient client) {
config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
int maxRetry =
config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
boolean dropInvalidMessage =
config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);

if (client != null) {
this.client = client;
Expand All @@ -112,7 +115,8 @@ public void start(Map<String, String> props, JestClient client) {
.setBatchSize(batchSize)
.setLingerMs(lingerMs)
.setRetryBackoffMs(retryBackoffMs)
.setMaxRetry(maxRetry);
.setMaxRetry(maxRetry)
.setDropInvalidMessage(dropInvalidMessage);

writer = builder.build();
writer.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class ElasticsearchWriter {
private final Map<String, String> topicToIndexMap;
private final long flushTimeoutMs;
private final BulkProcessor<IndexableRecord, ?> bulkProcessor;
private final boolean dropInvalidMessage;

private final Set<String> existingMappings;

Expand All @@ -66,7 +67,8 @@ public class ElasticsearchWriter {
int batchSize,
long lingerMs,
int maxRetries,
long retryBackoffMs
long retryBackoffMs,
boolean dropInvalidMessage
) {
this.client = client;
this.type = type;
Expand All @@ -76,6 +78,7 @@ public class ElasticsearchWriter {
this.ignoreSchemaTopics = ignoreSchemaTopics;
this.topicToIndexMap = topicToIndexMap;
this.flushTimeoutMs = flushTimeoutMs;
this.dropInvalidMessage = dropInvalidMessage;

bulkProcessor = new BulkProcessor<>(
new SystemTime(),
Expand Down Expand Up @@ -106,6 +109,7 @@ public static class Builder {
private long lingerMs;
private int maxRetry;
private long retryBackoffMs;
private boolean dropInvalidMessage;

public Builder(JestClient client) {
this.client = client;
Expand Down Expand Up @@ -168,6 +172,11 @@ public Builder setRetryBackoffMs(long retryBackoffMs) {
return this;
}

public Builder setDropInvalidMessage(boolean dropInvalidMessage) {
this.dropInvalidMessage = dropInvalidMessage;
return this;
}

public ElasticsearchWriter build() {
return new ElasticsearchWriter(
client,
Expand All @@ -183,7 +192,8 @@ public ElasticsearchWriter build() {
batchSize,
lingerMs,
maxRetry,
retryBackoffMs
retryBackoffMs,
dropInvalidMessage
);
}
}
Expand All @@ -209,18 +219,49 @@ public void write(Collection<SinkRecord> records) {
existingMappings.add(index);
}

final IndexableRecord indexableRecord = DataConverter.convertRecord(
sinkRecord,
index,
type,
ignoreKey,
ignoreSchema
);
final IndexableRecord indexableRecord = tryGetIndexableRecord(
sinkRecord,
index,
ignoreKey,
ignoreSchema);

if (indexableRecord != null) {
bulkProcessor.add(indexableRecord, flushTimeoutMs);
}

bulkProcessor.add(indexableRecord, flushTimeoutMs);
}
}

private IndexableRecord tryGetIndexableRecord(
SinkRecord sinkRecord,
String index,
boolean ignoreKey,
boolean ignoreSchema) {

IndexableRecord indexableRecord = null;

try {
indexableRecord = DataConverter.convertRecord(
sinkRecord,
index,
type,
ignoreKey,
ignoreSchema);
} catch (ConnectException convertException) {
if (dropInvalidMessage) {
log.error("Can't convert record from topic {} with partition {} and offset {}."
+ " Error message: {}",
sinkRecord.topic(),
sinkRecord.kafkaPartition(),
sinkRecord.kafkaOffset(),
convertException.getMessage());
} else {
throw convertException;
}
}
return indexableRecord;
}

public void flush() {
bulkProcessor.flush(flushTimeoutMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Rule;
import org.junit.Test;

import java.math.BigDecimal;
Expand All @@ -40,6 +41,7 @@
import java.util.Set;

import io.searchbox.client.JestClient;
import org.junit.rules.ExpectedException;

@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
Expand All @@ -50,6 +52,9 @@ public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
private final Schema otherSchema = createOtherSchema();
private final Struct otherRecord = createOtherRecord(otherSchema);

@Rule
public ExpectedException thrown = ExpectedException.none();

@Test
public void testWriter() throws Exception {
final boolean ignoreKey = false;
Expand Down Expand Up @@ -93,7 +98,7 @@ public void testTopicIndexOverride() throws Exception {
final String indexOverride = "index";

Collection<SinkRecord> records = prepareData(2);
ElasticsearchWriter writer = initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.singletonMap(TOPIC, indexOverride));
ElasticsearchWriter writer = initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.singletonMap(TOPIC, indexOverride), false);
writeDataAndRefresh(writer, records);
verifySearchResults(records, indexOverride, ignoreKey, ignoreSchema);
}
Expand Down Expand Up @@ -298,6 +303,52 @@ public void testBytes() throws Exception {
verifySearchResults(records, ignoreKey, ignoreSchema);
}

@Test
public void testInvalidRecordException() throws Exception {
final boolean ignoreKey = false;
final boolean ignoreSchema = true;

Collection<SinkRecord> records = new ArrayList<>();

SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, null, null, new byte[]{42}, 0);
records.add(sinkRecord);

final ElasticsearchWriter strictWriter = initWriter(client, ignoreKey, ignoreSchema, false);

thrown.expect(ConnectException.class);
thrown.expectMessage("Key is used as document id and can not be null");
strictWriter.write(records);
}

@Test
public void testDropInvalidRecord() throws Exception {
final boolean ignoreKey = false;
final boolean ignoreSchema = true;
Collection<SinkRecord> inputRecords = new ArrayList<>();
Collection<SinkRecord> outputRecords = new ArrayList<>();

Schema structSchema = SchemaBuilder.struct().name("struct")
.field("bytes", SchemaBuilder.BYTES_SCHEMA)
.build();

Struct struct = new Struct(structSchema);
struct.put("bytes", new byte[]{42});


SinkRecord invalidRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, null, structSchema, struct, 0);
SinkRecord validRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 1);

inputRecords.add(validRecord);
inputRecords.add(invalidRecord);

outputRecords.add(validRecord);

final ElasticsearchWriter nonStrictWriter = initWriter(client, ignoreKey, ignoreSchema, true);

writeDataAndRefresh(nonStrictWriter, inputRecords);
verifySearchResults(outputRecords, ignoreKey, ignoreSchema);
}

private Collection<SinkRecord> prepareData(int numRecords) {
Collection<SinkRecord> records = new ArrayList<>();
for (int i = 0; i < numRecords; ++i) {
Expand All @@ -308,10 +359,14 @@ private Collection<SinkRecord> prepareData(int numRecords) {
}

private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, boolean ignoreSchema) {
return initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.<String, String>emptyMap());
return initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.<String, String>emptyMap(), false);
}

private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, boolean ignoreSchema, boolean dropInvalidMessage) {
return initWriter(client, ignoreKey, Collections.<String>emptySet(), ignoreSchema, Collections.<String>emptySet(), Collections.<String, String>emptyMap(), dropInvalidMessage);
}

private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, Set<String> ignoreKeyTopics, boolean ignoreSchema, Set<String> ignoreSchemaTopics, Map<String, String> topicToIndexMap) {
private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, Set<String> ignoreKeyTopics, boolean ignoreSchema, Set<String> ignoreSchemaTopics, Map<String, String> topicToIndexMap, boolean dropInvalidMessage) {
ElasticsearchWriter writer = new ElasticsearchWriter.Builder(client)
.setType(TYPE)
.setIgnoreKey(ignoreKey, ignoreKeyTopics)
Expand All @@ -324,6 +379,7 @@ private ElasticsearchWriter initWriter(JestClient client, boolean ignoreKey, Set
.setLingerMs(1000)
.setRetryBackoffMs(1000)
.setMaxRetry(3)
.setDropInvalidMessage(dropInvalidMessage)
.build();
writer.start();
writer.createIndicesForTopics(Collections.singleton(TOPIC));
Expand Down

0 comments on commit 58ae0dd

Please sign in to comment.