diff --git a/core/src/main/java/kafka/automq/table/worker/IcebergTableManager.java b/core/src/main/java/kafka/automq/table/worker/IcebergTableManager.java index 765ae51ba9..5d5775d434 100644 --- a/core/src/main/java/kafka/automq/table/worker/IcebergTableManager.java +++ b/core/src/main/java/kafka/automq/table/worker/IcebergTableManager.java @@ -152,7 +152,7 @@ protected List checkSchemaChanges(Table table, Schema currentSchem */ @VisibleForTesting protected synchronized void applySchemaChange(Table table, List changes) { - LOGGER.info("Applying schema changes to table {}, changes {}", tableId, changes); + LOGGER.info("Applying schema changes to table {}, changes {}", tableId, changes.stream().map(c -> c.getType() + ":" + c.getColumnFullName()).toList()); Tasks.range(1) .retry(2) .run(notUsed -> applyChanges(table, changes)); @@ -221,16 +221,18 @@ private void collectRemovedField(Types.NestedField tableField, String parentName // if field doesn't exist in current schema and it's not a struct, mark it as optional (soft removal) if (currentField == null && !tableField.isOptional()) { changes.add(new SchemaChange(SchemaChange.ChangeType.MAKE_OPTIONAL, fieldName, - tableField.type().asPrimitiveType(), parentName)); + null, parentName)); return; } // if it is a nested field, recursively process subfields if (tableField.type().isStructType()) { - List tableSubFields = tableField.type().asStructType().fields(); - - for (Types.NestedField tableSubField : tableSubFields) { - collectRemovedField(tableSubField, fullFieldName, currentSchema, changes); - } + collectRemovedStructFields(tableField.type().asStructType().fields(), fullFieldName, currentSchema, changes); + } else if (isStructList(tableField.type())) { + collectRemovedStructFields(tableField.type().asListType().elementType().asStructType().fields(), + fullFieldName + ".element", currentSchema, changes); + } else if (isStructMap(tableField.type())) { + collectRemovedStructFields(tableField.type().asMapType().valueType().asStructType().fields(), + fullFieldName + ".value", currentSchema, changes); } } @@ -241,38 +243,59 @@ private void collectFieldChanges(Types.NestedField currentField, String parentNa Types.NestedField tableField = tableSchema.findField(fullFieldName); if (tableField == null) { - // if it is a nested field, recursively process subfields - if (currentField.type().isStructType()) { - List currentSubFields = currentField.type().asStructType().fields(); - - for (Types.NestedField currentSubField : currentSubFields) { - collectFieldChanges(currentSubField, fullFieldName, tableSchema, changes); + changes.add(new SchemaChange(SchemaChange.ChangeType.ADD_COLUMN, fieldName, + currentField.type(), parentName)); + return; + } else { + Type currentType = currentField.type(); + Type tableType = tableField.type(); + if (currentType.isStructType() && tableType.isStructType()) { + collectStructFieldChanges(currentType.asStructType().fields(), fullFieldName, tableSchema, changes); + collectOptionalFieldChanges(currentField, parentName, changes, tableField, fieldName); + } else if (isStructList(currentType) && isStructList(tableType)) { + collectStructFieldChanges(currentType.asListType().elementType().asStructType().fields(), + fullFieldName + ".element", tableSchema, changes); + } else if (isStructMap(currentType) && isStructMap(tableType)) { + collectStructFieldChanges(currentType.asMapType().valueType().asStructType().fields(), + fullFieldName + ".value", tableSchema, changes); + } else if (!currentType.isStructType() && !tableType.isStructType()) { + collectOptionalFieldChanges(currentField, parentName, changes, tableField, fieldName); + + if (!tableType.equals(currentType) && canPromoteType(tableType, currentType)) { + changes.add(new SchemaChange(SchemaChange.ChangeType.PROMOTE_TYPE, fieldName, currentType, parentName)); } - } else { - changes.add(new SchemaChange(SchemaChange.ChangeType.ADD_COLUMN, fieldName, currentField.type(), parentName)); } - } else { - // if it is a nested field, recursively process subfields - if (currentField.type().isStructType() && tableField.type().isStructType()) { - List currentSubFields = currentField.type().asStructType().fields(); + } + } - for (Types.NestedField currentSubField : currentSubFields) { - collectFieldChanges(currentSubField, fullFieldName, tableSchema, changes); - } - } else if (!currentField.type().isStructType() && !tableField.type().isStructType()) { - // process optional fields - if (!tableField.isOptional() && currentField.isOptional()) { - changes.add(new SchemaChange(SchemaChange.ChangeType.MAKE_OPTIONAL, fieldName, null, parentName)); - } + private static void collectOptionalFieldChanges(Types.NestedField currentField, String parentName, List changes, Types.NestedField tableField, String fieldName) { + if (!tableField.isOptional() && currentField.isOptional()) { + changes.add(new SchemaChange(SchemaChange.ChangeType.MAKE_OPTIONAL, fieldName, null, parentName)); + } + } - // promote type if needed - if (!tableField.type().equals(currentField.type()) && canPromoteType(tableField.type(), currentField.type())) { - changes.add(new SchemaChange(SchemaChange.ChangeType.PROMOTE_TYPE, fieldName, currentField.type(), parentName)); - } - } + private void collectStructFieldChanges(List currentSubFields, String parentFullName, + Schema tableSchema, List changes) { + for (Types.NestedField currentSubField : currentSubFields) { + collectFieldChanges(currentSubField, parentFullName, tableSchema, changes); + } + } + + private void collectRemovedStructFields(List tableSubFields, String parentFullName, + Schema currentSchema, List changes) { + for (Types.NestedField tableSubField : tableSubFields) { + collectRemovedField(tableSubField, parentFullName, currentSchema, changes); } } + private boolean isStructList(Type type) { + return type.typeId() == Type.TypeID.LIST && type.asListType().elementType().isStructType(); + } + + private boolean isStructMap(Type type) { + return type.typeId() == Type.TypeID.MAP && type.asMapType().valueType().isStructType(); + } + private boolean canPromoteType(Type oldType, Type newType) { if (oldType.typeId() == Type.TypeID.INTEGER && newType.typeId() == Type.TypeID.LONG) { return true; diff --git a/core/src/test/java/kafka/automq/table/worker/IcebergSchemaChangeCollectorTest.java b/core/src/test/java/kafka/automq/table/worker/IcebergSchemaChangeCollectorTest.java new file mode 100644 index 0000000000..7e63451603 --- /dev/null +++ b/core/src/test/java/kafka/automq/table/worker/IcebergSchemaChangeCollectorTest.java @@ -0,0 +1,621 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.worker; + +import com.google.common.collect.ImmutableMap; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +public class IcebergSchemaChangeCollectorTest { + private InMemoryCatalog catalog; + + @BeforeEach + public void setup() { + catalog = initializeCatalog(); + catalog.createNamespace(Namespace.of("default")); + } + + @Test + public void shouldReturnEmptyWhenSchemasMatch() { + Schema schema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); + + List changes = collectChanges(schema, schema); + assertTrue(changes.isEmpty()); + } + + @Test + public void shouldDetectTopLevelAddition() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); + + IcebergTableManager.SchemaChange change = assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, "name", null); + assertEquals(Types.StringType.get(), change.getNewType()); + } + + @Test + public void shouldDetectTopLevelOptional() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + Schema current = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get())); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "id", null); + } + + @Test + public void shouldDetectTopLevelPromotion() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.LongType.get())); + + IcebergTableManager.SchemaChange change = assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.PROMOTE_TYPE, "id", null); + assertEquals(Types.LongType.get(), change.getNewType()); + } + + @Test + public void shouldDetectNestedAddition() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "name", Types.StringType.get())))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "name", Types.StringType.get()), + Types.NestedField.optional(4, "email", Types.StringType.get())))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, "email", "user"); + } + + @Test + public void shouldDetectListElementStructAddition() { + Schema initial = new Schema( + Types.NestedField.optional(1, "addresses", Types.ListType.ofOptional(2, + Types.StructType.of(Types.NestedField.optional(3, "street", Types.StringType.get()))))); + Schema current = new Schema( + Types.NestedField.optional(1, "addresses", Types.ListType.ofOptional(2, + Types.StructType.of( + Types.NestedField.optional(3, "street", Types.StringType.get()), + Types.NestedField.optional(4, "zip", Types.IntegerType.get()))))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, "zip", "addresses.element"); + } + + @Test + public void shouldPromoteListElementStructFieldType() { + Schema initial = new Schema( + Types.NestedField.optional(1, "addresses", Types.ListType.ofOptional(2, + Types.StructType.of(Types.NestedField.optional(3, "zip", Types.IntegerType.get()))))); + Schema current = new Schema( + Types.NestedField.optional(1, "addresses", Types.ListType.ofOptional(2, + Types.StructType.of(Types.NestedField.optional(3, "zip", Types.LongType.get()))))); + + IcebergTableManager.SchemaChange change = assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.PROMOTE_TYPE, "zip", "addresses.element"); + assertEquals(Types.LongType.get(), change.getNewType()); + } + + @Test + public void shouldPromoteMapValueStructFieldType() { + Schema initial = new Schema( + Types.NestedField.optional(1, "attributes", Types.MapType.ofOptional(2, 3, + Types.StringType.get(), + Types.StructType.of(Types.NestedField.optional(4, "zip", Types.IntegerType.get()))))); + Schema current = new Schema( + Types.NestedField.optional(1, "attributes", Types.MapType.ofOptional(2, 3, + Types.StringType.get(), + Types.StructType.of(Types.NestedField.optional(4, "zip", Types.LongType.get()))))); + + IcebergTableManager.SchemaChange change = assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.PROMOTE_TYPE, "zip", "attributes.value"); + assertEquals(Types.LongType.get(), change.getNewType()); + } + + @Test + public void shouldMakeListElementStructFieldOptional() { + Schema initial = new Schema( + Types.NestedField.optional(1, "addresses", Types.ListType.ofOptional(2, + Types.StructType.of(Types.NestedField.required(3, "zip", Types.IntegerType.get()))))); + Schema current = new Schema( + Types.NestedField.optional(1, "addresses", Types.ListType.ofOptional(2, + Types.StructType.of(Types.NestedField.optional(3, "zip", Types.IntegerType.get()))))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "zip", "addresses.element"); + } + + @Test + public void shouldSoftRemoveMapValueStructField() { + Schema initial = new Schema( + Types.NestedField.optional(1, "attributes", Types.MapType.ofOptional(2, 3, + Types.StringType.get(), + Types.StructType.of(Types.NestedField.required(4, "zip", Types.IntegerType.get()))))); + Schema current = new Schema( + Types.NestedField.optional(1, "attributes", Types.MapType.ofOptional(2, 3, + Types.StringType.get(), + Types.StructType.of()))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "zip", "attributes.value"); + } + + @Test + public void shouldSkipDuplicateListElementChanges() { + Schema schema = new Schema( + Types.NestedField.optional(1, "addresses", Types.ListType.ofOptional(2, + Types.StructType.of(Types.NestedField.optional(3, "zip", Types.LongType.get()))))); + + List changes = List.of( + new IcebergTableManager.SchemaChange(IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "zip", null, "addresses.element"), + new IcebergTableManager.SchemaChange(IcebergTableManager.SchemaChange.ChangeType.PROMOTE_TYPE, "zip", Types.LongType.get(), "addresses.element")); + + IcebergTableManager manager = new IcebergTableManager(initializeCatalog(), TableIdentifier.of("default", "dummy"), mock(WorkerConfig.class)); + Table table = manager.getTableOrCreate(schema); + manager.applySchemaChange(table, changes); + } + + @Test + public void ignoresElementTypeReplacementAsIncompatible() { + Schema initial = new Schema( + Types.NestedField.optional(1, "addresses", Types.ListType.ofOptional(2, + Types.StructType.of(Types.NestedField.optional(3, "zip", Types.IntegerType.get()))))); + Schema current = new Schema( + Types.NestedField.optional(1, "addresses", Types.ListType.ofOptional(2, Types.IntegerType.get()))); + + List changes = collectChanges(initial, current); + assertTrue(changes.isEmpty(), "Incompatible list element type change should be ignored"); + } + + @Test + public void ignoresPrimitiveListElementPromotion() { + Schema initial = new Schema( + Types.NestedField.optional(1, "nums", Types.ListType.ofOptional(2, Types.IntegerType.get()))); + Schema current = new Schema( + Types.NestedField.optional(1, "nums", Types.ListType.ofOptional(2, Types.LongType.get()))); + + List changes = collectChanges(initial, current); + assertTrue(changes.isEmpty(), "list -> list promotion is unsupported and should be ignored"); + } + + @Test + public void shouldDetectMapValueStructAddition() { + Schema initial = new Schema( + Types.NestedField.optional(1, "attributes", Types.MapType.ofOptional(2, 3, + Types.StringType.get(), + Types.StructType.of(Types.NestedField.optional(4, "city", Types.StringType.get()))))); + Schema current = new Schema( + Types.NestedField.optional(1, "attributes", Types.MapType.ofOptional(2, 3, + Types.StringType.get(), + Types.StructType.of( + Types.NestedField.optional(4, "city", Types.StringType.get()), + Types.NestedField.optional(5, "country", Types.StringType.get()))))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, "country", "attributes.value"); + } + + @Test + public void shouldDetectDeepNestedAddition() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "profile", Types.StructType.of( + Types.NestedField.required(4, "name", Types.StringType.get())))))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "profile", Types.StructType.of( + Types.NestedField.required(4, "name", Types.StringType.get()), + Types.NestedField.optional(5, "age", Types.IntegerType.get())))))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, "age", "user.profile"); + } + + @Test + public void shouldDetectNestedFieldMadeOptional() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "profile", Types.StructType.of( + Types.NestedField.required(4, "name", Types.StringType.get()), + Types.NestedField.required(5, "age", Types.IntegerType.get())))))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "profile", Types.StructType.of( + Types.NestedField.required(4, "name", Types.StringType.get()), + Types.NestedField.optional(5, "age", Types.IntegerType.get())))))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "age", "user.profile"); + } + + @Test + public void shouldDetectNestedPromotion() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "profile", Types.StructType.of( + Types.NestedField.required(4, "age", Types.IntegerType.get())))))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "profile", Types.StructType.of( + Types.NestedField.required(4, "age", Types.LongType.get())))))); + + IcebergTableManager.SchemaChange change = assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.PROMOTE_TYPE, "age", "user.profile"); + assertEquals(Types.LongType.get(), change.getNewType()); + } + + @Test + public void shouldSoftRemoveTopLevelField() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "email", Types.StringType.get())); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "email", null); + } + + @Test + public void shouldSoftRemoveNonPrimitiveField() { + Schema initial = new Schema( + Types.NestedField.required(1, "ids", Types.ListType.ofRequired(2, Types.LongType.get()))); + Schema current = new Schema(); + + IcebergTableManager.SchemaChange change = assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "ids", null); + // ensure newType is null for non-primitive types so runtime won't call asPrimitiveType + assertEquals(null, change.getNewType()); + } + + @Test + public void shouldSoftRemoveNestedField() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "profile", Types.StructType.of( + Types.NestedField.required(4, "name", Types.StringType.get()), + Types.NestedField.required(5, "age", Types.IntegerType.get())))))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "profile", Types.StructType.of( + Types.NestedField.required(4, "name", Types.StringType.get())))))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "age", "user.profile"); + } + + @Test + public void shouldReportMixedChanges() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + Types.NestedField.optional(3, "score", Types.FloatType.get()), + Types.NestedField.optional(4, "old_field", Types.StringType.get())); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get()), + Types.NestedField.optional(3, "score", Types.DoubleType.get()), + Types.NestedField.optional(5, "new_field", Types.StringType.get())); + + List changes = collectChanges(initial, current); + assertEquals(4, changes.size()); + assertTrue(changes.stream().anyMatch(c -> c.getType() == IcebergTableManager.SchemaChange.ChangeType.PROMOTE_TYPE + && c.getColumnName().equals("id"))); + assertTrue(changes.stream().anyMatch(c -> c.getType() == IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL + && c.getColumnName().equals("name"))); + assertTrue(changes.stream().anyMatch(c -> c.getType() == IcebergTableManager.SchemaChange.ChangeType.PROMOTE_TYPE + && c.getColumnName().equals("score"))); + assertTrue(changes.stream().anyMatch(c -> c.getType() == IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN + && c.getColumnName().equals("new_field"))); + } + + + @Test + public void shouldDetectOptionalListFieldAddition() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "tags", Types.ListType.ofOptional(3, Types.StringType.get()))); + + IcebergTableManager.SchemaChange change = assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, "tags", null); + assertEquals(Types.ListType.ofOptional(3, Types.StringType.get()), change.getNewType()); + } + + @Test + public void shouldDetectOptionalMapFieldAddition() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "metadata", Types.MapType.ofOptional(3, 4, + Types.StringType.get(), Types.StringType.get()))); + + IcebergTableManager.SchemaChange change = assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, "metadata", null); + assertEquals(Types.MapType.ofOptional(3, 4, Types.StringType.get(), Types.StringType.get()), + change.getNewType()); + } + + @Test + public void shouldDetectRequiredListFieldAddition() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "scores", Types.ListType.ofRequired(3, Types.IntegerType.get()))); + + IcebergTableManager.SchemaChange change = assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, "scores", null); + assertEquals(Types.ListType.ofRequired(3, Types.IntegerType.get()), change.getNewType()); + } + + @Test + public void shouldDetectRequiredMapFieldAddition() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "attributes", Types.MapType.ofRequired(3, 4, + Types.StringType.get(), Types.IntegerType.get()))); + + IcebergTableManager.SchemaChange change = assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, "attributes", null); + assertEquals(Types.MapType.ofRequired(3, 4, Types.StringType.get(), Types.IntegerType.get()), + change.getNewType()); + } + + @Test + public void shouldSoftRemoveRequiredListField() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "tags", Types.ListType.ofOptional(3, Types.StringType.get()))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + + IcebergTableManager.SchemaChange change = assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "tags", null); + // newType should be null for non-primitive types + assertEquals(null, change.getNewType()); + } + + @Test + public void shouldSoftRemoveRequiredMapField() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "metadata", Types.MapType.ofOptional(3, 4, + Types.StringType.get(), Types.StringType.get()))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + + IcebergTableManager.SchemaChange change = assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "metadata", null); + assertEquals(null, change.getNewType()); + } + + @Test + public void shouldSoftRemoveOptionalListField() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "tags", Types.ListType.ofOptional(3, Types.StringType.get()))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + + // Optional field removal should still trigger MAKE_OPTIONAL to ensure idempotency + List changes = collectChanges(initial, current); + assertTrue(changes.isEmpty() || changes.stream().allMatch(c -> + c.getType() == IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL && + c.getColumnName().equals("tags"))); + } + + @Test + public void shouldSoftRemoveOptionalMapField() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "metadata", Types.MapType.ofOptional(3, 4, + Types.StringType.get(), Types.StringType.get()))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + + List changes = collectChanges(initial, current); + assertTrue(changes.isEmpty() || changes.stream().allMatch(c -> + c.getType() == IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL && + c.getColumnName().equals("metadata"))); + } + + @Test + public void shouldMakeListFieldOptional() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "tags", Types.ListType.ofOptional(3, Types.StringType.get()))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "tags", Types.ListType.ofOptional(3, Types.StringType.get()))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "tags", null); + } + + @Test + public void shouldMakeMapFieldOptional() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "metadata", Types.MapType.ofOptional(3, 4, + Types.StringType.get(), Types.StringType.get()))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "metadata", Types.MapType.ofOptional(3, 4, + Types.StringType.get(), Types.StringType.get()))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "metadata", null); + } + + @Test + public void shouldDetectNestedListFieldAddition() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "name", Types.StringType.get())))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "name", Types.StringType.get()), + Types.NestedField.optional(4, "hobbies", Types.ListType.ofOptional(5, Types.StringType.get()))))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, "hobbies", "user"); + } + + @Test + public void shouldSoftRemoveNestedListField() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "name", Types.StringType.get()), + Types.NestedField.required(4, "hobbies", Types.ListType.ofOptional(5, Types.StringType.get()))))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "name", Types.StringType.get())))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "hobbies", "user"); + } + + @Test + public void shouldMakeNestedMapFieldOptional() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "name", Types.StringType.get()), + Types.NestedField.required(4, "preferences", Types.MapType.ofOptional(5, 6, + Types.StringType.get(), Types.StringType.get()))))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "user", Types.StructType.of( + Types.NestedField.required(3, "name", Types.StringType.get()), + Types.NestedField.optional(4, "preferences", Types.MapType.ofOptional(5, 6, + Types.StringType.get(), Types.StringType.get()))))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "preferences", "user"); + } + + + @Test + public void shouldMakeStructFieldOptional() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "address", Types.StructType.of( + Types.NestedField.required(3, "street", Types.StringType.get()), + Types.NestedField.required(4, "city", Types.StringType.get())))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "address", Types.StructType.of( + Types.NestedField.required(3, "street", Types.StringType.get()), + Types.NestedField.required(4, "city", Types.StringType.get())))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "address", null); + } + + @Test + public void shouldMakeListNestedStructFieldOptional() { + Schema initial = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "contacts", Types.ListType.ofRequired(3, + Types.StructType.of( + Types.NestedField.required(4, "type", Types.StringType.get()), + Types.NestedField.required(5, "detail", Types.StringType.get()))))); + Schema current = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "contacts", Types.ListType.ofRequired(3, + Types.StructType.of( + Types.NestedField.optional(4, "type", Types.StringType.get()), + Types.NestedField.required(5, "detail", Types.StringType.get()))))); + + assertSingleChange( + collectChanges(initial, current), IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, "type", "contacts.element"); + } + + private List collectChanges(Schema tableSchema, Schema currentSchema) { + TableIdentifier tableId = TableIdentifier.of("default", generateRandomTableName()); + IcebergTableManager tableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + Table table = tableManager.getTableOrCreate(tableSchema); + return tableManager.collectSchemaChanges(currentSchema, table); + } + + private IcebergTableManager.SchemaChange assertSingleChange(List changes, + IcebergTableManager.SchemaChange.ChangeType type, + String columnName, + String parentName) { + assertEquals(1, changes.size(), "Expected exactly one schema change"); + IcebergTableManager.SchemaChange change = changes.get(0); + assertEquals(type, change.getType()); + assertEquals(columnName, change.getColumnName()); + assertEquals(parentName, change.getParentName()); + return change; + } + + private String generateRandomTableName() { + int randomNum = ThreadLocalRandom.current().nextInt(1000, 10000); + return "schema_table_" + randomNum; + } + + private InMemoryCatalog initializeCatalog() { + InMemoryCatalog inMemoryCatalog = new InMemoryCatalog(); + inMemoryCatalog.initialize("test", ImmutableMap.of()); + return inMemoryCatalog; + } +} diff --git a/core/src/test/java/kafka/automq/table/worker/IcebergTableManagerTest.java b/core/src/test/java/kafka/automq/table/worker/IcebergTableManagerTest.java index d0428d70f3..f52a5b082c 100644 --- a/core/src/test/java/kafka/automq/table/worker/IcebergTableManagerTest.java +++ b/core/src/test/java/kafka/automq/table/worker/IcebergTableManagerTest.java @@ -38,23 +38,28 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @Tag("S3Unit") public class IcebergTableManagerTest { private InMemoryCatalog catalog; - private IcebergTableManager icebergTableManager; @BeforeEach public void setup() { @@ -62,241 +67,250 @@ public void setup() { catalog.createNamespace(Namespace.of("default")); } - private String generateRandomTableName() { - int randomNum = ThreadLocalRandom.current().nextInt(1000, 10000); - return "my_table_" + randomNum; + @Test + public void shouldCreateTableOnceAndReuseInstance() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); + + Schema schema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + + Table table = manager.getTableOrCreate(schema); + assertNotNull(table); + assertEquals(table, manager.getTableOrCreate(schema)); } @Test - public void testTableCreationAndLoad() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + public void createsMissingNamespaceBeforeCreatingTable() { + String namespace = "ns_" + ThreadLocalRandom.current().nextInt(1000, 10000); + TableIdentifier tableId = TableIdentifier.of(namespace, "table_" + ThreadLocalRandom.current().nextInt(1000, 10000)); + IcebergTableManager manager = newManager(tableId); Schema schema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "name", Types.StringType.get())); - - Table table = icebergTableManager.getTableOrCreate(schema); - assertNotNull(table, "Table should not be null"); + Types.NestedField.required(1, "id", Types.IntegerType.get())); - Table loadedTable = icebergTableManager.getTableOrCreate(schema); - assertEquals(table, loadedTable, "Loaded table should be the same as the created table"); + Table table = manager.getTableOrCreate(schema); + assertNotNull(table); + // Namespace should now exist and table should be loadable + assertNotNull(catalog.loadTable(tableId)); } @Test - public void testTableCreationWithDotInName() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + public void supportsFieldNamesContainingDots() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); Schema schema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "name.test", Types.StringType.get())); - Table table = icebergTableManager.getTableOrCreate(schema); - assertNotNull(table, "Table should not be null"); - - Table loadedTable = icebergTableManager.getTableOrCreate(schema); - assertEquals(table, loadedTable, "Loaded table should be the same as the created table"); - - Types.NestedField field = loadedTable.schema().findField("name.test"); - assertNotNull(field, "Field 'name.test' should exist in the table schema"); - assertEquals(field.name(), "name.test", "Field name should be 'name.test'"); + Table table = manager.getTableOrCreate(schema); + Types.NestedField field = table.schema().findField("name.test"); + assertNotNull(field); + assertEquals("name.test", field.name()); } @Test - public void testCheckAndApplySchemaChanges() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); - - Schema initialSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "name", Types.StringType.get())); - Table table = icebergTableManager.getTableOrCreate(initialSchema); + public void addsPrimitiveColumn() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); - Schema updatedSchema = new Schema( - Types.NestedField.required(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "name", Types.StringType.get()), - Types.NestedField.optional(3, "email", Types.StringType.get())); + Table table = manager.getTableOrCreate(new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()))); - Record record = GenericRecord.create(updatedSchema); - List schemaChanges = icebergTableManager.checkSchemaChanges(table, record.struct().asSchema()); - assertFalse(schemaChanges.isEmpty(), "Schema changes should be applied"); - icebergTableManager.applySchemaChange(table, schemaChanges); + Table updated = applyChanges(manager, table, new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "email", Types.StringType.get()))); - // Reload table and verify the schema changes - Table updatedTable = catalog.loadTable(tableId); - List columns = updatedTable.schema().columns(); - assertEquals(3, columns.size(), "Table schema should have three columns"); - assertEquals(Types.LongType.get(), columns.get(0).type(), "Column 'id' should be of type Long"); + assertNotNull(updated.schema().findField("email")); } @Test - public void testAddNewColumn() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); - - Schema initialSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get())); - Table table = icebergTableManager.getTableOrCreate(initialSchema); + public void promotesPrimitiveColumnType() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); - Schema updatedSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "name", Types.StringType.get()), - Types.NestedField.optional(3, "email", Types.StringType.get())); + Table table = manager.getTableOrCreate(new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()))); - Record record = GenericRecord.create(updatedSchema); - List schemaChanges = icebergTableManager.checkSchemaChanges(table, record.struct().asSchema()); - assertFalse(schemaChanges.isEmpty(), "New columns should be added"); - icebergTableManager.applySchemaChange(table, schemaChanges); + Table updated = applyChanges(manager, table, new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()))); - // Reload table and verify the new columns - Table updatedTable = catalog.loadTable(tableId); - List columns = updatedTable.schema().columns(); - assertEquals(3, columns.size(), "Table schema should have three columns"); - assertEquals(Types.StringType.get(), columns.get(2).type(), "Column 'email' should be of type String"); + assertEquals(Types.LongType.get(), updated.schema().findField("id").type()); } @Test - public void testMakeColumnOptional() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + public void makesColumnOptional() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); - Schema initialSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get())); - Table table = icebergTableManager.getTableOrCreate(initialSchema); + Table table = manager.getTableOrCreate(new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()))); - Schema updatedSchema = new Schema( - Types.NestedField.optional(1, "id", Types.IntegerType.get())); - - Record record = GenericRecord.create(updatedSchema); - List schemaChanges = icebergTableManager.checkSchemaChanges(table, record.struct().asSchema()); - assertFalse(schemaChanges.isEmpty(), "Column 'id' should be made optional"); - icebergTableManager.applySchemaChange(table, schemaChanges); + Table updated = applyChanges(manager, table, new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()))); - // Reload table and verify column is optional - Table updatedTable = catalog.loadTable(tableId); - Types.NestedField column = updatedTable.schema().findField("id"); - assertTrue(column.isOptional(), "Column 'id' should be optional"); + assertTrue(updated.schema().findField("id").isOptional()); } @Test - public void testNoSchemaChanges() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + public void addsStructColumn() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); - Schema initialSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get())); - Table table = icebergTableManager.getTableOrCreate(initialSchema); + Table table = manager.getTableOrCreate(new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()))); - Record record = GenericRecord.create(initialSchema); - List schemaChanges = icebergTableManager.checkSchemaChanges(table, record.struct().asSchema()); - assertTrue(schemaChanges.isEmpty(), "No schema changes should be detected"); + Table updated = applyChanges(manager, table, new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "address", Types.StructType.of( + Types.NestedField.optional(3, "street", Types.StringType.get()), + Types.NestedField.optional(4, "zipCode", Types.IntegerType.get()))))); + + Types.NestedField address = updated.schema().findField("address"); + assertNotNull(address); + assertTrue(address.type().isStructType()); + assertNotNull(updated.schema().findField("address.street")); + assertNotNull(updated.schema().findField("address.zipCode")); } @Test - public void testNoChangesWhenV2AddsColumnAndV1RecordProvided() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + public void addsNestedFieldInsideStruct() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); - // v2 Schema with additional column - Schema v2Schema = new Schema( + Table table = manager.getTableOrCreate(new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "name", Types.StringType.get()), - Types.NestedField.optional(3, "email", Types.StringType.get())); - - // Create the table with v2 schema - Table table = icebergTableManager.getTableOrCreate(v2Schema); + Types.NestedField.optional(2, "address", Types.StructType.of( + Types.NestedField.optional(3, "street", Types.StringType.get()))))); - // v1 Record Schema without the new column - Schema v1Schema = new Schema( + Table updated = applyChanges(manager, table, new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "name", Types.StringType.get())); + Types.NestedField.optional(2, "address", Types.StructType.of( + Types.NestedField.optional(3, "street", Types.StringType.get()), + Types.NestedField.optional(4, "zipCode", Types.IntegerType.get()))))); + + assertNotNull(updated.schema().findField("address.zipCode")); + } - Record v1Record = GenericRecord.create(v1Schema); - List schemaChanges = icebergTableManager.checkSchemaChanges(table, v1Record.struct().asSchema()); - assertTrue(schemaChanges.isEmpty(), "No schema changes should be applied in this case"); + @Test + public void addsFieldInsideListElementStruct() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); - // Verify the schema remains unchanged - List columns = catalog.loadTable(tableId).schema().columns(); - assertEquals(3, columns.size(), "Table schema should have three columns"); - assertEquals(Types.StringType.get(), columns.get(2).type(), "Column 'email' should exist and be of type String"); + Table table = manager.getTableOrCreate(new Schema( + Types.NestedField.optional(1, "addresses", Types.ListType.ofOptional(2, + Types.StructType.of(Types.NestedField.optional(3, "street", Types.StringType.get())))))); + + Table updated = applyChanges(manager, table, new Schema( + Types.NestedField.optional(1, "addresses", Types.ListType.ofOptional(2, + Types.StructType.of( + Types.NestedField.optional(3, "street", Types.StringType.get()), + Types.NestedField.optional(4, "zip", Types.IntegerType.get())))))); + + assertNotNull(updated.schema().findField("addresses.element.zip")); } @Test - public void testNoChangesWhenV2SetsFieldToOptionalAndV1RecordProvided() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + public void addsFieldInsideMapValueStruct() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); + + Table table = manager.getTableOrCreate(new Schema( + Types.NestedField.optional(1, "attributes", Types.MapType.ofOptional(2, 3, + Types.StringType.get(), + Types.StructType.of(Types.NestedField.optional(4, "city", Types.StringType.get())))))); + + Table updated = applyChanges(manager, table, new Schema( + Types.NestedField.optional(1, "attributes", Types.MapType.ofOptional(2, 3, + Types.StringType.get(), + Types.StructType.of( + Types.NestedField.optional(4, "city", Types.StringType.get()), + Types.NestedField.optional(5, "country", Types.StringType.get())))))); + + assertNotNull(updated.schema().findField("attributes.value.country")); + } - // v2 Schema with "id" set to optional - Schema v2Schema = new Schema( - Types.NestedField.optional(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "name", Types.StringType.get())); + @Test + public void promotesFieldInsideListElementStruct() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); - // Create the table with v2 schema - Table table = icebergTableManager.getTableOrCreate(v2Schema); + Table table = manager.getTableOrCreate(new Schema( + Types.NestedField.optional(1, "addresses", Types.ListType.ofOptional(2, + Types.StructType.of(Types.NestedField.optional(3, "zip", Types.IntegerType.get())))))); - // v1 Record Schema with "id" required - Schema v1Schema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "name", Types.StringType.get())); + Table updated = applyChanges(manager, table, new Schema( + Types.NestedField.optional(1, "addresses", Types.ListType.ofOptional(2, + Types.StructType.of(Types.NestedField.optional(3, "zip", Types.LongType.get())))))); + + assertEquals(Types.LongType.get(), updated.schema().findField("addresses.element.zip").type()); + } + + @Test + public void exposesPartitionSpecAndAllowsReset() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); + Table table = manager.getTableOrCreate(new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()))); - Record v1Record = GenericRecord.create(v1Schema); - List schemaChanges = icebergTableManager.checkSchemaChanges(table, v1Record.struct().asSchema()); - assertTrue(schemaChanges.isEmpty(), "No schema changes should be applied in this case"); + assertNotNull(manager.spec(), "Partition spec should be captured after table creation"); - // Verify the schema remains unchanged - List columns = catalog.loadTable(tableId).schema().columns(); - assertEquals(2, columns.size(), "Table schema should have two columns"); - assertTrue(columns.get(0).isOptional(), "Column 'id' should remain optional"); + manager.reset(); + Table reloaded = manager.getTableOrCreate(table.schema()); + assertEquals(table.schema().asStruct(), reloaded.schema().asStruct()); } @Test - public void testNoChangesWhenV2PromotesTypeAndV1RecordProvided() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + public void handleSchemaChangesWithFlushTriggersFlushOnlyWhenNeeded() throws Exception { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); + manager.getTableOrCreate(new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()))); - // v2 Schema with "id" type promoted to Long - Schema v2Schema = new Schema( - Types.NestedField.required(1, "id", Types.LongType.get()), + Schema newSchema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "name", Types.StringType.get())); - // Create the table with v2 schema - Table table = icebergTableManager.getTableOrCreate(v2Schema); + AtomicInteger flushCount = new AtomicInteger(); + boolean changed = manager.handleSchemaChangesWithFlush(newSchema, () -> { + flushCount.incrementAndGet(); + }); + assertTrue(changed); + assertEquals(1, flushCount.get()); + assertNotNull(catalog.loadTable(tableId).schema().findField("name")); + + boolean noChange = manager.handleSchemaChangesWithFlush(newSchema, () -> { + flushCount.incrementAndGet(); + }); + assertFalse(noChange); + assertEquals(1, flushCount.get()); + } - // v1 Record Schema with "id" as Integer - Schema v1Schema = new Schema( + @Test + public void handleSchemaChangesWithFlushPropagatesFlushFailures() throws Exception { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); + manager.getTableOrCreate(new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()))); + + Schema newSchema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "name", Types.StringType.get())); - Record v1Record = GenericRecord.create(v1Schema); - List schemaChanges = icebergTableManager.checkSchemaChanges(table, v1Record.struct().asSchema()); - - assertTrue(schemaChanges.isEmpty(), "No schema changes should be applied in this case"); + assertThrows(IOException.class, () -> manager.handleSchemaChangesWithFlush(newSchema, () -> { + throw new IOException("flush failed"); + })); - // Verify the schema remains unchanged - List columns = catalog.loadTable(tableId).schema().columns(); - assertEquals(2, columns.size(), "Table schema should have two columns"); - assertEquals(Types.LongType.get(), columns.get(0).type(), "Column 'id' should remain of type Long"); + assertNull(catalog.loadTable(tableId).schema().findField("name")); } @Test - public void testUpdateTableOnErrorRetry() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - - Schema schema = new Schema( + public void retriesSchemaCommitOnFailure() { + TableIdentifier tableId = randomTableId(); + Schema baseSchema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "name", Types.StringType.get())); - Schema updatedSchema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "name", Types.StringType.get()), @@ -307,252 +321,173 @@ public void testUpdateTableOnErrorRetry() { UpdateSchema mockUpdateSchema = mock(UpdateSchema.class); when(mockCatalog.loadTable(eq(tableId))).thenThrow(new NoSuchTableException("Table not found")); - when(mockCatalog.createTable(eq(tableId), eq(schema), any(), any())).thenReturn(mockTable); - when(mockTable.schema()).thenReturn(schema); + when(mockCatalog.createTable(eq(tableId), eq(baseSchema), any(), any())).thenReturn(mockTable); + when(mockTable.schema()).thenReturn(baseSchema); when(mockTable.updateSchema()).thenReturn(mockUpdateSchema); - - // Ensure that addColumn method also returns the mock for fluent API chaining when(mockUpdateSchema.addColumn("email", Types.StringType.get())).thenReturn(mockUpdateSchema); - // Configure commit to throw exception on first call and succeed on second call doAnswer(new Answer() { private int count = 0; @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - if (count < 2) { + public Void answer(InvocationOnMock invocation) { + if (count < 1) { count++; throw new RuntimeException("Commit Error"); } - return null; // No exception on the second call + return null; } }).when(mockUpdateSchema).commit(); - IcebergTableManager icebergTableManager = new IcebergTableManager(mockCatalog, tableId, mock(WorkerConfig.class)); - Table table = icebergTableManager.getTableOrCreate(schema); + IcebergTableManager manager = new IcebergTableManager(mockCatalog, tableId, mock(WorkerConfig.class)); + Table table = manager.getTableOrCreate(baseSchema); Record record = GenericRecord.create(updatedSchema); - List schemaChanges = icebergTableManager.checkSchemaChanges(table, record.struct().asSchema()); - assertFalse(schemaChanges.isEmpty(), "Schema changes should be applied after retrying on error"); - icebergTableManager.applySchemaChange(table, schemaChanges); + List schemaChanges = manager.checkSchemaChanges(table, record.struct().asSchema()); + assertFalse(schemaChanges.isEmpty()); + manager.applySchemaChange(table, schemaChanges); } @Test - public void testCollectSchemaChanges_AddNewColumn() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + public void ignoresOlderRecordMissingColumn() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); - // Create table with initial schema - Schema initialSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "name", Types.StringType.get())); - Table table = icebergTableManager.getTableOrCreate(initialSchema); - - // Current schema with new column - Schema currentSchema = new Schema( + Table table = manager.getTableOrCreate(new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "name", Types.StringType.get()), - Types.NestedField.optional(3, "email", Types.StringType.get())); + Types.NestedField.optional(3, "email", Types.StringType.get()))); - List changes = icebergTableManager.collectSchemaChanges(currentSchema, table); + Schema olderSchema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); - assertEquals(1, changes.size()); - assertEquals(IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, changes.get(0).getType()); - assertEquals("email", changes.get(0).getColumnName()); - assertEquals(null, changes.get(0).getParentName()); + Record record = GenericRecord.create(olderSchema); + List schemaChanges = manager.checkSchemaChanges(table, record.struct().asSchema()); + assertTrue(schemaChanges.isEmpty()); + assertNotNull(catalog.loadTable(tableId).schema().findField("email")); } @Test - public void testCollectSchemaChanges_MakeOptional() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + public void ignoresOlderRecordWhenFieldAlreadyOptional() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); - // Create table with required field - Schema initialSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.required(2, "name", Types.StringType.get())); - Table table = icebergTableManager.getTableOrCreate(initialSchema); + Table table = manager.getTableOrCreate(new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get()))); - // Current schema with optional field - Schema currentSchema = new Schema( + Schema olderSchema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "name", Types.StringType.get())); - List changes = icebergTableManager.collectSchemaChanges(currentSchema, table); - - assertEquals(1, changes.size()); - assertEquals(IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, changes.get(0).getType()); - assertEquals("name", changes.get(0).getColumnName()); + Record record = GenericRecord.create(olderSchema); + List schemaChanges = manager.checkSchemaChanges(table, record.struct().asSchema()); + assertTrue(schemaChanges.isEmpty()); + assertTrue(catalog.loadTable(tableId).schema().findField("id").isOptional()); } @Test - public void testCollectSchemaChanges_PromoteType() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); - - // Create table with integer type - Schema initialSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "count", Types.IntegerType.get())); - Table table = icebergTableManager.getTableOrCreate(initialSchema); + public void ignoresOlderRecordWhenTypeAlreadyPromoted() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); - // Current schema with promoted types - Schema currentSchema = new Schema( + Table table = manager.getTableOrCreate(new Schema( Types.NestedField.required(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "count", Types.LongType.get())); + Types.NestedField.optional(2, "name", Types.StringType.get()))); - List changes = icebergTableManager.collectSchemaChanges(currentSchema, table); + Schema olderSchema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); - assertEquals(2, changes.size()); - assertTrue(changes.stream().allMatch(c -> c.getType() == IcebergTableManager.SchemaChange.ChangeType.PROMOTE_TYPE)); - assertTrue(changes.stream().anyMatch(c -> c.getColumnName().equals("id"))); - assertTrue(changes.stream().anyMatch(c -> c.getColumnName().equals("count"))); + Record record = GenericRecord.create(olderSchema); + List schemaChanges = manager.checkSchemaChanges(table, record.struct().asSchema()); + assertTrue(schemaChanges.isEmpty()); + assertEquals(Types.LongType.get(), catalog.loadTable(tableId).schema().findField("id").type()); } @Test - public void testCollectSchemaChanges_NestedFields() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + public void doesNothingWhenSchemasMatch() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); - // Create table with nested struct - Schema initialSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "user", Types.StructType.of( - Types.NestedField.required(3, "name", Types.StringType.get())))); - Table table = icebergTableManager.getTableOrCreate(initialSchema); - - // Current schema with additional nested field - Schema currentSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "user", Types.StructType.of( - Types.NestedField.required(3, "name", Types.StringType.get()), - Types.NestedField.optional(4, "email", Types.StringType.get())))); - - List changes = icebergTableManager.collectSchemaChanges(currentSchema, table); + Schema schema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get())); + Table table = manager.getTableOrCreate(schema); - assertEquals(1, changes.size()); - assertEquals(IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, changes.get(0).getType()); - assertEquals("email", changes.get(0).getColumnName()); - assertEquals("user", changes.get(0).getParentName()); + Record record = GenericRecord.create(schema); + List schemaChanges = manager.checkSchemaChanges(table, record.struct().asSchema()); + assertTrue(schemaChanges.isEmpty()); } @Test - public void testCollectSchemaChanges_RemovedField() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + public void skipsDuplicateNestedAdditions() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); - // Create table with more fields - Schema initialSchema = new Schema( + Table table = manager.getTableOrCreate(new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "name", Types.StringType.get()), - Types.NestedField.required(3, "email", Types.StringType.get())); - Table table = icebergTableManager.getTableOrCreate(initialSchema); + Types.NestedField.optional(2, "address", Types.StructType.of( + Types.NestedField.optional(3, "street", Types.StringType.get()))))); - // Current schema with removed field - Schema currentSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "name", Types.StringType.get())); + List schemaChanges = List.of( + new IcebergTableManager.SchemaChange(IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, + "street", Types.StringType.get(), "address"), + new IcebergTableManager.SchemaChange(IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, + "zipCode", Types.IntegerType.get(), "address")); - List changes = icebergTableManager.collectSchemaChanges(currentSchema, table); + manager.applySchemaChange(table, schemaChanges); - assertEquals(1, changes.size()); - assertEquals(IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, changes.get(0).getType()); - assertEquals("email", changes.get(0).getColumnName()); + Table updatedTable = catalog.loadTable(tableId); + Types.NestedField addressField = updatedTable.schema().findField("address"); + assertNotNull(addressField); + List nestedFields = addressField.type().asStructType().fields(); + assertEquals(2, nestedFields.size()); + assertNotNull(updatedTable.schema().findField("address.street")); + assertNotNull(updatedTable.schema().findField("address.zipCode")); } @Test - public void testCollectSchemaChanges_NoChanges() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); - - // Create table with schema - Schema schema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "name", Types.StringType.get())); - Table table = icebergTableManager.getTableOrCreate(schema); + public void skipsMakeOptionalAndPromoteWhenAlreadyApplied() { + TableIdentifier tableId = randomTableId(); + IcebergTableManager manager = newManager(tableId); - List changes = icebergTableManager.collectSchemaChanges(schema, table); + Schema tableSchema = new Schema( + Types.NestedField.optional(1, "name", Types.StringType.get()), + Types.NestedField.required(2, "id", Types.LongType.get())); - assertTrue(changes.isEmpty()); - } + Table mockTable = mock(Table.class); + UpdateSchema mockUpdateSchema = mock(UpdateSchema.class); + when(mockTable.schema()).thenReturn(tableSchema); + when(mockTable.updateSchema()).thenReturn(mockUpdateSchema); - @Test - public void testCollectSchemaChanges_ComplexNestedStructure() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + List schemaChanges = List.of( + new IcebergTableManager.SchemaChange(IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL, + "name", null, null), + new IcebergTableManager.SchemaChange(IcebergTableManager.SchemaChange.ChangeType.PROMOTE_TYPE, + "id", Types.LongType.get(), null)); - // Create table with deeply nested structure - Schema initialSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "user", Types.StructType.of( - Types.NestedField.required(3, "profile", Types.StructType.of( - Types.NestedField.required(4, "name", Types.StringType.get())))))); - Table table = icebergTableManager.getTableOrCreate(initialSchema); + manager.applySchemaChange(mockTable, schemaChanges); - // Current schema with additional deeply nested field - Schema currentSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "user", Types.StructType.of( - Types.NestedField.required(3, "profile", Types.StructType.of( - Types.NestedField.required(4, "name", Types.StringType.get()), - Types.NestedField.optional(5, "age", Types.IntegerType.get())))))); - - List changes = icebergTableManager.collectSchemaChanges(currentSchema, table); + verify(mockUpdateSchema, never()).makeColumnOptional("name"); + verify(mockUpdateSchema, never()).updateColumn(eq("id"), any()); + verify(mockUpdateSchema).commit(); + } - assertEquals(1, changes.size()); - assertEquals(IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN, changes.get(0).getType()); - assertEquals("age", changes.get(0).getColumnName()); - assertEquals("user.profile", changes.get(0).getParentName()); + private Table applyChanges(IcebergTableManager manager, Table table, Schema newSchema) { + Record record = GenericRecord.create(newSchema); + List schemaChanges = manager.checkSchemaChanges(table, record.struct().asSchema()); + assertFalse(schemaChanges.isEmpty(), "Expected schema changes to be detected"); + manager.applySchemaChange(table, schemaChanges); + return catalog.loadTable(manager.tableId()); } - @Test - public void testCollectSchemaChanges_MixedChanges() { - String tableName = generateRandomTableName(); - TableIdentifier tableId = TableIdentifier.of("default", tableName); - icebergTableManager = new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + private IcebergTableManager newManager(TableIdentifier tableId) { + return new IcebergTableManager(catalog, tableId, mock(WorkerConfig.class)); + } - // Create table with initial schema - Schema initialSchema = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.required(2, "name", Types.StringType.get()), - Types.NestedField.optional(3, "score", Types.FloatType.get()), - Types.NestedField.optional(4, "old_field", Types.StringType.get())); - Table table = icebergTableManager.getTableOrCreate(initialSchema); - - // Current schema with mixed changes - Schema currentSchema = new Schema( - Types.NestedField.required(1, "id", Types.LongType.get()), // promote type - Types.NestedField.optional(2, "name", Types.StringType.get()), // make optional - Types.NestedField.optional(3, "score", Types.DoubleType.get()), // promote type - Types.NestedField.optional(5, "new_field", Types.StringType.get())); // add column - // old_field is removed - - List changes = icebergTableManager.collectSchemaChanges(currentSchema, table); - - assertEquals(4, changes.size()); - - assertTrue(changes.stream().anyMatch(c -> - c.getType() == IcebergTableManager.SchemaChange.ChangeType.PROMOTE_TYPE && - c.getColumnName().equals("id"))); - - assertTrue(changes.stream().anyMatch(c -> - c.getType() == IcebergTableManager.SchemaChange.ChangeType.MAKE_OPTIONAL && - c.getColumnName().equals("name"))); - - assertTrue(changes.stream().anyMatch(c -> - c.getType() == IcebergTableManager.SchemaChange.ChangeType.PROMOTE_TYPE && - c.getColumnName().equals("score"))); - - assertTrue(changes.stream().anyMatch(c -> - c.getType() == IcebergTableManager.SchemaChange.ChangeType.ADD_COLUMN && - c.getColumnName().equals("new_field"))); + private TableIdentifier randomTableId() { + return TableIdentifier.of("default", "table_" + ThreadLocalRandom.current().nextInt(1000, 10000)); } private InMemoryCatalog initializeCatalog() { diff --git a/core/src/test/java/kafka/automq/table/worker/IcebergWriterSchemaEvolutionTest.java b/core/src/test/java/kafka/automq/table/worker/IcebergWriterSchemaEvolutionTest.java index 05ae9736b0..5361032c27 100644 --- a/core/src/test/java/kafka/automq/table/worker/IcebergWriterSchemaEvolutionTest.java +++ b/core/src/test/java/kafka/automq/table/worker/IcebergWriterSchemaEvolutionTest.java @@ -46,18 +46,14 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.List; import java.util.concurrent.ThreadLocalRandom; import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; @@ -93,348 +89,1283 @@ void setUp() { writer.setOffset(0, 0); } - private String generateRandomTableName() { - int randomNum = ThreadLocalRandom.current().nextInt(1000, 10000); - return "test_table_" + randomNum; + @Test + void testAddRequiredFieldsInNestedStruct() throws IOException { + // v1: {id, user{name}} + Schema userV1 = Schema.createRecord("User", null, null, false); + userV1.setFields(Collections.singletonList( + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null))); + + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV1, null, null))); + + GenericRecord userRecordV1 = new GenericData.Record(userV1); + userRecordV1.put("name", "alice"); + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("user", userRecordV1); + + // v2: Add required primitive field in nested struct (age) + // Add required nested struct field (profile) + // Add required list field in nested struct (hobbies) + Schema profileV2 = Schema.createRecord("Profile", null, null, false); + profileV2.setFields(Arrays.asList( + new Schema.Field("city", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("level", Schema.create(Schema.Type.INT), null, null))); + + Schema userV2 = Schema.createRecord("User", null, null, false); + Schema hobbiesSchema = Schema.createArray(Schema.create(Schema.Type.STRING)); + userV2.setFields(Arrays.asList( + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("age", Schema.create(Schema.Type.INT), null, null), + new Schema.Field("profile", profileV2, null, null), + new Schema.Field("hobbies", hobbiesSchema, null, null))); + + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV2, null, null))); + + GenericRecord profileRecordV2 = new GenericData.Record(profileV2); + profileRecordV2.put("city", "Shanghai"); + profileRecordV2.put("level", 5); + + GenericRecord userRecordV2 = new GenericData.Record(userV2); + userRecordV2.put("name", "bob"); + userRecordV2.put("age", 30); + userRecordV2.put("profile", profileRecordV2); + userRecordV2.put("hobbies", Arrays.asList("reading", "coding")); + + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + avroRecordV2.put("user", userRecordV2); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + + Table table = catalog.loadTable(tableId); + // Verify nested primitive field + assertNotNull(table.schema().findField("_kafka_value.user.age")); + // Verify nested struct field + assertNotNull(table.schema().findField("_kafka_value.user.profile")); + assertNotNull(table.schema().findField("_kafka_value.user.profile.city")); + // Verify nested list field + assertNotNull(table.schema().findField("_kafka_value.user.hobbies")); } @Test - void testSchemaEvolutionAddColumn() throws IOException { - // Given: Initial Avro schema (v1) - Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV1 = new ArrayList<>(); - fieldsV1.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV1.add(new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null)); - avroSchemaV1.setFields(fieldsV1); + void testAddRequiredFieldsInCollectionElement() throws IOException { + // v1: {id, addresses: list<{street}>} + Schema addressV1 = Schema.createRecord("Address", null, null, false); + addressV1.setFields(Collections.singletonList( + new Schema.Field("street", Schema.create(Schema.Type.STRING), null, null))); - // Create v1 record + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + Schema listV1 = Schema.createArray(addressV1); + Schema optionalListV1 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV1)); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("addresses", optionalListV1, null, null))); + + GenericRecord addressRecordV1 = new GenericData.Record(addressV1); + addressRecordV1.put("street", "Main St"); GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); avroRecordV1.put("id", 1L); - avroRecordV1.put("name", "test"); + avroRecordV1.put("addresses", Collections.singletonList(addressRecordV1)); + + // v2: Add required primitive field in list element (zipCode, floor) + // Add required nested struct field in list element (location) + Schema locationV2 = Schema.createRecord("Location", null, null, false); + locationV2.setFields(Arrays.asList( + new Schema.Field("lat", Schema.create(Schema.Type.DOUBLE), null, null), + new Schema.Field("lng", Schema.create(Schema.Type.DOUBLE), null, null))); + + Schema addressV2 = Schema.createRecord("Address", null, null, false); + addressV2.setFields(Arrays.asList( + new Schema.Field("street", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("zipCode", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("floor", Schema.create(Schema.Type.INT), null, null), + new Schema.Field("location", locationV2, null, null))); - // Given: Updated Avro schema (v2) with new email field Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV2 = new ArrayList<>(); - fieldsV2.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV2.add(new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null)); - Schema emailSchema = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); - fieldsV2.add(new Schema.Field("email", emailSchema, null, null)); - avroSchemaV2.setFields(fieldsV2); - - // Create v2 record + Schema listV2 = Schema.createArray(addressV2); + Schema optionalListV2 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV2)); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("addresses", optionalListV2, null, null))); + + GenericRecord locationRecordV2 = new GenericData.Record(locationV2); + locationRecordV2.put("lat", 39.9042); + locationRecordV2.put("lng", 116.4074); + + GenericRecord addressRecordV2 = new GenericData.Record(addressV2); + addressRecordV2.put("street", "Second St"); + addressRecordV2.put("zipCode", "100000"); + addressRecordV2.put("floor", 5); + addressRecordV2.put("location", locationRecordV2); + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); avroRecordV2.put("id", 2L); - avroRecordV2.put("name", "test2"); - avroRecordV2.put("email", "test@example.com"); + avroRecordV2.put("addresses", Collections.singletonList(addressRecordV2)); - // Mock deserializer behavior when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) .thenReturn(avroRecordV1) .thenReturn(avroRecordV2); - // Write records - Record kafkaRecordV1 = createMockKafkaRecord(1, 0); - writer.write(0, kafkaRecordV1); + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + + Table table = catalog.loadTable(tableId); + // Verify primitive fields in list element + assertNotNull(table.schema().findField("_kafka_value.addresses.element.zipCode")); + assertNotNull(table.schema().findField("_kafka_value.addresses.element.floor")); + // Verify struct field in list element + assertNotNull(table.schema().findField("_kafka_value.addresses.element.location")); + assertNotNull(table.schema().findField("_kafka_value.addresses.element.location.lat")); + } + + @Test + void testAddOptionalCollection() throws IOException { + // v1: {id, name} + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null))); + + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("name", "alice"); - Record kafkaRecordV2 = createMockKafkaRecord(2, 1); - writer.write(0, kafkaRecordV2); + // v2: {id, name, tags: list} + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + Schema listSchema = Schema.createArray(Schema.create(Schema.Type.STRING)); + Schema optionalList = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listSchema)); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("tags", optionalList, null, null))); + + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + avroRecordV2.put("name", "bob"); + avroRecordV2.put("tags", Arrays.asList("tag1", "tag2")); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); - // Verify schema evolution Table table = catalog.loadTable(tableId); - assertNotNull(table); - assertEquals(4, table.schema().columns().size()); - assertNotNull(table.schema().findField("_kafka_value.email")); + assertNotNull(table.schema().findField("_kafka_value.tags")); + assertEquals(true, table.schema().findField("_kafka_value.tags").type().isListType()); } + @Test - void testSchemaEvolutionMakeColumnOptional() throws IOException { - // Given: Initial Avro schema with required fields + void testAddRequiredCollections() throws IOException { + // v1: {id, name} Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV1 = new ArrayList<>(); - fieldsV1.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV1.add(new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null)); - avroSchemaV1.setFields(fieldsV1); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null))); - // Create v1 record GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); avroRecordV1.put("id", 1L); - avroRecordV1.put("name", "test"); + avroRecordV1.put("name", "alice"); + + // v2: Add list (tags) + // Add list (addresses) + // Add map (scores) + // Add map (locations) + Schema addressSchema = Schema.createRecord("Address", null, null, false); + addressSchema.setFields(Arrays.asList( + new Schema.Field("street", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("city", Schema.create(Schema.Type.STRING), null, null))); + + Schema locationSchema = Schema.createRecord("Location", null, null, false); + locationSchema.setFields(Arrays.asList( + new Schema.Field("lat", Schema.create(Schema.Type.DOUBLE), null, null), + new Schema.Field("lng", Schema.create(Schema.Type.DOUBLE), null, null))); - // Given: Updated schema with optional name field Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV2 = new ArrayList<>(); - fieldsV2.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - Schema optionalName = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); - fieldsV2.add(new Schema.Field("name", optionalName, null, null)); - avroSchemaV2.setFields(fieldsV2); + Schema tagsList = Schema.createArray(Schema.create(Schema.Type.STRING)); + Schema addressesList = Schema.createArray(addressSchema); + Schema scoresMap = Schema.createMap(Schema.create(Schema.Type.INT)); + Schema locationsMap = Schema.createMap(locationSchema); + + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("tags", tagsList, null, null), + new Schema.Field("addresses", addressesList, null, null), + new Schema.Field("scores", scoresMap, null, null), + new Schema.Field("locations", locationsMap, null, null))); + + GenericRecord addressRecord = new GenericData.Record(addressSchema); + addressRecord.put("street", "Main St"); + addressRecord.put("city", "Beijing"); + + GenericRecord locationRecord = new GenericData.Record(locationSchema); + locationRecord.put("lat", 39.9042); + locationRecord.put("lng", 116.4074); + + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + avroRecordV2.put("name", "bob"); + avroRecordV2.put("tags", Arrays.asList("tag1", "tag2")); + avroRecordV2.put("addresses", Collections.singletonList(addressRecord)); + java.util.Map scoresMapData = new java.util.HashMap<>(); + scoresMapData.put("math", 95); + avroRecordV2.put("scores", scoresMapData); + java.util.Map locationsMapData = new java.util.HashMap<>(); + locationsMapData.put("home", locationRecord); + avroRecordV2.put("locations", locationsMapData); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + + Table table = catalog.loadTable(tableId); + // Verify list + assertNotNull(table.schema().findField("_kafka_value.tags")); + assertEquals(true, table.schema().findField("_kafka_value.tags").type().isListType()); + // Verify list + assertNotNull(table.schema().findField("_kafka_value.addresses")); + assertNotNull(table.schema().findField("_kafka_value.addresses.element.street")); + // Verify map + assertNotNull(table.schema().findField("_kafka_value.scores")); + assertEquals(true, table.schema().findField("_kafka_value.scores").type().isMapType()); + // Verify map + assertNotNull(table.schema().findField("_kafka_value.locations")); + assertNotNull(table.schema().findField("_kafka_value.locations.value.lat")); + } + + + @Test + void testAddRequiredFieldInCollectionElement() throws IOException { + // v1: addresses: list<{street}> + Schema addressV1 = Schema.createRecord("Address", null, null, false); + addressV1.setFields(Collections.singletonList( + new Schema.Field("street", Schema.create(Schema.Type.STRING), null, null))); - // Create v2 record + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + Schema listV1 = Schema.createArray(addressV1); + Schema optionalListV1 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV1)); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("addresses", optionalListV1, null, null))); + + GenericRecord addressRecordV1 = new GenericData.Record(addressV1); + addressRecordV1.put("street", "Main St"); + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("addresses", Collections.singletonList(addressRecordV1)); + + // v2: addresses: list<{street, required city}> + Schema addressV2 = Schema.createRecord("Address", null, null, false); + addressV2.setFields(Arrays.asList( + new Schema.Field("street", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("city", Schema.create(Schema.Type.STRING), null, null))); + + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + Schema listV2 = Schema.createArray(addressV2); + Schema optionalListV2 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV2)); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("addresses", optionalListV2, null, null))); + + GenericRecord addressRecordV2 = new GenericData.Record(addressV2); + addressRecordV2.put("street", "Second St"); + addressRecordV2.put("city", "Beijing"); GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); avroRecordV2.put("id", 2L); - avroRecordV2.put("name", null); + avroRecordV2.put("addresses", Collections.singletonList(addressRecordV2)); - // Mock deserializer behavior when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) .thenReturn(avroRecordV1) .thenReturn(avroRecordV2); - // Write records - Record kafkaRecordV1 = createMockKafkaRecord(1, 0); - writer.write(0, kafkaRecordV1); + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + + Table table = catalog.loadTable(tableId); + assertNotNull(table.schema().findField("_kafka_value.addresses.element.city")); + assertEquals(Types.StringType.get(), table.schema().findField("_kafka_value.addresses.element.city").type()); + } + + @Test + void testAddRequiredFieldInMapValueStruct() throws IOException { + // v1: locations: map + Schema locationV1 = Schema.createRecord("Location", null, null, false); + locationV1.setFields(Collections.singletonList( + new Schema.Field("city", Schema.create(Schema.Type.STRING), null, null))); + + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + Schema mapV1 = Schema.createMap(locationV1); + Schema optionalMapV1 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), mapV1)); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("locations", optionalMapV1, null, null))); + + GenericRecord locationRecordV1 = new GenericData.Record(locationV1); + locationRecordV1.put("city", "Shanghai"); + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + java.util.Map mapData1 = new java.util.HashMap<>(); + mapData1.put("home", locationRecordV1); + avroRecordV1.put("locations", mapData1); + + // v2: locations: map + Schema locationV2 = Schema.createRecord("Location", null, null, false); + locationV2.setFields(Arrays.asList( + new Schema.Field("city", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("country", Schema.create(Schema.Type.STRING), null, null))); + + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + Schema mapV2 = Schema.createMap(locationV2); + Schema optionalMapV2 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), mapV2)); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("locations", optionalMapV2, null, null))); + + GenericRecord locationRecordV2 = new GenericData.Record(locationV2); + locationRecordV2.put("city", "Beijing"); + locationRecordV2.put("country", "China"); + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + java.util.Map mapData2 = new java.util.HashMap<>(); + mapData2.put("work", locationRecordV2); + avroRecordV2.put("locations", mapData2); - Record kafkaRecordV2 = createMockKafkaRecord(2, 1); - writer.write(0, kafkaRecordV2); + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); - // Verify schema evolution Table table = catalog.loadTable(tableId); - assertNotNull(table); - assertEquals(false, table.schema().findField("_kafka_value.name").isRequired()); + assertNotNull(table.schema().findField("_kafka_value.locations.value.country")); + assertEquals(Types.StringType.get(), table.schema().findField("_kafka_value.locations.value.country").type()); } + + // ========== Add Optional Field Tests ========== + @Test - void testSchemaEvolutionChangeColumnType() throws IOException { - // Given: Initial Avro schema with integer type + void testAddOptionalFieldInNestedStruct() throws IOException { + // v1: user{name} + Schema userV1 = Schema.createRecord("User", null, null, false); + userV1.setFields(Collections.singletonList( + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null))); + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV1 = new ArrayList<>(); - fieldsV1.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV1.add(new Schema.Field("count", Schema.create(Schema.Type.INT), null, null)); - avroSchemaV1.setFields(fieldsV1); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV1, null, null))); - // Create v1 record + GenericRecord userRecordV1 = new GenericData.Record(userV1); + userRecordV1.put("name", "alice"); GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); avroRecordV1.put("id", 1L); - avroRecordV1.put("count", 100); + avroRecordV1.put("user", userRecordV1); + + // v2: user{name, email} + Schema userV2 = Schema.createRecord("User", null, null, false); + Schema optionalEmail = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); + userV2.setFields(Arrays.asList( + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("email", optionalEmail, null, null))); - // Given: Updated schema with long type Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV2 = new ArrayList<>(); - fieldsV2.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV2.add(new Schema.Field("count", Schema.create(Schema.Type.LONG), null, null)); - avroSchemaV2.setFields(fieldsV2); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV2, null, null))); - // Create v2 record + GenericRecord userRecordV2 = new GenericData.Record(userV2); + userRecordV2.put("name", "bob"); + userRecordV2.put("email", "bob@example.com"); GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); avroRecordV2.put("id", 2L); - avroRecordV2.put("count", 1000L); + avroRecordV2.put("user", userRecordV2); - // Mock deserializer behavior when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) .thenReturn(avroRecordV1) .thenReturn(avroRecordV2); - // Write records - Record kafkaRecordV1 = createMockKafkaRecord(1, 0); - writer.write(0, kafkaRecordV1); + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); - Record kafkaRecordV2 = createMockKafkaRecord(2, 1); - writer.write(0, kafkaRecordV2); + Table table = catalog.loadTable(tableId); + assertNotNull(table.schema().findField("_kafka_value.user.email")); + } + + + @Test + void testDropRequiredCollection() throws IOException { + // v1: {id, tags: list} + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + Schema listSchema = Schema.createArray(Schema.create(Schema.Type.STRING)); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("tags", listSchema, null, null))); + + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("tags", Arrays.asList("tag1", "tag2")); + + // v2: {id} - dropped tags + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV2.setFields(Collections.singletonList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null))); + + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + + Table table = catalog.loadTable(tableId); + assertNotNull(table.schema().findField("_kafka_value.tags")); + assertEquals(false, table.schema().findField("_kafka_value.tags").isRequired()); + } + + @Test + void testDropRequiredFieldInNestedStruct() throws IOException { + // v1: user{name, email, age} + Schema userV1 = Schema.createRecord("User", null, null, false); + userV1.setFields(Arrays.asList( + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("email", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("age", Schema.create(Schema.Type.INT), null, null))); + + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV1, null, null))); + + GenericRecord userRecordV1 = new GenericData.Record(userV1); + userRecordV1.put("name", "alice"); + userRecordV1.put("email", "alice@example.com"); + userRecordV1.put("age", 25); + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("user", userRecordV1); + + // v2: user{name, email} - dropped age + Schema userV2 = Schema.createRecord("User", null, null, false); + userV2.setFields(Arrays.asList( + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("email", Schema.create(Schema.Type.STRING), null, null))); + + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV2, null, null))); + + GenericRecord userRecordV2 = new GenericData.Record(userV2); + userRecordV2.put("name", "bob"); + userRecordV2.put("email", "bob@example.com"); + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + avroRecordV2.put("user", userRecordV2); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); - // Verify schema evolution Table table = catalog.loadTable(tableId); - assertNotNull(table); - assertEquals(Types.LongType.get(), table.schema().findField("_kafka_value.count").type()); + assertNotNull(table.schema().findField("_kafka_value.user.age")); + assertEquals(false, table.schema().findField("_kafka_value.user.age").isRequired()); } @Test - void testSchemaEvolutionDropColumn() throws IOException { - // Given: Initial Avro schema (v1) + void testMakeRequiredFieldOptionalInNestedStruct() throws IOException { + // v1: user{required name, required age} + Schema userV1 = Schema.createRecord("User", null, null, false); + userV1.setFields(Arrays.asList( + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("age", Schema.create(Schema.Type.INT), null, null))); + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV1 = new ArrayList<>(); - fieldsV1.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV1.add(new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null)); - fieldsV1.add(new Schema.Field("email", Schema.create(Schema.Type.STRING), null, null)); - avroSchemaV1.setFields(fieldsV1); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV1, null, null))); - // Create v1 record + GenericRecord userRecordV1 = new GenericData.Record(userV1); + userRecordV1.put("name", "alice"); + userRecordV1.put("age", 25); GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); avroRecordV1.put("id", 1L); - avroRecordV1.put("name", "test"); - avroRecordV1.put("email", "test@example.com"); + avroRecordV1.put("user", userRecordV1); + + // v2: user{required name, optional age} + Schema userV2 = Schema.createRecord("User", null, null, false); + Schema optionalAge = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.INT))); + userV2.setFields(Arrays.asList( + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("age", optionalAge, null, null))); - // Given: Updated Avro schema (v2) with dropped email field Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV2 = new ArrayList<>(); - fieldsV2.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV2.add(new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null)); - avroSchemaV2.setFields(fieldsV2); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV2, null, null))); - // Create v2 record + GenericRecord userRecordV2 = new GenericData.Record(userV2); + userRecordV2.put("name", "bob"); + userRecordV2.put("age", null); GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); avroRecordV2.put("id", 2L); - avroRecordV2.put("name", "test2"); + avroRecordV2.put("user", userRecordV2); - // Mock deserializer behavior when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) .thenReturn(avroRecordV1) .thenReturn(avroRecordV2); - // Write records - Record kafkaRecordV1 = createMockKafkaRecord(1, 0); - writer.write(0, kafkaRecordV1); + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); - Record kafkaRecordV2 = createMockKafkaRecord(2, 1); - writer.write(0, kafkaRecordV2); + Table table = catalog.loadTable(tableId); + assertEquals(false, table.schema().findField("_kafka_value.user.age").isRequired()); + } + + @Test + void testPromoteFieldTypeInNestedStruct() throws IOException { + // v1: user{age: int} + Schema userV1 = Schema.createRecord("User", null, null, false); + userV1.setFields(Collections.singletonList( + new Schema.Field("age", Schema.create(Schema.Type.INT), null, null))); + + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV1, null, null))); + + GenericRecord userRecordV1 = new GenericData.Record(userV1); + userRecordV1.put("age", 25); + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("user", userRecordV1); + + // v2: user{age: long} + Schema userV2 = Schema.createRecord("User", null, null, false); + userV2.setFields(Collections.singletonList( + new Schema.Field("age", Schema.create(Schema.Type.LONG), null, null))); + + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV2, null, null))); + + GenericRecord userRecordV2 = new GenericData.Record(userV2); + userRecordV2.put("age", 30L); + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + avroRecordV2.put("user", userRecordV2); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); - // Verify schema evolution Table table = catalog.loadTable(tableId); - assertNotNull(table); - assertEquals(4, table.schema().columns().size()); - assertNotNull(table.schema().findField("_kafka_value.email")); - assertEquals(false, table.schema().findField("_kafka_value.email").isRequired()); + assertEquals(Types.LongType.get(), table.schema().findField("_kafka_value.user.age").type()); } + // ========== Collection Field Tests ========== + + + @Test - void testSchemaEvolutionSwitchOptionalField() throws IOException { - // Base schema without optional fields + void testAddOptionalMapCollection() throws IOException { + // v1: {id, name} Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV1 = new ArrayList<>(); - fieldsV1.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV1.add(new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null)); - avroSchemaV1.setFields(fieldsV1); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null))); GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); avroRecordV1.put("id", 1L); - avroRecordV1.put("name", "base"); + avroRecordV1.put("name", "alice"); - // Schema with optional field1 added on top of base schema + // v2: {id, name, metadata: map} Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV2 = new ArrayList<>(); - fieldsV2.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV2.add(new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null)); - Schema optionalField1 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); - fieldsV2.add(new Schema.Field("field1", optionalField1, null, null)); - avroSchemaV2.setFields(fieldsV2); + Schema mapSchema = Schema.createMap(Schema.create(Schema.Type.STRING)); + Schema optionalMap = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), mapSchema)); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("metadata", optionalMap, null, null))); GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); avroRecordV2.put("id", 2L); - avroRecordV2.put("name", "with-field1"); - avroRecordV2.put("field1", "value1"); + avroRecordV2.put("name", "bob"); + java.util.Map metadataMap = new java.util.HashMap<>(); + metadataMap.put("key1", "value1"); + metadataMap.put("key2", "value2"); + avroRecordV2.put("metadata", metadataMap); - // Schema removes field1 and introduces optional field2 instead - Schema avroSchemaV3 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV3 = new ArrayList<>(); - fieldsV3.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV3.add(new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null)); - Schema optionalField2 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); - fieldsV3.add(new Schema.Field("field2", optionalField2, null, null)); - avroSchemaV3.setFields(fieldsV3); + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); - GenericRecord avroRecordV3 = new GenericData.Record(avroSchemaV3); - avroRecordV3.put("id", 3L); - avroRecordV3.put("name", "with-field2"); - avroRecordV3.put("field2", "value2"); + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + + Table table = catalog.loadTable(tableId); + assertNotNull(table.schema().findField("_kafka_value.metadata")); + assertEquals(true, table.schema().findField("_kafka_value.metadata").type().isMapType()); + } + + + @Test + void testAddOptionalFieldInCollectionElement() throws IOException { + // v1: addresses: list<{street}> + Schema addressV1 = Schema.createRecord("Address", null, null, false); + addressV1.setFields(Collections.singletonList( + new Schema.Field("street", Schema.create(Schema.Type.STRING), null, null))); + + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + Schema listV1 = Schema.createArray(addressV1); + Schema optionalListV1 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV1)); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("addresses", optionalListV1, null, null))); + + GenericRecord addressRecordV1 = new GenericData.Record(addressV1); + addressRecordV1.put("street", "Main St"); + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("addresses", Collections.singletonList(addressRecordV1)); + + // v2: addresses: list<{street, zipCode}> + Schema addressV2 = Schema.createRecord("Address", null, null, false); + Schema optionalZip = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); + addressV2.setFields(Arrays.asList( + new Schema.Field("street", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("zipCode", optionalZip, null, null))); + + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + Schema listV2 = Schema.createArray(addressV2); + Schema optionalListV2 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV2)); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("addresses", optionalListV2, null, null))); + + GenericRecord addressRecordV2 = new GenericData.Record(addressV2); + addressRecordV2.put("street", "Second St"); + addressRecordV2.put("zipCode", "12345"); + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + avroRecordV2.put("addresses", Collections.singletonList(addressRecordV2)); when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) .thenReturn(avroRecordV1) - .thenReturn(avroRecordV2) - .thenReturn(avroRecordV3); + .thenReturn(avroRecordV2); - Record kafkaRecordV1 = createMockKafkaRecord(1, 0); - writer.write(0, kafkaRecordV1); + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); - Record kafkaRecordV2 = createMockKafkaRecord(2, 1); - writer.write(0, kafkaRecordV2); + Table table = catalog.loadTable(tableId); + assertNotNull(table.schema().findField("_kafka_value.addresses.element.zipCode")); + } - Record kafkaRecordV3 = createMockKafkaRecord(3, 2); - writer.write(0, kafkaRecordV3); + @Test + void testDropRequiredFieldInMapValueStruct() throws IOException { + // v1: attributes: map + Schema locationV1 = Schema.createRecord("Location", null, null, false); + locationV1.setFields(Arrays.asList( + new Schema.Field("city", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("country", Schema.create(Schema.Type.STRING), null, null))); + + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + Schema mapV1 = Schema.createMap(locationV1); + Schema optionalMapV1 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), mapV1)); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("attributes", optionalMapV1, null, null))); + + GenericRecord locationRecordV1 = new GenericData.Record(locationV1); + locationRecordV1.put("city", "Beijing"); + locationRecordV1.put("country", "China"); + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + java.util.Map mapData1 = new java.util.HashMap<>(); + mapData1.put("home", locationRecordV1); + avroRecordV1.put("attributes", mapData1); + + // v2: attributes: map - dropped country + Schema locationV2 = Schema.createRecord("Location", null, null, false); + locationV2.setFields(Collections.singletonList( + new Schema.Field("city", Schema.create(Schema.Type.STRING), null, null))); + + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + Schema mapV2 = Schema.createMap(locationV2); + Schema optionalMapV2 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), mapV2)); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("attributes", optionalMapV2, null, null))); + + GenericRecord locationRecordV2 = new GenericData.Record(locationV2); + locationRecordV2.put("city", "Shanghai"); + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + java.util.Map mapData2 = new java.util.HashMap<>(); + mapData2.put("work", locationRecordV2); + avroRecordV2.put("attributes", mapData2); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); Table table = catalog.loadTable(tableId); - assertNotNull(table); - assertNotNull(table.schema().findField("_kafka_value.field1")); - assertEquals(false, table.schema().findField("_kafka_value.field1").isRequired()); - assertNotNull(table.schema().findField("_kafka_value.field2")); - assertEquals(false, table.schema().findField("_kafka_value.field2").isRequired()); + assertNotNull(table.schema().findField("_kafka_value.attributes.value.country")); + assertEquals(false, table.schema().findField("_kafka_value.attributes.value.country").isRequired()); } @Test - void testSchemaEvolutionReorderColumn() throws IOException { - // Given: Initial Avro schema (v1) + void testPromoteFieldTypeInCollectionElement() throws IOException { + // v1: addresses: list<{zip: int}> + Schema addressV1 = Schema.createRecord("Address", null, null, false); + addressV1.setFields(Collections.singletonList( + new Schema.Field("zip", Schema.create(Schema.Type.INT), null, null))); + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV1 = new ArrayList<>(); - fieldsV1.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV1.add(new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null)); - fieldsV1.add(new Schema.Field("email", Schema.create(Schema.Type.STRING), null, null)); - avroSchemaV1.setFields(fieldsV1); + Schema listV1 = Schema.createArray(addressV1); + Schema optionalListV1 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV1)); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("addresses", optionalListV1, null, null))); + + GenericRecord addressRecordV1 = new GenericData.Record(addressV1); + addressRecordV1.put("zip", 12345); + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("addresses", Collections.singletonList(addressRecordV1)); + + // v2: addresses: list<{zip: long}> + Schema addressV2 = Schema.createRecord("Address", null, null, false); + addressV2.setFields(Collections.singletonList( + new Schema.Field("zip", Schema.create(Schema.Type.LONG), null, null))); + + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + Schema listV2 = Schema.createArray(addressV2); + Schema optionalListV2 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV2)); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("addresses", optionalListV2, null, null))); + + GenericRecord addressRecordV2 = new GenericData.Record(addressV2); + addressRecordV2.put("zip", 67890L); + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + avroRecordV2.put("addresses", Collections.singletonList(addressRecordV2)); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); - // Create v1 record + Table table = catalog.loadTable(tableId); + assertEquals(Types.LongType.get(), table.schema().findField("_kafka_value.addresses.element.zip").type()); + } + + // ========== Drop Required Field Tests ========== + + @Test + void testDropRequiredFieldInCollectionElement() throws IOException { + // v1: addresses: list<{street, city, zipCode}> + Schema addressV1 = Schema.createRecord("Address", null, null, false); + addressV1.setFields(Arrays.asList( + new Schema.Field("street", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("city", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("zipCode", Schema.create(Schema.Type.STRING), null, null))); + + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + Schema listV1 = Schema.createArray(addressV1); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("addresses", listV1, null, null))); + + GenericRecord addressRecordV1 = new GenericData.Record(addressV1); + addressRecordV1.put("street", "Main St"); + addressRecordV1.put("city", "Beijing"); + addressRecordV1.put("zipCode", "100000"); GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); avroRecordV1.put("id", 1L); - avroRecordV1.put("name", "test"); - avroRecordV1.put("email", "test@example.com"); + avroRecordV1.put("addresses", Collections.singletonList(addressRecordV1)); + + // v2: addresses: list<{street, city}> - dropped required zipCode + Schema addressV2 = Schema.createRecord("Address", null, null, false); + addressV2.setFields(Arrays.asList( + new Schema.Field("street", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("city", Schema.create(Schema.Type.STRING), null, null))); - // Given: Updated Avro schema (v2) with reordered fields Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV2 = new ArrayList<>(); - fieldsV2.add(new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null)); - fieldsV2.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV2.add(new Schema.Field("email", Schema.create(Schema.Type.STRING), null, null)); - avroSchemaV2.setFields(fieldsV2); + Schema listV2 = Schema.createArray(addressV2); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("addresses", listV2, null, null))); + + GenericRecord addressRecordV2 = new GenericData.Record(addressV2); + addressRecordV2.put("street", "Second St"); + addressRecordV2.put("city", "Shanghai"); + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + avroRecordV2.put("addresses", Collections.singletonList(addressRecordV2)); - // Create v2 record + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + + Table table = catalog.loadTable(tableId); + // Verify that the dropped required field is now optional + assertNotNull(table.schema().findField("_kafka_value.addresses.element.zipCode")); + assertEquals(false, table.schema().findField("_kafka_value.addresses.element.zipCode").isRequired()); + } + + // ========== Drop Optional Field Tests ========== + + @Test + void testDropOptionalFieldInNestedStruct() throws IOException { + // v1: user{name, email, phone} + Schema userV1 = Schema.createRecord("User", null, null, false); + Schema optionalEmail = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); + Schema optionalPhone = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); + userV1.setFields(Arrays.asList( + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("email", optionalEmail, null, null), + new Schema.Field("phone", optionalPhone, null, null))); + + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV1, null, null))); + + GenericRecord userRecordV1 = new GenericData.Record(userV1); + userRecordV1.put("name", "alice"); + userRecordV1.put("email", "alice@example.com"); + userRecordV1.put("phone", "123-456-7890"); + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("user", userRecordV1); + + // v2: user{name, email} - dropped optional phone + Schema userV2 = Schema.createRecord("User", null, null, false); + userV2.setFields(Arrays.asList( + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("email", optionalEmail, null, null))); + + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV2, null, null))); + + GenericRecord userRecordV2 = new GenericData.Record(userV2); + userRecordV2.put("name", "bob"); + userRecordV2.put("email", "bob@example.com"); GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); - avroRecordV2.put("name", "test2"); avroRecordV2.put("id", 2L); - avroRecordV2.put("email", "test2@example.com"); + avroRecordV2.put("user", userRecordV2); - // Mock deserializer behavior when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) .thenReturn(avroRecordV1) .thenReturn(avroRecordV2); - // Write records - Record kafkaRecordV1 = createMockKafkaRecord(1, 0); - writer.write(0, kafkaRecordV1); + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + + Table table = catalog.loadTable(tableId); + // Verify that the dropped optional field still exists and remains optional + assertNotNull(table.schema().findField("_kafka_value.user.phone")); + assertEquals(false, table.schema().findField("_kafka_value.user.phone").isRequired()); + } + + @Test + void testDropOptionalCollections() throws IOException { + // v1: {id, tags: optional list, metadata: optional map} + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + Schema listSchema = Schema.createArray(Schema.create(Schema.Type.STRING)); + Schema optionalList = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listSchema)); + Schema mapSchema = Schema.createMap(Schema.create(Schema.Type.STRING)); + Schema optionalMap = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), mapSchema)); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("tags", optionalList, null, null), + new Schema.Field("metadata", optionalMap, null, null))); + + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("tags", Arrays.asList("tag1", "tag2")); + java.util.Map metadataMap = new java.util.HashMap<>(); + metadataMap.put("key1", "value1"); + avroRecordV1.put("metadata", metadataMap); + + // v2: {id} - dropped both optional collections + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV2.setFields(Collections.singletonList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null))); + + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); - Record kafkaRecordV2 = createMockKafkaRecord(2, 1); - writer.write(0, kafkaRecordV2); + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + + Table table = catalog.loadTable(tableId); + // Verify that dropped optional collections still exist and remain optional + assertNotNull(table.schema().findField("_kafka_value.tags")); + assertEquals(false, table.schema().findField("_kafka_value.tags").isRequired()); + assertNotNull(table.schema().findField("_kafka_value.metadata")); + assertEquals(false, table.schema().findField("_kafka_value.metadata").isRequired()); + } + + // ========== Make Field Optional Tests ========== + + @Test + void testMakeRequiredFieldOptional() throws IOException { + // v1: {id, required email} + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("email", Schema.create(Schema.Type.STRING), null, null))); + + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("email", "alice@example.com"); + + // v2: {id, optional email} + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + Schema optionalEmail = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("email", optionalEmail, null, null))); + + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + avroRecordV2.put("email", null); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); - // Verify schema evolution Table table = catalog.loadTable(tableId); - assertNotNull(table); - assertEquals(4, table.schema().columns().size()); - assertNotNull(table.schema().findField("_kafka_value.id")); - assertNotNull(table.schema().findField("_kafka_value.name")); assertNotNull(table.schema().findField("_kafka_value.email")); + assertEquals(false, table.schema().findField("_kafka_value.email").isRequired()); + } + + @Test + void testMakeRequiredFieldOptionalInCollectionElement() throws IOException { + // v1: addresses: list<{street, required city}> + Schema addressV1 = Schema.createRecord("Address", null, null, false); + addressV1.setFields(Arrays.asList( + new Schema.Field("street", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("city", Schema.create(Schema.Type.STRING), null, null))); + + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + Schema listV1 = Schema.createArray(addressV1); + Schema optionalListV1 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV1)); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("addresses", optionalListV1, null, null))); + + GenericRecord addressRecordV1 = new GenericData.Record(addressV1); + addressRecordV1.put("street", "Main St"); + addressRecordV1.put("city", "Beijing"); + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("addresses", Collections.singletonList(addressRecordV1)); + + // v2: addresses: list<{street, optional city}> + Schema addressV2 = Schema.createRecord("Address", null, null, false); + Schema optionalCity = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); + addressV2.setFields(Arrays.asList( + new Schema.Field("street", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("city", optionalCity, null, null))); + + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + Schema listV2 = Schema.createArray(addressV2); + Schema optionalListV2 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV2)); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("addresses", optionalListV2, null, null))); + + GenericRecord addressRecordV2 = new GenericData.Record(addressV2); + addressRecordV2.put("street", "Second St"); + addressRecordV2.put("city", null); + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + avroRecordV2.put("addresses", Collections.singletonList(addressRecordV2)); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + + Table table = catalog.loadTable(tableId); + // Verify that the field in collection element is now optional + assertNotNull(table.schema().findField("_kafka_value.addresses.element.city")); + assertEquals(false, table.schema().findField("_kafka_value.addresses.element.city").isRequired()); + } + + @Test + void testMakeRequiredCollectionOptional() throws IOException { + // v1: {id, required tags: list} + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + Schema listV1 = Schema.createArray(Schema.create(Schema.Type.STRING)); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("tags", listV1, null, null))); + + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("tags", Arrays.asList("tag1", "tag2")); + + // v2: {id, optional tags: list} + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + Schema listV2 = Schema.createArray(Schema.create(Schema.Type.STRING)); + Schema optionalList = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV2)); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("tags", optionalList, null, null))); + + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + avroRecordV2.put("tags", null); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + + Table table = catalog.loadTable(tableId); + assertNotNull(table.schema().findField("_kafka_value.tags")); + assertEquals(false, table.schema().findField("_kafka_value.tags").isRequired()); } + // ========== Promote Field Type Tests ========== + @Test - void testSchemaEvolutionIncompatibleTypeChange() throws IOException { - // Given: Initial Avro schema with string type + void testPromoteCollectionElementType() throws IOException { + // v1: scores: list Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV1 = new ArrayList<>(); - fieldsV1.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV1.add(new Schema.Field("age", Schema.create(Schema.Type.INT), null, null)); - avroSchemaV1.setFields(fieldsV1); + Schema listV1 = Schema.createArray(Schema.create(Schema.Type.INT)); + Schema optionalListV1 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV1)); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("scores", optionalListV1, null, null))); - // Create v1 record GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); - avroRecordV1.put("id", 2L); - avroRecordV1.put("age", 30); + avroRecordV1.put("id", 1L); + avroRecordV1.put("scores", Arrays.asList(90, 85, 95)); - // Given: Updated schema with incompatible string type for age + // v2: scores: list Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); - List fieldsV2 = new ArrayList<>(); - fieldsV2.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null)); - fieldsV2.add(new Schema.Field("age", Schema.create(Schema.Type.STRING), null, null)); - avroSchemaV2.setFields(fieldsV2); + Schema listV2 = Schema.createArray(Schema.create(Schema.Type.LONG)); + Schema optionalListV2 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV2)); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("scores", optionalListV2, null, null))); - // Create v2 record GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); - avroRecordV2.put("id", 1L); - avroRecordV2.put("age", "twenty"); + avroRecordV2.put("id", 2L); + avroRecordV2.put("scores", Arrays.asList(100L, 95L, 98L)); - // Mock deserializer behavior when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) .thenReturn(avroRecordV1) .thenReturn(avroRecordV2); - // Write records - Record kafkaRecordV1 = createMockKafkaRecord(1, 0); - assertDoesNotThrow(() -> writer.write(0, kafkaRecordV1)); + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + + Table table = catalog.loadTable(tableId); + // Verify that the collection element type cannot be promoted from int to long + assertNotNull(table.schema().findField("_kafka_value.scores")); + assertEquals(true, table.schema().findField("_kafka_value.scores").type().isListType()); + assertEquals(Types.IntegerType.get(), + table.schema().findField("_kafka_value.scores").type().asListType().elementType()); + } + + @Test + void testPromoteMapValueType() throws IOException { + // v1: metadata: map + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + Schema mapV1 = Schema.createMap(Schema.create(Schema.Type.INT)); + Schema optionalMapV1 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), mapV1)); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("metadata", optionalMapV1, null, null))); - // Verify that writing the second record with an incompatible schema throws an exception - Record kafkaRecordV2 = createMockKafkaRecord(2, 1); - assertThrows(IOException.class, () -> writer.write(0, kafkaRecordV2)); + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + java.util.Map mapData1 = new java.util.HashMap<>(); + mapData1.put("score", 90); + mapData1.put("rank", 5); + avroRecordV1.put("metadata", mapData1); + + // v2: metadata: map + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + Schema mapV2 = Schema.createMap(Schema.create(Schema.Type.LONG)); + Schema optionalMapV2 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), mapV2)); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("metadata", optionalMapV2, null, null))); + + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + java.util.Map mapData2 = new java.util.HashMap<>(); + mapData2.put("score", 100L); + mapData2.put("rank", 1L); + avroRecordV2.put("metadata", mapData2); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + + Table table = catalog.loadTable(tableId); + // Verify that the map value type cannot be promoted from int to long + assertNotNull(table.schema().findField("_kafka_value.metadata")); + assertEquals(true, table.schema().findField("_kafka_value.metadata").type().isMapType()); + assertEquals(Types.IntegerType.get(), + table.schema().findField("_kafka_value.metadata").type().asMapType().valueType()); + } + + // ========== Complex Mixed Scenarios ========== + + @Test + void testComplexNestedAndCollectionEvolution() throws IOException { + // v1: {id, user{name}} + Schema userV1 = Schema.createRecord("User", null, null, false); + userV1.setFields(Collections.singletonList( + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null))); + + Schema avroSchemaV1 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV1.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV1, null, null))); + + GenericRecord userRecordV1 = new GenericData.Record(userV1); + userRecordV1.put("name", "alice"); + GenericRecord avroRecordV1 = new GenericData.Record(avroSchemaV1); + avroRecordV1.put("id", 1L); + avroRecordV1.put("user", userRecordV1); + + // v2: {id, user{name, addresses: list<{street}>}} + Schema addressV2 = Schema.createRecord("Address", null, null, false); + addressV2.setFields(Collections.singletonList( + new Schema.Field("street", Schema.create(Schema.Type.STRING), null, null))); + + Schema userV2 = Schema.createRecord("User", null, null, false); + Schema listV2 = Schema.createArray(addressV2); + Schema optionalListV2 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV2)); + userV2.setFields(Arrays.asList( + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("addresses", optionalListV2, null, null))); + + Schema avroSchemaV2 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV2.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV2, null, null))); + + GenericRecord addressRecordV2 = new GenericData.Record(addressV2); + addressRecordV2.put("street", "Main St"); + GenericRecord userRecordV2 = new GenericData.Record(userV2); + userRecordV2.put("name", "bob"); + userRecordV2.put("addresses", Collections.singletonList(addressRecordV2)); + GenericRecord avroRecordV2 = new GenericData.Record(avroSchemaV2); + avroRecordV2.put("id", 2L); + avroRecordV2.put("user", userRecordV2); + + // v3: {id, user{name, addresses: list<{street, zipCode}>}} + Schema addressV3 = Schema.createRecord("Address", null, null, false); + Schema optionalZip = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); + addressV3.setFields(Arrays.asList( + new Schema.Field("street", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("zipCode", optionalZip, null, null))); + + Schema userV3 = Schema.createRecord("User", null, null, false); + Schema listV3 = Schema.createArray(addressV3); + Schema optionalListV3 = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), listV3)); + userV3.setFields(Arrays.asList( + new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("addresses", optionalListV3, null, null))); + + Schema avroSchemaV3 = Schema.createRecord("TestRecord", null, null, false); + avroSchemaV3.setFields(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null), + new Schema.Field("user", userV3, null, null))); + + GenericRecord addressRecordV3 = new GenericData.Record(addressV3); + addressRecordV3.put("street", "Second St"); + addressRecordV3.put("zipCode", "12345"); + GenericRecord userRecordV3 = new GenericData.Record(userV3); + userRecordV3.put("name", "charlie"); + userRecordV3.put("addresses", Collections.singletonList(addressRecordV3)); + GenericRecord avroRecordV3 = new GenericData.Record(avroSchemaV3); + avroRecordV3.put("id", 3L); + avroRecordV3.put("user", userRecordV3); + + when(kafkaAvroDeserializer.deserialize(anyString(), any(), any(ByteBuffer.class))) + .thenReturn(avroRecordV1) + .thenReturn(avroRecordV2) + .thenReturn(avroRecordV3); + + writer.write(0, createMockKafkaRecord(1, 0)); + writer.write(0, createMockKafkaRecord(2, 1)); + writer.write(0, createMockKafkaRecord(3, 2)); + + Table table = catalog.loadTable(tableId); + assertNotNull(table.schema().findField("_kafka_value.user.addresses")); + assertNotNull(table.schema().findField("_kafka_value.user.addresses.element.street")); + assertNotNull(table.schema().findField("_kafka_value.user.addresses.element.zipCode")); + } + + // ========== Helper Methods ========== + + private String generateRandomTableName() { + return "test_table_" + ThreadLocalRandom.current().nextInt(100000, 999999); } private Record createMockKafkaRecord(int schemaId, int offset) {