diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java index 360b8de3c51e..97ef93f891bf 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java @@ -20,9 +20,10 @@ package org.apache.hudi.utilities.schema; import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.ReflectionUtils; -import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.utilities.sources.helpers.ProtoConversionUtil; import org.apache.avro.Schema; @@ -38,27 +39,44 @@ public class ProtoClassBasedSchemaProvider extends SchemaProvider { * Configs supported. */ public static class Config { - public static final String PROTO_SCHEMA_CLASS_NAME = "hoodie.deltastreamer.schemaprovider.proto.className"; - public static final String PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = "hoodie.deltastreamer.schemaprovider.proto.flattenWrappers"; + private static final String PROTO_SCHEMA_PROVIDER_PREFIX = "hoodie.deltastreamer.schemaprovider.proto"; + public static final ConfigProperty PROTO_SCHEMA_CLASS_NAME = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".class.name") + .noDefaultValue() + .sinceVersion("0.13.0") + .withDocumentation("The Protobuf Message class used as the source for the schema."); + + public static final ConfigProperty PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers") + .defaultValue(false) + .sinceVersion("0.13.0") + .withDocumentation("When set to false wrapped primitives like Int64Value are translated to a record with a single 'value' field instead of simply a nullable value"); + + public static final ConfigProperty PROTO_SCHEMA_MAX_RECURSION_DEPTH = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".max.recursion.depth") + .defaultValue(5) + .sinceVersion("0.13.0") + .withDocumentation("The max depth to unravel the Proto schema when translating into an Avro schema. Setting this depth allows the user to convert a schema that is recursive in proto into " + + "something that can be represented in their lake format like Parquet. After a given class has been seen N times within a single branch, the schema provider will create a record with a " + + "byte array to hold the remaining proto data and a string to hold the message descriptor's name for context."); } private final String schemaString; /** - * To be lazily inited on executors. + * To be lazily initiated on executors. */ private transient Schema schema; public ProtoClassBasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); DataSourceUtils.checkRequiredProperties(props, Collections.singletonList( - Config.PROTO_SCHEMA_CLASS_NAME)); - String className = config.getString(Config.PROTO_SCHEMA_CLASS_NAME); - boolean flattenWrappedPrimitives = props.getBoolean(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES, false); + Config.PROTO_SCHEMA_CLASS_NAME.key())); + String className = config.getString(Config.PROTO_SCHEMA_CLASS_NAME.key()); + boolean flattenWrappedPrimitives = props.getBoolean(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES.key(), + Config.PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES.defaultValue()); + int maxRecursionDepth = props.getInteger(Config.PROTO_SCHEMA_MAX_RECURSION_DEPTH.key(), Config.PROTO_SCHEMA_MAX_RECURSION_DEPTH.defaultValue()); try { - schemaString = ProtoConversionUtil.getAvroSchemaForMessageClass(ReflectionUtils.getClass(className), flattenWrappedPrimitives).toString(); + schemaString = ProtoConversionUtil.getAvroSchemaForMessageClass(ReflectionUtils.getClass(className), flattenWrappedPrimitives, maxRecursionDepth).toString(); } catch (Exception e) { - throw new HoodieException(String.format("Error reading proto source schema for class: %s", className), e); + throw new HoodieSchemaException(String.format("Error reading proto source schema for class: %s", className), e); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java index ae37273be6b9..1ed057e322de 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java @@ -53,10 +53,10 @@ public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) { super(props, sparkContext, sparkSession, schemaProvider, SourceType.PROTO, metrics); DataSourceUtils.checkRequiredProperties(props, Collections.singletonList( - ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME)); + ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key())); props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class); props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, ByteArrayDeserializer.class); - className = props.getString(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME); + className = props.getString(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key()); this.offsetGen = new KafkaOffsetGen(props); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java index 240f0a4bd912..3feffc4971dc 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java @@ -37,6 +37,7 @@ import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; +import org.apache.kafka.common.utils.CopyOnWriteMap; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -45,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -57,10 +59,11 @@ public class ProtoConversionUtil { * Creates an Avro {@link Schema} for the provided class. Assumes that the class is a protobuf {@link Message}. * @param clazz The protobuf class * @param flattenWrappedPrimitives set to true to treat wrapped primitives like nullable fields instead of nested messages. + * @param maxRecursionDepth the number of times to unravel a recursive proto schema before spilling the rest to bytes * @return An Avro schema */ - public static Schema getAvroSchemaForMessageClass(Class clazz, boolean flattenWrappedPrimitives) { - return AvroSupport.get().getSchema(clazz, flattenWrappedPrimitives); + public static Schema getAvroSchemaForMessageClass(Class clazz, boolean flattenWrappedPrimitives, int maxRecursionDepth) { + return AvroSupport.get().getSchema(clazz, flattenWrappedPrimitives, maxRecursionDepth); } /** @@ -80,17 +83,19 @@ public static GenericRecord convertToAvro(Schema schema, Message message) { * 2. Convert directly from a protobuf {@link Message} to a {@link GenericRecord} while properly handling enums and wrapped primitives mentioned above. */ private static class AvroSupport { + private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING); + private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL); + private static final String OVERFLOW_DESCRIPTOR_FIELD_NAME = "descriptor_full_name"; + private static final String OVERFLOW_BYTES_FIELD_NAME = "proto_bytes"; + private static final Schema RECURSION_OVERFLOW_SCHEMA = Schema.createRecord("recursion_overflow", null, "org.apache.hudi.proto", false, + Arrays.asList(new Schema.Field(OVERFLOW_DESCRIPTOR_FIELD_NAME, STRING_SCHEMA, null, ""), + new Schema.Field(OVERFLOW_BYTES_FIELD_NAME, Schema.create(Schema.Type.BYTES), null, "".getBytes()))); private static final AvroSupport INSTANCE = new AvroSupport(); // A cache of the proto class name paired with whether wrapped primitives should be flattened as the key and the generated avro schema as the value - private static final Map, Schema> SCHEMA_CACHE = new ConcurrentHashMap<>(); + private static final Map SCHEMA_CACHE = new ConcurrentHashMap<>(); // A cache with a key as the pair target avro schema and the proto descriptor for the source and the value as an array of proto field descriptors where the order matches the avro ordering. // When converting from proto to avro, we want to be able to iterate over the fields in the proto in the same order as they appear in the avro schema. private static final Map, Descriptors.FieldDescriptor[]> FIELD_CACHE = new ConcurrentHashMap<>(); - - - private static final Schema STRINGS = Schema.create(Schema.Type.STRING); - - private static final Schema NULL = Schema.create(Schema.Type.NULL); private static final Map WRAPPER_DESCRIPTORS_TO_TYPE = getWrapperDescriptorsToType(); private static Map getWrapperDescriptorsToType() { @@ -118,14 +123,15 @@ public GenericRecord convert(Schema schema, Message message) { return (GenericRecord) convertObject(schema, message); } - public Schema getSchema(Class c, boolean flattenWrappedPrimitives) { - return SCHEMA_CACHE.computeIfAbsent(Pair.of(c, flattenWrappedPrimitives), key -> { + public Schema getSchema(Class c, boolean flattenWrappedPrimitives, int maxRecursionDepth) { + return SCHEMA_CACHE.computeIfAbsent(new SchemaCacheKey(c, flattenWrappedPrimitives, maxRecursionDepth), key -> { try { Object descriptor = c.getMethod("getDescriptor").invoke(null); if (c.isEnum()) { return getEnumSchema((Descriptors.EnumDescriptor) descriptor); } else { - return getMessageSchema((Descriptors.Descriptor) descriptor, new HashMap<>(), flattenWrappedPrimitives); + Descriptors.Descriptor castedDescriptor = (Descriptors.Descriptor) descriptor; + return getMessageSchema(castedDescriptor, new CopyOnWriteMap<>(), flattenWrappedPrimitives, getNamespace(castedDescriptor.getFullName()), maxRecursionDepth); } } catch (Exception e) { throw new RuntimeException(e); @@ -141,24 +147,40 @@ private Schema getEnumSchema(Descriptors.EnumDescriptor enumDescriptor) { return Schema.createEnum(enumDescriptor.getName(), null, getNamespace(enumDescriptor.getFullName()), symbols); } - private Schema getMessageSchema(Descriptors.Descriptor descriptor, Map seen, boolean flattenWrappedPrimitives) { - if (seen.containsKey(descriptor)) { - return seen.get(descriptor); + /** + * Translates a Proto Message descriptor into an Avro Schema + * @param descriptor the descriptor for the proto message + * @param recursionDepths a map of the descriptor to the number of times it has been encountered in this depth first traversal of the schema. + * This is used to cap the number of times we recurse on a schema. + * @param flattenWrappedPrimitives if true, treat wrapped primitives as nullable primitives, if false, treat them as proto messages + * @param path a string prefixed with the namespace of the original message being translated to avro and containing the current dot separated path tracking progress through the schema. + * This value is used for a namespace when creating Avro records to avoid an error when reusing the same class name when unraveling a recursive schema. + * @param maxRecursionDepth the number of times to unravel a recursive proto schema before spilling the rest to bytes + * @return an avro schema + */ + private Schema getMessageSchema(Descriptors.Descriptor descriptor, CopyOnWriteMap recursionDepths, boolean flattenWrappedPrimitives, String path, + int maxRecursionDepth) { + // Parquet does not handle recursive schemas so we "unravel" the proto N levels + Integer currentRecursionCount = recursionDepths.getOrDefault(descriptor, 0); + if (currentRecursionCount >= maxRecursionDepth) { + return RECURSION_OVERFLOW_SCHEMA; } - Schema result = Schema.createRecord(descriptor.getName(), null, - getNamespace(descriptor.getFullName()), false); + // The current path is used as a namespace to avoid record name collisions within recursive schemas + Schema result = Schema.createRecord(descriptor.getName(), null, path, false); - seen.put(descriptor, result); + recursionDepths.put(descriptor, ++currentRecursionCount); List fields = new ArrayList<>(descriptor.getFields().size()); for (Descriptors.FieldDescriptor f : descriptor.getFields()) { - fields.add(new Schema.Field(f.getName(), getFieldSchema(f, seen, flattenWrappedPrimitives), null, getDefault(f))); + // each branch of the schema traversal requires its own recursion depth tracking so copy the recursionDepths map + fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path, maxRecursionDepth), null, getDefault(f))); } result.setFields(fields); return result; } - private Schema getFieldSchema(Descriptors.FieldDescriptor f, Map seen, boolean flattenWrappedPrimitives) { + private Schema getFieldSchema(Descriptors.FieldDescriptor f, CopyOnWriteMap recursionDepths, boolean flattenWrappedPrimitives, String path, + int maxRecursionDepth) { Function schemaFinalizer = f.isRepeated() ? Schema::createArray : Function.identity(); switch (f.getType()) { case BOOL: @@ -188,16 +210,18 @@ private Schema getFieldSchema(Descriptors.FieldDescriptor f, Map mapValue = (Map) value; Map mapCopy = new HashMap<>(mapValue.size()); for (Map.Entry entry : mapValue.entrySet()) { - mapCopy.put(convertObject(STRINGS, entry.getKey()), convertObject(schema.getValueType(), entry.getValue())); + mapCopy.put(convertObject(STRING_SCHEMA, entry.getKey()), convertObject(schema.getValueType(), entry.getValue())); } return mapCopy; case NULL: @@ -355,5 +387,38 @@ private String getNamespace(String descriptorFullName) { int lastDotIndex = descriptorFullName.lastIndexOf('.'); return descriptorFullName.substring(0, lastDotIndex); } + + private String appendFieldNameToPath(String existingPath, String fieldName) { + return existingPath + "." + fieldName; + } + + private static class SchemaCacheKey { + private final String className; + private final boolean flattenWrappedPrimitives; + private final int maxRecursionDepth; + + SchemaCacheKey(Class clazz, boolean flattenWrappedPrimitives, int maxRecursionDepth) { + this.className = clazz.getName(); + this.flattenWrappedPrimitives = flattenWrappedPrimitives; + this.maxRecursionDepth = maxRecursionDepth; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SchemaCacheKey that = (SchemaCacheKey) o; + return flattenWrappedPrimitives == that.flattenWrappedPrimitives && maxRecursionDepth == that.maxRecursionDepth && className.equals(that.className); + } + + @Override + public int hashCode() { + return Objects.hash(className, flattenWrappedPrimitives, maxRecursionDepth); + } + } } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestProtoClassBasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestProtoClassBasedSchemaProvider.java index 2d069f7f591c..00a030bc9938 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestProtoClassBasedSchemaProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestProtoClassBasedSchemaProvider.java @@ -20,42 +20,60 @@ package org.apache.hudi.utilities.schema; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.test.proto.Parent; import org.apache.hudi.utilities.test.proto.Sample; import org.apache.avro.Schema; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.util.Scanner; +import java.io.IOException; public class TestProtoClassBasedSchemaProvider { @Test - public void validateDefaultSchemaGeneration() { + public void validateDefaultSchemaGeneration() throws IOException { TypedProperties properties = new TypedProperties(); - properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME, Sample.class.getName()); + properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName()); ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null); - Schema protoSchema = protoToAvroSchemaProvider.getSourceSchema(); + Schema convertedSchema = protoToAvroSchemaProvider.getSourceSchema(); Schema.Parser parser = new Schema.Parser(); - Schema expectedSchema = parser.parse(getExpectedSchema("schema-provider/proto/sample_schema_nested.txt")); - Assertions.assertEquals(expectedSchema, protoSchema); + Schema expectedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc")); + Assertions.assertEquals(expectedSchema, convertedSchema); } @Test - public void validateFlattenedSchemaGeneration() { + public void validateFlattenedSchemaGeneration() throws IOException { TypedProperties properties = new TypedProperties(); - properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME, Sample.class.getName()); - properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES, "true"); + properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName()); + properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES.key(), "true"); ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null); - Schema protoSchema = protoToAvroSchemaProvider.getSourceSchema(); + Schema convertedSchema = protoToAvroSchemaProvider.getSourceSchema(); Schema.Parser parser = new Schema.Parser(); - Schema expectedSchema = parser.parse(getExpectedSchema("schema-provider/proto/sample_schema_flattened.txt")); - Assertions.assertEquals(expectedSchema, protoSchema); + Schema expectedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_flattened.avsc")); + Assertions.assertEquals(expectedSchema, convertedSchema); } - private String getExpectedSchema(String pathToExpectedSchema) { - try (Scanner scanner = new Scanner(getClass().getClassLoader().getResourceAsStream(pathToExpectedSchema))) { - return scanner.next(); - } + @Test + public void validateRecursiveSchemaGeneration_depth2() throws IOException { + TypedProperties properties = new TypedProperties(); + properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Parent.class.getName()); + properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_MAX_RECURSION_DEPTH.key(), String.valueOf(2)); + ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null); + Schema convertedSchema = protoToAvroSchemaProvider.getSourceSchema(); + Schema.Parser parser = new Schema.Parser(); + Schema expectedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive_depth_2.avsc")); + Assertions.assertEquals(expectedSchema, convertedSchema); + } + + @Test + public void validateRecursiveSchemaGeneration_defaultDepth() throws IOException { + TypedProperties properties = new TypedProperties(); + properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Parent.class.getName()); + ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null); + Schema convertedSchema = protoToAvroSchemaProvider.getSourceSchema(); + Schema.Parser parser = new Schema.Parser(); + Schema expectedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive_default_limit.avsc")); + Assertions.assertEquals(expectedSchema, convertedSchema); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java index 0b2834623de0..061378a81130 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java @@ -80,7 +80,7 @@ protected TypedProperties createPropsForKafkaSource(String topic, Long maxEvents maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue())); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - props.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME, Sample.class.getName()); + props.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName()); return props; } @@ -98,7 +98,7 @@ public void testProtoKafkaSourceWithFlattenWrappedPrimitives() { final String topic = TEST_TOPIC_PREFIX + "testProtoKafkaSourceFlatten"; testUtils.createTopic(topic, 2); TypedProperties props = createPropsForKafkaSource(topic, null, "earliest"); - props.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES, "true"); + props.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES.key(), "true"); SchemaProvider schemaProvider = new ProtoClassBasedSchemaProvider(props, jsc()); Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(protoKafkaSource); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestProtoConversionUtil.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestProtoConversionUtil.java index 937d914e1aea..949ee0f597cc 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestProtoConversionUtil.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestProtoConversionUtil.java @@ -18,7 +18,10 @@ package org.apache.hudi.utilities.sources.helpers; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.test.proto.Child; import org.apache.hudi.utilities.test.proto.Nested; +import org.apache.hudi.utilities.test.proto.Parent; import org.apache.hudi.utilities.test.proto.Sample; import org.apache.hudi.utilities.test.proto.SampleEnum; @@ -35,264 +38,246 @@ import com.google.protobuf.UInt64Value; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import com.google.protobuf.util.Timestamps; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Scanner; +import java.util.Random; import java.util.function.Function; import java.util.stream.Collectors; public class TestProtoConversionUtil { + private static final Random RANDOM = new Random(); + @Test - public void allFieldsSet_wellKnownTypesAreNested() { - List primitiveList = Arrays.asList(1, 2, 3); - Map primitiveMap = new HashMap<>(); - primitiveMap.put("key1", 1); - primitiveMap.put("key2", 2); + public void allFieldsSet_wellKnownTypesAreNested() throws IOException { + Schema.Parser parser = new Schema.Parser(); + Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc")); + Pair inputAndOutput = createInputOutputSampleWithRandomValues(convertedSchema, true); + GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, inputAndOutput.getLeft()), convertedSchema); + Assertions.assertEquals(inputAndOutput.getRight(), actual); + } - Nested nestedMessage = Nested.newBuilder().setNestedInt(1).build(); - List nestedList = Arrays.asList(Nested.newBuilder().setNestedInt(2).build(), Nested.newBuilder().setNestedInt(3).build()); - Map nestedMap = new HashMap<>(); - nestedMap.put("1Key", Nested.newBuilder().setNestedInt(123).build()); - nestedMap.put("2Key", Nested.newBuilder().setNestedInt(321).build()); - Timestamp time = Timestamps.fromMillis(System.currentTimeMillis()); + @Test + public void noFieldsSet_wellKnownTypesAreNested() throws IOException { + Sample sample = Sample.newBuilder().build(); + Schema.Parser parser = new Schema.Parser(); + Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc")); + GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, sample), convertedSchema); + Assertions.assertEquals(createDefaultOutput(convertedSchema), actual); + } - Sample sample = Sample.newBuilder() - .setPrimitiveDouble(1.1) - .setPrimitiveFloat(2.1f) - .setPrimitiveInt(1) - .setPrimitiveLong(2L) - .setPrimitiveUnsignedInt(3) - .setPrimitiveUnsignedLong(4L) - .setPrimitiveSignedInt(5) - .setPrimitiveSignedLong(6L) - .setPrimitiveFixedInt(7) - .setPrimitiveFixedLong(8L) - .setPrimitiveFixedSignedInt(9) - .setPrimitiveFixedSignedLong(10L) - .setPrimitiveBoolean(true) - .setPrimitiveString("I am a string!") - .setPrimitiveBytes(ByteString.copyFrom("I am just bytes".getBytes())) - .addAllRepeatedPrimitive(primitiveList) - .putAllMapPrimitive(primitiveMap) - .setNestedMessage(nestedMessage) - .addAllRepeatedMessage(nestedList) - .putAllMapMessage(nestedMap) - .setWrappedString(StringValue.of("I am a wrapped string")) - .setWrappedInt(Int32Value.of(11)) - .setWrappedLong(Int64Value.of(12L)) - .setWrappedUnsignedInt(UInt32Value.of(13)) - .setWrappedUnsignedLong(UInt64Value.of(14L)) - .setWrappedDouble(DoubleValue.of(15.5)) - .setWrappedFloat(FloatValue.of(16.6f)) - .setWrappedBoolean(BoolValue.of(true)) - .setWrappedBytes(BytesValue.of(ByteString.copyFrom("I am wrapped bytes".getBytes()))) - .setEnum(SampleEnum.SECOND) - .setTimestamp(time) - .build(); + @Test + public void allFieldsSet_wellKnownTypesAreFlattened() throws IOException { Schema.Parser parser = new Schema.Parser(); - Schema protoSchema = parser.parse(getSchema("schema-provider/proto/sample_schema_nested.txt")); - - GenericRecord actual = ProtoConversionUtil.convertToAvro(protoSchema, sample); - - Schema nestedMessageSchema = protoSchema.getField("nested_message").schema().getTypes().get(1); - - GenericData.Record wrappedStringRecord = getWrappedRecord(protoSchema, "wrapped_string", "I am a wrapped string"); - GenericData.Record wrappedIntRecord = getWrappedRecord(protoSchema, "wrapped_int", 11); - GenericData.Record wrappedLongRecord = getWrappedRecord(protoSchema, "wrapped_long", 12L); - GenericData.Record wrappedUIntRecord = getWrappedRecord(protoSchema, "wrapped_unsigned_int", 13L); - GenericData.Record wrappedULongRecord = getWrappedRecord(protoSchema, "wrapped_unsigned_long", 14L); - GenericData.Record wrappedDoubleRecord = getWrappedRecord(protoSchema, "wrapped_double", 15.5); - GenericData.Record wrappedFloatRecord = getWrappedRecord(protoSchema, "wrapped_float", 16.6f); - GenericData.Record wrappedBooleanRecord = getWrappedRecord(protoSchema, "wrapped_boolean", true); - GenericData.Record wrappedBytesRecord = getWrappedRecord(protoSchema, "wrapped_bytes", ByteBuffer.wrap("I am wrapped bytes".getBytes())); - - GenericData.Record expectedRecord = new GenericData.Record(protoSchema); - expectedRecord.put("primitive_double", 1.1); - expectedRecord.put("primitive_float", 2.1f); - expectedRecord.put("primitive_int", 1); - expectedRecord.put("primitive_long", 2L); - expectedRecord.put("primitive_unsigned_int", 3L); - expectedRecord.put("primitive_unsigned_long", 4L); - expectedRecord.put("primitive_signed_int", 5); - expectedRecord.put("primitive_signed_long", 6L); - expectedRecord.put("primitive_fixed_int", 7); - expectedRecord.put("primitive_fixed_long", 8L); - expectedRecord.put("primitive_fixed_signed_int", 9); - expectedRecord.put("primitive_fixed_signed_long", 10L); - expectedRecord.put("primitive_boolean", true); - expectedRecord.put("primitive_string", "I am a string!"); - expectedRecord.put("primitive_bytes", ByteBuffer.wrap("I am just bytes".getBytes())); - expectedRecord.put("repeated_primitive", primitiveList); - expectedRecord.put("map_primitive", convertMapToList(protoSchema, "map_primitive", primitiveMap)); - expectedRecord.put("nested_message", convertNestedMessage(nestedMessageSchema, nestedMessage)); - expectedRecord.put("repeated_message", nestedList.stream().map(m -> convertNestedMessage(nestedMessageSchema, m)).collect(Collectors.toList())); - expectedRecord.put("map_message", convertMapToList(protoSchema, "map_message", nestedMap, value -> convertNestedMessage(nestedMessageSchema, value))); - expectedRecord.put("wrapped_string", wrappedStringRecord); - expectedRecord.put("wrapped_int", wrappedIntRecord); - expectedRecord.put("wrapped_long", wrappedLongRecord); - expectedRecord.put("wrapped_unsigned_int", wrappedUIntRecord); - expectedRecord.put("wrapped_unsigned_long", wrappedULongRecord); - expectedRecord.put("wrapped_double", wrappedDoubleRecord); - expectedRecord.put("wrapped_float", wrappedFloatRecord); - expectedRecord.put("wrapped_boolean", wrappedBooleanRecord); - expectedRecord.put("wrapped_bytes", wrappedBytesRecord); - expectedRecord.put("enum", SampleEnum.SECOND.name()); - expectedRecord.put("timestamp", getTimestampRecord(protoSchema, time)); - Assertions.assertEquals(expectedRecord, actual); + Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_flattened.avsc")); + Pair inputAndOutput = createInputOutputSampleWithRandomValues(convertedSchema, false); + GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, inputAndOutput.getLeft()), convertedSchema); + Assertions.assertEquals(inputAndOutput.getRight(), actual); } @Test - public void noFieldsSet_wellKnownTypesAreNested() { + public void noFieldsSet_wellKnownTypesAreFlattened() throws IOException { Sample sample = Sample.newBuilder().build(); Schema.Parser parser = new Schema.Parser(); - Schema protoSchema = parser.parse(getSchema("schema-provider/proto/sample_schema_nested.txt")); - - GenericRecord actual = ProtoConversionUtil.convertToAvro(protoSchema, sample); + Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_flattened.avsc")); + GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, sample), convertedSchema); + Assertions.assertEquals(createDefaultOutput(convertedSchema), actual); + } - // all fields will have default values - GenericData.Record expectedRecord = new GenericData.Record(protoSchema); - expectedRecord.put("primitive_double", 0.0); - expectedRecord.put("primitive_float", 0.0f); - expectedRecord.put("primitive_int", 0); - expectedRecord.put("primitive_long", 0L); - expectedRecord.put("primitive_unsigned_int", 0L); - expectedRecord.put("primitive_unsigned_long", 0L); - expectedRecord.put("primitive_signed_int", 0); - expectedRecord.put("primitive_signed_long", 0L); - expectedRecord.put("primitive_fixed_int", 0); - expectedRecord.put("primitive_fixed_long", 0L); - expectedRecord.put("primitive_fixed_signed_int", 0); - expectedRecord.put("primitive_fixed_signed_long", 0L); - expectedRecord.put("primitive_boolean", false); - expectedRecord.put("primitive_string", ""); - expectedRecord.put("primitive_bytes", ByteBuffer.wrap("".getBytes())); - expectedRecord.put("repeated_primitive", Collections.emptyList()); - expectedRecord.put("map_primitive", Collections.emptyList()); - expectedRecord.put("nested_message", null); - expectedRecord.put("repeated_message", Collections.emptyList()); - expectedRecord.put("map_message", Collections.emptyList()); - expectedRecord.put("wrapped_string", null); - expectedRecord.put("wrapped_int", null); - expectedRecord.put("wrapped_long", null); - expectedRecord.put("wrapped_unsigned_int", null); - expectedRecord.put("wrapped_unsigned_long", null); - expectedRecord.put("wrapped_double", null); - expectedRecord.put("wrapped_float", null); - expectedRecord.put("wrapped_boolean", null); - expectedRecord.put("wrapped_bytes", null); - expectedRecord.put("enum", SampleEnum.FIRST.name()); - expectedRecord.put("timestamp", null); - Assertions.assertEquals(expectedRecord, actual); + @Test + public void recursiveSchema_noOverflow() throws IOException { + Schema.Parser parser = new Schema.Parser(); + Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive_depth_2.avsc")); + Pair inputAndOutput = createInputOutputForRecursiveSchemaNoOverflow(convertedSchema); + GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, inputAndOutput.getLeft()), convertedSchema); + Assertions.assertEquals(inputAndOutput.getRight(), actual); } @Test - public void allFieldsSet_wellKnownTypesAreFlattened() { - List primitiveList = Arrays.asList(1, 2, 3); + public void recursiveSchema_withOverflow() throws Exception { + Schema.Parser parser = new Schema.Parser(); + Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive_depth_2.avsc")); + Pair inputAndOutput = createInputOutputForRecursiveSchemaWithOverflow(convertedSchema); + Parent input = inputAndOutput.getLeft(); + GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, inputAndOutput.getLeft()), convertedSchema); + Assertions.assertEquals(inputAndOutput.getRight(), actual); + // assert that overflow data can be read back into proto class + Child parsedSingleChildOverflow = Child.parseFrom(getOverflowBytesFromChildRecord((GenericRecord) actual.get("child"))); + Assertions.assertEquals(input.getChild().getRecurseField().getRecurseField(), parsedSingleChildOverflow); + // Get children list + GenericData.Array array = (GenericData.Array) actual.get("children"); + Child parsedChildren1Overflow = Child.parseFrom(getOverflowBytesFromChildRecord(array.get(0))); + Assertions.assertEquals(input.getChildren(0).getRecurseField().getRecurseField(), parsedChildren1Overflow); + Child parsedChildren2Overflow = Child.parseFrom(getOverflowBytesFromChildRecord(array.get(1))); + Assertions.assertEquals(input.getChildren(1).getRecurseField().getRecurseField(), parsedChildren2Overflow); + } + + private Pair createInputOutputSampleWithRandomValues(Schema schema, boolean wellKnownTypesAsRecords) { + Schema nestedMessageSchema = schema.getField("nested_message").schema().getTypes().get(1); + Schema listMessageSchema = schema.getField("repeated_message").schema().getElementType(); + Schema mapMessageSchema = schema.getField("map_message").schema().getElementType().getField("value").schema().getTypes().get(1); + + double primitiveDouble = RANDOM.nextDouble(); + float primitiveFloat = RANDOM.nextFloat(); + int primitiveInt = RANDOM.nextInt(); + long primitiveLong = RANDOM.nextLong(); + int primitiveUnsignedInt = RANDOM.nextInt(); + long primitiveUnsignedLong = RANDOM.nextLong(); + int primitiveSignedInt = RANDOM.nextInt(); + long primitiveSignedLong = RANDOM.nextLong(); + int primitiveFixedInt = RANDOM.nextInt(); + long primitiveFixedLong = RANDOM.nextLong(); + int primitiveFixedSignedInt = RANDOM.nextInt(); + long primitiveFixedSignedLong = RANDOM.nextLong(); + boolean primitiveBoolean = RANDOM.nextBoolean(); + String primitiveString = randomString(10); + byte[] primitiveBytes = randomString(10).getBytes(); + + double wrappedDouble = RANDOM.nextDouble(); + float wrappedFloat = RANDOM.nextFloat(); + int wrappedInt = RANDOM.nextInt(); + long wrappedLong = RANDOM.nextLong(); + int wrappedUnsignedInt = RANDOM.nextInt(); + long wrappedUnsignedLong = RANDOM.nextLong(); + boolean wrappedBoolean = RANDOM.nextBoolean(); + String wrappedString = randomString(10); + byte[] wrappedBytes = randomString(10).getBytes(); + SampleEnum enumValue = SampleEnum.forNumber(RANDOM.nextInt(1)); + + List primitiveList = Arrays.asList(RANDOM.nextInt(), RANDOM.nextInt(), RANDOM.nextInt()); Map primitiveMap = new HashMap<>(); - primitiveMap.put("key1", 1); - primitiveMap.put("key2", 2); + primitiveMap.put(randomString(5), RANDOM.nextInt()); + primitiveMap.put(randomString(5), RANDOM.nextInt()); - Nested nestedMessage = Nested.newBuilder().setNestedInt(1).build(); - List nestedList = Arrays.asList(Nested.newBuilder().setNestedInt(2).build(), Nested.newBuilder().setNestedInt(3).build()); + Nested nestedMessage = Nested.newBuilder().setNestedInt(RANDOM.nextInt()).build(); + List nestedList = Arrays.asList(Nested.newBuilder().setNestedInt(RANDOM.nextInt()).build(), Nested.newBuilder().setNestedInt(RANDOM.nextInt()).build()); Map nestedMap = new HashMap<>(); - nestedMap.put("1Key", Nested.newBuilder().setNestedInt(123).build()); - nestedMap.put("2Key", Nested.newBuilder().setNestedInt(321).build()); + nestedMap.put(randomString(5), Nested.newBuilder().setNestedInt(RANDOM.nextInt()).build()); + nestedMap.put(randomString(5), Nested.newBuilder().setNestedInt(RANDOM.nextInt()).build()); Timestamp time = Timestamps.fromMillis(System.currentTimeMillis()); - Sample sample = Sample.newBuilder() - .setPrimitiveDouble(1.1) - .setPrimitiveFloat(2.1f) - .setPrimitiveInt(1) - .setPrimitiveLong(2L) - .setPrimitiveUnsignedInt(3) - .setPrimitiveUnsignedLong(4L) - .setPrimitiveSignedInt(5) - .setPrimitiveSignedLong(6L) - .setPrimitiveFixedInt(7) - .setPrimitiveFixedLong(8L) - .setPrimitiveFixedSignedInt(9) - .setPrimitiveFixedSignedLong(10L) - .setPrimitiveBoolean(true) - .setPrimitiveString("I am a string!") - .setPrimitiveBytes(ByteString.copyFrom("I am just bytes".getBytes())) + Sample input = Sample.newBuilder() + .setPrimitiveDouble(primitiveDouble) + .setPrimitiveFloat(primitiveFloat) + .setPrimitiveInt(primitiveInt) + .setPrimitiveLong(primitiveLong) + .setPrimitiveUnsignedInt(primitiveUnsignedInt) + .setPrimitiveUnsignedLong(primitiveUnsignedLong) + .setPrimitiveSignedInt(primitiveSignedInt) + .setPrimitiveSignedLong(primitiveSignedLong) + .setPrimitiveFixedInt(primitiveFixedInt) + .setPrimitiveFixedLong(primitiveFixedLong) + .setPrimitiveFixedSignedInt(primitiveFixedSignedInt) + .setPrimitiveFixedSignedLong(primitiveFixedSignedLong) + .setPrimitiveBoolean(primitiveBoolean) + .setPrimitiveString(primitiveString) + .setPrimitiveBytes(ByteString.copyFrom(primitiveBytes)) .addAllRepeatedPrimitive(primitiveList) .putAllMapPrimitive(primitiveMap) .setNestedMessage(nestedMessage) .addAllRepeatedMessage(nestedList) .putAllMapMessage(nestedMap) - .setWrappedString(StringValue.of("I am a wrapped string")) - .setWrappedInt(Int32Value.of(11)) - .setWrappedLong(Int64Value.of(12L)) - .setWrappedUnsignedInt(UInt32Value.of(13)) - .setWrappedUnsignedLong(UInt64Value.of(14L)) - .setWrappedDouble(DoubleValue.of(15.5)) - .setWrappedFloat(FloatValue.of(16.6f)) - .setWrappedBoolean(BoolValue.of(true)) - .setWrappedBytes(BytesValue.of(ByteString.copyFrom("I am wrapped bytes".getBytes()))) - .setEnum(SampleEnum.SECOND) + .setWrappedString(StringValue.of(wrappedString)) + .setWrappedInt(Int32Value.of(wrappedInt)) + .setWrappedLong(Int64Value.of(wrappedLong)) + .setWrappedUnsignedInt(UInt32Value.of(wrappedUnsignedInt)) + .setWrappedUnsignedLong(UInt64Value.of(wrappedUnsignedLong)) + .setWrappedDouble(DoubleValue.of(wrappedDouble)) + .setWrappedFloat(FloatValue.of(wrappedFloat)) + .setWrappedBoolean(BoolValue.of(wrappedBoolean)) + .setWrappedBytes(BytesValue.of(ByteString.copyFrom(wrappedBytes))) + .setEnum(enumValue) .setTimestamp(time) .build(); - Schema.Parser parser = new Schema.Parser(); - Schema protoSchema = parser.parse(getSchema("schema-provider/proto/sample_schema_flattened.txt")); - - GenericRecord actual = ProtoConversionUtil.convertToAvro(protoSchema, sample); - - Schema nestedMessageSchema = protoSchema.getField("nested_message").schema().getTypes().get(1); - - GenericData.Record expectedRecord = new GenericData.Record(protoSchema); - expectedRecord.put("primitive_double", 1.1); - expectedRecord.put("primitive_float", 2.1f); - expectedRecord.put("primitive_int", 1); - expectedRecord.put("primitive_long", 2L); - expectedRecord.put("primitive_unsigned_int", 3L); - expectedRecord.put("primitive_unsigned_long", 4L); - expectedRecord.put("primitive_signed_int", 5); - expectedRecord.put("primitive_signed_long", 6L); - expectedRecord.put("primitive_fixed_int", 7); - expectedRecord.put("primitive_fixed_long", 8L); - expectedRecord.put("primitive_fixed_signed_int", 9); - expectedRecord.put("primitive_fixed_signed_long", 10L); - expectedRecord.put("primitive_boolean", true); - expectedRecord.put("primitive_string", "I am a string!"); - expectedRecord.put("primitive_bytes", ByteBuffer.wrap("I am just bytes".getBytes())); + + Object wrappedStringOutput; + Object wrappedIntOutput; + Object wrappedLongOutput; + Object wrappedUIntOutput; + Object wrappedULongOutput; + Object wrappedDoubleOutput; + Object wrappedFloatOutput; + Object wrappedBooleanOutput; + Object wrappedBytesOutput; + if (wellKnownTypesAsRecords) { + wrappedStringOutput = getWrappedRecord(schema, "wrapped_string", wrappedString); + wrappedIntOutput = getWrappedRecord(schema, "wrapped_int", wrappedInt); + wrappedLongOutput = getWrappedRecord(schema, "wrapped_long", wrappedLong); + wrappedUIntOutput = getWrappedRecord(schema, "wrapped_unsigned_int", (long) wrappedUnsignedInt); + wrappedULongOutput = getWrappedRecord(schema, "wrapped_unsigned_long", wrappedUnsignedLong); + wrappedDoubleOutput = getWrappedRecord(schema, "wrapped_double", wrappedDouble); + wrappedFloatOutput = getWrappedRecord(schema, "wrapped_float", wrappedFloat); + wrappedBooleanOutput = getWrappedRecord(schema, "wrapped_boolean", wrappedBoolean); + wrappedBytesOutput = getWrappedRecord(schema, "wrapped_bytes", ByteBuffer.wrap(wrappedBytes)); + } else { + wrappedStringOutput = wrappedString; + wrappedIntOutput = wrappedInt; + wrappedLongOutput = wrappedLong; + wrappedUIntOutput = (long) wrappedUnsignedInt; + wrappedULongOutput = wrappedUnsignedLong; + wrappedDoubleOutput = wrappedDouble; + wrappedFloatOutput = wrappedFloat; + wrappedBooleanOutput = wrappedBoolean; + wrappedBytesOutput = ByteBuffer.wrap(wrappedBytes); + } + + GenericData.Record expectedRecord = new GenericData.Record(schema); + expectedRecord.put("primitive_double", primitiveDouble); + expectedRecord.put("primitive_float", primitiveFloat); + expectedRecord.put("primitive_int", primitiveInt); + expectedRecord.put("primitive_long", primitiveLong); + expectedRecord.put("primitive_unsigned_int", (long) primitiveUnsignedInt); + expectedRecord.put("primitive_unsigned_long", primitiveUnsignedLong); + expectedRecord.put("primitive_signed_int", primitiveSignedInt); + expectedRecord.put("primitive_signed_long", primitiveSignedLong); + expectedRecord.put("primitive_fixed_int", primitiveFixedInt); + expectedRecord.put("primitive_fixed_long", primitiveFixedLong); + expectedRecord.put("primitive_fixed_signed_int", primitiveFixedSignedInt); + expectedRecord.put("primitive_fixed_signed_long", primitiveFixedSignedLong); + expectedRecord.put("primitive_boolean", primitiveBoolean); + expectedRecord.put("primitive_string", primitiveString); + expectedRecord.put("primitive_bytes", ByteBuffer.wrap(primitiveBytes)); expectedRecord.put("repeated_primitive", primitiveList); - expectedRecord.put("map_primitive", convertMapToList(protoSchema, "map_primitive", primitiveMap)); + expectedRecord.put("map_primitive", convertMapToList(schema, "map_primitive", primitiveMap)); expectedRecord.put("nested_message", convertNestedMessage(nestedMessageSchema, nestedMessage)); - expectedRecord.put("repeated_message", nestedList.stream().map(m -> convertNestedMessage(nestedMessageSchema, m)).collect(Collectors.toList())); - expectedRecord.put("map_message", convertMapToList(protoSchema, "map_message", nestedMap, value -> convertNestedMessage(nestedMessageSchema, value))); - expectedRecord.put("wrapped_string", "I am a wrapped string"); - expectedRecord.put("wrapped_int", 11); - expectedRecord.put("wrapped_long", 12L); - expectedRecord.put("wrapped_unsigned_int", 13L); - expectedRecord.put("wrapped_unsigned_long", 14L); - expectedRecord.put("wrapped_double", 15.5); - expectedRecord.put("wrapped_float", 16.6f); - expectedRecord.put("wrapped_boolean", true); - expectedRecord.put("wrapped_bytes", ByteBuffer.wrap("I am wrapped bytes".getBytes())); - expectedRecord.put("enum", SampleEnum.SECOND.name()); - expectedRecord.put("timestamp", getTimestampRecord(protoSchema, time)); - Assertions.assertEquals(expectedRecord, actual); - } + expectedRecord.put("repeated_message", nestedList.stream().map(m -> convertNestedMessage(listMessageSchema, m)).collect(Collectors.toList())); + expectedRecord.put("map_message", convertMapToList(schema, "map_message", nestedMap, value -> convertNestedMessage(mapMessageSchema, value))); + expectedRecord.put("wrapped_string", wrappedStringOutput); + expectedRecord.put("wrapped_int", wrappedIntOutput); + expectedRecord.put("wrapped_long", wrappedLongOutput); + expectedRecord.put("wrapped_unsigned_int", wrappedUIntOutput); + expectedRecord.put("wrapped_unsigned_long", wrappedULongOutput); + expectedRecord.put("wrapped_double", wrappedDoubleOutput); + expectedRecord.put("wrapped_float", wrappedFloatOutput); + expectedRecord.put("wrapped_boolean", wrappedBooleanOutput); + expectedRecord.put("wrapped_bytes", wrappedBytesOutput); + expectedRecord.put("enum", enumValue.name()); + expectedRecord.put("timestamp", getTimestampRecord(schema, time)); - @Test - public void noFieldsSet_wellKnownTypesAreFlattened() { - Sample sample = Sample.newBuilder().build(); - Schema.Parser parser = new Schema.Parser(); - Schema protoSchema = parser.parse(getSchema("schema-provider/proto/sample_schema_flattened.txt")); - - GenericRecord actual = ProtoConversionUtil.convertToAvro(protoSchema, sample); + return Pair.of(input, expectedRecord); + } + private GenericRecord createDefaultOutput(Schema schema) { // all fields will have default values - GenericData.Record expectedRecord = new GenericData.Record(protoSchema); + GenericData.Record expectedRecord = new GenericData.Record(schema); expectedRecord.put("primitive_double", 0.0); expectedRecord.put("primitive_float", 0.0f); expectedRecord.put("primitive_int", 0); @@ -324,7 +309,171 @@ public void noFieldsSet_wellKnownTypesAreFlattened() { expectedRecord.put("wrapped_bytes", null); expectedRecord.put("enum", SampleEnum.FIRST.name()); expectedRecord.put("timestamp", null); - Assertions.assertEquals(expectedRecord, actual); + return expectedRecord; + } + + public Pair createInputOutputForRecursiveSchemaNoOverflow(Schema schema) { + Child singleChild = Child.newBuilder() + .setBasicField(1) + .setRecurseField(Child.newBuilder() + .setBasicField(2) + .build()) + .build(); + Child children1 = Child.newBuilder() + .setBasicField(11) + .setRecurseField(Child.newBuilder() + .setBasicField(12) + .build()) + .build(); + Child children2 = Child.newBuilder() + .setBasicField(21) + .setRecurseField(Child.newBuilder() + .setBasicField(22) + .build()) + .build(); + List childrenList = Arrays.asList(children1, children2); + Parent input = Parent.newBuilder().setChild(singleChild).addAllChildren(childrenList).build(); + + Schema childAvroSchema = schema.getField("child").schema().getTypes().get(1); + Schema childLevel2AvroSchema = childAvroSchema.getField("recurse_field").schema().getTypes().get(1); + + Schema childrenAvroSchema = schema.getField("children").schema().getElementType(); + Schema childrenLevel2AvroSchema = childrenAvroSchema.getField("recurse_field").schema().getTypes().get(1); + + // setup the single child avro + GenericData.Record singleChildLevel2Avro = new GenericData.Record(childLevel2AvroSchema); + singleChildLevel2Avro.put("basic_field", 2); + GenericData.Record singleChildAvro = new GenericData.Record(childAvroSchema); + singleChildAvro.put("basic_field", 1); + singleChildAvro.put("recurse_field", singleChildLevel2Avro); + + // setup list of children + GenericData.Record children1Level2Avro = new GenericData.Record(childrenLevel2AvroSchema); + children1Level2Avro.put("basic_field", 12); + GenericData.Record children1Avro = new GenericData.Record(childrenAvroSchema); + children1Avro.put("basic_field", 11); + children1Avro.put("recurse_field", children1Level2Avro); + + GenericData.Record children2Level2Avro = new GenericData.Record(childrenLevel2AvroSchema); + children2Level2Avro.put("basic_field", 22); + GenericData.Record children2Avro = new GenericData.Record(childrenAvroSchema); + children2Avro.put("basic_field", 21); + children2Avro.put("recurse_field", children2Level2Avro); + + // setup expected parent record + GenericData.Record expected = new GenericData.Record(schema); + expected.put("child", singleChildAvro); + expected.put("children", Arrays.asList(children1Avro, children2Avro)); + + return Pair.of(input, expected); + } + + public Pair createInputOutputForRecursiveSchemaWithOverflow(Schema schema) { + Child singleChildOverflow = Child.newBuilder() + .setBasicField(3) + .setRecurseField(Child.newBuilder() + .setBasicField(4) + .build()).build(); + Child singleChild = Child.newBuilder() + .setBasicField(1) + .setRecurseField(Child.newBuilder() + .setBasicField(2) + .setRecurseField(singleChildOverflow) + .build()) + .build(); + Child children1Overflow = Child.newBuilder() + .setBasicField(13) + .setRecurseField(Child.newBuilder() + .setBasicField(14) + .build()).build(); + Child children1 = Child.newBuilder() + .setBasicField(11) + .setRecurseField(Child.newBuilder() + .setBasicField(12) + .setRecurseField(children1Overflow) + .build()) + .build(); + Child children2Overflow = Child.newBuilder() + .setBasicField(23) + .setRecurseField(Child.newBuilder() + .setBasicField(24) + .build()).build(); + Child children2 = Child.newBuilder() + .setBasicField(21) + .setRecurseField(Child.newBuilder() + .setBasicField(22) + .setRecurseField(children2Overflow) + .build()) + .build(); + List childrenList = Arrays.asList(children1, children2); + Parent input = Parent.newBuilder().setChild(singleChild).addAllChildren(childrenList).build(); + + Schema childAvroSchema = schema.getField("child").schema().getTypes().get(1); + Schema childLevel2AvroSchema = childAvroSchema.getField("recurse_field").schema().getTypes().get(1); + Schema recursionOverflowSchema = childLevel2AvroSchema.getField("recurse_field").schema().getTypes().get(1); + + Schema childrenAvroSchema = schema.getField("children").schema().getElementType(); + Schema childrenLevel2AvroSchema = childrenAvroSchema.getField("recurse_field").schema().getTypes().get(1); + + // setup the single child avro + GenericData.Record singleChildOverflowAvro = new GenericData.Record(recursionOverflowSchema); + singleChildOverflowAvro.put("descriptor_full_name", "test.Child"); + singleChildOverflowAvro.put("proto_bytes", ByteBuffer.wrap(singleChildOverflow.toByteArray())); + GenericData.Record singleChildLevel2Avro = new GenericData.Record(childLevel2AvroSchema); + singleChildLevel2Avro.put("basic_field", 2); + singleChildLevel2Avro.put("recurse_field", singleChildOverflowAvro); + GenericData.Record singleChildAvro = new GenericData.Record(childAvroSchema); + singleChildAvro.put("basic_field", 1); + singleChildAvro.put("recurse_field", singleChildLevel2Avro); + + // setup list of children + GenericData.Record children1OverflowAvro = new GenericData.Record(recursionOverflowSchema); + children1OverflowAvro.put("descriptor_full_name", "test.Child"); + children1OverflowAvro.put("proto_bytes", ByteBuffer.wrap(children1Overflow.toByteArray())); + GenericData.Record children1Level2Avro = new GenericData.Record(childrenLevel2AvroSchema); + children1Level2Avro.put("basic_field", 12); + children1Level2Avro.put("recurse_field", children1OverflowAvro); + GenericData.Record children1Avro = new GenericData.Record(childrenAvroSchema); + children1Avro.put("basic_field", 11); + children1Avro.put("recurse_field", children1Level2Avro); + + GenericData.Record children2OverflowAvro = new GenericData.Record(recursionOverflowSchema); + children2OverflowAvro.put("descriptor_full_name", "test.Child"); + children2OverflowAvro.put("proto_bytes", ByteBuffer.wrap(children2Overflow.toByteArray())); + GenericData.Record children2Level2Avro = new GenericData.Record(childrenLevel2AvroSchema); + children2Level2Avro.put("basic_field", 22); + children2Level2Avro.put("recurse_field", children2OverflowAvro); + GenericData.Record children2Avro = new GenericData.Record(childrenAvroSchema); + children2Avro.put("basic_field", 21); + children2Avro.put("recurse_field", children2Level2Avro); + + // setup expected parent record + GenericData.Record expected = new GenericData.Record(schema); + expected.put("child", singleChildAvro); + expected.put("children", Arrays.asList(children1Avro, children2Avro)); + + return Pair.of(input, expected); + } + + private ByteBuffer getOverflowBytesFromChildRecord(GenericRecord record) { + return (ByteBuffer) ((GenericRecord) ((GenericRecord) record.get("recurse_field")).get("recurse_field")).get("proto_bytes"); + } + + private GenericRecord serializeAndDeserializeAvro(GenericRecord input, Schema schema) { + // serialize and deserialize the data to make sure the avro record can be persisted and then read back + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); + GenericDatumWriter writer = new GenericDatumWriter<>(schema); + writer.write(input, encoder); + encoder.flush(); + + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(outputStream.toByteArray(), null); + GenericDatumReader reader = new GenericDatumReader<>(schema); + GenericRecord transformedRec = reader.read(null, decoder); + return transformedRec; + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } } private GenericData.Record getTimestampRecord(Schema protoSchema, Timestamp time) { @@ -359,9 +508,9 @@ private static List convertMapToList(final Schema protoSch return convertMapToList(protoSchema, fieldName, originalMap, Function.identity()); } - private String getSchema(String pathToSchema) { - try (Scanner scanner = new Scanner(getClass().getClassLoader().getResourceAsStream(pathToSchema))) { - return scanner.next(); - } + private static String randomString(int size) { + byte[] bytes = new byte[size]; + RANDOM.nextBytes(bytes); + return new String(bytes, StandardCharsets.UTF_8); } } diff --git a/hudi-utilities/src/test/resources/schema-provider/proto/parent_schema_recursive_default_limit.avsc b/hudi-utilities/src/test/resources/schema-provider/proto/parent_schema_recursive_default_limit.avsc new file mode 100644 index 000000000000..91d9797347e5 --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-provider/proto/parent_schema_recursive_default_limit.avsc @@ -0,0 +1,160 @@ +{ + "type" : "record", + "name" : "Parent", + "namespace" : "test", + "fields" : [ { + "name" : "child", + "type" : [ "null", { + "type" : "record", + "name" : "Child", + "namespace" : "test.child", + "fields" : [ { + "name" : "basic_field", + "type" : "int", + "default" : 0 + }, { + "name" : "recurse_field", + "type" : [ "null", { + "type" : "record", + "name" : "Child", + "namespace" : "test.child.recurse_field", + "fields" : [ { + "name" : "basic_field", + "type" : "int", + "default" : 0 + }, { + "name" : "recurse_field", + "type" : [ "null", { + "type" : "record", + "name" : "Child", + "namespace" : "test.child.recurse_field.recurse_field", + "fields" : [ { + "name" : "basic_field", + "type" : "int", + "default" : 0 + }, { + "name" : "recurse_field", + "type" : [ "null", { + "type" : "record", + "name" : "Child", + "namespace" : "test.child.recurse_field.recurse_field.recurse_field", + "fields" : [ { + "name" : "basic_field", + "type" : "int", + "default" : 0 + }, { + "name" : "recurse_field", + "type" : [ "null", { + "type" : "record", + "name" : "Child", + "namespace" : "test.child.recurse_field.recurse_field.recurse_field.recurse_field", + "fields" : [ { + "name" : "basic_field", + "type" : "int", + "default" : 0 + }, { + "name" : "recurse_field", + "type" : [ "null", { + "type" : "record", + "name" : "recursion_overflow", + "namespace" : "org.apache.hudi.proto", + "fields" : [ { + "name" : "descriptor_full_name", + "type" : "string", + "default" : "" + }, { + "name" : "proto_bytes", + "type" : "bytes", + "default" : "" + } ] + } ], + "default" : null + } ] + } ], + "default" : null + } ] + } ], + "default" : null + } ] + } ], + "default" : null + } ] + } ], + "default" : null + } ] + } ], + "default" : null + }, { + "name" : "children", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "Child", + "namespace" : "test.children", + "fields" : [ { + "name" : "basic_field", + "type" : "int", + "default" : 0 + }, { + "name" : "recurse_field", + "type" : [ "null", { + "type" : "record", + "name" : "Child", + "namespace" : "test.children.recurse_field", + "fields" : [ { + "name" : "basic_field", + "type" : "int", + "default" : 0 + }, { + "name" : "recurse_field", + "type" : [ "null", { + "type" : "record", + "name" : "Child", + "namespace" : "test.children.recurse_field.recurse_field", + "fields" : [ { + "name" : "basic_field", + "type" : "int", + "default" : 0 + }, { + "name" : "recurse_field", + "type" : [ "null", { + "type" : "record", + "name" : "Child", + "namespace" : "test.children.recurse_field.recurse_field.recurse_field", + "fields" : [ { + "name" : "basic_field", + "type" : "int", + "default" : 0 + }, { + "name" : "recurse_field", + "type" : [ "null", { + "type" : "record", + "name" : "Child", + "namespace" : "test.children.recurse_field.recurse_field.recurse_field.recurse_field", + "fields" : [ { + "name" : "basic_field", + "type" : "int", + "default" : 0 + }, { + "name" : "recurse_field", + "type" : [ "null", "org.apache.hudi.proto.recursion_overflow" ], + "default" : null + } ] + } ], + "default" : null + } ] + } ], + "default" : null + } ] + } ], + "default" : null + } ] + } ], + "default" : null + } ] + } + }, + "default" : [ ] + } ] +} \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/schema-provider/proto/parent_schema_recursive_depth_2.avsc b/hudi-utilities/src/test/resources/schema-provider/proto/parent_schema_recursive_depth_2.avsc new file mode 100644 index 000000000000..966a68e90bdf --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-provider/proto/parent_schema_recursive_depth_2.avsc @@ -0,0 +1,115 @@ +{ + "type": "record", + "name": "Parent", + "namespace": "test", + "fields": [ + { + "name": "child", + "type": [ + "null", + { + "type": "record", + "name": "Child", + "namespace": "test.child", + "fields": [ + { + "name": "basic_field", + "type": "int", + "default": 0 + }, + { + "name": "recurse_field", + "type": [ + "null", + { + "type": "record", + "name": "Child", + "namespace": "test.child.recurse_field", + "fields": [ + { + "name": "basic_field", + "type": "int", + "default": 0 + }, + { + "name": "recurse_field", + "type": [ + "null", + { + "type": "record", + "name": "recursion_overflow", + "namespace": "org.apache.hudi.proto", + "fields": [ + { + "name": "descriptor_full_name", + "type": "string", + "default": "" + }, + { + "name": "proto_bytes", + "type": "bytes", + "default": "" + } + ] + } + ], + "default": null + } + ] + } + ], + "default": null + } + ] + } + ], + "default": null + }, + { + "name": "children", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "Child", + "namespace": "test.children", + "fields": [ + { + "name": "basic_field", + "type": "int", + "default": 0 + }, + { + "name": "recurse_field", + "type": [ + "null", + { + "type": "record", + "name": "Child", + "namespace": "test.children.recurse_field", + "fields": [ + { + "name": "basic_field", + "type": "int", + "default": 0 + }, + { + "name": "recurse_field", + "type": [ + "null", + "org.apache.hudi.proto.recursion_overflow" + ], + "default": null + } + ] + } + ], + "default": null + } + ] + } + }, + "default": [] + } + ] +} \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/schema-provider/proto/recursive.proto b/hudi-utilities/src/test/resources/schema-provider/proto/recursive.proto new file mode 100644 index 000000000000..d564d5e3347f --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-provider/proto/recursive.proto @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +syntax = "proto3"; + +package test; + +option java_package = "org.apache.hudi.utilities.test.proto"; +option java_multiple_files = true; + +// Creates a message for testing the unravelling of recursive schemas +message Parent { + Child child = 1; + repeated Child children = 2; +} + +message Child { + int32 basic_field = 1; + Child recurse_field = 2; +} \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/schema-provider/proto/sample_schema_flattened.avsc b/hudi-utilities/src/test/resources/schema-provider/proto/sample_schema_flattened.avsc new file mode 100644 index 000000000000..e0876480a909 --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-provider/proto/sample_schema_flattened.avsc @@ -0,0 +1,310 @@ +{ + "type": "record", + "name": "Sample", + "namespace": "test", + "fields": [ + { + "name": "primitive_double", + "type": "double", + "default": 0.0 + }, + { + "name": "primitive_float", + "type": "float", + "default": 0.0 + }, + { + "name": "primitive_int", + "type": "int", + "default": 0 + }, + { + "name": "primitive_long", + "type": "long", + "default": 0 + }, + { + "name": "primitive_unsigned_int", + "type": "long", + "default": 0 + }, + { + "name": "primitive_unsigned_long", + "type": "long", + "default": 0 + }, + { + "name": "primitive_signed_int", + "type": "int", + "default": 0 + }, + { + "name": "primitive_signed_long", + "type": "long", + "default": 0 + }, + { + "name": "primitive_fixed_int", + "type": "int", + "default": 0 + }, + { + "name": "primitive_fixed_long", + "type": "long", + "default": 0 + }, + { + "name": "primitive_fixed_signed_int", + "type": "int", + "default": 0 + }, + { + "name": "primitive_fixed_signed_long", + "type": "long", + "default": 0 + }, + { + "name": "primitive_boolean", + "type": "boolean", + "default": false + }, + { + "name": "primitive_string", + "type": { + "type": "string", + "avro.java.string": "String" + }, + "default": "" + }, + { + "name": "primitive_bytes", + "type": "bytes", + "default": "" + }, + { + "name": "repeated_primitive", + "type": { + "type": "array", + "items": "int" + }, + "default": [] + }, + { + "name": "map_primitive", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "MapPrimitiveEntry", + "namespace": "test.map_primitive", + "fields": [ + { + "name": "key", + "type": { + "type": "string", + "avro.java.string": "String" + }, + "default": "" + }, + { + "name": "value", + "type": "int", + "default": 0 + } + ] + } + }, + "default": [] + }, + { + "name": "nested_message", + "type": [ + "null", + { + "type": "record", + "name": "Nested", + "namespace": "test.nested_message", + "fields": [ + { + "name": "nested_int", + "type": "int", + "default": 0 + } + ] + } + ], + "default": null + }, + { + "name": "repeated_message", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "Nested", + "namespace": "test.repeated_message", + "fields": [ + { + "name": "nested_int", + "type": "int", + "default": 0 + } + ] + } + }, + "default": [] + }, + { + "name": "map_message", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "MapMessageEntry", + "namespace": "test.map_message", + "fields": [ + { + "name": "key", + "type": { + "type": "string", + "avro.java.string": "String" + }, + "default": "" + }, + { + "name": "value", + "type": [ + "null", + { + "type": "record", + "name": "Nested", + "namespace": "test.map_message.value", + "fields": [ + { + "name": "nested_int", + "type": "int", + "default": 0 + } + ] + } + ], + "default": null + } + ] + } + }, + "default": [] + }, + { + "name": "wrapped_string", + "type": [ + "null", + { + "type": "string", + "avro.java.string": "String" + } + ], + "default": null + }, + { + "name": "wrapped_int", + "type": [ + "null", + "int" + ], + "default": null + }, + { + "name": "wrapped_long", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "wrapped_unsigned_int", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "wrapped_unsigned_long", + "type": [ + "null", + "long" + ], + "default": null + }, + { + "name": "wrapped_double", + "type": [ + "null", + "double" + ], + "default": null + }, + { + "name": "wrapped_float", + "type": [ + "null", + "float" + ], + "default": null + }, + { + "name": "wrapped_boolean", + "type": [ + "null", + "boolean" + ], + "default": null + }, + { + "name": "wrapped_bytes", + "type": [ + "null", + "bytes" + ], + "default": null + }, + { + "name": "enum", + "type": { + "type": "enum", + "name": "SampleEnum", + "symbols": [ + "FIRST", + "SECOND" + ] + }, + "default": "FIRST" + }, + { + "name": "timestamp", + "type": [ + "null", + { + "type": "record", + "name": "Timestamp", + "namespace": "test.timestamp", + "fields": [ + { + "name": "seconds", + "type": "long", + "default": 0 + }, + { + "name": "nanos", + "type": "int", + "default": 0 + } + ] + } + ], + "default": null + } + ] +} \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/schema-provider/proto/sample_schema_flattened.txt b/hudi-utilities/src/test/resources/schema-provider/proto/sample_schema_flattened.txt deleted file mode 100644 index f3d372486d9e..000000000000 --- a/hudi-utilities/src/test/resources/schema-provider/proto/sample_schema_flattened.txt +++ /dev/null @@ -1 +0,0 @@ -{"type":"record","name":"Sample","namespace":"test","fields":[{"name":"primitive_double","type":"double","default":0.0},{"name":"primitive_float","type":"float","default":0.0},{"name":"primitive_int","type":"int","default":0},{"name":"primitive_long","type":"long","default":0},{"name":"primitive_unsigned_int","type":"long","default":0},{"name":"primitive_unsigned_long","type":"long","default":0},{"name":"primitive_signed_int","type":"int","default":0},{"name":"primitive_signed_long","type":"long","default":0},{"name":"primitive_fixed_int","type":"int","default":0},{"name":"primitive_fixed_long","type":"long","default":0},{"name":"primitive_fixed_signed_int","type":"int","default":0},{"name":"primitive_fixed_signed_long","type":"long","default":0},{"name":"primitive_boolean","type":"boolean","default":false},{"name":"primitive_string","type":{"type":"string","avro.java.string":"String"},"default":""},{"name":"primitive_bytes","type":"bytes","default":""},{"name":"repeated_primitive","type":{"type":"array","items":"int"},"default":[]},{"name":"map_primitive","type":{"type":"array","items":{"type":"record","name":"MapPrimitiveEntry","namespace":"test.Sample","fields":[{"name":"key","type":{"type":"string","avro.java.string":"String"},"default":""},{"name":"value","type":"int","default":0}]}},"default":[]},{"name":"nested_message","type":["null",{"type":"record","name":"Nested","fields":[{"name":"nested_int","type":"int","default":0}]}],"default":null},{"name":"repeated_message","type":{"type":"array","items":"Nested"},"default":[]},{"name":"map_message","type":{"type":"array","items":{"type":"record","name":"MapMessageEntry","namespace":"test.Sample","fields":[{"name":"key","type":{"type":"string","avro.java.string":"String"},"default":""},{"name":"value","type":["null","test.Nested"],"default":null}]}},"default":[]},{"name":"wrapped_string","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"wrapped_int","type":["null","int"],"default":null},{"name":"wrapped_long","type":["null","long"],"default":null},{"name":"wrapped_unsigned_int","type":["null","long"],"default":null},{"name":"wrapped_unsigned_long","type":["null","long"],"default":null},{"name":"wrapped_double","type":["null","double"],"default":null},{"name":"wrapped_float","type":["null","float"],"default":null},{"name":"wrapped_boolean","type":["null","boolean"],"default":null},{"name":"wrapped_bytes","type":["null","bytes"],"default":null},{"name":"enum","type":{"type":"enum","name":"SampleEnum","symbols":["FIRST","SECOND"]},"default":"FIRST"},{"name":"timestamp","type":["null",{"type":"record","name":"Timestamp","namespace":"google.protobuf","fields":[{"name":"seconds","type":"long","default":0},{"name":"nanos","type":"int","default":0}]}],"default":null}]} \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/schema-provider/proto/sample_schema_nested.avsc b/hudi-utilities/src/test/resources/schema-provider/proto/sample_schema_nested.avsc new file mode 100644 index 000000000000..5c9b63aa5c2a --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-provider/proto/sample_schema_nested.avsc @@ -0,0 +1,409 @@ +{ + "type": "record", + "name": "Sample", + "namespace": "test", + "fields": [ + { + "name": "primitive_double", + "type": "double", + "default": 0.0 + }, + { + "name": "primitive_float", + "type": "float", + "default": 0.0 + }, + { + "name": "primitive_int", + "type": "int", + "default": 0 + }, + { + "name": "primitive_long", + "type": "long", + "default": 0 + }, + { + "name": "primitive_unsigned_int", + "type": "long", + "default": 0 + }, + { + "name": "primitive_unsigned_long", + "type": "long", + "default": 0 + }, + { + "name": "primitive_signed_int", + "type": "int", + "default": 0 + }, + { + "name": "primitive_signed_long", + "type": "long", + "default": 0 + }, + { + "name": "primitive_fixed_int", + "type": "int", + "default": 0 + }, + { + "name": "primitive_fixed_long", + "type": "long", + "default": 0 + }, + { + "name": "primitive_fixed_signed_int", + "type": "int", + "default": 0 + }, + { + "name": "primitive_fixed_signed_long", + "type": "long", + "default": 0 + }, + { + "name": "primitive_boolean", + "type": "boolean", + "default": false + }, + { + "name": "primitive_string", + "type": { + "type": "string", + "avro.java.string": "String" + }, + "default": "" + }, + { + "name": "primitive_bytes", + "type": "bytes", + "default": "" + }, + { + "name": "repeated_primitive", + "type": { + "type": "array", + "items": "int" + }, + "default": [] + }, + { + "name": "map_primitive", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "MapPrimitiveEntry", + "namespace": "test.map_primitive", + "fields": [ + { + "name": "key", + "type": { + "type": "string", + "avro.java.string": "String" + }, + "default": "" + }, + { + "name": "value", + "type": "int", + "default": 0 + } + ] + } + }, + "default": [] + }, + { + "name": "nested_message", + "type": [ + "null", + { + "type": "record", + "name": "Nested", + "namespace": "test.nested_message", + "fields": [ + { + "name": "nested_int", + "type": "int", + "default": 0 + } + ] + } + ], + "default": null + }, + { + "name": "repeated_message", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "Nested", + "namespace": "test.repeated_message", + "fields": [ + { + "name": "nested_int", + "type": "int", + "default": 0 + } + ] + } + }, + "default": [] + }, + { + "name": "map_message", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "MapMessageEntry", + "namespace": "test.map_message", + "fields": [ + { + "name": "key", + "type": { + "type": "string", + "avro.java.string": "String" + }, + "default": "" + }, + { + "name": "value", + "type": [ + "null", + { + "type": "record", + "name": "Nested", + "namespace": "test.map_message.value", + "fields": [ + { + "name": "nested_int", + "type": "int", + "default": 0 + } + ] + } + ], + "default": null + } + ] + } + }, + "default": [] + }, + { + "name": "wrapped_string", + "type": [ + "null", + { + "type": "record", + "name": "StringValue", + "namespace": "test.wrapped_string", + "fields": [ + { + "name": "value", + "type": { + "type": "string", + "avro.java.string": "String" + }, + "default": "" + } + ] + } + ], + "default": null + }, + { + "name": "wrapped_int", + "type": [ + "null", + { + "type": "record", + "name": "Int32Value", + "namespace": "test.wrapped_int", + "fields": [ + { + "name": "value", + "type": "int", + "default": 0 + } + ] + } + ], + "default": null + }, + { + "name": "wrapped_long", + "type": [ + "null", + { + "type": "record", + "name": "Int64Value", + "namespace": "test.wrapped_long", + "fields": [ + { + "name": "value", + "type": "long", + "default": 0 + } + ] + } + ], + "default": null + }, + { + "name": "wrapped_unsigned_int", + "type": [ + "null", + { + "type": "record", + "name": "UInt32Value", + "namespace": "test.wrapped_unsigned_int", + "fields": [ + { + "name": "value", + "type": "long", + "default": 0 + } + ] + } + ], + "default": null + }, + { + "name": "wrapped_unsigned_long", + "type": [ + "null", + { + "type": "record", + "name": "UInt64Value", + "namespace": "test.wrapped_unsigned_long", + "fields": [ + { + "name": "value", + "type": "long", + "default": 0 + } + ] + } + ], + "default": null + }, + { + "name": "wrapped_double", + "type": [ + "null", + { + "type": "record", + "name": "DoubleValue", + "namespace": "test.wrapped_double", + "fields": [ + { + "name": "value", + "type": "double", + "default": 0.0 + } + ] + } + ], + "default": null + }, + { + "name": "wrapped_float", + "type": [ + "null", + { + "type": "record", + "name": "FloatValue", + "namespace": "test.wrapped_float", + "fields": [ + { + "name": "value", + "type": "float", + "default": 0.0 + } + ] + } + ], + "default": null + }, + { + "name": "wrapped_boolean", + "type": [ + "null", + { + "type": "record", + "name": "BoolValue", + "namespace": "test.wrapped_boolean", + "fields": [ + { + "name": "value", + "type": "boolean", + "default": false + } + ] + } + ], + "default": null + }, + { + "name": "wrapped_bytes", + "type": [ + "null", + { + "type": "record", + "name": "BytesValue", + "namespace": "test.wrapped_bytes", + "fields": [ + { + "name": "value", + "type": "bytes", + "default": "" + } + ] + } + ], + "default": null + }, + { + "name": "enum", + "type": { + "type": "enum", + "name": "SampleEnum", + "symbols": [ + "FIRST", + "SECOND" + ] + }, + "default": "FIRST" + }, + { + "name": "timestamp", + "type": [ + "null", + { + "type": "record", + "name": "Timestamp", + "namespace": "test.timestamp", + "fields": [ + { + "name": "seconds", + "type": "long", + "default": 0 + }, + { + "name": "nanos", + "type": "int", + "default": 0 + } + ] + } + ], + "default": null + } + ] +} \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/schema-provider/proto/sample_schema_nested.txt b/hudi-utilities/src/test/resources/schema-provider/proto/sample_schema_nested.txt deleted file mode 100644 index ba8892e02127..000000000000 --- a/hudi-utilities/src/test/resources/schema-provider/proto/sample_schema_nested.txt +++ /dev/null @@ -1 +0,0 @@ -{"type":"record","name":"Sample","namespace":"test","fields":[{"name":"primitive_double","type":"double","default":0.0},{"name":"primitive_float","type":"float","default":0.0},{"name":"primitive_int","type":"int","default":0},{"name":"primitive_long","type":"long","default":0},{"name":"primitive_unsigned_int","type":"long","default":0},{"name":"primitive_unsigned_long","type":"long","default":0},{"name":"primitive_signed_int","type":"int","default":0},{"name":"primitive_signed_long","type":"long","default":0},{"name":"primitive_fixed_int","type":"int","default":0},{"name":"primitive_fixed_long","type":"long","default":0},{"name":"primitive_fixed_signed_int","type":"int","default":0},{"name":"primitive_fixed_signed_long","type":"long","default":0},{"name":"primitive_boolean","type":"boolean","default":false},{"name":"primitive_string","type":{"type":"string","avro.java.string":"String"},"default":""},{"name":"primitive_bytes","type":"bytes","default":""},{"name":"repeated_primitive","type":{"type":"array","items":"int"},"default":[]},{"name":"map_primitive","type":{"type":"array","items":{"type":"record","name":"MapPrimitiveEntry","namespace":"test.Sample","fields":[{"name":"key","type":{"type":"string","avro.java.string":"String"},"default":""},{"name":"value","type":"int","default":0}]}},"default":[]},{"name":"nested_message","type":["null",{"type":"record","name":"Nested","fields":[{"name":"nested_int","type":"int","default":0}]}],"default":null},{"name":"repeated_message","type":{"type":"array","items":"Nested"},"default":[]},{"name":"map_message","type":{"type":"array","items":{"type":"record","name":"MapMessageEntry","namespace":"test.Sample","fields":[{"name":"key","type":{"type":"string","avro.java.string":"String"},"default":""},{"name":"value","type":["null","test.Nested"],"default":null}]}},"default":[]},{"name":"wrapped_string","type":["null",{"type":"record","name":"StringValue","namespace":"google.protobuf","fields":[{"name":"value","type":{"type":"string","avro.java.string":"String"},"default":""}]}],"default":null},{"name":"wrapped_int","type":["null",{"type":"record","name":"Int32Value","namespace":"google.protobuf","fields":[{"name":"value","type":"int","default":0}]}],"default":null},{"name":"wrapped_long","type":["null",{"type":"record","name":"Int64Value","namespace":"google.protobuf","fields":[{"name":"value","type":"long","default":0}]}],"default":null},{"name":"wrapped_unsigned_int","type":["null",{"type":"record","name":"UInt32Value","namespace":"google.protobuf","fields":[{"name":"value","type":"long","default":0}]}],"default":null},{"name":"wrapped_unsigned_long","type":["null",{"type":"record","name":"UInt64Value","namespace":"google.protobuf","fields":[{"name":"value","type":"long","default":0}]}],"default":null},{"name":"wrapped_double","type":["null",{"type":"record","name":"DoubleValue","namespace":"google.protobuf","fields":[{"name":"value","type":"double","default":0.0}]}],"default":null},{"name":"wrapped_float","type":["null",{"type":"record","name":"FloatValue","namespace":"google.protobuf","fields":[{"name":"value","type":"float","default":0.0}]}],"default":null},{"name":"wrapped_boolean","type":["null",{"type":"record","name":"BoolValue","namespace":"google.protobuf","fields":[{"name":"value","type":"boolean","default":false}]}],"default":null},{"name":"wrapped_bytes","type":["null",{"type":"record","name":"BytesValue","namespace":"google.protobuf","fields":[{"name":"value","type":"bytes","default":""}]}],"default":null},{"name":"enum","type":{"type":"enum","name":"SampleEnum","symbols":["FIRST","SECOND"]},"default":"FIRST"},{"name":"timestamp","type":["null",{"type":"record","name":"Timestamp","namespace":"google.protobuf","fields":[{"name":"seconds","type":"long","default":0},{"name":"nanos","type":"int","default":0}]}],"default":null}]} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 7b7f5fce8eaf..767d046ee5b4 100644 --- a/pom.xml +++ b/pom.xml @@ -525,6 +525,7 @@ **/test/resources/*.data **/test/resources/*.commit **/test/resources/**/*.txt + **/test/resources/**/*.avsc **/target/** **/generated-sources/** .github/**