Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ Exists in two variants:
- `io.aiven.kafka.connect.transforms.ExtractTimestamp$Value` - works on values.

The transformation defines the following configurations:

- `field.name` - The name of the field which should be used as the new timestamp. Cannot be `null` or empty.
- `timestamp.resolution` - The timestamp resolution for key or value
There are two possible values:
- `milliseconds` - key or value timestamp in milliseconds
- `seconds` - key or value timestamp in seconds and will be converted in milliseconds,
the default is `milliseconds`.

Here's an example of this transformation configuration:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ static void setUpAll() throws IOException, InterruptedException {
final File integrationTestClassesPath = new File(System.getProperty("integration-test.classes.path"));
assert integrationTestClassesPath.exists();

final Class[] testConnectorClasses = new Class[]{
final Class<?>[] testConnectorClasses = new Class[]{
TestSourceConnector.class, TestSourceConnector.TestSourceConnectorTask.class
};
for (final Class clazz : testConnectorClasses) {
for (final Class<?> clazz : testConnectorClasses) {
final String packageName = clazz.getPackage().getName();
final String packagePrefix = packageName + ".";
assert clazz.getCanonicalName().startsWith(packagePrefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
Expand Down Expand Up @@ -70,9 +71,15 @@ public R apply(final R record) {

final long newTimestamp;
if (fieldValue instanceof Long) {
newTimestamp = (long) fieldValue;
final var longFieldValue = (long) fieldValue;
if (config.timestampResolution() == ExtractTimestampConfig.TimestampResolution.SECONDS) {
newTimestamp = TimeUnit.SECONDS.toMillis(longFieldValue);
} else {
newTimestamp = longFieldValue;
}
} else if (fieldValue instanceof Date) {
newTimestamp = ((Date) fieldValue).getTime();
final var dateFieldValue = (Date) fieldValue;
newTimestamp = dateFieldValue.getTime();
} else {
throw new DataException(config.fieldName()
+ " field must be INT64 or org.apache.kafka.connect.data.Timestamp: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,96 @@

package io.aiven.kafka.connect.transforms;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

final class ExtractTimestampConfig extends AbstractConfig {

public static final String FIELD_NAME_CONFIG = "field.name";
private static final String FIELD_NAME_DOC =
"The name of the field is to be used as the source of timestamp. "
private static final String FIELD_NAME_DOC = "The name of the field is to be used as the source of timestamp. "
+ "The field must have INT64 or org.apache.kafka.connect.data.Timestamp type "
+ "and must mot be null.";

public static final String EPOCH_RESOLUTION_CONFIG = "timestamp.resolution";
private static final String EPOCH_RESOLUTION_DOC = "Time resolution used for INT64 type field. "
+ "Valid values are \"seconds\" for seconds since epoch and \"milliseconds\" for "
+ "milliseconds since epoch. Default is \"milliseconds\" and ignored for "
+ "org.apache.kafka.connect.data.Timestamp type.";


public enum TimestampResolution {

MILLISECONDS("milliseconds"),
SECONDS("seconds");

final String resolution;

private static final String RESOLUTIONS =
Arrays.stream(values()).map(TimestampResolution::resolution).collect(Collectors.joining(", "));

private TimestampResolution(final String resolution) {
this.resolution = resolution;
}

public String resolution() {
return resolution;
}

public static TimestampResolution fromString(final String value) {
for (final var r : values()) {
if (r.resolution.equals(value)) {
return r;
}
}
throw new IllegalArgumentException(
"Unsupported resolution type '" + value + "'. Supported are: " + RESOLUTIONS);
}

}

ExtractTimestampConfig(final Map<?, ?> originals) {
super(config(), originals);
}

static ConfigDef config() {
return new ConfigDef()
.define(
FIELD_NAME_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
new ConfigDef.NonEmptyString(),
ConfigDef.Importance.HIGH,
FIELD_NAME_DOC);
.define(
FIELD_NAME_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
new ConfigDef.NonEmptyString(),
ConfigDef.Importance.HIGH,
FIELD_NAME_DOC)
.define(
EPOCH_RESOLUTION_CONFIG,
ConfigDef.Type.STRING,
TimestampResolution.MILLISECONDS.resolution,
new ConfigDef.Validator() {
@Override
public void ensureValid(final String name, final Object value) {
assert value instanceof String;
try {
TimestampResolution.fromString((String) value);
} catch (final IllegalArgumentException e) {
throw new ConfigException(EPOCH_RESOLUTION_CONFIG, value, e.getMessage());
}
}
},
ConfigDef.Importance.LOW,
EPOCH_RESOLUTION_DOC);
}

final String fieldName() {
return getString(FIELD_NAME_CONFIG);
}

final TimestampResolution timestampResolution() {
return TimestampResolution.fromString(getString(EPOCH_RESOLUTION_CONFIG));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,52 @@ void definedFieldName() {
final ExtractTimestampConfig config = new ExtractTimestampConfig(props);
assertEquals("test", config.fieldName());
}

@Test
void emptyTimestampResolution() {
final var props = new HashMap<>();
props.put("field.name", "test");
final var config = new ExtractTimestampConfig(props);
assertEquals(ExtractTimestampConfig.TimestampResolution.MILLISECONDS, config.timestampResolution());
}

@Test
void definedTimestampResolutionInSeconds() {
final var props = new HashMap<>();
props.put("field.name", "test");
props.put(
ExtractTimestampConfig.EPOCH_RESOLUTION_CONFIG,
ExtractTimestampConfig.TimestampResolution.SECONDS.resolution
);
final var config = new ExtractTimestampConfig(props);
assertEquals(ExtractTimestampConfig.TimestampResolution.SECONDS, config.timestampResolution());
}

@Test
void definedTimestampResolutionInMillis() {
final var props = new HashMap<>();
props.put("field.name", "test");
props.put(
ExtractTimestampConfig.EPOCH_RESOLUTION_CONFIG,
ExtractTimestampConfig.TimestampResolution.MILLISECONDS.resolution
);
final var config = new ExtractTimestampConfig(props);
assertEquals(ExtractTimestampConfig.TimestampResolution.MILLISECONDS, config.timestampResolution());
}

@Test
void wrongTimestampResolution() {
final var props = new HashMap<>();
props.put("field.name", "test");
props.put(
ExtractTimestampConfig.EPOCH_RESOLUTION_CONFIG,
"foo"
);
final var e = assertThrows(ConfigException.class, () -> new ExtractTimestampConfig(props));
assertEquals(
"Invalid value foo for configuration timestamp.resolution: "
+ "Unsupported resolution type 'foo'. Supported are: milliseconds, seconds",
e.getMessage());
}

}
Loading