Skip to content

Commit

Permalink
Introduce optional external.version.header config to use for ES exter…
Browse files Browse the repository at this point in the history
…nal version instead of kafka offset for non-datastream indices
  • Loading branch information
rjosal-indeed committed Dec 4, 2023
1 parent 4d5c9d7 commit 6efa59d
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 1 deletion.
Expand Up @@ -25,10 +25,14 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand All @@ -52,6 +56,7 @@ public class DataConverter {
private static final Logger log = LoggerFactory.getLogger(DataConverter.class);

private static final Converter JSON_CONVERTER;
private static final HeaderConverter HEADER_CONVERTER = new SimpleHeaderConverter();
protected static final String MAP_KEY = "key";
protected static final String MAP_VALUE = "value";

Expand Down Expand Up @@ -202,7 +207,24 @@ private DocWriteRequest<?> maybeAddExternalVersioning(
) {
if (!config.shouldIgnoreKey(record.topic())) {
request.versionType(VersionType.EXTERNAL);
request.version(record.kafkaOffset());
if (config.hasExternalVersionHeader()) {
final Header versionHeader = record.headers().lastWithName(config.externalVersionHeader());
final byte[] versionValue = HEADER_CONVERTER.fromConnectHeader(
record.topic(),
versionHeader.key(),
versionHeader.schema(),
versionHeader.value()
);
try {
//fromConnectHeader byte output is UTF_8
request.version(Long.parseLong(new String(versionValue, StandardCharsets.UTF_8)));
} catch (NumberFormatException e) {
throw new ConnectException("Error converting to long: "
+ new String(versionValue, StandardCharsets.UTF_8), e);
}
} else {
request.version(record.kafkaOffset());
}
}

return request;
Expand Down
Expand Up @@ -226,6 +226,13 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final BehaviorOnMalformedDoc BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT =
BehaviorOnMalformedDoc.FAIL;

public static final String EXTERNAL_VERSION_HEADER_CONFIG = "external.version.header";
private static final String EXTERNAL_VERSION_HEADER_DOC = "Header name to pull value for"
+ " external versioning, defaults to using the kafka record offset. Must have a numeric"
+ " value.";
private static final String EXTERNAL_VERSION_HEADER_DISPLAY = "External Version Header Name";
private static final String EXTERNAL_VERSION_HEADER_DEFAULT = "";

public static final String WRITE_METHOD_CONFIG = "write.method";
private static final String WRITE_METHOD_DOC = String.format(
"Method used for writing data to Elasticsearch, and one of %s or %s. The default method is"
Expand Down Expand Up @@ -592,6 +599,16 @@ private static void addConversionConfigs(ConfigDef configDef) {
Width.SHORT,
BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY,
new EnumRecommender<>(BehaviorOnMalformedDoc.class)
).define(
EXTERNAL_VERSION_HEADER_CONFIG,
Type.STRING,
EXTERNAL_VERSION_HEADER_DEFAULT,
Importance.LOW,
EXTERNAL_VERSION_HEADER_DOC,
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
EXTERNAL_VERSION_HEADER_DISPLAY
).define(
WRITE_METHOD_CONFIG,
Type.STRING,
Expand Down Expand Up @@ -874,6 +891,14 @@ public boolean useCompactMapEntries() {
return getBoolean(COMPACT_MAP_ENTRIES_CONFIG);
}

public boolean hasExternalVersionHeader() {
return !getString(EXTERNAL_VERSION_HEADER_CONFIG).isEmpty();
}

public String externalVersionHeader() {
return getString(EXTERNAL_VERSION_HEADER_CONFIG);
}

public WriteMethod writeMethod() {
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
}
Expand Down
Expand Up @@ -325,6 +325,54 @@ public void deleteOnNullValue() {

assertEquals(key, actualRecord.id());
assertEquals(index, actualRecord.index());
assertEquals(sinkRecord.kafkaOffset(), actualRecord.version());
}

@Test
public void externalVersionHeaderOnDelete() {
String externalVersionHeader = "version";
long expectedExternalVersion = 123l;

props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.EXTERNAL_VERSION_HEADER_CONFIG, externalVersionHeader);
props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name());
converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));

SinkRecord sinkRecord = createSinkRecordWithValue(null);
sinkRecord.headers().addLong(externalVersionHeader, expectedExternalVersion);

DeleteRequest actualRecord = (DeleteRequest) converter.convertRecord(sinkRecord, index);

assertEquals(key, actualRecord.id());
assertEquals(index, actualRecord.index());
assertEquals(expectedExternalVersion, actualRecord.version());
}

@Test
public void externalVersionHeaderOnIndex() {
String externalVersionHeader = "version";
long expectedExternalVersion = 123l;

props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
props.put(ElasticsearchSinkConnectorConfig.EXTERNAL_VERSION_HEADER_CONFIG, externalVersionHeader);
props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name());
converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));


Schema preProcessedSchema = converter.preProcessSchema(schema);
Struct struct = new Struct(preProcessedSchema).put("string", "myValue");
SinkRecord sinkRecord = createSinkRecordWithValue(struct);
sinkRecord.headers().addLong(externalVersionHeader, expectedExternalVersion);

IndexRequest actualRecord = (IndexRequest) converter.convertRecord(sinkRecord, index);

assertEquals(key, actualRecord.id());
assertEquals(index, actualRecord.index());
assertEquals(expectedExternalVersion, actualRecord.version());
}

@Test
Expand Down

0 comments on commit 6efa59d

Please sign in to comment.