diff --git a/core/src/main/java/kafka/automq/table/binder/AbstractTypeAdapter.java b/core/src/main/java/kafka/automq/table/binder/AbstractTypeAdapter.java index d095ad6f75..6ec81384e0 100644 --- a/core/src/main/java/kafka/automq/table/binder/AbstractTypeAdapter.java +++ b/core/src/main/java/kafka/automq/table/binder/AbstractTypeAdapter.java @@ -48,9 +48,10 @@ */ public abstract class AbstractTypeAdapter implements TypeAdapter { + @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"}) @Override - public Object convert(Object sourceValue, S sourceSchema, Type targetType) { + public Object convert(Object sourceValue, S sourceSchema, Type targetType, StructConverter structConverter) { if (sourceValue == null) { return null; } @@ -83,9 +84,11 @@ public Object convert(Object sourceValue, S sourceSchema, Type targetType) { case TIMESTAMP: return convertTimestamp(sourceValue, sourceSchema, (Types.TimestampType) targetType); case LIST: - return convertList(sourceValue, sourceSchema, (Types.ListType) targetType); + return convertList(sourceValue, sourceSchema, (Types.ListType) targetType, structConverter); case MAP: - return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType); + return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType, structConverter); + case STRUCT: + return structConverter.convert(sourceValue, sourceSchema, targetType); default: return sourceValue; } @@ -203,10 +206,13 @@ protected Object convertTimestamp(Object sourceValue, S sourceSchema, Types.Time Instant instant = Instant.parse(sourceValue.toString()); return DateTimeUtil.timestamptzFromMicros(DateTimeUtil.microsFromInstant(instant)); } + if (sourceValue instanceof Number) { + return DateTimeUtil.timestamptzFromMicros(((Number) sourceValue).longValue()); + } throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId()); } - protected abstract List convertList(Object sourceValue, S sourceSchema, Types.ListType targetType); + protected abstract List convertList(Object sourceValue, S sourceSchema, Types.ListType targetType, StructConverter structConverter); - protected abstract Map convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType); + protected abstract Map convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType, StructConverter structConverter); } diff --git a/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java b/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java index fa35a9b8e5..d7eae3a19b 100644 --- a/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java +++ b/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java @@ -116,7 +116,7 @@ protected Object convertTimestamp(Object sourceValue, Schema sourceSchema, Types } @Override - protected List convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType) { + protected List convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType, StructConverter structConverter) { Schema listSchema = sourceSchema; Schema elementSchema = listSchema.getElementType(); @@ -131,14 +131,14 @@ protected List convertList(Object sourceValue, Schema sourceSchema, Types.Lis List list = new ArrayList<>(sourceList.size()); for (Object element : sourceList) { - Object convert = convert(element, elementSchema, targetType.elementType()); + Object convert = convert(element, elementSchema, targetType.elementType(), structConverter); list.add(convert); } return list; } @Override - protected Map convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType) { + protected Map convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType, StructConverter structConverter) { if (sourceValue instanceof GenericData.Array) { GenericData.Array arrayValue = (GenericData.Array) sourceValue; Map recordMap = new HashMap<>(arrayValue.size()); @@ -161,8 +161,8 @@ protected List convertList(Object sourceValue, Schema sourceSchema, Types.Lis continue; } GenericRecord record = (GenericRecord) element; - Object key = convert(record.get(keyField.pos()), keySchema, keyType); - Object value = convert(record.get(valueField.pos()), valueSchema, valueType); + Object key = convert(record.get(keyField.pos()), keySchema, keyType, structConverter); + Object value = convert(record.get(valueField.pos()), valueSchema, valueType, structConverter); recordMap.put(key, value); } return recordMap; @@ -179,10 +179,32 @@ protected List convertList(Object sourceValue, Schema sourceSchema, Types.Lis for (Map.Entry entry : sourceMap.entrySet()) { Object rawKey = entry.getKey(); - Object key = convert(rawKey, STRING_SCHEMA_INSTANCE, keyType); - Object value = convert(entry.getValue(), valueSchema, valueType); + Object key = convert(rawKey, STRING_SCHEMA_INSTANCE, keyType, structConverter); + Object value = convert(entry.getValue(), valueSchema, valueType, structConverter); adaptedMap.put(key, value); } return adaptedMap; } + + @Override + public Object convert(Object sourceValue, Schema sourceSchema, Type targetType) { + return convert(sourceValue, sourceSchema, targetType, this::convertStruct); + } + + protected Object convertStruct(Object sourceValue, Schema sourceSchema, Type targetType) { + org.apache.iceberg.Schema schema = targetType.asStructType().asSchema(); + org.apache.iceberg.data.GenericRecord result = org.apache.iceberg.data.GenericRecord.create(schema); + for (Types.NestedField f : schema.columns()) { + // Convert the value to the expected type + GenericRecord record = (GenericRecord) sourceValue; + Schema.Field sourceField = sourceSchema.getField(f.name()); + if (sourceField == null) { + throw new IllegalStateException("Missing field '" + f.name() + + "' in source schema: " + sourceSchema.getFullName()); + } + Object fieldValue = convert(record.get(f.name()), sourceField.schema(), f.type()); + result.setField(f.name(), fieldValue); + } + return result; + } } diff --git a/core/src/main/java/kafka/automq/table/binder/FieldMapping.java b/core/src/main/java/kafka/automq/table/binder/FieldMapping.java new file mode 100644 index 0000000000..13178235f4 --- /dev/null +++ b/core/src/main/java/kafka/automq/table/binder/FieldMapping.java @@ -0,0 +1,57 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.automq.table.binder; + +import org.apache.avro.Schema; +import org.apache.iceberg.types.Type; + +/** + * Represents the mapping between an Avro field and its corresponding Iceberg field. + * This class stores the position, key, schema, and type information needed to + * convert field values during record binding. + */ +public class FieldMapping { + private final int avroPosition; + private final String avroKey; + private final Type icebergType; + private final Schema avroSchema; + + public FieldMapping(int avroPosition, String avroKey, Type icebergType, Schema avroSchema) { + this.avroPosition = avroPosition; + this.avroKey = avroKey; + this.icebergType = icebergType; + this.avroSchema = avroSchema; + } + + public int avroPosition() { + return avroPosition; + } + + public String avroKey() { + return avroKey; + } + + public Type icebergType() { + return icebergType; + } + + public Schema avroSchema() { + return avroSchema; + } +} diff --git a/core/src/main/java/kafka/automq/table/binder/RecordBinder.java b/core/src/main/java/kafka/automq/table/binder/RecordBinder.java index e6020390b4..1b47374264 100644 --- a/core/src/main/java/kafka/automq/table/binder/RecordBinder.java +++ b/core/src/main/java/kafka/automq/table/binder/RecordBinder.java @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -48,7 +49,7 @@ public class RecordBinder { private final FieldMapping[] fieldMappings; // Pre-computed RecordBinders for nested STRUCT fields - private final Map nestedStructBinders; + private final Map nestedStructBinders; // Field count statistics for this batch private final AtomicLong batchFieldCount; @@ -78,11 +79,9 @@ public RecordBinder(org.apache.iceberg.Schema icebergSchema, Schema avroSchema, } // Initialize field mappings - this.fieldMappings = new FieldMapping[icebergSchema.columns().size()]; - initializeFieldMappings(avroSchema); - + this.fieldMappings = buildFieldMappings(avroSchema, icebergSchema); // Pre-compute nested struct binders - this.nestedStructBinders = precomputeNestedStructBinders(typeAdapter); + this.nestedStructBinders = precomputeBindersMap(typeAdapter); } public RecordBinder createBinderForNewSchema(org.apache.iceberg.Schema icebergSchema, Schema avroSchema) { @@ -121,8 +120,9 @@ void addFieldCount(long count) { batchFieldCount.addAndGet(count); } - private void initializeFieldMappings(Schema avroSchema) { + private FieldMapping[] buildFieldMappings(Schema avroSchema, org.apache.iceberg.Schema icebergSchema) { Schema recordSchema = avroSchema; + FieldMapping[] mappings = new FieldMapping[icebergSchema.columns().size()]; if (recordSchema.getType() == Schema.Type.UNION) { recordSchema = recordSchema.getTypes().stream() @@ -137,32 +137,28 @@ private void initializeFieldMappings(Schema avroSchema) { Schema.Field avroField = recordSchema.getField(fieldName); if (avroField != null) { - fieldMappings[icebergPos] = createOptimizedMapping( + mappings[icebergPos] = buildFieldMapping( avroField.name(), avroField.pos(), icebergField.type(), avroField.schema() ); } else { - fieldMappings[icebergPos] = null; + mappings[icebergPos] = null; } } + return mappings; } - private FieldMapping createOptimizedMapping(String avroFieldName, int avroPosition, Type icebergType, Schema avroType) { - org.apache.iceberg.Schema nestedSchema = null; - String nestedSchemaId = null; - if (icebergType.isStructType()) { - nestedSchema = icebergType.asStructType().asSchema(); - nestedSchemaId = icebergType.toString(); - } + private FieldMapping buildFieldMapping(String avroFieldName, int avroPosition, Type icebergType, Schema avroType) { if (Type.TypeID.TIMESTAMP.equals(icebergType.typeId()) || Type.TypeID.TIME.equals(icebergType.typeId()) || Type.TypeID.MAP.equals(icebergType.typeId()) - || Type.TypeID.LIST.equals(icebergType.typeId())) { + || Type.TypeID.LIST.equals(icebergType.typeId()) + || Type.TypeID.STRUCT.equals(icebergType.typeId())) { avroType = resolveUnionElement(avroType); } - return new FieldMapping(avroPosition, avroFieldName, icebergType, icebergType.typeId(), avroType, nestedSchema, nestedSchemaId); + return new FieldMapping(avroPosition, avroFieldName, icebergType, avroType); } private Schema resolveUnionElement(Schema schema) { @@ -183,24 +179,55 @@ private Schema resolveUnionElement(Schema schema) { /** * Pre-computes RecordBinders for nested STRUCT fields. */ - private Map precomputeNestedStructBinders(TypeAdapter typeAdapter) { - Map binders = new HashMap<>(); + private Map precomputeBindersMap(TypeAdapter typeAdapter) { + Map binders = new IdentityHashMap<>(); for (FieldMapping mapping : fieldMappings) { - if (mapping != null && mapping.typeId() == Type.TypeID.STRUCT) { - String structId = mapping.nestedSchemaId(); - if (!binders.containsKey(structId)) { - RecordBinder nestedBinder = new RecordBinder( - mapping.nestedSchema(), + if (mapping != null) { + Type type = mapping.icebergType(); + if (type.isPrimitiveType()) { + } else if (type.isStructType()) { + org.apache.iceberg.Schema schema = type.asStructType().asSchema(); + RecordBinder structBinder = new RecordBinder( + schema, mapping.avroSchema(), typeAdapter, batchFieldCount ); - binders.put(structId, nestedBinder); + binders.put(mapping.avroSchema(), structBinder); + } else if (type.isListType()) { + Types.ListType listType = type.asListType(); + Type elementType = listType.elementType(); + if (elementType.isStructType()) { + org.apache.iceberg.Schema schema = elementType.asStructType().asSchema(); + RecordBinder elementBinder = new RecordBinder( + schema, + mapping.avroSchema().getElementType(), + typeAdapter, + batchFieldCount + ); + binders.put(mapping.avroSchema().getElementType(), elementBinder); + } + } else if (type.isMapType()) { + Types.MapType mapType = type.asMapType(); + Type keyType = mapType.keyType(); + Type valueType = mapType.valueType(); + if (keyType.isStructType()) { + throw new UnsupportedOperationException("Struct keys in MAP types are not supported"); + } + if (valueType.isStructType()) { + org.apache.iceberg.Schema schema = valueType.asStructType().asSchema(); + RecordBinder valueBinder = new RecordBinder( + schema, + mapping.avroSchema().getValueType(), + typeAdapter, + batchFieldCount + ); + binders.put(mapping.avroSchema().getValueType(), valueBinder); + } } } } - return binders; } @@ -210,16 +237,16 @@ private static class AvroRecordView implements Record { private final TypeAdapter typeAdapter; private final Map fieldNameToPosition; private final FieldMapping[] fieldMappings; - private final Map nestedStructBinders; + private final Map nestedStructBinders; private final RecordBinder parentBinder; AvroRecordView(GenericRecord avroRecord, - org.apache.iceberg.Schema icebergSchema, - TypeAdapter typeAdapter, - Map fieldNameToPosition, - FieldMapping[] fieldMappings, - Map nestedStructBinders, - RecordBinder parentBinder) { + org.apache.iceberg.Schema icebergSchema, + TypeAdapter typeAdapter, + Map fieldNameToPosition, + FieldMapping[] fieldMappings, + Map nestedStructBinders, + RecordBinder parentBinder) { this.avroRecord = avroRecord; this.icebergSchema = icebergSchema; this.typeAdapter = typeAdapter; @@ -242,25 +269,11 @@ public Object get(int pos) { if (mapping == null) { return null; } - Object avroValue = avroRecord.get(mapping.avroPosition()); if (avroValue == null) { return null; } - - // Handle STRUCT type - delegate to nested binder - if (mapping.typeId() == Type.TypeID.STRUCT) { - String structId = mapping.nestedSchemaId(); - RecordBinder nestedBinder = nestedStructBinders.get(structId); - if (nestedBinder == null) { - throw new IllegalStateException("Nested binder not found for struct: " + structId); - } - parentBinder.addFieldCount(1); - return nestedBinder.bind((GenericRecord) avroValue); - } - - // Convert non-STRUCT types - Object result = typeAdapter.convert(avroValue, mapping.avroSchema(), mapping.icebergType()); + Object result = convert(avroValue, mapping.avroSchema(), mapping.icebergType()); // Calculate and accumulate field count long fieldCount = calculateFieldCount(result, mapping.icebergType()); @@ -269,6 +282,17 @@ public Object get(int pos) { return result; } + public Object convert(Object sourceValue, Schema sourceSchema, Type targetType) { + if (targetType.typeId() == Type.TypeID.STRUCT) { + RecordBinder binder = nestedStructBinders.get(sourceSchema); + if (binder == null) { + throw new IllegalStateException("Missing nested binder for schema: " + sourceSchema); + } + return binder.bind((GenericRecord) sourceValue); + } + return typeAdapter.convert(sourceValue, (Schema) sourceSchema, targetType, this::convert); + } + /** * Calculates the field count for a converted value based on its size. * Large fields are counted multiple times based on the size threshold. @@ -358,66 +382,20 @@ public T get(int pos, Class javaClass) { public void setField(String name, Object value) { throw new UnsupportedOperationException("Read-only"); } + @Override public Record copy() { throw new UnsupportedOperationException("Read-only"); } + @Override public Record copy(Map overwriteValues) { throw new UnsupportedOperationException("Read-only"); } + @Override public void set(int pos, T value) { throw new UnsupportedOperationException("Read-only"); } } - - // Field mapping structure - private static class FieldMapping { - private final int avroPosition; - private final String avroKey; - private final Type icebergType; - private final Type.TypeID typeId; - private final Schema avroSchema; - private final org.apache.iceberg.Schema nestedSchema; - private final String nestedSchemaId; - - FieldMapping(int avroPosition, String avroKey, Type icebergType, Type.TypeID typeId, Schema avroSchema, org.apache.iceberg.Schema nestedSchema, String nestedSchemaId) { - this.avroPosition = avroPosition; - this.avroKey = avroKey; - this.icebergType = icebergType; - this.typeId = typeId; - this.avroSchema = avroSchema; - this.nestedSchema = nestedSchema; - this.nestedSchemaId = nestedSchemaId; - } - - public int avroPosition() { - return avroPosition; - } - - public String avroKey() { - return avroKey; - } - - public Type icebergType() { - return icebergType; - } - - public Type.TypeID typeId() { - return typeId; - } - - public Schema avroSchema() { - return avroSchema; - } - - public org.apache.iceberg.Schema nestedSchema() { - return nestedSchema; - } - - public String nestedSchemaId() { - return nestedSchemaId; - } - } } diff --git a/core/src/main/java/kafka/automq/table/binder/StructConverter.java b/core/src/main/java/kafka/automq/table/binder/StructConverter.java new file mode 100644 index 0000000000..87372cd33c --- /dev/null +++ b/core/src/main/java/kafka/automq/table/binder/StructConverter.java @@ -0,0 +1,27 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.automq.table.binder; + +import org.apache.iceberg.types.Type; + +@FunctionalInterface +public interface StructConverter { + + Object convert(Object sourceValue, S sourceSchema, Type targetType); +} diff --git a/core/src/main/java/kafka/automq/table/binder/TypeAdapter.java b/core/src/main/java/kafka/automq/table/binder/TypeAdapter.java index b38b3861d6..2f5b06c670 100644 --- a/core/src/main/java/kafka/automq/table/binder/TypeAdapter.java +++ b/core/src/main/java/kafka/automq/table/binder/TypeAdapter.java @@ -37,4 +37,14 @@ public interface TypeAdapter { */ Object convert(Object sourceValue, S sourceSchema, Type targetType); + /** + * Converts a source value to the target Iceberg type with support for recursive struct conversion. + * + * @param sourceValue The source value + * @param sourceSchema The source schema + * @param targetType The target Iceberg type + * @param structConverter A callback for converting nested STRUCT types + * @return The converted value + */ + Object convert(Object sourceValue, S sourceSchema, Type targetType, StructConverter structConverter); } diff --git a/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java b/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java index ab589a68b1..30492c7f22 100644 --- a/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java +++ b/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java @@ -84,6 +84,8 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("S3Unit") public class AvroRecordBinderTest { @@ -120,7 +122,7 @@ private void testSendRecord(org.apache.iceberg.Schema schema, Record record) { } } - private TaskWriter createTableWriter(Table table) { + public static TaskWriter createTableWriter(Table table) { FileAppenderFactory appenderFactory = new GenericAppenderFactory( table.schema(), table.spec(), @@ -283,35 +285,6 @@ record -> record.put(fieldName, null), ); } - /** - * Writes a record whose map field may still contain Avro GenericRecord entries for non-string keys. - * The entries are rebound to Iceberg Records when necessary so the writer sees the expected structure. - */ - private void writeRecordWithConvertedMapEntries(Record icebergRecord, - org.apache.iceberg.Schema icebergSchema, - Schema entryAvroSchema) { - Object fieldValue = icebergRecord.getField("mapField"); - if (fieldValue instanceof List) { - List entries = (List) fieldValue; - if (!entries.isEmpty() && !(entries.get(0) instanceof Record)) { - org.apache.iceberg.types.Type elementType = - icebergSchema.findField("mapField").type().asListType().elementType(); - org.apache.iceberg.Schema entryIcebergSchema = elementType.asStructType().asSchema(); - RecordBinder entryBinder = new RecordBinder(entryIcebergSchema, entryAvroSchema); - List convertedEntries = new ArrayList<>(entries.size()); - for (Object entry : entries) { - convertedEntries.add(entryBinder.bind((GenericRecord) entry)); - } - org.apache.iceberg.data.GenericRecord copy = org.apache.iceberg.data.GenericRecord.create(icebergSchema.asStruct()); - copy.setField("mapField", convertedEntries); - testSendRecord(icebergSchema.asStruct().asSchema(), copy); - return; - } - } - testSendRecord(icebergSchema.asStruct().asSchema(), icebergRecord); - } - - @Test public void testSchemaEvolution() { // Original Avro schema with 3 fields @@ -631,6 +604,262 @@ public void testListWithNullableElementsConversion() { ); } + @Test + public void testListOfRecordsConversion() { + String avroSchemaJson = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"ListRecordContainer\",\n" + + " \"namespace\": \"" + TEST_NAMESPACE + "\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"listField\",\n" + + " \"type\": {\n" + + " \"type\": \"array\",\n" + + " \"items\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"ListRecordEntry\",\n" + + " \"fields\": [\n" + + " {\"name\": \"innerString\", \"type\": \"string\"},\n" + + " {\"name\": \"innerInt\", \"type\": \"int\"}\n" + + " ]\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + "}\n"; + + Schema avroSchema = new Schema.Parser().parse(avroSchemaJson); + GenericRecord avroRecord = new GenericData.Record(avroSchema); + + Schema listFieldSchema = avroSchema.getField("listField").schema(); + Schema listEntrySchema = listFieldSchema.getElementType(); + + @SuppressWarnings("unchecked") + GenericData.Array listValue = new GenericData.Array<>(2, listFieldSchema); + + GenericRecord firstEntry = new GenericData.Record(listEntrySchema); + firstEntry.put("innerString", new Utf8("first")); + firstEntry.put("innerInt", 1); + listValue.add(firstEntry); + + GenericRecord secondEntry = new GenericData.Record(listEntrySchema); + secondEntry.put("innerString", new Utf8("second")); + secondEntry.put("innerInt", 2); + listValue.add(secondEntry); + + avroRecord.put("listField", listValue); + + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema) + .bind(serializeAndDeserialize(avroRecord, avroSchema)); + + @SuppressWarnings("unchecked") + List boundList = (List) icebergRecord.getField("listField"); + assertEquals(2, boundList.size()); + assertEquals("first", boundList.get(0).getField("innerString").toString()); + assertEquals(1, boundList.get(0).getField("innerInt")); + assertEquals("second", boundList.get(1).getField("innerString").toString()); + assertEquals(2, boundList.get(1).getField("innerInt")); + + testSendRecord(icebergSchema, icebergRecord); + } + + @Test + public void testStructBindersHandleDuplicateFullNames() { + Schema directStruct = Schema.createRecord("DuplicatedStruct", null, TEST_NAMESPACE, false); + directStruct.setFields(Arrays.asList( + new Schema.Field("directOnly", Schema.create(Schema.Type.STRING), null, null) + )); + + Schema listStruct = Schema.createRecord("DuplicatedStruct", null, TEST_NAMESPACE, false); + listStruct.setFields(Arrays.asList( + new Schema.Field("listOnly", Schema.create(Schema.Type.INT), null, null) + )); + + Schema listSchema = Schema.createArray(listStruct); + + Schema parent = Schema.createRecord("StructCollisionRoot", null, TEST_NAMESPACE, false); + parent.setFields(Arrays.asList( + new Schema.Field("directField", directStruct, null, null), + new Schema.Field("listField", listSchema, null, null) + )); + + GenericRecord parentRecord = new GenericData.Record(parent); + GenericRecord directRecord = new GenericData.Record(directStruct); + directRecord.put("directOnly", new Utf8("direct")); + parentRecord.put("directField", directRecord); + + @SuppressWarnings("unchecked") + GenericData.Array listValue = new GenericData.Array<>(1, listSchema); + GenericRecord listRecord = new GenericData.Record(listStruct); + listRecord.put("listOnly", 42); + listValue.add(listRecord); + parentRecord.put("listField", listValue); + + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(parent); + Record icebergRecord = new RecordBinder(icebergSchema, parent) + .bind(serializeAndDeserialize(parentRecord, parent)); + + Record directField = (Record) icebergRecord.getField("directField"); + assertEquals("direct", directField.getField("directOnly").toString()); + + @SuppressWarnings("unchecked") + List boundList = (List) icebergRecord.getField("listField"); + assertEquals(1, boundList.size()); + assertEquals(42, boundList.get(0).getField("listOnly")); + } + + @Test + public void testStructBindersHandleDuplicateFullNamesInMapValues() { + Schema directStruct = Schema.createRecord("DuplicatedStruct", null, TEST_NAMESPACE, false); + directStruct.setFields(Arrays.asList( + new Schema.Field("directOnly", Schema.create(Schema.Type.STRING), null, null) + )); + + Schema mapStruct = Schema.createRecord("DuplicatedStruct", null, TEST_NAMESPACE, false); + mapStruct.setFields(Arrays.asList( + new Schema.Field("mapOnly", Schema.create(Schema.Type.LONG), null, null) + )); + + Schema mapSchema = Schema.createMap(mapStruct); + + Schema parent = Schema.createRecord("StructCollisionMapRoot", null, TEST_NAMESPACE, false); + parent.setFields(Arrays.asList( + new Schema.Field("directField", directStruct, null, null), + new Schema.Field("mapField", mapSchema, null, null) + )); + + GenericRecord parentRecord = new GenericData.Record(parent); + GenericRecord directRecord = new GenericData.Record(directStruct); + directRecord.put("directOnly", new Utf8("direct")); + parentRecord.put("directField", directRecord); + + Map mapValue = new HashMap<>(); + GenericRecord mapEntry = new GenericData.Record(mapStruct); + mapEntry.put("mapOnly", 123L); + mapValue.put("key", mapEntry); + parentRecord.put("mapField", mapValue); + + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(parent); + Record icebergRecord = new RecordBinder(icebergSchema, parent) + .bind(serializeAndDeserialize(parentRecord, parent)); + + Record directField = (Record) icebergRecord.getField("directField"); + assertEquals("direct", directField.getField("directOnly").toString()); + + @SuppressWarnings("unchecked") + Map boundMap = (Map) icebergRecord.getField("mapField"); + assertEquals(1, boundMap.size()); + assertEquals(123L, boundMap.get(new Utf8("key")).getField("mapOnly")); + } + + @Test + public void testConvertStructThrowsWhenSourceFieldMissing() { + Schema nestedSchema = Schema.createRecord("NestedRecord", null, TEST_NAMESPACE, false); + nestedSchema.setFields(Arrays.asList( + new Schema.Field("presentField", Schema.create(Schema.Type.STRING), null, null) + )); + + GenericRecord nestedRecord = new GenericData.Record(nestedSchema); + nestedRecord.put("presentField", new Utf8("value")); + + Types.StructType icebergStruct = Types.StructType.of( + Types.NestedField.optional(2, "presentField", Types.StringType.get()), + Types.NestedField.optional(3, "missingField", Types.StringType.get()) + ); + + AvroValueAdapter adapter = new AvroValueAdapter(); + IllegalStateException exception = assertThrows(IllegalStateException.class, + () -> adapter.convert(nestedRecord, nestedSchema, icebergStruct)); + assertTrue(exception.getMessage().contains("missingField")); + assertTrue(exception.getMessage().contains("NestedRecord")); + } + + @Test + public void testNestedStructsBindRecursively() { + Schema innerStruct = Schema.createRecord("InnerStruct", null, TEST_NAMESPACE, false); + innerStruct.setFields(Arrays.asList( + new Schema.Field("innerField", Schema.create(Schema.Type.INT), null, null) + )); + + Schema middleStruct = Schema.createRecord("MiddleStruct", null, TEST_NAMESPACE, false); + middleStruct.setFields(Arrays.asList( + new Schema.Field("middleField", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("inner", innerStruct, null, null) + )); + + Schema outerStruct = Schema.createRecord("OuterStruct", null, TEST_NAMESPACE, false); + outerStruct.setFields(Arrays.asList( + new Schema.Field("outerField", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("middle", middleStruct, null, null) + )); + + GenericRecord innerRecord = new GenericData.Record(innerStruct); + innerRecord.put("innerField", 7); + + GenericRecord middleRecord = new GenericData.Record(middleStruct); + middleRecord.put("middleField", new Utf8("mid")); + middleRecord.put("inner", innerRecord); + + GenericRecord outerRecord = new GenericData.Record(outerStruct); + outerRecord.put("outerField", new Utf8("out")); + outerRecord.put("middle", middleRecord); + + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(outerStruct); + Record icebergRecord = new RecordBinder(icebergSchema, outerStruct) + .bind(serializeAndDeserialize(outerRecord, outerStruct)); + + Record middleResult = (Record) icebergRecord.getField("middle"); + assertEquals("mid", middleResult.getField("middleField").toString()); + Record innerResult = (Record) middleResult.getField("inner"); + assertEquals(7, innerResult.getField("innerField")); + } + + @Test + public void testStructSchemaInstanceReuseSharesBinder() { + Schema sharedStruct = Schema.createRecord("SharedStruct", null, TEST_NAMESPACE, false); + sharedStruct.setFields(Arrays.asList( + new Schema.Field("value", Schema.create(Schema.Type.LONG), null, null) + )); + + Schema listSchema = Schema.createArray(sharedStruct); + + Schema parent = Schema.createRecord("SharedStructReuseRoot", null, TEST_NAMESPACE, false); + parent.setFields(Arrays.asList( + new Schema.Field("directField", sharedStruct, null, null), + new Schema.Field("listField", listSchema, null, null) + )); + + GenericRecord directValue = new GenericData.Record(sharedStruct); + directValue.put("value", 1L); + + @SuppressWarnings("unchecked") + GenericData.Array listValue = new GenericData.Array<>(2, listSchema); + GenericRecord listEntry1 = new GenericData.Record(sharedStruct); + listEntry1.put("value", 2L); + listValue.add(listEntry1); + GenericRecord listEntry2 = new GenericData.Record(sharedStruct); + listEntry2.put("value", 3L); + listValue.add(listEntry2); + + GenericRecord parentRecord = new GenericData.Record(parent); + parentRecord.put("directField", directValue); + parentRecord.put("listField", listValue); + + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(parent); + Record icebergRecord = new RecordBinder(icebergSchema, parent) + .bind(serializeAndDeserialize(parentRecord, parent)); + + Record directRecord = (Record) icebergRecord.getField("directField"); + assertEquals(1L, directRecord.getField("value")); + + @SuppressWarnings("unchecked") + List boundList = (List) icebergRecord.getField("listField"); + assertEquals(2, boundList.size()); + assertEquals(2L, boundList.get(0).getField("value")); + assertEquals(3L, boundList.get(1).getField("value")); + } + // Test method for converting a map field @Test public void testStringMapConversion() { @@ -644,6 +873,67 @@ public void testStringMapConversion() { ); } + @Test + public void testMapWithRecordValuesConversion() { + String avroSchemaJson = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"MapRecordContainer\",\n" + + " \"namespace\": \"" + TEST_NAMESPACE + "\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"mapField\",\n" + + " \"type\": {\n" + + " \"type\": \"map\",\n" + + " \"values\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"MapValueRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"innerString\", \"type\": \"string\"},\n" + + " {\"name\": \"innerLong\", \"type\": \"long\"}\n" + + " ]\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + "}\n"; + + Schema avroSchema = new Schema.Parser().parse(avroSchemaJson); + GenericRecord avroRecord = new GenericData.Record(avroSchema); + + Schema mapFieldSchema = avroSchema.getField("mapField").schema(); + Schema mapValueSchema = mapFieldSchema.getValueType(); + + Map mapValue = new HashMap<>(); + GenericRecord firstValue = new GenericData.Record(mapValueSchema); + firstValue.put("innerString", new Utf8("first")); + firstValue.put("innerLong", 10L); + mapValue.put("key1", firstValue); + + GenericRecord secondValue = new GenericData.Record(mapValueSchema); + secondValue.put("innerString", new Utf8("second")); + secondValue.put("innerLong", 20L); + mapValue.put("key2", secondValue); + + avroRecord.put("mapField", mapValue); + + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema) + .bind(serializeAndDeserialize(avroRecord, avroSchema)); + + Map boundMap = normalizeMapValues(icebergRecord.getField("mapField")); + assertEquals(2, boundMap.size()); + + Record key1Record = (Record) boundMap.get(new Utf8("key1")); + assertEquals("first", key1Record.getField("innerString").toString()); + assertEquals(10L, key1Record.getField("innerLong")); + + Record key2Record = (Record) boundMap.get(new Utf8("key2")); + assertEquals("second", key2Record.getField("innerString").toString()); + assertEquals(20L, key2Record.getField("innerLong")); + + testSendRecord(icebergSchema, icebergRecord); + } + // Test method for converting a map field @Test public void testIntMapConversion() { @@ -1122,12 +1412,12 @@ public void testFieldCountWithNestedStructure() { // Create nested record GenericRecord nestedRecord = new GenericData.Record(avroSchema.getField("nestedField").schema()); - nestedRecord.put("nestedString", "nested"); // 1 field - nestedRecord.put("nestedInt", 123); // 1 field + nestedRecord.put("nestedString", "nested"); + nestedRecord.put("nestedInt", 123); GenericRecord mainRecord = new GenericData.Record(avroSchema); - mainRecord.put("simpleField", "simple"); // 1 field - mainRecord.put("nestedField", nestedRecord); // STRUCT fields are counted when accessed + mainRecord.put("simpleField", "simple"); + mainRecord.put("nestedField", nestedRecord); Record icebergRecord = recordBinder.bind(mainRecord); @@ -1137,7 +1427,7 @@ public void testFieldCountWithNestedStructure() { assertEquals("nested", nested.getField("nestedString")); assertEquals(123, nested.getField("nestedInt")); - // Total: 3 (simple) + 1(struct) + 3 (nested string) + 1 (nested int) = 4 fields + // Total: 3 (simple) + 1(struct) + 3 (nested string) + 1 (nested int) = 8 fields // Note: STRUCT type itself doesn't add to count, only its leaf fields long fieldCount = recordBinder.getAndResetFieldCount(); assertEquals(8, fieldCount); diff --git a/core/src/test/java/kafka/automq/table/process/convert/ProtobufRegistryConverterTest.java b/core/src/test/java/kafka/automq/table/process/convert/ProtobufRegistryConverterTest.java index fffefe172f..635592db18 100644 --- a/core/src/test/java/kafka/automq/table/process/convert/ProtobufRegistryConverterTest.java +++ b/core/src/test/java/kafka/automq/table/process/convert/ProtobufRegistryConverterTest.java @@ -1,5 +1,6 @@ package kafka.automq.table.process.convert; +import kafka.automq.table.binder.RecordBinder; import kafka.automq.table.deserializer.proto.CustomProtobufSchema; import kafka.automq.table.deserializer.proto.ProtobufSchemaProvider; import kafka.automq.table.deserializer.proto.parse.ProtobufSchemaParser; @@ -9,6 +10,7 @@ import org.apache.kafka.common.utils.ByteUtils; +import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; @@ -17,9 +19,17 @@ import com.squareup.wire.schema.internal.parser.ProtoParser; import org.apache.avro.generic.GenericRecord; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.io.TaskWriter; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -29,6 +39,7 @@ import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import static kafka.automq.table.binder.AvroRecordBinderTest.createTableWriter; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; @@ -37,21 +48,29 @@ public class ProtobufRegistryConverterTest { private static final String ALL_TYPES_PROTO = """ syntax = \"proto3\"; - + package kafka.automq.table.process.proto; - + import \"google/protobuf/timestamp.proto\"; - + message Nested { string name = 1; int32 count = 2; } - + enum SampleEnum { SAMPLE_ENUM_UNSPECIFIED = 0; SAMPLE_ENUM_SECOND = 1; } - + + message FloatArray { + repeated double values = 1; + } + + message StringArray { + repeated string values = 1; + } + message AllTypes { // Scalar primitives in order defined by Avro ProtobufData mapping bool f_bool = 1; @@ -79,12 +98,28 @@ enum SampleEnum { oneof choice { string choice_str = 22; int32 choice_int = 23; + FloatArray choice_float_array = 26; + StringArray choice_string_array = 27; } repeated Nested f_nested_list = 24; map f_string_nested_map = 25; } """; + private void testSendRecord(org.apache.iceberg.Schema schema, org.apache.iceberg.data.Record record) { + InMemoryCatalog catalog = new InMemoryCatalog(); + catalog.initialize("test", ImmutableMap.of()); + catalog.createNamespace(Namespace.of("default")); + String tableName = "test"; + Table table = catalog.createTable(TableIdentifier.of(Namespace.of("default"), tableName), schema); + TaskWriter writer = createTableWriter(table); + try { + writer.write(record); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Test void testConvertAllPrimitiveAndCollectionTypes() throws Exception { String topic = "pb-all-types"; @@ -108,7 +143,7 @@ void testConvertAllPrimitiveAndCollectionTypes() throws Exception { DynamicMessage message = buildAllTypesMessage(descriptor); // magic byte + schema id + single message index + serialized protobuf payload - ByteBuffer payload = buildConfluentPayload(schemaId, message.toByteArray(), 1); + ByteBuffer payload = buildConfluentPayload(schemaId, message.toByteArray(), 3); ProtobufRegistryConverter converter = new ProtobufRegistryConverter(registryClient, "http://mock:8081", false); @@ -121,6 +156,11 @@ void testConvertAllPrimitiveAndCollectionTypes() throws Exception { assertPrimitiveFields(record); assertRepeatedAndMapFields(record); assertNestedAndTimestamp(record); + + org.apache.iceberg.Schema iceberg = AvroSchemaUtil.toIceberg(record.getSchema()); + RecordBinder recordBinder = new RecordBinder(iceberg, record.getSchema()); + Record bind = recordBinder.bind(record); + testSendRecord(iceberg, bind); } private static DynamicMessage buildAllTypesMessage(Descriptors.Descriptor descriptor) { @@ -145,7 +185,16 @@ private static DynamicMessage buildAllTypesMessage(Descriptors.Descriptor descri descriptor.findFieldByName("f_enum"), descriptor.getFile().findEnumTypeByName("SampleEnum").findValueByName("SAMPLE_ENUM_SECOND") ); - builder.setField(descriptor.findFieldByName("choice_str"), "choice-string"); + + // Build FloatArray for oneof choice + Descriptors.FieldDescriptor floatArrayField = descriptor.findFieldByName("choice_float_array"); + Descriptors.Descriptor floatArrayDescriptor = floatArrayField.getMessageType(); + DynamicMessage.Builder floatArrayBuilder = DynamicMessage.newBuilder(floatArrayDescriptor); + Descriptors.FieldDescriptor floatValuesField = floatArrayDescriptor.findFieldByName("values"); + floatArrayBuilder.addRepeatedField(floatValuesField, 1.1); + floatArrayBuilder.addRepeatedField(floatValuesField, 2.2); + floatArrayBuilder.addRepeatedField(floatValuesField, 3.3); + builder.setField(floatArrayField, floatArrayBuilder.build()); Descriptors.FieldDescriptor nestedField = descriptor.findFieldByName("f_message"); Descriptors.Descriptor nestedDescriptor = nestedField.getMessageType(); @@ -286,8 +335,14 @@ private static void assertNestedAndTimestamp(GenericRecord record) { // Optional field should fall back to proto3 default (empty string) assertEquals("", getField(record, "f_optional_string", "fOptionalString").toString()); - Object oneofValue = getField(record, "choice_str", "choiceStr"); - assertEquals("choice-string", oneofValue.toString()); + // Verify oneof with complex FloatArray type + GenericRecord floatArrayValue = (GenericRecord) getField(record, "choice_float_array", "floatArray"); + List floatValues = (List) floatArrayValue.get("values"); + List expectedFloats = List.of(1.1, 2.2, 3.3); + assertEquals(expectedFloats.size(), floatValues.size()); + for (int i = 0; i < expectedFloats.size(); i++) { + assertEquals(expectedFloats.get(i), (Double) floatValues.get(i), 1e-6); + } } private static Object getField(GenericRecord record, String... candidateNames) { diff --git a/core/src/test/java/kafka/automq/table/worker/IcebergWriterSchemaEvolutionTest.java b/core/src/test/java/kafka/automq/table/worker/IcebergWriterSchemaEvolutionTest.java index 31123a5d03..05ae9736b0 100644 --- a/core/src/test/java/kafka/automq/table/worker/IcebergWriterSchemaEvolutionTest.java +++ b/core/src/test/java/kafka/automq/table/worker/IcebergWriterSchemaEvolutionTest.java @@ -283,6 +283,69 @@ void testSchemaEvolutionDropColumn() throws IOException { assertEquals(false, table.schema().findField("_kafka_value.email").isRequired()); } + @Test + void testSchemaEvolutionSwitchOptionalField() throws IOException { + // Base schema without optional fields + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + List fieldsV1 = new ArrayList<>(); + fieldsV1.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); + fieldsV1.add(new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null)); + avroSchemaV1.setFields(fieldsV1); + + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("name", "base"); + + // Schema with optional field1 added on top of base schema + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + List fieldsV2 = new ArrayList<>(); + fieldsV2.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); + fieldsV2.add(new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null)); + Schema optionalField1 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); + fieldsV2.add(new Schema.Field("field1", optionalField1, null, null)); + avroSchemaV2.setFields(fieldsV2); + + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + avroRecordV2.put("name", "with-field1"); + avroRecordV2.put("field1", "value1"); + + // Schema removes field1 and introduces optional field2 instead + Schema avroSchemaV3 = Schema.createRecord("TestRecord", null, null, false); + List fieldsV3 = new ArrayList<>(); + fieldsV3.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); + fieldsV3.add(new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null)); + Schema optionalField2 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); + fieldsV3.add(new Schema.Field("field2", optionalField2, null, null)); + avroSchemaV3.setFields(fieldsV3); + + GenericRecord avroRecordV3 = new GenericData.Record(avroSchemaV3); + avroRecordV3.put("id", 3L); + avroRecordV3.put("name", "with-field2"); + avroRecordV3.put("field2", "value2"); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2) + .thenReturn(avroRecordV3); + + Record kafkaRecordV1 = createMockKafkaRecord(1, 0); + writer.write(0, kafkaRecordV1); + + Record kafkaRecordV2 = createMockKafkaRecord(2, 1); + writer.write(0, kafkaRecordV2); + + Record kafkaRecordV3 = createMockKafkaRecord(3, 2); + writer.write(0, kafkaRecordV3); + + Table table = catalog.loadTable(tableId); + assertNotNull(table); + assertNotNull(table.schema().findField("_kafka_value.field1")); + assertEquals(false, table.schema().findField("_kafka_value.field1").isRequired()); + assertNotNull(table.schema().findField("_kafka_value.field2")); + assertEquals(false, table.schema().findField("_kafka_value.field2").isRequired()); + } + @Test void testSchemaEvolutionReorderColumn() throws IOException { // Given: Initial Avro schema (v1)