Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4706] Fix InternalSchemaChangeApplier#applyAddChange error to add nest type #6486

Merged
merged 2 commits into from Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -51,7 +51,8 @@ public InternalSchema applyAddChange(
TableChange.ColumnPositionChange.ColumnPositionType positionType) {
TableChanges.ColumnAddChange add = TableChanges.ColumnAddChange.get(latestSchema);
String parentName = TableChangesHelper.getParentName(colName);
add.addColumns(parentName, colName, colType, doc);
String leafName = TableChangesHelper.getLeafName(colName);
add.addColumns(parentName, leafName, colType, doc);
Comment on lines +54 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests around nested fields to guard the changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add some UT about nested fields adding or reordering in TestTableChanges.

if (positionType != null) {
switch (positionType) {
case NO_OPERATION:
Expand Down
Expand Up @@ -76,4 +76,9 @@ public static String getParentName(String fullColName) {
int offset = fullColName.lastIndexOf(".");
return offset > 0 ? fullColName.substring(0, offset) : "";
}

public static String getLeafName(String fullColName) {
int offset = fullColName.lastIndexOf(".");
return offset > 0 ? fullColName.substring(offset + 1) : fullColName;
}
}
Expand Up @@ -20,8 +20,11 @@

import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Type;
import org.apache.hudi.internal.schema.Types;

import org.apache.hudi.internal.schema.Types.StringType;
import org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType;
import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -225,5 +228,88 @@ public void testNestUpdate() {
);
Assertions.assertEquals(newSchema.getRecord(), checkSchema.getRecord());
}

@Test
public void testChangeApplier() {
// We add test here to verify the logic of applyAddChange and applyReOrderColPositionChange
InternalSchema oldSchema = new InternalSchema(Types.Field.get(0, false, "id", Types.IntType.get()),
Types.Field.get(1, true, "data", Types.StringType.get()),
Types.Field.get(2, true, "preferences",
Types.RecordType.get(Types.Field.get(7, false, "feature1",
Types.BooleanType.get()), Types.Field.get(8, true, "feature2", Types.BooleanType.get()))),
Types.Field.get(3, false, "locations", Types.MapType.get(9, 10, Types.StringType.get(),
Types.RecordType.get(Types.Field.get(11, false, "lat", Types.FloatType.get()), Types.Field.get(12, false, "long", Types.FloatType.get())), false)),
Types.Field.get(4, true, "points", Types.ArrayType.get(13, true,
Types.RecordType.get(Types.Field.get(14, false, "x", Types.LongType.get()), Types.Field.get(15, false, "y", Types.LongType.get())))),
Types.Field.get(5, false,"doubles", Types.ArrayType.get(16, false, Types.DoubleType.get())),
Types.Field.get(6, true, "properties", Types.MapType.get(17, 18, Types.StringType.get(), Types.StringType.get()))
);

// add c1 first
InternalSchema newSchema = addOperationForSchemaChangeApplier(oldSchema, "c1", StringType.get(), "add c1 first",
"id", ColumnPositionType.BEFORE);
//add preferences.cx before preferences.feature2
newSchema = addOperationForSchemaChangeApplier(newSchema, "preferences.cx", Types.BooleanType.get(), "add preferences.cx before preferences.feature2",
"preferences.feature2", ColumnPositionType.BEFORE);
// check repeated add.
InternalSchema currSchema = newSchema;
Assertions.assertThrows(HoodieSchemaException.class, () -> addOperationForSchemaChangeApplier(currSchema, "preferences.cx", Types.BooleanType.get(),
"add preferences.cx before preferences.feature2"));
// add locations.value.lax before locations.value.long
newSchema = addOperationForSchemaChangeApplier(newSchema, "locations.value.lax", Types.BooleanType.get(), "add locations.value.lax before locations.value.long");
newSchema = reOrderOperationForSchemaChangeApplier(newSchema, "locations.value.lax", "locations.value.long", ColumnPositionType.BEFORE);
//
// add points.element.z after points.element.y
newSchema = addOperationForSchemaChangeApplier(newSchema, "points.element.z", Types.BooleanType.get(), "add points.element.z after points.element.y", "points.element.y", ColumnPositionType.AFTER);
InternalSchema checkedSchema = new InternalSchema(
Types.Field.get(19, true, "c1", Types.StringType.get(), "add c1 first"),
Types.Field.get(0, false, "id", Types.IntType.get()),
Types.Field.get(1, true, "data", Types.StringType.get()),
Types.Field.get(2, true, "preferences",
Types.RecordType.get(Types.Field.get(7, false, "feature1", Types.BooleanType.get()),
Types.Field.get(20, true, "cx", Types.BooleanType.get(), "add preferences.cx before preferences.feature2"),
Types.Field.get(8, true, "feature2", Types.BooleanType.get()))),
Types.Field.get(3, false, "locations", Types.MapType.get(9, 10, Types.StringType.get(),
Types.RecordType.get(Types.Field.get(11, false, "lat", Types.FloatType.get()),
Types.Field.get(21, true, "lax", Types.BooleanType.get(), "add locations.value.lax before locations.value.long"),
Types.Field.get(12, false, "long", Types.FloatType.get())), false)),
Types.Field.get(4, true, "points", Types.ArrayType.get(13, true,
Types.RecordType.get(Types.Field.get(14, false, "x", Types.LongType.get()),
Types.Field.get(15, false, "y", Types.LongType.get()),
Types.Field.get(22, true, "z", Types.BooleanType.get(), "add points.element.z after points.element.y")))),
Types.Field.get(5, false,"doubles", Types.ArrayType.get(16, false, Types.DoubleType.get())),
Types.Field.get(6, true, "properties", Types.MapType.get(17, 18, Types.StringType.get(), Types.StringType.get()))
);
Assertions.assertEquals(newSchema.getRecord(), checkedSchema.getRecord());
}

private static InternalSchema addOperationForSchemaChangeApplier(
InternalSchema schema,
String colName,
Type colType,
String doc,
String position,
TableChange.ColumnPositionChange.ColumnPositionType positionType) {
InternalSchemaChangeApplier applier = new InternalSchemaChangeApplier(schema);
return applier.applyAddChange(colName, colType, doc, position, positionType);
}

private static InternalSchema reOrderOperationForSchemaChangeApplier(
InternalSchema schema,
String colName,
String position,
TableChange.ColumnPositionChange.ColumnPositionType positionType) {
InternalSchemaChangeApplier applier = new InternalSchemaChangeApplier(schema);
return applier.applyReOrderColPositionChange(colName, position, positionType);
}

private static InternalSchema addOperationForSchemaChangeApplier(
InternalSchema schema,
String colName,
Type colType,
String doc) {
return addOperationForSchemaChangeApplier(schema, colName, colType, doc, "",
ColumnPositionType.NO_OPERATION);
}
}