diff --git a/README.md b/README.md index 7d5c886..e5d6588 100644 --- a/README.md +++ b/README.md @@ -22,8 +22,12 @@ Exists in two variants: - `io.aiven.kafka.connect.transforms.ExtractTimestamp$Value` - works on values. The transformation defines the following configurations: - - `field.name` - The name of the field which should be used as the new timestamp. Cannot be `null` or empty. +- `timestamp.resolution` - The timestamp resolution for key or value + There are two possible values: + - `milliseconds` - key or value timestamp in milliseconds + - `seconds` - key or value timestamp in seconds and will be converted in milliseconds, + the default is `milliseconds`. Here's an example of this transformation configuration: diff --git a/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java b/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java index f613732..bd4e5af 100644 --- a/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java +++ b/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java @@ -90,10 +90,10 @@ static void setUpAll() throws IOException, InterruptedException { final File integrationTestClassesPath = new File(System.getProperty("integration-test.classes.path")); assert integrationTestClassesPath.exists(); - final Class[] testConnectorClasses = new Class[]{ + final Class[] testConnectorClasses = new Class[]{ TestSourceConnector.class, TestSourceConnector.TestSourceConnectorTask.class }; - for (final Class clazz : testConnectorClasses) { + for (final Class clazz : testConnectorClasses) { final String packageName = clazz.getPackage().getName(); final String packagePrefix = packageName + "."; assert clazz.getCanonicalName().startsWith(packagePrefix); diff --git a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestamp.java b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestamp.java index 52e4341..a45ac0a 100644 --- a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestamp.java +++ b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestamp.java @@ -18,6 +18,7 @@ import java.util.Date; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; @@ -70,9 +71,15 @@ public R apply(final R record) { final long newTimestamp; if (fieldValue instanceof Long) { - newTimestamp = (long) fieldValue; + final var longFieldValue = (long) fieldValue; + if (config.timestampResolution() == ExtractTimestampConfig.TimestampResolution.SECONDS) { + newTimestamp = TimeUnit.SECONDS.toMillis(longFieldValue); + } else { + newTimestamp = longFieldValue; + } } else if (fieldValue instanceof Date) { - newTimestamp = ((Date) fieldValue).getTime(); + final var dateFieldValue = (Date) fieldValue; + newTimestamp = dateFieldValue.getTime(); } else { throw new DataException(config.fieldName() + " field must be INT64 or org.apache.kafka.connect.data.Timestamp: " diff --git a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfig.java b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfig.java index 40d899a..e859798 100644 --- a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfig.java +++ b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfig.java @@ -16,34 +16,96 @@ package io.aiven.kafka.connect.transforms; +import java.util.Arrays; import java.util.Map; +import java.util.stream.Collectors; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; final class ExtractTimestampConfig extends AbstractConfig { + public static final String FIELD_NAME_CONFIG = "field.name"; - private static final String FIELD_NAME_DOC = - "The name of the field is to be used as the source of timestamp. " + private static final String FIELD_NAME_DOC = "The name of the field is to be used as the source of timestamp. " + "The field must have INT64 or org.apache.kafka.connect.data.Timestamp type " + "and must mot be null."; + public static final String EPOCH_RESOLUTION_CONFIG = "timestamp.resolution"; + private static final String EPOCH_RESOLUTION_DOC = "Time resolution used for INT64 type field. " + + "Valid values are \"seconds\" for seconds since epoch and \"milliseconds\" for " + + "milliseconds since epoch. Default is \"milliseconds\" and ignored for " + + "org.apache.kafka.connect.data.Timestamp type."; + + + public enum TimestampResolution { + + MILLISECONDS("milliseconds"), + SECONDS("seconds"); + + final String resolution; + + private static final String RESOLUTIONS = + Arrays.stream(values()).map(TimestampResolution::resolution).collect(Collectors.joining(", ")); + + private TimestampResolution(final String resolution) { + this.resolution = resolution; + } + + public String resolution() { + return resolution; + } + + public static TimestampResolution fromString(final String value) { + for (final var r : values()) { + if (r.resolution.equals(value)) { + return r; + } + } + throw new IllegalArgumentException( + "Unsupported resolution type '" + value + "'. Supported are: " + RESOLUTIONS); + } + + } + ExtractTimestampConfig(final Map originals) { super(config(), originals); } static ConfigDef config() { return new ConfigDef() - .define( - FIELD_NAME_CONFIG, - ConfigDef.Type.STRING, - ConfigDef.NO_DEFAULT_VALUE, - new ConfigDef.NonEmptyString(), - ConfigDef.Importance.HIGH, - FIELD_NAME_DOC); + .define( + FIELD_NAME_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.NO_DEFAULT_VALUE, + new ConfigDef.NonEmptyString(), + ConfigDef.Importance.HIGH, + FIELD_NAME_DOC) + .define( + EPOCH_RESOLUTION_CONFIG, + ConfigDef.Type.STRING, + TimestampResolution.MILLISECONDS.resolution, + new ConfigDef.Validator() { + @Override + public void ensureValid(final String name, final Object value) { + assert value instanceof String; + try { + TimestampResolution.fromString((String) value); + } catch (final IllegalArgumentException e) { + throw new ConfigException(EPOCH_RESOLUTION_CONFIG, value, e.getMessage()); + } + } + }, + ConfigDef.Importance.LOW, + EPOCH_RESOLUTION_DOC); } final String fieldName() { return getString(FIELD_NAME_CONFIG); } + + final TimestampResolution timestampResolution() { + return TimestampResolution.fromString(getString(EPOCH_RESOLUTION_CONFIG)); + } + } diff --git a/src/test/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfigTest.java b/src/test/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfigTest.java index 8eb8bd1..08bd22f 100644 --- a/src/test/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfigTest.java +++ b/src/test/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfigTest.java @@ -51,4 +51,52 @@ void definedFieldName() { final ExtractTimestampConfig config = new ExtractTimestampConfig(props); assertEquals("test", config.fieldName()); } + + @Test + void emptyTimestampResolution() { + final var props = new HashMap<>(); + props.put("field.name", "test"); + final var config = new ExtractTimestampConfig(props); + assertEquals(ExtractTimestampConfig.TimestampResolution.MILLISECONDS, config.timestampResolution()); + } + + @Test + void definedTimestampResolutionInSeconds() { + final var props = new HashMap<>(); + props.put("field.name", "test"); + props.put( + ExtractTimestampConfig.EPOCH_RESOLUTION_CONFIG, + ExtractTimestampConfig.TimestampResolution.SECONDS.resolution + ); + final var config = new ExtractTimestampConfig(props); + assertEquals(ExtractTimestampConfig.TimestampResolution.SECONDS, config.timestampResolution()); + } + + @Test + void definedTimestampResolutionInMillis() { + final var props = new HashMap<>(); + props.put("field.name", "test"); + props.put( + ExtractTimestampConfig.EPOCH_RESOLUTION_CONFIG, + ExtractTimestampConfig.TimestampResolution.MILLISECONDS.resolution + ); + final var config = new ExtractTimestampConfig(props); + assertEquals(ExtractTimestampConfig.TimestampResolution.MILLISECONDS, config.timestampResolution()); + } + + @Test + void wrongTimestampResolution() { + final var props = new HashMap<>(); + props.put("field.name", "test"); + props.put( + ExtractTimestampConfig.EPOCH_RESOLUTION_CONFIG, + "foo" + ); + final var e = assertThrows(ConfigException.class, () -> new ExtractTimestampConfig(props)); + assertEquals( + "Invalid value foo for configuration timestamp.resolution: " + + "Unsupported resolution type 'foo'. Supported are: milliseconds, seconds", + e.getMessage()); + } + } diff --git a/src/test/java/io/aiven/kafka/connect/transforms/ExtractTimestampTest.java b/src/test/java/io/aiven/kafka/connect/transforms/ExtractTimestampTest.java index 48d9e0a..4903b54 100644 --- a/src/test/java/io/aiven/kafka/connect/transforms/ExtractTimestampTest.java +++ b/src/test/java/io/aiven/kafka/connect/transforms/ExtractTimestampTest.java @@ -16,6 +16,8 @@ package io.aiven.kafka.connect.transforms; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -30,6 +32,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -41,62 +44,56 @@ abstract class ExtractTimestampTest { @Test void recordNotStructOrMap() { final SinkRecord originalRecord = record(SchemaBuilder.INT8_SCHEMA, (byte) 123); - final Throwable e = assertThrows(DataException.class, - () -> transformation().apply(originalRecord)); + final Throwable e = assertThrows(DataException.class, () -> transformation().apply(originalRecord)); assertEquals(keyOrValue() + " type must be STRUCT or MAP: " + originalRecord, - e.getMessage()); + e.getMessage()); } @Test void recordStructNull() { final Schema schema = SchemaBuilder.struct().schema(); final SinkRecord originalRecord = record(schema, null); - final Throwable e = assertThrows(DataException.class, - () -> transformation().apply(originalRecord)); + final Throwable e = assertThrows(DataException.class, () -> transformation().apply(originalRecord)); assertEquals(keyOrValue() + " can't be null: " + originalRecord, - e.getMessage()); + e.getMessage()); } @Test void recordMapNull() { final SinkRecord originalRecord = record(null, null); - final Throwable e = assertThrows(DataException.class, - () -> transformation().apply(originalRecord)); + final Throwable e = assertThrows(DataException.class, () -> transformation().apply(originalRecord)); assertEquals(keyOrValue() + " can't be null: " + originalRecord, - e.getMessage()); + e.getMessage()); } @Test void structWithMissingField() { final Schema schema = SchemaBuilder.struct() - .field(FIELD, Schema.INT64_SCHEMA) - .build(); + .field(FIELD, Schema.INT64_SCHEMA) + .build(); final SinkRecord originalRecord = record(null, new Struct(schema)); - final Throwable e = assertThrows(DataException.class, - () -> transformation().apply(originalRecord)); + final Throwable e = assertThrows(DataException.class, () -> transformation().apply(originalRecord)); assertEquals(FIELD + " field must be present and its value can't be null: " + originalRecord, - e.getMessage()); + e.getMessage()); } @Test void mapWithMissingField() { final SinkRecord originalRecord = record(null, new HashMap<>()); - final Throwable e = assertThrows(DataException.class, - () -> transformation().apply(originalRecord)); + final Throwable e = assertThrows(DataException.class, () -> transformation().apply(originalRecord)); assertEquals(FIELD + " field must be present and its value can't be null: " + originalRecord, - e.getMessage()); + e.getMessage()); } @Test void structWithNullField() { final Schema schema = SchemaBuilder.struct() - .field(FIELD, Schema.OPTIONAL_INT64_SCHEMA) - .build(); + .field(FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .build(); final SinkRecord originalRecord = record(null, new Struct(schema).put(FIELD, null)); - final Throwable e = assertThrows(DataException.class, - () -> transformation().apply(originalRecord)); + final Throwable e = assertThrows(DataException.class, () -> transformation().apply(originalRecord)); assertEquals(FIELD + " field must be present and its value can't be null: " + originalRecord, - e.getMessage()); + e.getMessage()); } @Test @@ -104,23 +101,21 @@ void mapWithNullField() { final HashMap valueMap = new HashMap<>(); valueMap.put(FIELD, null); final SinkRecord originalRecord = record(null, valueMap); - final Throwable e = assertThrows(DataException.class, - () -> transformation().apply(originalRecord)); + final Throwable e = assertThrows(DataException.class, () -> transformation().apply(originalRecord)); assertEquals(FIELD + " field must be present and its value can't be null: " + originalRecord, - e.getMessage()); + e.getMessage()); } @Test void structWithFieldOfIncorrectType() { final Schema schema = SchemaBuilder.struct() - .field(FIELD, Schema.STRING_SCHEMA) - .build(); + .field(FIELD, Schema.STRING_SCHEMA) + .build(); final SinkRecord originalRecord = record(null, new Struct(schema).put(FIELD, "aaa")); - final Throwable e = assertThrows(DataException.class, - () -> transformation().apply(originalRecord)); + final Throwable e = assertThrows(DataException.class, () -> transformation().apply(originalRecord)); assertEquals(FIELD + " field must be INT64 or org.apache.kafka.connect.data.Timestamp: " - + originalRecord, - e.getMessage()); + + originalRecord, + e.getMessage()); } @Test @@ -128,25 +123,24 @@ void mapWithFieldOfIncorrectType() { final HashMap valueMap = new HashMap<>(); valueMap.put(FIELD, "aaa"); final SinkRecord originalRecord = record(null, valueMap); - final Throwable e = assertThrows(DataException.class, - () -> transformation().apply(originalRecord)); + final Throwable e = assertThrows(DataException.class, () -> transformation().apply(originalRecord)); assertEquals(FIELD + " field must be INT64 or org.apache.kafka.connect.data.Timestamp: " - + originalRecord, - e.getMessage()); + + originalRecord, + e.getMessage()); } @ParameterizedTest - @ValueSource(booleans = { true, false }) - void structWithIntField(final boolean optional) { + @ValueSource(booleans = {true, false}) + void structWithOptionalIntField(final boolean optional) { final Schema schema; if (optional) { schema = SchemaBuilder.struct() - .field(FIELD, Schema.OPTIONAL_INT64_SCHEMA) - .build(); + .field(FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .build(); } else { schema = SchemaBuilder.struct() - .field(FIELD, Schema.INT64_SCHEMA) - .build(); + .field(FIELD, Schema.INT64_SCHEMA) + .build(); } final long timestamp = 11363151277L; final SinkRecord originalRecord = record(null, new Struct(schema).put(FIELD, timestamp)); @@ -154,39 +148,106 @@ void structWithIntField(final boolean optional) { assertEquals(setNewTimestamp(originalRecord, timestamp), transformedRecord); } - @Test - void mapWithIntField() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void structWithOptIntField(final boolean optional) { + final Schema schema; + if (optional) { + schema = SchemaBuilder.struct() + .field(FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .build(); + } else { + schema = SchemaBuilder.struct() + .field(FIELD, Schema.INT64_SCHEMA) + .build(); + } final long timestamp = 11363151277L; - final HashMap valueMap = new HashMap<>(); - valueMap.put(FIELD, timestamp); - final SinkRecord originalRecord = record(null, valueMap); + final SinkRecord originalRecord = record(null, new Struct(schema).put(FIELD, timestamp)); final SinkRecord transformedRecord = transformation().apply(originalRecord); assertEquals(setNewTimestamp(originalRecord, timestamp), transformedRecord); } - @Test - void structWithTimestampField() { + @ParameterizedTest + @EnumSource(value = ExtractTimestampConfig.TimestampResolution.class, names = {"MILLISECONDS", "SECONDS"}) + void structWithIntField(final ExtractTimestampConfig.TimestampResolution tsResolution) { + final var schema = SchemaBuilder.struct().field(FIELD, Schema.INT64_SCHEMA).build(); + final var datetime = ZonedDateTime.of( + 2020, 11, 15, 1, 2, 3, 4, + ZoneId.of("UTC") + ); + final var instance = datetime.toInstant(); + final long timestamp; + if (tsResolution == ExtractTimestampConfig.TimestampResolution.SECONDS) { + timestamp = instance.getEpochSecond(); + } else { + timestamp = instance.toEpochMilli(); + } + final var props = new HashMap(); + props.put(ExtractTimestampConfig.EPOCH_RESOLUTION_CONFIG, tsResolution.resolution()); + final SinkRecord originalRecord = record(null, new Struct(schema).put(FIELD, timestamp)); + final SinkRecord transformedRecord = transformation(props).apply(originalRecord); + assertEquals(setNewTimestamp(originalRecord, instance.toEpochMilli()), transformedRecord); + } + + @ParameterizedTest + @EnumSource(value = ExtractTimestampConfig.TimestampResolution.class, names = {"MILLISECONDS", "SECONDS"}) + void mapWithIntField(final ExtractTimestampConfig.TimestampResolution tsResolution) { + final var datetime = ZonedDateTime.of( + 2020, 11, 15, 1, 2, 3, 4, + ZoneId.of("UTC") + ); + final var instance = datetime.toInstant(); + final var props = new HashMap(); + props.put(ExtractTimestampConfig.EPOCH_RESOLUTION_CONFIG, tsResolution.resolution()); + final long timestamp; + if (tsResolution == ExtractTimestampConfig.TimestampResolution.SECONDS) { + timestamp = instance.getEpochSecond(); + } else { + timestamp = instance.toEpochMilli(); + } + final SinkRecord originalRecord = record(null, Map.of(FIELD, timestamp)); + final var transformedRecord = transformation(props).apply(originalRecord); + assertEquals(setNewTimestamp(originalRecord, instance.toEpochMilli()), transformedRecord); + } + + @ParameterizedTest + @EnumSource(value = ExtractTimestampConfig.TimestampResolution.class, names = {"MILLISECONDS", "SECONDS"}) + void structWithTimestampField(final ExtractTimestampConfig.TimestampResolution tsResolution) { final Schema schema = SchemaBuilder.struct() - .field(FIELD, Timestamp.SCHEMA) - .build(); - final long timestamp = 11363151277L; - final SinkRecord originalRecord = record(null, new Struct(schema).put(FIELD, new Date(timestamp))); - final SinkRecord transformedRecord = transformation().apply(originalRecord); - assertEquals(setNewTimestamp(originalRecord, timestamp), transformedRecord); + .field(FIELD, Timestamp.SCHEMA) + .build(); + final var datetime = ZonedDateTime.of( + 2020, 11, 15, 1, 2, 3, 4, + ZoneId.of("UTC") + ); + final var instant = datetime.toInstant(); + final var props = new HashMap(); + props.put(ExtractTimestampConfig.EPOCH_RESOLUTION_CONFIG, tsResolution.resolution()); + final SinkRecord originalRecord = record(null, new Struct(schema).put(FIELD, Date.from(instant))); + final SinkRecord transformedRecord = transformation(props).apply(originalRecord); + assertEquals(setNewTimestamp(originalRecord, instant.toEpochMilli()), transformedRecord); } - @Test - void mapWithTimestampField() { - final long timestamp = 11363151277L; - final HashMap valueMap = new HashMap<>(); - valueMap.put(FIELD, new Date(timestamp)); - final SinkRecord originalRecord = record(null, valueMap); - final SinkRecord transformedRecord = transformation().apply(originalRecord); - assertEquals(setNewTimestamp(originalRecord, timestamp), transformedRecord); + @ParameterizedTest + @EnumSource(value = ExtractTimestampConfig.TimestampResolution.class, names = {"MILLISECONDS", "SECONDS"}) + void mapWithTimestampField(final ExtractTimestampConfig.TimestampResolution tsResolution) { + final var datetime = ZonedDateTime.of( + 2020, 11, 15, 1, 2, 3, 4, + ZoneId.of("UTC") + ); + final var instant = datetime.toInstant(); + final var props = new HashMap(); + props.put(ExtractTimestampConfig.EPOCH_RESOLUTION_CONFIG, tsResolution.resolution()); + final SinkRecord originalRecord = record(null, Map.of(FIELD, Date.from(instant))); + final SinkRecord transformedRecord = transformation(props).apply(originalRecord); + assertEquals(setNewTimestamp(originalRecord, instant.toEpochMilli()), transformedRecord); } private ExtractTimestamp transformation() { - final Map props = new HashMap<>(); + return transformation(new HashMap<>()); + } + + private ExtractTimestamp transformation(final Map props) { props.put("field.name", FIELD); final ExtractTimestamp transform = createTransformationObject(); transform.configure(props); @@ -200,21 +261,21 @@ protected SinkRecord record(final Schema keySchema, final Schema valueSchema, final Object value) { return new SinkRecord("original_topic", 0, - keySchema, key, - valueSchema, value, - 123L, - 456L, TimestampType.CREATE_TIME); + keySchema, key, + valueSchema, value, + 123L, + 456L, TimestampType.CREATE_TIME); } private SinkRecord setNewTimestamp(final SinkRecord record, final long newTimestamp) { return record.newRecord(record.topic(), - record.kafkaPartition(), - record.keySchema(), - record.key(), - record.valueSchema(), - record.value(), - newTimestamp, - record.headers() + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + record.value(), + newTimestamp, + record.headers() ); }