Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -156,6 +157,19 @@ private Function<ResolvedCatalogMaterializedTable, List<TableChange>> buildTable
throw new ValidationException("Changing of REFRESH MODE is unsupported");
}

final TableDistribution oldDistribution = oldTable.getDistribution().orElse(null);
final TableDistribution newDistribution =
mergeContext.getMergedTableDistribution().orElse(null);
if (!Objects.equals(oldDistribution, newDistribution)) {
if (oldDistribution == null) {
changes.add(TableChange.add(newDistribution));
} else if (newDistribution == null) {
changes.add(TableChange.dropDistribution());
} else {
changes.add(TableChange.modify(newDistribution));
}
}

return changes;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.TableDistribution.Kind;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
Expand Down Expand Up @@ -724,6 +726,7 @@ void testCreateOrAlterMaterializedTableForExistingTable() throws TableNotExistEx
+ " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+ ")\n"
+ "COMMENT 'materialized table comment'\n"
+ "DISTRIBUTED BY HASH (b) INTO 7 BUCKETS\n"
+ "PARTITIONED BY (a, d)\n"
+ "WITH (\n"
+ " 'format' = 'json2'\n"
Expand All @@ -745,7 +748,8 @@ void testCreateOrAlterMaterializedTableForExistingTable() throws TableNotExistEx
"SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
+ "FROM `builtin`.`default`.`t3` AS `t3`"),
TableChange.set("format", "json2"),
TableChange.reset("connector"));
TableChange.reset("connector"),
TableChange.add(TableDistribution.of(Kind.HASH, 7, List.of("b"))));
assertThat(operation.asSummaryString())
.isEqualTo(
"CREATE OR ALTER MATERIALIZED TABLE builtin.default.base_mtbl\n"
Expand All @@ -754,7 +758,8 @@ void testCreateOrAlterMaterializedTableForExistingTable() throws TableNotExistEx
+ " MODIFY DEFINITION QUERY TO 'SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
+ "FROM `builtin`.`default`.`t3` AS `t3`',\n"
+ " SET 'format' = 'json2',\n"
+ " RESET 'connector'");
+ " RESET 'connector',\n"
+ " ADD DISTRIBUTED BY HASH(`b`) INTO 7 BUCKETS");

// new table only difference schema & definition query with old table.
CatalogMaterializedTable oldTable =
Expand Down Expand Up @@ -792,6 +797,49 @@ void testCreateOrAlterMaterializedTableForExistingTable() throws TableNotExistEx
new UnresolvedPhysicalColumn("f", DataTypes.VARCHAR(Integer.MAX_VALUE)));
}

@Test
void testCreateOrAlterMaterializedTableWithDistributionForExistingTable() {
final String sql =
"CREATE OR ALTER MATERIALIZED TABLE base_mtbl_with_metadata (\n"
+ " t AS current_timestamp,"
+ " m STRING METADATA VIRTUAL,"
+ " m_p STRING METADATA,"
+ " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED,"
+ " WATERMARK FOR t as current_timestamp - INTERVAL '5' SECOND"
+ ")\n"
+ "COMMENT 'materialized table comment'\n"
+ "DISTRIBUTED BY HASH (a) INTO 5 BUCKETS\n"
+ "WITH (\n"
+ " 'connector' = 'filesystem', \n"
+ " 'format' = 'json'\n"
+ ")\n"
+ "FRESHNESS = INTERVAL '30' SECOND\n"
+ "REFRESH_MODE = FULL\n"
+ "AS SELECT t1.* FROM t1";
Operation operation = parse(sql);

assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);

FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation;
assertThat(op.getTableChanges())
.containsExactly(
TableChange.modifyDefinitionQuery(
"SELECT `t1`.*\n" + "FROM `t1`",
"SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
+ "FROM `builtin`.`default`.`t1` AS `t1`"),
TableChange.set("connector", "filesystem"),
TableChange.set("format", "json"),
TableChange.modify(TableDistribution.of(Kind.HASH, 5, List.of("a"))));
assertThat(operation.asSummaryString())
.isEqualTo(
"CREATE OR ALTER MATERIALIZED TABLE builtin.default.base_mtbl_with_metadata\n"
+ " MODIFY DEFINITION QUERY TO 'SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
+ "FROM `builtin`.`default`.`t1` AS `t1`',\n"
+ " SET 'connector' = 'filesystem',\n"
+ " SET 'format' = 'json',\n"
+ " MODIFY DISTRIBUTED BY HASH(`a`) INTO 5 BUCKETS");
}

private static Collection<TestSpec> testDataForCreateAlterMaterializedTableFailedCase() {
final Collection<TestSpec> list = new ArrayList<>();
list.addAll(createWithInvalidSchema());
Expand Down