Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,6 @@ protected Object convertTimestamp(Object sourceValue, Schema sourceSchema, Types
@Override
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType) {
Schema listSchema = sourceSchema;
if (listSchema.getType() == Schema.Type.UNION) {
listSchema = listSchema.getTypes().stream()
.filter(s -> s.getType() == Schema.Type.ARRAY)
.findFirst()
.orElseThrow(() -> new IllegalStateException(
"UNION schema does not contain an ARRAY type: " + sourceSchema));
}

Schema elementSchema = listSchema.getElementType();

List<?> sourceList;
Expand All @@ -151,8 +143,7 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
GenericData.Array<?> arrayValue = (GenericData.Array<?>) sourceValue;
Map<Object, Object> recordMap = new HashMap<>(arrayValue.size());

Schema kvSchema = resolveUnionElement(sourceSchema.getElementType(), Schema.Type.RECORD,
"Map element UNION schema does not contain a RECORD type");
Schema kvSchema = sourceSchema.getElementType();

Schema.Field keyField = kvSchema.getFields().get(0);
Schema.Field valueField = kvSchema.getFields().get(1);
Expand All @@ -177,8 +168,7 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
return recordMap;
}

Schema mapSchema = resolveUnionElement(sourceSchema, Schema.Type.MAP,
"UNION schema does not contain a MAP type");
Schema mapSchema = sourceSchema;

Map<?, ?> sourceMap = (Map<?, ?>) sourceValue;
Map<Object, Object> adaptedMap = new HashMap<>(sourceMap.size());
Expand All @@ -195,21 +185,4 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
}
return adaptedMap;
}

private Schema resolveUnionElement(Schema schema, Schema.Type expectedType, String errorMessage) {
Schema resolved = schema;
if (schema.getType() == Schema.Type.UNION) {
resolved = null;
for (Schema unionMember : schema.getTypes()) {
if (unionMember.getType() == expectedType) {
resolved = unionMember;
break;
}
}
if (resolved == null) {
throw new IllegalStateException(errorMessage + ": " + schema);
}
}
return resolved;
}
}
19 changes: 19 additions & 0 deletions core/src/main/java/kafka/automq/table/binder/RecordBinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.avro.Schema.Type.NULL;

