Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
/** The util class for merging the schemas. */
public class SchemaMergingUtils {

public static final String ARRAY_ELEMENT_FIELD_NAME = "element";
public static final String MAP_VALUE_FIELD_NAME = "value";

public static TableSchema mergeSchemas(
TableSchema currentTableSchema, RowType targetType, boolean allowExplicitCast) {
RowType currentType = currentTableSchema.logicalRowType();
Expand Down Expand Up @@ -266,19 +269,68 @@ private static void diffFields(
changes.add(
SchemaChange.addColumn(
fieldNames, newField.type(), newField.description(), null));
} else if (!oldField.type().equals(newField.type())) {
// type changed — check if it's a nested struct change
if (oldField.type() instanceof RowType && newField.type() instanceof RowType) {
diffFields(
((RowType) oldField.type()).getFields(),
((RowType) newField.type()).getFields(),
fieldNames,
changes);
} else {
changes.add(SchemaChange.updateColumnType(fieldNames, newField.type(), true));
}
} else if (!oldField.type().equals(newField.type())
&& !diffNestedTypeChanges(
oldField.type(), newField.type(), fieldNames, changes)) {
changes.add(SchemaChange.updateColumnType(fieldNames, newField.type(), true));
}
}
}

/**
* Returns true only when the type difference has been fully represented by nested schema
* changes. Returns false to let the caller fall back to {@link SchemaChange.UpdateColumnType}.
*/
private static boolean diffNestedTypeChanges(
DataType oldType, DataType newType, String[] fieldNames, List<SchemaChange> changes) {
List<SchemaChange> stagedChanges = new ArrayList<>();
boolean handled = diffNestedTypeChangesInner(oldType, newType, fieldNames, stagedChanges);
if (handled) {
changes.addAll(stagedChanges);
}
return handled;
}

private static boolean diffNestedTypeChangesInner(
DataType oldType, DataType newType, String[] fieldNames, List<SchemaChange> changes) {
if (oldType instanceof RowType && newType instanceof RowType) {
List<DataField> oldFields = ((RowType) oldType).getFields();
List<DataField> newFields = ((RowType) newType).getFields();
if (hasRemovedFields(oldFields, newFields)) {
return false;
}
diffFields(oldFields, newFields, fieldNames, changes);
return true;
} else if (oldType instanceof ArrayType && newType instanceof ArrayType) {
return diffNestedTypeChanges(
((ArrayType) oldType).getElementType(),
((ArrayType) newType).getElementType(),
appendFieldName(fieldNames, ARRAY_ELEMENT_FIELD_NAME),
changes);
} else if (oldType instanceof MapType && newType instanceof MapType) {
MapType oldMapType = (MapType) oldType;
MapType newMapType = (MapType) newType;
if (!oldMapType.getKeyType().equals(newMapType.getKeyType())) {
return false;
}
return diffNestedTypeChanges(
oldMapType.getValueType(),
newMapType.getValueType(),
appendFieldName(fieldNames, MAP_VALUE_FIELD_NAME),
changes);
}
return false;
}

private static boolean hasRemovedFields(List<DataField> oldFields, List<DataField> newFields) {
Map<String, DataField> newFieldMap =
newFields.stream().collect(Collectors.toMap(DataField::name, Function.identity()));
for (DataField oldField : oldFields) {
if (!newFieldMap.containsKey(oldField.name())) {
return true;
}
}
return false;
}

