Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DGS-8901 Ensure logical type config is applied to Reflect/Specific Avro data types #2805

Merged
merged 5 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
Expand Down Expand Up @@ -74,23 +75,25 @@ public DatumReader<?> load(IdentityPair<Schema, Schema> key) {
boolean writerSchemaIsPrimitive =
AvroSchemaUtils.getPrimitiveSchemas().containsValue(writerSchema);
if (writerSchemaIsPrimitive) {
if (avroUseLogicalTypeConverters) {
return new GenericDatumReader<>(writerSchema, finalReaderSchema,
AvroSchemaUtils.getGenericData());
} else {
return new GenericDatumReader<>(writerSchema, finalReaderSchema);
}
return new GenericDatumReader<>(writerSchema, finalReaderSchema,
avroUseLogicalTypeConverters
? AvroSchemaUtils.getGenericData()
: GenericData.get());
} else if (useSchemaReflection) {
return new ReflectDatumReader<>(writerSchema, finalReaderSchema);
return new ReflectDatumReader<>(writerSchema, finalReaderSchema,
avroUseLogicalTypeConverters
? AvroSchemaUtils.getReflectData()
: ReflectData.get());
} else if (useSpecificAvroReader) {
return new SpecificDatumReader<>(writerSchema, finalReaderSchema);
return new SpecificDatumReader<>(writerSchema, finalReaderSchema,
avroUseLogicalTypeConverters
? AvroSchemaUtils.getSpecificData()
: SpecificData.get());
} else {
if (avroUseLogicalTypeConverters) {
return new GenericDatumReader<>(writerSchema, finalReaderSchema,
AvroSchemaUtils.getGenericData());
} else {
return new GenericDatumReader<>(writerSchema, finalReaderSchema);
}
return new GenericDatumReader<>(writerSchema, finalReaderSchema,
avroUseLogicalTypeConverters
? AvroSchemaUtils.getGenericData()
: GenericData.get());
}
}
};
Expand Down Expand Up @@ -490,7 +493,8 @@ Object read(AvroSchema writerAvroSchema, AvroSchema readerAvroSchema) {
DatumReader<?> reader;
if (!migrations.isEmpty()) {
// if migration is required, then initially use GenericDatumReader
reader = new GenericDatumReader<>(writerSchema);
reader = new GenericDatumReader<>(
writerSchema, writerSchema, AvroSchemaUtils.getGenericData());
} else {
reader = getDatumReader(writerSchema, readerSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.errors.SerializationException;
Expand All @@ -76,14 +77,26 @@

public class AvroSchemaUtils {

private static final GenericData INSTANCE = new GenericData();
private static final GenericData GENERIC_DATA_INSTANCE = new GenericData();
private static final ReflectData REFLECT_DATA_INSTANCE = new ReflectData();
private static final SpecificData SPECIFIC_DATA_INSTANCE = new SpecificData();

static {
addLogicalTypeConversion(INSTANCE);
addLogicalTypeConversion(GENERIC_DATA_INSTANCE);
addLogicalTypeConversion(REFLECT_DATA_INSTANCE);
addLogicalTypeConversion(SPECIFIC_DATA_INSTANCE);
}

public static GenericData getGenericData() {
return INSTANCE;
return GENERIC_DATA_INSTANCE;
}

public static ReflectData getReflectData() {
return REFLECT_DATA_INSTANCE;
}

public static SpecificData getSpecificData() {
return SPECIFIC_DATA_INSTANCE;
}

public static void addLogicalTypeConversion(GenericData avroData) {
Expand Down Expand Up @@ -255,7 +268,8 @@ private static void removeProperty(JsonNode node, String propertyName) {
}

public static Object toObject(JsonNode value, AvroSchema schema) throws IOException {
return toObject(value, schema, new GenericDatumReader<>(schema.rawSchema()));
return toObject(value, schema, new GenericDatumReader<>(
schema.rawSchema(), schema.rawSchema(), getGenericData()));
}

public static Object toObject(
Expand All @@ -271,7 +285,8 @@ public static Object toObject(
}

public static Object toObject(String value, AvroSchema schema) throws IOException {
return toObject(value, schema, new GenericDatumReader<>(schema.rawSchema()));
return toObject(value, schema, new GenericDatumReader<>(
schema.rawSchema(), schema.rawSchema(), getGenericData()));
}

public static Object toObject(
Expand Down Expand Up @@ -309,15 +324,14 @@ public static void toJson(Object value, OutputStream out) throws IOException {
public static DatumWriter<?> getDatumWriter(
Object value, Schema schema, boolean avroUseLogicalTypeConverters) {
if (value instanceof SpecificRecord) {
return new SpecificDatumWriter<>(schema);
return new SpecificDatumWriter<>(schema,
avroUseLogicalTypeConverters ? getSpecificData() : SpecificData.get());
} else if (value instanceof GenericRecord) {
if (avroUseLogicalTypeConverters) {
return new GenericDatumWriter<>(schema, getGenericData());
} else {
return new GenericDatumWriter<>(schema);
}
return new GenericDatumWriter<>(schema,
avroUseLogicalTypeConverters ? getGenericData() : GenericData.get());
} else {
return new ReflectDatumWriter<>(schema);
return new ReflectDatumWriter<>(schema,
avroUseLogicalTypeConverters ? getReflectData() : ReflectData.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
Expand Down Expand Up @@ -74,10 +75,10 @@
import java.util.Map;
import java.util.Properties;
import java.util.SortedMap;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -104,6 +105,8 @@ public class JsonataExecutorIntegrationTest extends ClusterTestHarness {

private static final String TOPIC = "widget";

private static final UUID ID = UUID.fromString("2182b6f9-6422-43d8-819e-822b2b678eec");

public JsonataExecutorIntegrationTest() {
super(1, true);
}
Expand Down Expand Up @@ -139,6 +142,7 @@ private static Properties createConsumerProps(
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(SCHEMA_REGISTRY_URL, schemaRegistryUrl);
props.put("use.latest.with.metadata", "application.version=" + applicationVersion);
props.put("avro.use.logical.type.converters", "true");
props.putAll(additionalProps);
return props;
}
Expand Down Expand Up @@ -174,6 +178,7 @@ private static Properties createProducerProps(
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("auto.register.schemas", "false");
props.put("use.latest.with.metadata", "application.version=" + applicationVersion);
props.put("avro.use.logical.type.converters", "true");
props.put("latest.compatibility.strict", "false");
props.putAll(additionalProps);
return props;
Expand All @@ -196,13 +201,13 @@ public void testAvroReflectionFullyCompatible() throws Exception {
additionalProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
additionalProps.put(KafkaAvroSerializerConfig.SCHEMA_REFLECTION_CONFIG, "true");
List<Object> payloads = new ArrayList<>();
OldWidget widget = new OldWidget("alice");
OldWidget widget = new OldWidget(ID, "alice");
widget.setSize(123);
payloads.add(widget);
NewWidget newWidget = new NewWidget("alice");
NewWidget newWidget = new NewWidget(ID, "alice");
newWidget.setHeight(123);
payloads.add(newWidget);
NewerWidget newerWidget = new NewerWidget("alice");
NewerWidget newerWidget = new NewerWidget(ID, "alice");
newerWidget.setLength(123);
payloads.add(newerWidget);
produceAndConsume(additionalProps, payloads);
Expand Down Expand Up @@ -231,14 +236,14 @@ private static void registerReflectSchemas(String schemaRegistryUrl) throws Exce
config.setCompatibilityLevel("NONE");
schemaRegistry.updateConfig(TOPIC + "-value", config);

Schema schema = ReflectData.get().getSchema(OldWidget.class);
Schema schema = AvroSchemaUtils.getReflectData().getSchema(OldWidget.class);
AvroSchema avroSchema = new AvroSchema(schema);
SortedMap<String, String> props = ImmutableSortedMap.of("application.version", "v1");
Metadata metadata = new Metadata(Collections.emptySortedMap(), props, Collections.emptySortedSet());
avroSchema = avroSchema.copy(metadata, null);
schemaRegistry.register(TOPIC + "-value", avroSchema);

schema = ReflectData.get().getSchema(NewWidget.class);
schema = AvroSchemaUtils.getReflectData().getSchema(NewWidget.class);
avroSchema = new AvroSchema(schema);
Rule rule = new Rule("myRule1", null, RuleKind.TRANSFORM, RuleMode.UPGRADE,
JsonataExecutor.TYPE, null, null, rule1To2, null, null, false);
Expand All @@ -250,7 +255,7 @@ private static void registerReflectSchemas(String schemaRegistryUrl) throws Exce
avroSchema = avroSchema.copy(metadata, ruleSet);
schemaRegistry.register(TOPIC + "-value", avroSchema);

schema = ReflectData.get().getSchema(NewerWidget.class);
schema = AvroSchemaUtils.getReflectData().getSchema(NewerWidget.class);
avroSchema = new AvroSchema(schema);
rule = new Rule("myRule1", null, RuleKind.TRANSFORM, RuleMode.UPGRADE,
JsonataExecutor.TYPE, null, null, rule2To3, null, null, false);
Expand Down Expand Up @@ -474,13 +479,13 @@ public void testJsonSchemaPojoFullyCompatible() throws Exception {
additionalProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class);
additionalProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class);
List<Object> payloads = new ArrayList<>();
OldWidget widget = new OldWidget("alice");
OldWidget widget = new OldWidget(ID, "alice");
widget.setSize(123);
payloads.add(widget);
NewWidget newWidget = new NewWidget("alice");
NewWidget newWidget = new NewWidget(ID, "alice");
newWidget.setHeight(123);
payloads.add(newWidget);
NewerWidget newerWidget = new NewerWidget("alice");
NewerWidget newerWidget = new NewerWidget(ID, "alice");
newerWidget.setLength(123);
payloads.add(newerWidget);
produceAndConsume(additionalProps, payloads);
Expand All @@ -494,13 +499,13 @@ public void testJsonSchemaJsonNodeFullyCompatible() throws Exception {
additionalProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class);
additionalProps.put(KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPE, JsonNode.class);
List<Object> payloads = new ArrayList<>();
OldWidget widget = new OldWidget("alice");
OldWidget widget = new OldWidget(ID, "alice");
widget.setSize(123);
payloads.add(mapper.valueToTree(widget));
NewWidget newWidget = new NewWidget("alice");
NewWidget newWidget = new NewWidget(ID, "alice");
newWidget.setHeight(123);
payloads.add(mapper.valueToTree(newWidget));
NewerWidget newerWidget = new NewerWidget("alice");
NewerWidget newerWidget = new NewerWidget(ID, "alice");
newerWidget.setLength(123);
payloads.add(mapper.valueToTree(newerWidget));
produceAndConsume(additionalProps, payloads);
Expand Down