/**
* A factory that creates lazy-evaluation Record views of Avro GenericRecords.
* Field values are converted only when accessed, avoiding upfront conversion overhead.
Expand Down Expand Up @@ -154,9 +156,26 @@ private FieldMapping createOptimizedMapping(String avroFieldName, int avroPositi
nestedSchema = icebergType.asStructType().asSchema();
nestedSchemaId = icebergType.toString();
}
if (Type.TypeID.MAP.equals(icebergType.typeId()) || Type.TypeID.LIST.equals(icebergType.typeId())) {
avroType = resolveUnionElement(avroType);
}
return new FieldMapping(avroPosition, avroFieldName, icebergType, icebergType.typeId(), avroType, nestedSchema, nestedSchemaId);
}

private Schema resolveUnionElement(Schema schema) {
Schema resolved = schema;
if (schema.getType() == Schema.Type.UNION) {
resolved = null;
for (Schema unionMember : schema.getTypes()) {
if (unionMember.getType() != NULL) {
resolved = unionMember;
break;
}
}
}
return resolved;
}


/**
* Pre-computes RecordBinders for nested STRUCT fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,64 +938,6 @@ public void testUnionStringMapConversion() {
testSendRecord(icebergSchema, icebergRecord);
}

@Test
public void testUnionArrayMapConversion() {
String avroSchemaStr = " {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"TestRecord\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"mapField\",\n" +
" \"type\": {\n" +
" \"type\": \"array\",\n" +
" \"logicalType\": \"map\",\n" +
" \"items\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"UnionMapEntry\",\n" +
" \"fields\": [\n" +
" {\"name\": \"key\", \"type\": \"int\"},\n" +
" {\"name\": \"value\", \"type\": \"string\"}\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n";

avroSchema = new Schema.Parser().parse(avroSchemaStr);

Map<Integer, String> expectedMap = new HashMap<>();
expectedMap.put(10, "alpha");
expectedMap.put(20, "beta");

Schema mapFieldSchema = avroSchema.getField("mapField").schema();
Schema elementUnionSchema = mapFieldSchema.getElementType();
Schema entrySchema = elementUnionSchema.getTypes().stream()
.filter(s -> s.getType() == Schema.Type.RECORD)
.findFirst()
.orElseThrow(() -> new IllegalStateException("Array element UNION schema does not contain a RECORD type"));

GenericData.Array<Object> mapEntries = new GenericData.Array<>(expectedMap.size() + 1, mapFieldSchema);
for (Map.Entry<Integer, String> entry : expectedMap.entrySet()) {
GenericRecord mapEntry = new GenericData.Record(entrySchema);
mapEntry.put("key", entry.getKey());
mapEntry.put("value", entry.getValue());
mapEntries.add(mapEntry);
}
mapEntries.add(null);

AvroValueAdapter adapter = new AvroValueAdapter();
Types.MapType mapType = Types.MapType.ofOptional(1, 2, Types.IntegerType.get(), Types.StringType.get());

@SuppressWarnings("unchecked")
Map<Integer, Object> result = (Map<Integer, Object>) adapter.convert(mapEntries, mapFieldSchema, mapType);

assertEquals(expectedMap, result);
}

// Test method for converting a record with nested fields
@Test
public void testNestedRecordConversion() {
Expand Down Expand Up @@ -1132,6 +1074,40 @@ public void testUnionFieldConversion() {
" {\n" +
" \"name\": \"unionField4\",\n" +
" \"type\": [\"null\", \"string\"]\n" +
" },\n" +
" {\n" +
" \"name\": \"unionListField\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"array\",\n" +
" \"items\": \"string\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"unionMapField\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"map\",\n" +
" \"values\": \"int\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"unionStructField\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"UnionStruct\",\n" +
" \"fields\": [\n" +
" {\"name\": \"innerString\", \"type\": \"string\"},\n" +
" {\"name\": \"innerInt\", \"type\": \"int\"}\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }\n";
Expand All @@ -1141,6 +1117,17 @@ public void testUnionFieldConversion() {
avroRecord.put("unionField1", "union_string");
avroRecord.put("unionField2", 42);
avroRecord.put("unionField3", true);
List<String> unionList = Arrays.asList("item1", "item2");
avroRecord.put("unionListField", unionList);
Map<String, Integer> unionMap = new HashMap<>();
unionMap.put("one", 1);
unionMap.put("two", 2);
avroRecord.put("unionMapField", unionMap);
Schema unionStructSchema = avroSchema.getField("unionStructField").schema().getTypes().get(1);
GenericRecord unionStruct = new GenericData.Record(unionStructSchema);
unionStruct.put("innerString", "nested");
unionStruct.put("innerInt", 99);
avroRecord.put("unionStructField", unionStruct);

// Convert Avro record to Iceberg record using the wrapper
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
Expand All @@ -1156,6 +1143,15 @@ public void testUnionFieldConversion() {
Object unionField3 = icebergRecord.getField("unionField3");
assertEquals(true, unionField3);

assertNull(icebergRecord.getField("unionField4"));

assertEquals(unionList, normalizeValue(icebergRecord.getField("unionListField")));
assertEquals(unionMap, normalizeValue(icebergRecord.getField("unionMapField")));

Record unionStructRecord = (Record) icebergRecord.getField("unionStructField");
assertEquals("nested", unionStructRecord.getField("innerString").toString());
assertEquals(99, unionStructRecord.getField("innerInt"));

// Send the record to the table
testSendRecord(icebergSchema, icebergRecord);
}
Expand Down
Loading