Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce optional external.version.header config #697

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,10 +25,14 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand All @@ -52,6 +56,7 @@ public class DataConverter {
private static final Logger log = LoggerFactory.getLogger(DataConverter.class);

private static final Converter JSON_CONVERTER;
private static final HeaderConverter HEADER_CONVERTER = new SimpleHeaderConverter();
protected static final String MAP_KEY = "key";
protected static final String MAP_VALUE = "value";

Expand Down Expand Up @@ -202,7 +207,24 @@ private DocWriteRequest<?> maybeAddExternalVersioning(
) {
if (!config.shouldIgnoreKey(record.topic())) {
request.versionType(VersionType.EXTERNAL);
request.version(record.kafkaOffset());
if (config.hasExternalVersionHeader()) {
final Header versionHeader = record.headers().lastWithName(config.externalVersionHeader());
final byte[] versionValue = HEADER_CONVERTER.fromConnectHeader(
record.topic(),
versionHeader.key(),
versionHeader.schema(),
versionHeader.value()
);
try {
//fromConnectHeader byte output is UTF_8
request.version(Long.parseLong(new String(versionValue, StandardCharsets.UTF_8)));
} catch (NumberFormatException e) {
throw new ConnectException("Error converting to long: "
+ new String(versionValue, StandardCharsets.UTF_8), e);
}
} else {
request.version(record.kafkaOffset());
}
}

return request;
Expand Down
Expand Up @@ -226,6 +226,13 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final BehaviorOnMalformedDoc BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT =
BehaviorOnMalformedDoc.FAIL;

public static final String EXTERNAL_VERSION_HEADER_CONFIG = "external.version.header";
private static final String EXTERNAL_VERSION_HEADER_DOC = "Header name to pull value for"
+ " external versioning, defaults to using the kafka record offset. Must have a numeric"
+ " value.";
private static final String EXTERNAL_VERSION_HEADER_DISPLAY = "External Version Header Name";
private static final String EXTERNAL_VERSION_HEADER_DEFAULT = "";

public static final String WRITE_METHOD_CONFIG = "write.method";
private static final String WRITE_METHOD_DOC = String.format(
"Method used for writing data to Elasticsearch, and one of %s or %s. The default method is"
Expand Down Expand Up @@ -592,6 +599,16 @@ private static void addConversionConfigs(ConfigDef configDef) {
Width.SHORT,
BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY,
new EnumRecommender<>(BehaviorOnMalformedDoc.class)
).define(
EXTERNAL_VERSION_HEADER_CONFIG,
Type.STRING,
EXTERNAL_VERSION_HEADER_DEFAULT,
Importance.LOW,
EXTERNAL_VERSION_HEADER_DOC,
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
EXTERNAL_VERSION_HEADER_DISPLAY
).define(
WRITE_METHOD_CONFIG,
Type.STRING,
Expand Down Expand Up @@ -874,6 +891,14 @@ public boolean useCompactMapEntries() {
return getBoolean(COMPACT_MAP_ENTRIES_CONFIG);
}

public boolean hasExternalVersionHeader() {
return !getString(EXTERNAL_VERSION_HEADER_CONFIG).isEmpty();
}

public String externalVersionHeader() {
return getString(EXTERNAL_VERSION_HEADER_CONFIG);
}

public WriteMethod writeMethod() {
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -325,6 +326,54 @@ public void deleteOnNullValue() {

assertEquals(key, actualRecord.id());
assertEquals(index, actualRecord.index());
assertEquals(sinkRecord.kafkaOffset(), actualRecord.version());
}

@Test
public void externalVersionHeaderOnDelete() {
String externalVersionHeader = "version";
long expectedExternalVersion = 123l;

props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.EXTERNAL_VERSION_HEADER_CONFIG, externalVersionHeader);
props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name());
converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));

SinkRecord sinkRecord = createSinkRecordWithValue(null);
sinkRecord.headers().addLong(externalVersionHeader, expectedExternalVersion);

DeleteRequest actualRecord = (DeleteRequest) converter.convertRecord(sinkRecord, index);

assertEquals(key, actualRecord.id());
assertEquals(index, actualRecord.index());
assertEquals(expectedExternalVersion, actualRecord.version());
}

@Test
public void externalVersionHeaderOnIndex() {
String externalVersionHeader = "version";
long expectedExternalVersion = 123l;

props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.EXTERNAL_VERSION_HEADER_CONFIG, externalVersionHeader);
props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name());
converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));


Schema preProcessedSchema = converter.preProcessSchema(schema);
Struct struct = new Struct(preProcessedSchema).put("string", "myValue");
SinkRecord sinkRecord = createSinkRecordWithValue(struct);
sinkRecord.headers().addLong(externalVersionHeader, expectedExternalVersion);

IndexRequest actualRecord = (IndexRequest) converter.convertRecord(sinkRecord, index);

assertEquals(key, actualRecord.id());
assertEquals(index, actualRecord.index());
assertEquals(expectedExternalVersion, actualRecord.version());
}

@Test
Expand Down