Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,7 @@ public static void applySchemaChanges(
UpdateSchema pendingUpdate, List<TableChange> schemaChanges) {
for (TableChange change : schemaChanges) {
if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
Column flinkColumn = addColumn.getColumn();
Preconditions.checkArgument(
FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
"Unsupported table change: Adding computed column %s.",
flinkColumn.getName());
Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
if (flinkColumn.getDataType().getLogicalType().isNullable()) {
pendingUpdate.addColumn(
flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null));
} else {
pendingUpdate.addRequiredColumn(
flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null));
}
applyAddColumn(pendingUpdate, (TableChange.AddColumn) change);
} else if (change instanceof TableChange.ModifyColumn) {
TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change;
applyModifyColumn(pendingUpdate, modifyColumn);
Expand All @@ -164,6 +151,31 @@ public static void applySchemaChanges(
}
}

private static void applyAddColumn(UpdateSchema pendingUpdate, TableChange.AddColumn addColumn) {
Column flinkColumn = addColumn.getColumn();
Preconditions.checkArgument(
FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
"Unsupported table change: Adding computed column %s.",
flinkColumn.getName());

Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());

if (flinkColumn.getDataType().getLogicalType().isNullable()) {
pendingUpdate.addColumn(
flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null));
} else {
pendingUpdate.addRequiredColumn(
flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null));
}

if (addColumn.getPosition() != null) {
TableChange.ColumnPosition position = addColumn.getPosition();
TableChange.ModifyColumnPosition modifyColumnPosition =
new TableChange.ModifyColumnPosition(addColumn.getColumn(), position);
applyModifyColumnPosition(pendingUpdate, modifyColumnPosition);
}
}

/**
* Applies a list of Flink table property changes to an {@link UpdateProperties} operation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,31 @@ public void testAlterTableAddColumn() {
.hasMessageContaining("Try to add a column `id` which already exists in the table.");
}

@TestTemplate
public void testAlterTableAddColumnPosition() {
sql("CREATE TABLE tl(id BIGINT, name STRING)");
Schema schemaBefore = table("tl").schema();
assertThat(schemaBefore.asStruct())
.isEqualTo(
new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "name", Types.StringType.get()))
.asStruct());

sql("ALTER TABLE tl ADD (col1 STRING FIRST)");
sql("ALTER TABLE tl ADD (col2 INT AFTER id)");

Schema schemaAfter = table("tl").schema();
assertThat(schemaAfter.asStruct())
.isEqualTo(
new Schema(
Types.NestedField.optional(3, "col1", Types.StringType.get()),
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(4, "col2", Types.IntegerType.get()),
Types.NestedField.optional(2, "name", Types.StringType.get()))
.asStruct());
}

@TestTemplate
public void testAlterTableDropColumn() {
sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,7 @@ public static void applySchemaChanges(
UpdateSchema pendingUpdate, List<TableChange> schemaChanges) {
for (TableChange change : schemaChanges) {
if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
Column flinkColumn = addColumn.getColumn();
Preconditions.checkArgument(
FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
"Unsupported table change: Adding computed column %s.",
flinkColumn.getName());
Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
if (flinkColumn.getDataType().getLogicalType().isNullable()) {
pendingUpdate.addColumn(
flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null));
} else {
pendingUpdate.addRequiredColumn(
flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null));
}
applyAddColumn(pendingUpdate, (TableChange.AddColumn) change);
} else if (change instanceof TableChange.ModifyColumn) {
TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change;
applyModifyColumn(pendingUpdate, modifyColumn);
Expand All @@ -164,6 +151,31 @@ public static void applySchemaChanges(
}
}

private static void applyAddColumn(UpdateSchema pendingUpdate, TableChange.AddColumn addColumn) {
Column flinkColumn = addColumn.getColumn();
Preconditions.checkArgument(
FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
"Unsupported table change: Adding computed column %s.",
flinkColumn.getName());

Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());

if (flinkColumn.getDataType().getLogicalType().isNullable()) {
pendingUpdate.addColumn(
flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null));
} else {
pendingUpdate.addRequiredColumn(
flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null));
}

if (addColumn.getPosition() != null) {
TableChange.ColumnPosition position = addColumn.getPosition();
TableChange.ModifyColumnPosition modifyColumnPosition =
new TableChange.ModifyColumnPosition(addColumn.getColumn(), position);
applyModifyColumnPosition(pendingUpdate, modifyColumnPosition);
}
}

/**
* Applies a list of Flink table property changes to an {@link UpdateProperties} operation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,31 @@ public void testAlterTableAddColumn() {
.hasMessageContaining("Try to add a column `id` which already exists in the table.");
}

@TestTemplate
public void testAlterTableAddColumnPosition() {
sql("CREATE TABLE tl(id BIGINT, name STRING)");
Schema schemaBefore = table("tl").schema();
assertThat(schemaBefore.asStruct())
.isEqualTo(
new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "name", Types.StringType.get()))
.asStruct());

sql("ALTER TABLE tl ADD (col1 STRING FIRST)");
sql("ALTER TABLE tl ADD (col2 INT AFTER id)");

Schema schemaAfter = table("tl").schema();
assertThat(schemaAfter.asStruct())
.isEqualTo(
new Schema(
Types.NestedField.optional(3, "col1", Types.StringType.get()),
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(4, "col2", Types.IntegerType.get()),
Types.NestedField.optional(2, "name", Types.StringType.get()))
.asStruct());
}

@TestTemplate
public void testAlterTableDropColumn() {
sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)");
Expand Down