Skip to content

Commit

Permalink
DGS-8901 Ensure logical type config is applied to Reflect/Specific Av…
Browse files Browse the repository at this point in the history
…ro data types (#2805)

* Use GenericData from getGenericData to handle Avro logical types

* Ensure logical type is used with all Avro data instances

* minor fixes

* minor fix

* Minor cleanup
  • Loading branch information
rayokota committed Oct 24, 2023
1 parent 2e69121 commit 1c390a0
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
Expand Down Expand Up @@ -74,23 +75,25 @@ public DatumReader<?> load(IdentityPair<Schema, Schema> key) {
boolean writerSchemaIsPrimitive =
AvroSchemaUtils.getPrimitiveSchemas().containsValue(writerSchema);
if (writerSchemaIsPrimitive) {
if (avroUseLogicalTypeConverters) {
return new GenericDatumReader<>(writerSchema, finalReaderSchema,
AvroSchemaUtils.getGenericData());
} else {
return new GenericDatumReader<>(writerSchema, finalReaderSchema);
}
return new GenericDatumReader<>(writerSchema, finalReaderSchema,
avroUseLogicalTypeConverters
? AvroSchemaUtils.getGenericData()
: GenericData.get());
} else if (useSchemaReflection) {
return new ReflectDatumReader<>(writerSchema, finalReaderSchema);
return new ReflectDatumReader<>(writerSchema, finalReaderSchema,
avroUseLogicalTypeConverters
? AvroSchemaUtils.getReflectData()
: ReflectData.get());
} else if (useSpecificAvroReader) {
return new SpecificDatumReader<>(writerSchema, finalReaderSchema);
return new SpecificDatumReader<>(writerSchema, finalReaderSchema,
avroUseLogicalTypeConverters
? AvroSchemaUtils.getSpecificData()
: SpecificData.get());
} else {
if (avroUseLogicalTypeConverters) {
return new GenericDatumReader<>(writerSchema, finalReaderSchema,
AvroSchemaUtils.getGenericData());
} else {
return new GenericDatumReader<>(writerSchema, finalReaderSchema);
}
return new GenericDatumReader<>(writerSchema, finalReaderSchema,
avroUseLogicalTypeConverters
? AvroSchemaUtils.getGenericData()
: GenericData.get());
}
}
};
Expand Down Expand Up @@ -490,7 +493,8 @@ Object read(AvroSchema writerAvroSchema, AvroSchema readerAvroSchema) {
DatumReader<?> reader;
if (!migrations.isEmpty()) {
// if migration is required, then initially use GenericDatumReader
reader = new GenericDatumReader<>(writerSchema);
reader = new GenericDatumReader<>(
writerSchema, writerSchema, AvroSchemaUtils.getGenericData());
} else {
reader = getDatumReader(writerSchema, readerSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.errors.SerializationException;
Expand All @@ -76,14 +77,26 @@

public class AvroSchemaUtils {

private static final GenericData INSTANCE = new GenericData();
private static final GenericData GENERIC_DATA_INSTANCE = new GenericData();
private static final ReflectData REFLECT_DATA_INSTANCE = new ReflectData();
private static final SpecificData SPECIFIC_DATA_INSTANCE = new SpecificData();

static {
addLogicalTypeConversion(INSTANCE);
addLogicalTypeConversion(GENERIC_DATA_INSTANCE);
addLogicalTypeConversion(REFLECT_DATA_INSTANCE);
addLogicalTypeConversion(SPECIFIC_DATA_INSTANCE);
}

public static GenericData getGenericData() {
return INSTANCE;
return GENERIC_DATA_INSTANCE;
}

public static ReflectData getReflectData() {
return REFLECT_DATA_INSTANCE;
}

public static SpecificData getSpecificData() {
return SPECIFIC_DATA_INSTANCE;
}

public static void addLogicalTypeConversion(GenericData avroData) {
Expand Down Expand Up @@ -255,7 +268,8 @@ private static void removeProperty(JsonNode node, String propertyName) {
}

public static Object toObject(JsonNode value, AvroSchema schema) throws IOException {
return toObject(value, schema, new GenericDatumReader<>(schema.rawSchema()));
return toObject(value, schema, new GenericDatumReader<>(
schema.rawSchema(), schema.rawSchema(), getGenericData()));
}

public static Object toObject(
Expand All @@ -271,7 +285,8 @@ public static Object toObject(
}

public static Object toObject(String value, AvroSchema schema) throws IOException {
return toObject(value, schema, new GenericDatumReader<>(schema.rawSchema()));
return toObject(value, schema, new GenericDatumReader<>(
schema.rawSchema(), schema.rawSchema(), getGenericData()));
}

public static Object toObject(
Expand Down Expand Up @@ -309,15 +324,14 @@ public static void toJson(Object value, OutputStream out) throws IOException {
public static DatumWriter<?> getDatumWriter(
Object value, Schema schema, boolean avroUseLogicalTypeConverters) {
if (value instanceof SpecificRecord) {
return new SpecificDatumWriter<>(schema);
return new SpecificDatumWriter<>(schema,
avroUseLogicalTypeConverters ? getSpecificData() : SpecificData.get());
} else if (value instanceof GenericRecord) {
if (avroUseLogicalTypeConverters) {
return new GenericDatumWriter<>(schema, getGenericData());
} else {
return new GenericDatumWriter<>(schema);
}
return new GenericDatumWriter<>(schema,
avroUseLogicalTypeConverters ? getGenericData() : GenericData.get());
} else {
return new ReflectDatumWriter<>(schema);
return new ReflectDatumWriter<>(schema,
avroUseLogicalTypeConverters ? getReflectData() : ReflectData.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
Expand Down Expand Up @@ -74,10 +75,10 @@
import java.util.Map;
import java.util.Properties;
import java.util.SortedMap;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -104,6 +105,8 @@ public class JsonataExecutorIntegrationTest extends ClusterTestHarness {

private static final String TOPIC = "widget";

private static final UUID ID = UUID.fromString("2182b6f9-6422-43d8-819e-822b2b678eec");

public JsonataExecutorIntegrationTest() {
super(1, true);
}
Expand Down Expand Up @@ -139,6 +142,7 @@ private static Properties createConsumerProps(
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(SCHEMA_REGISTRY_URL, schemaRegistryUrl);
props.put("use.latest.with.metadata", "application.version=" + applicationVersion);
props.put("avro.use.logical.type.converters", "true");
props.putAll(additionalProps);
return props;
}
Expand Down Expand Up @@ -174,6 +178,7 @@ private static Properties createProducerProps(
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("auto.register.schemas", "false");
props.put("use.latest.with.metadata", "application.version=" + applicationVersion);
props.put("avro.use.logical.type.converters", "true");
props.put("latest.compatibility.strict", "false");
props.putAll(additionalProps);
return props;
Expand All @@ -196,13 +201,13 @@ public void testAvroReflectionFullyCompatible() throws Exception {
additionalProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
additionalProps.put(KafkaAvroSerializerConfig.SCHEMA_REFLECTION_CONFIG, "true");
List<Object> payloads = new ArrayList<>();
OldWidget widget = new OldWidget("alice");
OldWidget widget = new OldWidget(ID, "alice");
widget.setSize(123);
payloads.add(widget);
NewWidget newWidget = new NewWidget("alice");
NewWidget newWidget = new NewWidget(ID, "alice");
newWidget.setHeight(123);
payloads.add(newWidget);
NewerWidget newerWidget = new NewerWidget("alice");
NewerWidget newerWidget = new NewerWidget(ID, "alice");
newerWidget.setLength(123);
payloads.add(newerWidget);
produceAndConsume(additionalProps, payloads);
Expand Down Expand Up @@ -231,14 +236,14 @@ private static void registerReflectSchemas(String schemaRegistryUrl) throws Exce
config.setCompatibilityLevel("NONE");
schemaRegistry.updateConfig(TOPIC + "-value", config);

Schema schema = ReflectData.get().getSchema(OldWidget.class);
Schema schema = AvroSchemaUtils.getReflectData().getSchema(OldWidget.class);
AvroSchema avroSchema = new AvroSchema(schema);
SortedMap<String, String> props = ImmutableSortedMap.of("application.version", "v1");
Metadata metadata = new Metadata(Collections.emptySortedMap(), props, Collections.emptySortedSet());
avroSchema = avroSchema.copy(metadata, null);
schemaRegistry.register(TOPIC + "-value", avroSchema);

schema = ReflectData.get().getSchema(NewWidget.class);
schema = AvroSchemaUtils.getReflectData().getSchema(NewWidget.class);
avroSchema = new AvroSchema(schema);
Rule rule = new Rule("myRule1", null, RuleKind.TRANSFORM, RuleMode.UPGRADE,
JsonataExecutor.TYPE, null, null, rule1To2, null, null, false);
Expand All @@ -250,7 +255,7 @@ private static void registerReflectSchemas(String schemaRegistryUrl) throws Exce
avroSchema = avroSchema.copy(metadata, ruleSet);
schemaRegistry.register(TOPIC + "-value", avroSchema);

schema = ReflectData.get().getSchema(NewerWidget.class);
schema = AvroSchemaUtils.getReflectData().getSchema(NewerWidget.class);
avroSchema = new AvroSchema(schema);
rule = new Rule("myRule1", null, RuleKind.TRANSFORM, RuleMode.UPGRADE,
JsonataExecutor.TYPE, null, null, rule2To3, null, null, false);
Expand Down Expand Up @@ -474,13 +479,13 @@ public void testJsonSchemaPojoFullyCompatible() throws Exception {
additionalProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class);
additionalProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class);
List<Object> payloads = new ArrayList<>();
OldWidget widget = new OldWidget("alice");
OldWidget widget = new OldWidget(ID, "alice");
widget.setSize(123);
payloads.add(widget);
NewWidget newWidget = new NewWidget("alice");
NewWidget newWidget = new NewWidget(ID, "alice");
newWidget.setHeight(123);
payloads.add(newWidget);
NewerWidget newerWidget = new NewerWidget("alice");
NewerWidget newerWidget = new NewerWidget(ID, "alice");
newerWidget.setLength(123);
payloads.add(newerWidget);
produceAndConsume(additionalProps, payloads);
Expand All @@ -494,13 +499,13 @@ public void testJsonSchemaJsonNodeFullyCompatible() throws Exception {
additionalProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class);
additionalProps.put(KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPE, JsonNode.class);
List<Object> payloads = new ArrayList<>();
OldWidget widget = new OldWidget("alice");
OldWidget widget = new OldWidget(ID, "alice");
widget.setSize(123);
payloads.add(mapper.valueToTree(widget));
NewWidget newWidget = new NewWidget("alice");
NewWidget newWidget = new NewWidget(ID, "alice");
newWidget.setHeight(123);
payloads.add(mapper.valueToTree(newWidget));
NewerWidget newerWidget = new NewerWidget("alice");
NewerWidget newerWidget = new NewerWidget(ID, "alice");
newerWidget.setLength(123);
payloads.add(mapper.valueToTree(newerWidget));
produceAndConsume(additionalProps, payloads);
Expand Down

0 comments on commit 1c390a0

Please sign in to comment.