From 6efa59d21df854c9d8dfcdfb261fbf05fa3abe32 Mon Sep 17 00:00:00 2001 From: Ryan Josal Date: Mon, 4 Dec 2023 10:48:41 -0800 Subject: [PATCH] Introduce optional external.version.header config to use for ES external version instead of kafka offset for non-datastream indices --- .../connect/elasticsearch/DataConverter.java | 24 +++++++++- .../ElasticsearchSinkConnectorConfig.java | 25 ++++++++++ .../elasticsearch/DataConverterTest.java | 48 +++++++++++++++++++ 3 files changed, 96 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java index 7f0dc9795..40f77f003 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java +++ b/src/main/java/io/confluent/connect/elasticsearch/DataConverter.java @@ -25,10 +25,14 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.SimpleHeaderConverter; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; @@ -52,6 +56,7 @@ public class DataConverter { private static final Logger log = LoggerFactory.getLogger(DataConverter.class); private static final Converter JSON_CONVERTER; + private static final HeaderConverter HEADER_CONVERTER = new SimpleHeaderConverter(); protected static final String MAP_KEY = "key"; protected static final String MAP_VALUE = "value"; @@ -202,7 +207,24 @@ private DocWriteRequest maybeAddExternalVersioning( ) { if (!config.shouldIgnoreKey(record.topic())) { request.versionType(VersionType.EXTERNAL); - request.version(record.kafkaOffset()); + if (config.hasExternalVersionHeader()) { + final Header versionHeader = record.headers().lastWithName(config.externalVersionHeader()); + final byte[] versionValue = HEADER_CONVERTER.fromConnectHeader( + record.topic(), + versionHeader.key(), + versionHeader.schema(), + versionHeader.value() + ); + try { + //fromConnectHeader byte output is UTF_8 + request.version(Long.parseLong(new String(versionValue, StandardCharsets.UTF_8))); + } catch (NumberFormatException e) { + throw new ConnectException("Error converting to long: " + + new String(versionValue, StandardCharsets.UTF_8), e); + } + } else { + request.version(record.kafkaOffset()); + } } return request; diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java index 092b41fb8..01df9dfea 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java @@ -226,6 +226,13 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { private static final BehaviorOnMalformedDoc BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT = BehaviorOnMalformedDoc.FAIL; + public static final String EXTERNAL_VERSION_HEADER_CONFIG = "external.version.header"; + private static final String EXTERNAL_VERSION_HEADER_DOC = "Header name to pull value for" + + " external versioning, defaults to using the kafka record offset. Must have a numeric" + + " value."; + private static final String EXTERNAL_VERSION_HEADER_DISPLAY = "External Version Header Name"; + private static final String EXTERNAL_VERSION_HEADER_DEFAULT = ""; + public static final String WRITE_METHOD_CONFIG = "write.method"; private static final String WRITE_METHOD_DOC = String.format( "Method used for writing data to Elasticsearch, and one of %s or %s. The default method is" @@ -592,6 +599,16 @@ private static void addConversionConfigs(ConfigDef configDef) { Width.SHORT, BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY, new EnumRecommender<>(BehaviorOnMalformedDoc.class) + ).define( + EXTERNAL_VERSION_HEADER_CONFIG, + Type.STRING, + EXTERNAL_VERSION_HEADER_DEFAULT, + Importance.LOW, + EXTERNAL_VERSION_HEADER_DOC, + DATA_CONVERSION_GROUP, + ++order, + Width.SHORT, + EXTERNAL_VERSION_HEADER_DISPLAY ).define( WRITE_METHOD_CONFIG, Type.STRING, @@ -874,6 +891,14 @@ public boolean useCompactMapEntries() { return getBoolean(COMPACT_MAP_ENTRIES_CONFIG); } + public boolean hasExternalVersionHeader() { + return !getString(EXTERNAL_VERSION_HEADER_CONFIG).isEmpty(); + } + + public String externalVersionHeader() { + return getString(EXTERNAL_VERSION_HEADER_CONFIG); + } + public WriteMethod writeMethod() { return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase()); } diff --git a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java b/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java index 279e8ab51..f2f1b7580 100644 --- a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java +++ b/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java @@ -325,6 +325,54 @@ public void deleteOnNullValue() { assertEquals(key, actualRecord.id()); assertEquals(index, actualRecord.index()); + assertEquals(sinkRecord.kafkaOffset(), actualRecord.version()); + } + + @Test + public void externalVersionHeaderOnDelete() { + String externalVersionHeader = "version"; + long expectedExternalVersion = 123l; + + props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.EXTERNAL_VERSION_HEADER_CONFIG, externalVersionHeader); + props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name()); + converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + + SinkRecord sinkRecord = createSinkRecordWithValue(null); + sinkRecord.headers().addLong(externalVersionHeader, expectedExternalVersion); + + DeleteRequest actualRecord = (DeleteRequest) converter.convertRecord(sinkRecord, index); + + assertEquals(key, actualRecord.id()); + assertEquals(index, actualRecord.index()); + assertEquals(expectedExternalVersion, actualRecord.version()); + } + + @Test + public void externalVersionHeaderOnIndex() { + String externalVersionHeader = "version"; + long expectedExternalVersion = 123l; + + props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false"); + props.put(ElasticsearchSinkConnectorConfig.EXTERNAL_VERSION_HEADER_CONFIG, externalVersionHeader); + props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name()); + converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props)); + + + Schema preProcessedSchema = converter.preProcessSchema(schema); + Struct struct = new Struct(preProcessedSchema).put("string", "myValue"); + SinkRecord sinkRecord = createSinkRecordWithValue(struct); + sinkRecord.headers().addLong(externalVersionHeader, expectedExternalVersion); + + IndexRequest actualRecord = (IndexRequest) converter.convertRecord(sinkRecord, index); + + assertEquals(key, actualRecord.id()); + assertEquals(index, actualRecord.index()); + assertEquals(expectedExternalVersion, actualRecord.version()); } @Test