Skip to content

Commit

Permalink
KAFKA-5891; Proper handling of LogicalTypes in Cast (#4633)
Browse files Browse the repository at this point in the history
Currently logical types are dropped during Cast Transformation.
This patch fixes this behaviour.

Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
  • Loading branch information
maver1ck authored and hachikuji committed Aug 20, 2018
1 parent 0b81ef4 commit e8d343d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
Expand Up @@ -164,13 +164,17 @@ private Schema getOrBuildSchema(Schema valueSchema) {
} else {
builder = SchemaUtil.copySchemaBasics(valueSchema, SchemaBuilder.struct());
for (Field field : valueSchema.fields()) {
SchemaBuilder fieldBuilder =
convertFieldType(casts.containsKey(field.name()) ? casts.get(field.name()) : field.schema().type());
if (field.schema().isOptional())
fieldBuilder.optional();
if (field.schema().defaultValue() != null)
fieldBuilder.defaultValue(castValueToType(field.schema().defaultValue(), fieldBuilder.type()));
builder.field(field.name(), fieldBuilder.build());
if (casts.containsKey(field.name())) {
SchemaBuilder fieldBuilder = convertFieldType(casts.get(field.name()));
if (field.schema().isOptional())
fieldBuilder.optional();
if (field.schema().defaultValue() != null)
fieldBuilder.defaultValue(castValueToType(field.schema().defaultValue(), fieldBuilder.type()));
builder.field(field.name(), fieldBuilder.build());
} else {
builder.field(field.name(), field.schema());
}

}
}

Expand Down
Expand Up @@ -21,12 +21,14 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Test;

import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -304,6 +306,7 @@ public void castFieldsWithSchema() {
builder.field("boolean", Schema.BOOLEAN_SCHEMA);
builder.field("string", Schema.STRING_SCHEMA);
builder.field("optional", Schema.OPTIONAL_FLOAT32_SCHEMA);
builder.field("timestamp", Timestamp.SCHEMA);
Schema supportedTypesSchema = builder.build();

Struct recordValue = new Struct(supportedTypesSchema);
Expand All @@ -315,6 +318,7 @@ public void castFieldsWithSchema() {
recordValue.put("float64", -64.);
recordValue.put("boolean", true);
recordValue.put("string", "42");
recordValue.put("timestamp", new Date(0));
// optional field intentionally omitted

SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
Expand All @@ -331,6 +335,7 @@ public void castFieldsWithSchema() {
assertEquals(true, ((Struct) transformed.value()).schema().field("float64").schema().defaultValue());
assertEquals((byte) 1, ((Struct) transformed.value()).get("boolean"));
assertEquals(42, ((Struct) transformed.value()).get("string"));
assertEquals(new Date(0), ((Struct) transformed.value()).get("timestamp"));
assertNull(((Struct) transformed.value()).get("optional"));
}

Expand Down

0 comments on commit e8d343d

Please sign in to comment.