Skip to content

Commit

Permalink
KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)
Browse files Browse the repository at this point in the history
Fix handling of nulls in TimestampConverter.

Authors: Valeria Vasylieva <valeria.vasylieva@gmail.com>, Robert Yokota <rayokota@gmail.com>
Reviewers: Arjun Satish <arjun@confluent.io>, Randall Hauch <rhauch@gmail.com>
  • Loading branch information
rayokota authored and rhauch committed Jul 12, 2019
1 parent 8c93b7e commit c05ed1e
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 42 deletions.
Expand Up @@ -47,7 +47,7 @@
import java.util.TimeZone;

import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;

public abstract class TimestampConverter<R extends ConnectRecord<R>> implements Transformation<R> {

Expand Down Expand Up @@ -85,6 +85,10 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements

private static final TimeZone UTC = TimeZone.getTimeZone("UTC");

public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema();
public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema();
public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema();

private interface TimestampTranslator {
/**
* Convert from the type-specific format to the universal java.util.Date format
Expand All @@ -94,7 +98,7 @@ private interface TimestampTranslator {
/**
* Get the schema for this format.
*/
Schema typeSchema();
Schema typeSchema(boolean isOptional);

/**
* Convert from the universal java.util.Date format to the type-specific format
Expand All @@ -118,8 +122,8 @@ public Date toRaw(Config config, Object orig) {
}

@Override
public Schema typeSchema() {
return Schema.STRING_SCHEMA;
public Schema typeSchema(boolean isOptional) {
return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA;
}

@Override
Expand All @@ -139,8 +143,8 @@ public Date toRaw(Config config, Object orig) {
}

@Override
public Schema typeSchema() {
return Schema.INT64_SCHEMA;
public Schema typeSchema(boolean isOptional) {
return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA;
}

@Override
Expand All @@ -159,8 +163,8 @@ public Date toRaw(Config config, Object orig) {
}

@Override
public Schema typeSchema() {
return org.apache.kafka.connect.data.Date.SCHEMA;
public Schema typeSchema(boolean isOptional) {
return isOptional ? OPTIONAL_DATE_SCHEMA : org.apache.kafka.connect.data.Date.SCHEMA;
}

@Override
Expand All @@ -185,8 +189,8 @@ public Date toRaw(Config config, Object orig) {
}

@Override
public Schema typeSchema() {
return Time.SCHEMA;
public Schema typeSchema(boolean isOptional) {
return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA;
}

@Override
Expand All @@ -212,8 +216,8 @@ public Date toRaw(Config config, Object orig) {
}

@Override
public Schema typeSchema() {
return Timestamp.SCHEMA;
public Schema typeSchema(boolean isOptional) {
return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA;
}

@Override
Expand Down Expand Up @@ -330,16 +334,16 @@ private R applyWithSchema(R record) {
if (config.field.isEmpty()) {
Object value = operatingValue(record);
// New schema is determined by the requested target timestamp type
Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema();
Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema(schema.isOptional());
return newRecord(record, updatedSchema, convertTimestamp(value, timestampTypeFromSchema(schema)));
} else {
final Struct value = requireStruct(operatingValue(record), PURPOSE);
Schema updatedSchema = schemaUpdateCache.get(value.schema());
final Struct value = requireStructOrNull(operatingValue(record), PURPOSE);
Schema updatedSchema = schemaUpdateCache.get(schema);
if (updatedSchema == null) {
SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
for (Field field : schema.fields()) {
if (field.name().equals(config.field)) {
builder.field(field.name(), TRANSLATORS.get(config.type).typeSchema());
builder.field(field.name(), TRANSLATORS.get(config.type).typeSchema(field.schema().isOptional()));
} else {
builder.field(field.name(), field.schema());
}
Expand All @@ -361,6 +365,9 @@ private R applyWithSchema(R record) {
}

private Struct applyValueWithSchema(Struct value, Schema updatedSchema) {
if (value == null) {
return null;
}
Struct updatedValue = new Struct(updatedSchema);
for (Field field : value.schema().fields()) {
final Object updatedFieldValue;
Expand All @@ -375,11 +382,11 @@ private Struct applyValueWithSchema(Struct value, Schema updatedSchema) {
}

private R applySchemaless(R record) {
if (config.field.isEmpty()) {
Object value = operatingValue(record);
return newRecord(record, null, convertTimestamp(value));
Object rawValue = operatingValue(record);
if (rawValue == null || config.field.isEmpty()) {
return newRecord(record, null, convertTimestamp(rawValue));
} else {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
final Map<String, Object> value = requireMap(rawValue, PURPOSE);
final HashMap<String, Object> updatedValue = new HashMap<>(value);
updatedValue.put(config.field, convertTimestamp(value.get(config.field)));
return newRecord(record, null, updatedValue);
Expand Down Expand Up @@ -424,11 +431,14 @@ private String inferTimestampType(Object timestamp) {

/**
* Convert the given timestamp to the target timestamp format.
* @param timestamp the input timestamp
* @param timestamp the input timestamp, may be null
* @param timestampFormat the format of the timestamp, or null if the format should be inferred
* @return the converted timestamp
*/
private Object convertTimestamp(Object timestamp, String timestampFormat) {
if (timestamp == null) {
return null;
}
if (timestampFormat == null) {
timestampFormat = inferTimestampType(timestamp);
}
Expand Down

0 comments on commit c05ed1e

Please sign in to comment.