diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index 854ea910c285f..323e0ca17b757 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -197,7 +197,7 @@ private ResultFetcher callCreateMaterializedTableOperation( OperationExecutor operationExecutor, OperationHandle handle, CreateMaterializedTableOperation createMaterializedTableOperation) { - CatalogMaterializedTable materializedTable = + ResolvedCatalogMaterializedTable materializedTable = createMaterializedTableOperation.getCatalogMaterializedTable(); if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) { createMaterializedTableInContinuousMode( @@ -220,7 +220,7 @@ private void createMaterializedTableInContinuousMode( ObjectIdentifier materializedTableIdentifier = createMaterializedTableOperation.getTableIdentifier(); - CatalogMaterializedTable catalogMaterializedTable = + ResolvedCatalogMaterializedTable catalogMaterializedTable = createMaterializedTableOperation.getCatalogMaterializedTable(); try { @@ -257,7 +257,7 @@ private void createMaterializedTableInFullMode( ObjectIdentifier materializedTableIdentifier = createMaterializedTableOperation.getTableIdentifier(); - CatalogMaterializedTable catalogMaterializedTable = + ResolvedCatalogMaterializedTable catalogMaterializedTable = createMaterializedTableOperation.getCatalogMaterializedTable(); // convert duration to cron expression diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java index 626c3c63ebda1..5a9245c5dd02d 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java @@ -51,11 +51,11 @@ public class SqlCreateMaterializedTable extends SqlCreate { private final SqlIdentifier tableName; - private final SqlCharStringLiteral comment; + private final @Nullable SqlTableConstraint tableConstraint; - private final SqlTableConstraint tableConstraint; + private final @Nullable SqlCharStringLiteral comment; - private final SqlDistribution distribution; + private final @Nullable SqlDistribution distribution; private final SqlNodeList partitionKeyList; @@ -63,7 +63,7 @@ public class SqlCreateMaterializedTable extends SqlCreate { private final SqlIntervalLiteral freshness; - @Nullable private final SqlLiteral refreshMode; + private final @Nullable SqlLiteral refreshMode; private final SqlNode asQuery; @@ -80,8 +80,8 @@ public SqlCreateMaterializedTable( SqlNode asQuery) { super(OPERATOR, pos, false, false); this.tableName = requireNonNull(tableName, "tableName should not be null"); - this.comment = comment; this.tableConstraint = tableConstraint; + this.comment = comment; this.distribution = distribution; this.partitionKeyList = requireNonNull(partitionKeyList, "partitionKeyList should not be null"); @@ -124,7 +124,7 @@ public Optional getTableConstraint() { return Optional.ofNullable(tableConstraint); } - public SqlDistribution getDistribution() { + public @Nullable SqlDistribution getDistribution() { return distribution; } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java index d4eff00254d9e..9fb68dc71b00e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.internal.TableResultImpl; import org.apache.flink.table.api.internal.TableResultInternal; -import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.operations.Operation; @@ -38,7 +37,7 @@ public class CreateMaterializedTableOperation implements CreateOperation, MaterializedTableOperation { private final ObjectIdentifier tableIdentifier; - private final CatalogMaterializedTable materializedTable; + private final ResolvedCatalogMaterializedTable materializedTable; public CreateMaterializedTableOperation( ObjectIdentifier tableIdentifier, ResolvedCatalogMaterializedTable materializedTable) { @@ -57,7 +56,7 @@ public ObjectIdentifier getTableIdentifier() { return tableIdentifier; } - public CatalogMaterializedTable getCatalogMaterializedTable() { + public ResolvedCatalogMaterializedTable getCatalogMaterializedTable() { return materializedTable; } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 49d74708c675e..acc651b1e5b55 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -126,8 +126,7 @@ void testCreateMaterializedTable() { assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation; - CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable(); - assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class); + ResolvedCatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable(); Map options = new HashMap<>(); options.put("connector", "filesystem"); @@ -152,8 +151,7 @@ void testCreateMaterializedTable() { .definitionQuery("SELECT *\n" + "FROM `builtin`.`default`.`t1`") .build(); - assertThat(((ResolvedCatalogMaterializedTable) materializedTable).getOrigin()) - .isEqualTo(expected); + assertThat(materializedTable.getOrigin()).isEqualTo(expected); } @Test @@ -202,8 +200,7 @@ void testContinuousRefreshMode() { assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation; - CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable(); - assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class); + ResolvedCatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable(); assertThat(materializedTable.getLogicalRefreshMode()) .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC); @@ -220,8 +217,7 @@ void testContinuousRefreshMode() { assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class); CreateMaterializedTableOperation op2 = (CreateMaterializedTableOperation) operation2; - CatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable(); - assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class); + ResolvedCatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable(); assertThat(materializedTable2.getLogicalRefreshMode()) .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS); @@ -240,8 +236,7 @@ void testFullRefreshMode() { assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation; - CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable(); - assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class); + ResolvedCatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable(); assertThat(materializedTable.getLogicalRefreshMode()) .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC); @@ -258,8 +253,7 @@ void testFullRefreshMode() { assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class); CreateMaterializedTableOperation op2 = (CreateMaterializedTableOperation) operation2; - CatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable(); - assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class); + ResolvedCatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable(); assertThat(materializedTable2.getLogicalRefreshMode()) .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL);