Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ protected List<SchemaChange> checkSchemaChanges(Table table, Schema currentSchem
*/
@VisibleForTesting
protected synchronized void applySchemaChange(Table table, List<SchemaChange> changes) {
LOGGER.info("Applying schema changes to table {}, changes {}", tableId, changes);
LOGGER.info("Applying schema changes to table {}, changes {}", tableId, changes.stream().map(c -> c.getType() + ":" + c.getColumnFullName()).toList());
Tasks.range(1)
.retry(2)
.run(notUsed -> applyChanges(table, changes));
Expand Down Expand Up @@ -221,16 +221,18 @@ private void collectRemovedField(Types.NestedField tableField, String parentName
// if field doesn't exist in current schema and it's not a struct, mark it as optional (soft removal)
if (currentField == null && !tableField.isOptional()) {
changes.add(new SchemaChange(SchemaChange.ChangeType.MAKE_OPTIONAL, fieldName,
tableField.type().asPrimitiveType(), parentName));
null, parentName));
return;
}
// if it is a nested field, recursively process subfields
if (tableField.type().isStructType()) {
List<Types.NestedField> tableSubFields = tableField.type().asStructType().fields();

for (Types.NestedField tableSubField : tableSubFields) {
collectRemovedField(tableSubField, fullFieldName, currentSchema, changes);
}
collectRemovedStructFields(tableField.type().asStructType().fields(), fullFieldName, currentSchema, changes);
} else if (isStructList(tableField.type())) {
collectRemovedStructFields(tableField.type().asListType().elementType().asStructType().fields(),
fullFieldName + ".element", currentSchema, changes);
} else if (isStructMap(tableField.type())) {
collectRemovedStructFields(tableField.type().asMapType().valueType().asStructType().fields(),
fullFieldName + ".value", currentSchema, changes);
}
}

Expand All @@ -241,38 +243,59 @@ private void collectFieldChanges(Types.NestedField currentField, String parentNa
Types.NestedField tableField = tableSchema.findField(fullFieldName);

if (tableField == null) {
// if it is a nested field, recursively process subfields
if (currentField.type().isStructType()) {
List<Types.NestedField> currentSubFields = currentField.type().asStructType().fields();

for (Types.NestedField currentSubField : currentSubFields) {
collectFieldChanges(currentSubField, fullFieldName, tableSchema, changes);
changes.add(new SchemaChange(SchemaChange.ChangeType.ADD_COLUMN, fieldName,
currentField.type(), parentName));
return;
} else {
Type currentType = currentField.type();
Type tableType = tableField.type();
if (currentType.isStructType() && tableType.isStructType()) {
collectStructFieldChanges(currentType.asStructType().fields(), fullFieldName, tableSchema, changes);
collectOptionalFieldChanges(currentField, parentName, changes, tableField, fieldName);
} else if (isStructList(currentType) && isStructList(tableType)) {
collectStructFieldChanges(currentType.asListType().elementType().asStructType().fields(),
fullFieldName + ".element", tableSchema, changes);
} else if (isStructMap(currentType) && isStructMap(tableType)) {
collectStructFieldChanges(currentType.asMapType().valueType().asStructType().fields(),
fullFieldName + ".value", tableSchema, changes);
} else if (!currentType.isStructType() && !tableType.isStructType()) {
collectOptionalFieldChanges(currentField, parentName, changes, tableField, fieldName);

if (!tableType.equals(currentType) && canPromoteType(tableType, currentType)) {
changes.add(new SchemaChange(SchemaChange.ChangeType.PROMOTE_TYPE, fieldName, currentType, parentName));
}
} else {
changes.add(new SchemaChange(SchemaChange.ChangeType.ADD_COLUMN, fieldName, currentField.type(), parentName));
}
} else {
// if it is a nested field, recursively process subfields
if (currentField.type().isStructType() && tableField.type().isStructType()) {
List<Types.NestedField> currentSubFields = currentField.type().asStructType().fields();
}
}

for (Types.NestedField currentSubField : currentSubFields) {
collectFieldChanges(currentSubField, fullFieldName, tableSchema, changes);
}
} else if (!currentField.type().isStructType() && !tableField.type().isStructType()) {
// process optional fields
if (!tableField.isOptional() && currentField.isOptional()) {
changes.add(new SchemaChange(SchemaChange.ChangeType.MAKE_OPTIONAL, fieldName, null, parentName));
}
private static void collectOptionalFieldChanges(Types.NestedField currentField, String parentName, List<SchemaChange> changes, Types.NestedField tableField, String fieldName) {
if (!tableField.isOptional() && currentField.isOptional()) {
changes.add(new SchemaChange(SchemaChange.ChangeType.MAKE_OPTIONAL, fieldName, null, parentName));
}
}

// promote type if needed
if (!tableField.type().equals(currentField.type()) && canPromoteType(tableField.type(), currentField.type())) {
changes.add(new SchemaChange(SchemaChange.ChangeType.PROMOTE_TYPE, fieldName, currentField.type(), parentName));
}
}
private void collectStructFieldChanges(List<Types.NestedField> currentSubFields, String parentFullName,
Schema tableSchema, List<SchemaChange> changes) {
for (Types.NestedField currentSubField : currentSubFields) {
collectFieldChanges(currentSubField, parentFullName, tableSchema, changes);
}
}

private void collectRemovedStructFields(List<Types.NestedField> tableSubFields, String parentFullName,
Schema currentSchema, List<SchemaChange> changes) {
for (Types.NestedField tableSubField : tableSubFields) {
collectRemovedField(tableSubField, parentFullName, currentSchema, changes);
}
}

private boolean isStructList(Type type) {
return type.typeId() == Type.TypeID.LIST && type.asListType().elementType().isStructType();
}

private boolean isStructMap(Type type) {
return type.typeId() == Type.TypeID.MAP && type.asMapType().valueType().isStructType();
}

private boolean canPromoteType(Type oldType, Type newType) {
if (oldType.typeId() == Type.TypeID.INTEGER && newType.typeId() == Type.TypeID.LONG) {
return true;
Expand Down
Loading
Loading