Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to use auto-generated IDs on indexing #679

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,47 @@ private String convertKey(Schema keySchema, Object key) {
}
}

private boolean shouldIgnoreNullValue(SinkRecord record) {
switch (config.behaviorOnNullValues()) {
case IGNORE:
log.trace("Ignoring {} with null value.", recordString(record));
return true;
case DELETE:
if (record.key() == null) {
// Since the record key is used as the ID of the index to delete and we don't have a key
// for this record, we can't delete anything anyways, so we ignore the record.
// We can also disregard the value of the ignoreKey parameter, since even if it's true
// the resulting index we'd try to delete would be based solely off topic/partition/
// offset information for the SinkRecord. Since that information is guaranteed to be
// unique per message, we can be confident that there wouldn't be any corresponding
// index present in ES to delete anyways.
log.trace(
"Ignoring {} with null key, since the record key is used as the ID of the index",
recordString(record)
);
return true;
}
// Will proceed as normal, ultimately creating a DeleteRequest
log.trace("Deleting {} from Elasticsearch", recordString(record));
return false;
case FAIL:
default:
throw new DataException(
String.format(
"%s with key of %s and null value encountered (to ignore future records "
+ " like this change the configuration property '%s' from '%s' to '%s')",
recordString(record),
record.key(),
ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG,
BehaviorOnNullValues.FAIL,
BehaviorOnNullValues.IGNORE
)
);
}
}

public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
if (record.value() == null) {
if (record.value() == null && shouldIgnoreNullValue(record)) {
switch (config.behaviorOnNullValues()) {
case IGNORE:
log.trace("Ignoring {} with null value.", recordString(record));
Expand Down Expand Up @@ -176,10 +215,12 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5));
case INSERT:
OpType opType = config.isDataStream() ? OpType.CREATE : OpType.INDEX;
return maybeAddExternalVersioning(
new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType),
record
);
IndexRequest req =
new IndexRequest(index).source(payload, XContentType.JSON).opType(opType);
if (config.useAutogeneratedIds()) {
return req;
}
return maybeAddExternalVersioning(req.id(id), record);
default:
return null; // shouldn't happen
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final String DROP_INVALID_MESSAGE_DISPLAY = "Drop invalid messages";
private static final boolean DROP_INVALID_MESSAGE_DEFAULT = false;

private static final String USE_AUTOGENERATED_IDS_DISPLAY = "Elasticsearch Generated IDs";
private static final boolean USE_AUTOGENERATED_IDS_DEFAULT = false;

public static final String BEHAVIOR_ON_NULL_VALUES_CONFIG = "behavior.on.null.values";
private static final String BEHAVIOR_ON_NULL_VALUES_DOC = "How to handle records with a "
+ "non-null key and a null value (i.e. Kafka tombstone records). Valid options are "
Expand Down Expand Up @@ -330,6 +333,18 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final String DATA_STREAM_DATASET_DISPLAY = "Data Stream Dataset";
private static final String DATA_STREAM_DATASET_DEFAULT = "";

public static final String USE_AUTO_GENERATED_IDS_CONFIG = "use.autogenerated.ids";
private static final String USE_AUTO_GENERATED_IDS_DOC = String.format(
"Whether to use auto-generated Elasticsearch document IDs for insertion requests. "
+ "Note that this setting removes exactly once guarantees and message "
+ "delivery will be at least once. Only applies if %s is set to %s."
+ "When this is set to ``true``, ``%s`` option will also be ignored when "
+ "sending data to Elasticsearch",
WRITE_METHOD_CONFIG,
WriteMethod.INSERT,
IGNORE_KEY_CONFIG
);

public static final String DATA_STREAM_TYPE_CONFIG = "data.stream.type";
private static final String DATA_STREAM_TYPE_DOC = String.format(
"Generic type describing the data to be written to data stream. "
Expand Down Expand Up @@ -685,6 +700,16 @@ private static void addConversionConfigs(ConfigDef configDef) {
Width.SHORT,
WRITE_METHOD_DISPLAY,
new EnumRecommender<>(WriteMethod.class)
).define(
USE_AUTO_GENERATED_IDS_CONFIG,
Type.BOOLEAN,
USE_AUTOGENERATED_IDS_DEFAULT,
Importance.MEDIUM,
USE_AUTO_GENERATED_IDS_DOC,
CONNECTOR_GROUP,
++order,
Width.SHORT,
USE_AUTOGENERATED_IDS_DISPLAY
);
}

Expand Down Expand Up @@ -940,6 +965,10 @@ public Set<String> ignoreSchemaTopics() {
return new HashSet<>(getList(IGNORE_SCHEMA_TOPICS_CONFIG));
}

public boolean useAutogeneratedIds() {
return getBoolean(USE_AUTO_GENERATED_IDS_CONFIG);
}

public String kerberosUserPrincipal() {
return getString(KERBEROS_PRINCIPAL_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.index.VersionType;
Expand All @@ -39,6 +40,7 @@
import static io.confluent.connect.elasticsearch.DataConverter.TIMESTAMP_FIELD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -387,6 +389,30 @@ public void failOnNullValue() {
}
}

@Test
public void useAutogeneratedIds() {
props.put(ElasticsearchSinkConnectorConfig.USE_AUTO_GENERATED_IDS_CONFIG, "true");
props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, "INSERT");

converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));

SinkRecord sinkRecord = createSinkRecordWithValue(new Struct(schema).put("string","test"));
DocWriteRequest<?> req = converter.convertRecord(sinkRecord, index);
assertNull(req.id());
}

@Test
public void ignoreAutogeneratedIdsForUpsert() {
props.put(ElasticsearchSinkConnectorConfig.USE_AUTO_GENERATED_IDS_CONFIG, "true");
props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, "UPSERT");

converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));

SinkRecord sinkRecord = createSinkRecordWithValue(new Struct(schema).put("string","test"));
DocWriteRequest<?> req = converter.convertRecord(sinkRecord, index);
assertNotNull(req.id());
}

public SinkRecord createSinkRecordWithValue(Object value) {
return new SinkRecord(
topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public void testNullValue() throws JsonProcessingException {

assertThatThrownBy(() -> task.put(records))
.isInstanceOf(DataException.class)
.hasMessageContaining("has a null value ");
.hasMessageContaining("null value encountered ");
currentOffsets = ImmutableMap.of(tp, new OffsetAndMetadata(0));
assertThat(task.preCommit(currentOffsets).get(tp).offset())
.isLessThanOrEqualTo(1);
Expand Down