private static String[] appendFieldName(String[] parentNames, String fieldName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,185 @@ public void testMergeSchemas() {
assertThat(r6.getFieldCount()).isEqualTo(8);
}

@Test
public void testDiffNestedSchemaChangesInArrayAndMap() {
DataField arrayField =
new DataField(
0,
"items",
new ArrayType(
new RowType(
Lists.newArrayList(
new DataField(1, "f1", new IntType()),
new DataField(
2,
"f2",
new VarCharType(
VarCharType.MAX_LENGTH))))));
DataField mapField =
new DataField(
3,
"attributes",
new MapType(
new VarCharType(VarCharType.MAX_LENGTH),
new RowType(
Lists.newArrayList(
new DataField(4, "v1", new IntType())))));
TableSchema oldSchema =
new TableSchema(
0,
Lists.newArrayList(arrayField, mapField),
4,
Lists.newArrayList(),
Lists.newArrayList(),
new HashMap<>(),
"");

DataField evolvedArrayField =
new DataField(
0,
"items",
new ArrayType(
new RowType(
Lists.newArrayList(
new DataField(1, "f1", new IntType()),
new DataField(
2,
"f2",
new VarCharType(VarCharType.MAX_LENGTH)),
new DataField(
5,
"f3",
new VarCharType(
VarCharType.MAX_LENGTH))))));
DataField evolvedMapField =
new DataField(
3,
"attributes",
new MapType(
new VarCharType(VarCharType.MAX_LENGTH),
new RowType(
Lists.newArrayList(
new DataField(4, "v1", new IntType()),
new DataField(6, "v2", new BigIntType())))));
TableSchema newSchema =
new TableSchema(
1,
Lists.newArrayList(evolvedArrayField, evolvedMapField),
6,
Lists.newArrayList(),
Lists.newArrayList(),
new HashMap<>(),
"");

List<SchemaChange> changes = SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);

assertThat(changes).hasSize(2);
SchemaChange.AddColumn addArrayNestedField = (SchemaChange.AddColumn) changes.get(0);
assertThat(addArrayNestedField.fieldNames())
.containsExactly("items", SchemaMergingUtils.ARRAY_ELEMENT_FIELD_NAME, "f3");
assertThat(addArrayNestedField.dataType())
.isEqualTo(new VarCharType(VarCharType.MAX_LENGTH));
SchemaChange.AddColumn addMapValueNestedField = (SchemaChange.AddColumn) changes.get(1);
assertThat(addMapValueNestedField.fieldNames())
.containsExactly("attributes", SchemaMergingUtils.MAP_VALUE_FIELD_NAME, "v2");
assertThat(addMapValueNestedField.dataType()).isEqualTo(new BigIntType());
}

@Test
public void testDiffNestedSchemaChangesDoesNotTreatMapKeyAsValueChange() {
DataField mapField =
new DataField(
0,
"attributes",
new MapType(
new RowType(
Lists.newArrayList(new DataField(1, "k1", new IntType()))),
new RowType(
Lists.newArrayList(
new DataField(2, "v1", new IntType())))));
TableSchema oldSchema =
new TableSchema(
0,
Lists.newArrayList(mapField),
2,
Lists.newArrayList(),
Lists.newArrayList(),
new HashMap<>(),
"");

DataType evolvedMapType =
new MapType(
new RowType(
Lists.newArrayList(
new DataField(1, "k1", new IntType()),
new DataField(3, "k2", new BigIntType()))),
new RowType(
Lists.newArrayList(
new DataField(2, "v1", new IntType()),
new DataField(4, "v2", new BigIntType()))));
TableSchema newSchema =
new TableSchema(
1,
Lists.newArrayList(new DataField(0, "attributes", evolvedMapType)),
4,
Lists.newArrayList(),
Lists.newArrayList(),
new HashMap<>(),
"");

List<SchemaChange> changes = SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);

assertThat(changes).hasSize(1);
SchemaChange.UpdateColumnType updateMapType =
(SchemaChange.UpdateColumnType) changes.get(0);
assertThat(updateMapType.fieldNames()).containsExactly("attributes");
assertThat(updateMapType.newDataType()).isEqualTo(evolvedMapType);
}

@Test
public void testDiffNestedSchemaChangesFallsBackToTypeUpdateWhenNestedFieldRemoved() {
DataType itemsType =
new ArrayType(
new RowType(
Lists.newArrayList(
new DataField(1, "f1", new IntType()),
new DataField(
2,
"f2",
new VarCharType(VarCharType.MAX_LENGTH)))));
TableSchema oldSchema =
new TableSchema(
0,
Lists.newArrayList(new DataField(0, "items", itemsType)),
2,
Lists.newArrayList(),
Lists.newArrayList(),
new HashMap<>(),
"");

DataType evolvedItemsType =
new ArrayType(
new RowType(Lists.newArrayList(new DataField(1, "f1", new IntType()))));
TableSchema newSchema =
new TableSchema(
1,
Lists.newArrayList(new DataField(0, "items", evolvedItemsType)),
2,
Lists.newArrayList(),
Lists.newArrayList(),
new HashMap<>(),
"");

List<SchemaChange> changes = SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);

assertThat(changes).hasSize(1);
SchemaChange.UpdateColumnType updateItemsType =
(SchemaChange.UpdateColumnType) changes.get(0);
assertThat(updateItemsType.fieldNames()).containsExactly("items");
assertThat(updateItemsType.newDataType()).isEqualTo(evolvedItemsType);
}

@Test
public void testMergeArrayTypes() {
AtomicInteger highestFieldId = new AtomicInteger(1);
Expand Down
Loading