Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,26 @@ protected Object convertTimestamp(Object sourceValue, Schema sourceSchema, Types

@Override
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType) {
Schema elementSchema = sourceSchema.getElementType();
List<?> sourceList = (List<?>) sourceValue;
Schema listSchema = sourceSchema;
if (listSchema.getType() == Schema.Type.UNION) {
listSchema = listSchema.getTypes().stream()
.filter(s -> s.getType() == Schema.Type.ARRAY)
.findFirst()
.orElseThrow(() -> new IllegalStateException(
"UNION schema does not contain an ARRAY type: " + sourceSchema));
}

Schema elementSchema = listSchema.getElementType();

List<?> sourceList;
if (sourceValue instanceof GenericData.Array) {
sourceList = (GenericData.Array<?>) sourceValue;
} else if (sourceValue instanceof List) {
sourceList = (List<?>) sourceValue;
} else {
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to LIST");
}

List<Object> list = new ArrayList<>(sourceList.size());
for (Object element : sourceList) {
Object convert = convert(element, elementSchema, targetType.elementType());
Expand Down Expand Up @@ -157,13 +175,21 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
}
return recordMap;
} else {
Schema mapSchema = sourceSchema;
if (mapSchema.getType() == Schema.Type.UNION) {
mapSchema = mapSchema.getTypes().stream()
.filter(s -> s.getType() == Schema.Type.MAP)
.findFirst()
.orElseThrow(() -> new IllegalStateException(
"UNION schema does not contain a Map type: " + sourceSchema.getElementType()));
}
// Handle standard Java Map
Map<?, ?> sourceMap = (Map<?, ?>) sourceValue;
Map<Object, Object> adaptedMap = new HashMap<>();
for (Map.Entry<?, ?> entry : sourceMap.entrySet()) {
// Avro map keys are always strings
Object key = convert(entry.getKey(), STRING_SCHEMA_INSTANCE, targetType.keyType());
Object value = convert(entry.getValue(), sourceSchema.getValueType(), targetType.valueType());
Object value = convert(entry.getValue(), mapSchema.getValueType(), targetType.valueType());
adaptedMap.put(key, value);
}
return adaptedMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public RecordProcessor create(WorkerConfig config, String topic) {
if (config.schemaType() == TableTopicSchemaType.SCHEMA) {
return new DefaultRecordProcessor(topic,
StringConverter.INSTANCE,
converterFactory.createValueConverter(topic, config), List.of(FlattenTransform.INSTANCE));
converterFactory.createForSchemaId(topic, false), List.of(FlattenTransform.INSTANCE));
}

var keyConverter = converterFactory.createKeyConverter(topic, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ public void testStringConversion() {

// Convert Avro record to Iceberg record using the wrapper
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);

Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(record);

// Verify the field value
Expand Down Expand Up @@ -751,6 +752,43 @@ public void testListConversion() {
testSendRecord(icebergSchema, icebergRecord);
}

@Test
public void testUnionListConversion() {
String avroSchemaStr = " {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"TestRecord\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"listField\",\n" +
" \"type\": [\"null\", {\"type\": \"array\", \"items\": [\"null\", \"string\"]}],\n" +
" \"default\": null\n" +
" }\n" +
" ]\n" +
" }\n";

avroSchema = new Schema.Parser().parse(avroSchemaStr);

Schema listSchema = avroSchema.getField("listField").schema().getTypes().stream()
.filter(s -> s.getType() == Schema.Type.ARRAY)
.findFirst()
.orElseThrow(() -> new IllegalStateException("UNION schema does not contain an ARRAY type"));

GenericData.Array<Object> listValue = new GenericData.Array<>(3, listSchema);
listValue.add(new Utf8("a"));
listValue.add(null);
listValue.add(new Utf8("c"));

GenericRecord avroRecord = new GenericData.Record(avroSchema);
avroRecord.put("listField", listValue);

org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema));

assertEquals(Arrays.asList("a", null, "c"), normalizeValue(icebergRecord.getField("listField")));

testSendRecord(icebergSchema, icebergRecord);
}

// Test method for converting a map field
@Test
public void testStringMapConversion() {
Expand Down Expand Up @@ -868,6 +906,96 @@ public void testMapWithNonStringKeysConversion() {
testSendRecord(icebergSchema, icebergRecord);
}

@Test
public void testUnionStringMapConversion() {
String avroSchemaStr = " {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"TestRecord\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"mapField\",\n" +
" \"type\": [\"null\", {\"type\": \"map\", \"values\": [\"null\", \"string\"]}],\n" +
" \"default\": null\n" +
" }\n" +
" ]\n" +
" }\n";

avroSchema = new Schema.Parser().parse(avroSchemaStr);

GenericRecord avroRecord = new GenericData.Record(avroSchema);
Map<String, Object> expectedMap = new HashMap<>();
expectedMap.put("key1", "value1");
expectedMap.put("key2", null);
avroRecord.put("mapField", expectedMap);

GenericRecord record = serializeAndDeserialize(avroRecord, avroSchema);

org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(record);

assertEquals(expectedMap, normalizeValue(icebergRecord.getField("mapField")));

testSendRecord(icebergSchema, icebergRecord);
}

@Test
public void testUnionArrayMapConversion() {
String avroSchemaStr = " {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"TestRecord\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"mapField\",\n" +
" \"type\": {\n" +
" \"type\": \"array\",\n" +
" \"logicalType\": \"map\",\n" +
" \"items\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"UnionMapEntry\",\n" +
" \"fields\": [\n" +
" {\"name\": \"key\", \"type\": \"int\"},\n" +
" {\"name\": \"value\", \"type\": \"string\"}\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n";

avroSchema = new Schema.Parser().parse(avroSchemaStr);

Map<Integer, String> expectedMap = new HashMap<>();
expectedMap.put(10, "alpha");
expectedMap.put(20, "beta");

Schema mapFieldSchema = avroSchema.getField("mapField").schema();
Schema elementUnionSchema = mapFieldSchema.getElementType();
Schema entrySchema = elementUnionSchema.getTypes().stream()
.filter(s -> s.getType() == Schema.Type.RECORD)
.findFirst()
.orElseThrow(() -> new IllegalStateException("Array element UNION schema does not contain a RECORD type"));

GenericData.Array<Object> mapEntries = new GenericData.Array<>(expectedMap.size() + 1, mapFieldSchema);
for (Map.Entry<Integer, String> entry : expectedMap.entrySet()) {
GenericRecord mapEntry = new GenericData.Record(entrySchema);
mapEntry.put("key", entry.getKey());
mapEntry.put("value", entry.getValue());
mapEntries.add(mapEntry);
}
mapEntries.add(null);

AvroValueAdapter adapter = new AvroValueAdapter();
Types.MapType mapType = Types.MapType.ofOptional(1, 2, Types.IntegerType.get(), Types.StringType.get());

@SuppressWarnings("unchecked")
Map<Integer, Object> result = (Map<Integer, Object>) adapter.convert(mapEntries, mapFieldSchema, mapType);

assertEquals(expectedMap, result);
}

// Test method for converting a record with nested fields
@Test
public void testNestedRecordConversion() {
Expand Down
Loading