Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private ResultFetcher callCreateMaterializedTableOperation(
OperationExecutor operationExecutor,
OperationHandle handle,
CreateMaterializedTableOperation createMaterializedTableOperation) {
CatalogMaterializedTable materializedTable =
ResolvedCatalogMaterializedTable materializedTable =
createMaterializedTableOperation.getCatalogMaterializedTable();
if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) {
createMaterializedTableInContinuousMode(
Expand All @@ -220,7 +220,7 @@ private void createMaterializedTableInContinuousMode(

ObjectIdentifier materializedTableIdentifier =
createMaterializedTableOperation.getTableIdentifier();
CatalogMaterializedTable catalogMaterializedTable =
ResolvedCatalogMaterializedTable catalogMaterializedTable =
createMaterializedTableOperation.getCatalogMaterializedTable();

try {
Expand Down Expand Up @@ -257,7 +257,7 @@ private void createMaterializedTableInFullMode(

ObjectIdentifier materializedTableIdentifier =
createMaterializedTableOperation.getTableIdentifier();
CatalogMaterializedTable catalogMaterializedTable =
ResolvedCatalogMaterializedTable catalogMaterializedTable =
createMaterializedTableOperation.getCatalogMaterializedTable();

// convert duration to cron expression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ public class SqlCreateMaterializedTable extends SqlCreate {

private final SqlIdentifier tableName;

private final SqlCharStringLiteral comment;
private final @Nullable SqlTableConstraint tableConstraint;

private final SqlTableConstraint tableConstraint;
private final @Nullable SqlCharStringLiteral comment;

private final SqlDistribution distribution;
private final @Nullable SqlDistribution distribution;

private final SqlNodeList partitionKeyList;

private final SqlNodeList propertyList;

private final SqlIntervalLiteral freshness;

@Nullable private final SqlLiteral refreshMode;
private final @Nullable SqlLiteral refreshMode;

private final SqlNode asQuery;

Expand All @@ -80,8 +80,8 @@ public SqlCreateMaterializedTable(
SqlNode asQuery) {
super(OPERATOR, pos, false, false);
this.tableName = requireNonNull(tableName, "tableName should not be null");
this.comment = comment;
this.tableConstraint = tableConstraint;
this.comment = comment;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-ordered the fields they way they are defined in the constructor

this.distribution = distribution;
this.partitionKeyList =
requireNonNull(partitionKeyList, "partitionKeyList should not be null");
Expand Down Expand Up @@ -124,7 +124,7 @@ public Optional<SqlTableConstraint> getTableConstraint() {
return Optional.ofNullable(tableConstraint);
}

public SqlDistribution getDistribution() {
public @Nullable SqlDistribution getDistribution() {
return distribution;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.operations.Operation;
Expand All @@ -38,7 +37,7 @@ public class CreateMaterializedTableOperation
implements CreateOperation, MaterializedTableOperation {

private final ObjectIdentifier tableIdentifier;
private final CatalogMaterializedTable materializedTable;
private final ResolvedCatalogMaterializedTable materializedTable;

public CreateMaterializedTableOperation(
ObjectIdentifier tableIdentifier, ResolvedCatalogMaterializedTable materializedTable) {
Expand All @@ -57,7 +56,7 @@ public ObjectIdentifier getTableIdentifier() {
return tableIdentifier;
}

public CatalogMaterializedTable getCatalogMaterializedTable() {
public ResolvedCatalogMaterializedTable getCatalogMaterializedTable() {
return materializedTable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ void testCreateMaterializedTable() {
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);

CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation;
CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
ResolvedCatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();

Map<String, String> options = new HashMap<>();
options.put("connector", "filesystem");
Expand All @@ -152,8 +151,7 @@ void testCreateMaterializedTable() {
.definitionQuery("SELECT *\n" + "FROM `builtin`.`default`.`t1`")
.build();

assertThat(((ResolvedCatalogMaterializedTable) materializedTable).getOrigin())
.isEqualTo(expected);
assertThat(materializedTable.getOrigin()).isEqualTo(expected);
}

@Test
Expand Down Expand Up @@ -202,8 +200,7 @@ void testContinuousRefreshMode() {
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);

CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation;
CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
ResolvedCatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();

assertThat(materializedTable.getLogicalRefreshMode())
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
Expand All @@ -220,8 +217,7 @@ void testContinuousRefreshMode() {
assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class);

CreateMaterializedTableOperation op2 = (CreateMaterializedTableOperation) operation2;
CatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable();
assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class);
ResolvedCatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable();

assertThat(materializedTable2.getLogicalRefreshMode())
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS);
Expand All @@ -240,8 +236,7 @@ void testFullRefreshMode() {
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);

CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation;
CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
ResolvedCatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();

assertThat(materializedTable.getLogicalRefreshMode())
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
Expand All @@ -258,8 +253,7 @@ void testFullRefreshMode() {
assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class);

CreateMaterializedTableOperation op2 = (CreateMaterializedTableOperation) operation2;
CatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable();
assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class);
ResolvedCatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable();

assertThat(materializedTable2.getLogicalRefreshMode())
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL);
Expand Down