diff --git a/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java b/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java index 2635bd3e89..fa35a9b8e5 100644 --- a/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java +++ b/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java @@ -118,14 +118,6 @@ protected Object convertTimestamp(Object sourceValue, Schema sourceSchema, Types @Override protected List convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType) { Schema listSchema = sourceSchema; - if (listSchema.getType() == Schema.Type.UNION) { - listSchema = listSchema.getTypes().stream() - .filter(s -> s.getType() == Schema.Type.ARRAY) - .findFirst() - .orElseThrow(() -> new IllegalStateException( - "UNION schema does not contain an ARRAY type: " + sourceSchema)); - } - Schema elementSchema = listSchema.getElementType(); List sourceList; @@ -151,8 +143,7 @@ protected List convertList(Object sourceValue, Schema sourceSchema, Types.Lis GenericData.Array arrayValue = (GenericData.Array) sourceValue; Map recordMap = new HashMap<>(arrayValue.size()); - Schema kvSchema = resolveUnionElement(sourceSchema.getElementType(), Schema.Type.RECORD, - "Map element UNION schema does not contain a RECORD type"); + Schema kvSchema = sourceSchema.getElementType(); Schema.Field keyField = kvSchema.getFields().get(0); Schema.Field valueField = kvSchema.getFields().get(1); @@ -177,8 +168,7 @@ protected List convertList(Object sourceValue, Schema sourceSchema, Types.Lis return recordMap; } - Schema mapSchema = resolveUnionElement(sourceSchema, Schema.Type.MAP, - "UNION schema does not contain a MAP type"); + Schema mapSchema = sourceSchema; Map sourceMap = (Map) sourceValue; Map adaptedMap = new HashMap<>(sourceMap.size()); @@ -195,21 +185,4 @@ protected List convertList(Object sourceValue, Schema sourceSchema, Types.Lis } return adaptedMap; } - - private Schema resolveUnionElement(Schema schema, Schema.Type expectedType, String errorMessage) { - Schema resolved = schema; - if (schema.getType() == Schema.Type.UNION) { - resolved = null; - for (Schema unionMember : schema.getTypes()) { - if (unionMember.getType() == expectedType) { - resolved = unionMember; - break; - } - } - if (resolved == null) { - throw new IllegalStateException(errorMessage + ": " + schema); - } - } - return resolved; - } } diff --git a/core/src/main/java/kafka/automq/table/binder/RecordBinder.java b/core/src/main/java/kafka/automq/table/binder/RecordBinder.java index 6f0d116518..0c1be6a3db 100644 --- a/core/src/main/java/kafka/automq/table/binder/RecordBinder.java +++ b/core/src/main/java/kafka/automq/table/binder/RecordBinder.java @@ -34,6 +34,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.avro.Schema.Type.NULL; + /** * A factory that creates lazy-evaluation Record views of Avro GenericRecords. * Field values are converted only when accessed, avoiding upfront conversion overhead. @@ -154,9 +156,26 @@ private FieldMapping createOptimizedMapping(String avroFieldName, int avroPositi nestedSchema = icebergType.asStructType().asSchema(); nestedSchemaId = icebergType.toString(); } + if (Type.TypeID.MAP.equals(icebergType.typeId()) || Type.TypeID.LIST.equals(icebergType.typeId())) { + avroType = resolveUnionElement(avroType); + } return new FieldMapping(avroPosition, avroFieldName, icebergType, icebergType.typeId(), avroType, nestedSchema, nestedSchemaId); } + private Schema resolveUnionElement(Schema schema) { + Schema resolved = schema; + if (schema.getType() == Schema.Type.UNION) { + resolved = null; + for (Schema unionMember : schema.getTypes()) { + if (unionMember.getType() != NULL) { + resolved = unionMember; + break; + } + } + } + return resolved; + } + /** * Pre-computes RecordBinders for nested STRUCT fields. diff --git a/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java b/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java index 1d6eb4769e..9051e27a6a 100644 --- a/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java +++ b/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java @@ -938,64 +938,6 @@ public void testUnionStringMapConversion() { testSendRecord(icebergSchema, icebergRecord); } - @Test - public void testUnionArrayMapConversion() { - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\n" + - " \"name\": \"mapField\",\n" + - " \"type\": {\n" + - " \"type\": \"array\",\n" + - " \"logicalType\": \"map\",\n" + - " \"items\": [\n" + - " \"null\",\n" + - " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"UnionMapEntry\",\n" + - " \"fields\": [\n" + - " {\"name\": \"key\", \"type\": \"int\"},\n" + - " {\"name\": \"value\", \"type\": \"string\"}\n" + - " ]\n" + - " }\n" + - " ]\n" + - " }\n" + - " }\n" + - " ]\n" + - " }\n"; - - avroSchema = new Schema.Parser().parse(avroSchemaStr); - - Map expectedMap = new HashMap<>(); - expectedMap.put(10, "alpha"); - expectedMap.put(20, "beta"); - - Schema mapFieldSchema = avroSchema.getField("mapField").schema(); - Schema elementUnionSchema = mapFieldSchema.getElementType(); - Schema entrySchema = elementUnionSchema.getTypes().stream() - .filter(s -> s.getType() == Schema.Type.RECORD) - .findFirst() - .orElseThrow(() -> new IllegalStateException("Array element UNION schema does not contain a RECORD type")); - - GenericData.Array mapEntries = new GenericData.Array<>(expectedMap.size() + 1, mapFieldSchema); - for (Map.Entry entry : expectedMap.entrySet()) { - GenericRecord mapEntry = new GenericData.Record(entrySchema); - mapEntry.put("key", entry.getKey()); - mapEntry.put("value", entry.getValue()); - mapEntries.add(mapEntry); - } - mapEntries.add(null); - - AvroValueAdapter adapter = new AvroValueAdapter(); - Types.MapType mapType = Types.MapType.ofOptional(1, 2, Types.IntegerType.get(), Types.StringType.get()); - - @SuppressWarnings("unchecked") - Map result = (Map) adapter.convert(mapEntries, mapFieldSchema, mapType); - - assertEquals(expectedMap, result); - } - // Test method for converting a record with nested fields @Test public void testNestedRecordConversion() { @@ -1132,6 +1074,40 @@ public void testUnionFieldConversion() { " {\n" + " \"name\": \"unionField4\",\n" + " \"type\": [\"null\", \"string\"]\n" + + " },\n" + + " {\n" + + " \"name\": \"unionListField\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " {\n" + + " \"type\": \"array\",\n" + + " \"items\": \"string\"\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"name\": \"unionMapField\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " {\n" + + " \"type\": \"map\",\n" + + " \"values\": \"int\"\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"name\": \"unionStructField\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"UnionStruct\",\n" + + " \"fields\": [\n" + + " {\"name\": \"innerString\", \"type\": \"string\"},\n" + + " {\"name\": \"innerInt\", \"type\": \"int\"}\n" + + " ]\n" + + " }\n" + + " ]\n" + " }\n" + " ]\n" + " }\n"; @@ -1141,6 +1117,17 @@ public void testUnionFieldConversion() { avroRecord.put("unionField1", "union_string"); avroRecord.put("unionField2", 42); avroRecord.put("unionField3", true); + List unionList = Arrays.asList("item1", "item2"); + avroRecord.put("unionListField", unionList); + Map unionMap = new HashMap<>(); + unionMap.put("one", 1); + unionMap.put("two", 2); + avroRecord.put("unionMapField", unionMap); + Schema unionStructSchema = avroSchema.getField("unionStructField").schema().getTypes().get(1); + GenericRecord unionStruct = new GenericData.Record(unionStructSchema); + unionStruct.put("innerString", "nested"); + unionStruct.put("innerInt", 99); + avroRecord.put("unionStructField", unionStruct); // Convert Avro record to Iceberg record using the wrapper org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); @@ -1156,6 +1143,15 @@ public void testUnionFieldConversion() { Object unionField3 = icebergRecord.getField("unionField3"); assertEquals(true, unionField3); + assertNull(icebergRecord.getField("unionField4")); + + assertEquals(unionList, normalizeValue(icebergRecord.getField("unionListField"))); + assertEquals(unionMap, normalizeValue(icebergRecord.getField("unionMapField"))); + + Record unionStructRecord = (Record) icebergRecord.getField("unionStructField"); + assertEquals("nested", unionStructRecord.getField("innerString").toString()); + assertEquals(99, unionStructRecord.getField("innerInt")); + // Send the record to the table testSendRecord(icebergSchema, icebergRecord); }