From 7e2b5f7bc2a6137445c6fbca41fe705d96599932 Mon Sep 17 00:00:00 2001 From: xuyang Date: Tue, 7 May 2024 22:33:10 +0800 Subject: [PATCH 1/5] [FLINK-35193][table] Support the execution of refresh materialized table --- .../MaterializedTableManager.java | 89 ++++++++++++++++++ .../MaterializedTableStatementITCase.java | 92 +++++++++++++++++++ .../parser/ddl/SqlAlterMaterializedTable.java | 4 + .../ddl/SqlAlterMaterializedTableRefresh.java | 5 + .../table/operations/OperationUtils.java | 6 +- ...lterMaterializedTableRefreshOperation.java | 68 ++++++++++++++ ...lterMaterializedTableRefreshConverter.java | 56 +++++++++++ .../converters/SqlNodeConverters.java | 1 + 8 files changed, 320 insertions(+), 1 deletion(-) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableRefreshOperation.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableRefreshConverter.java diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index fed60634a3abb..8b5f99063bb99 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -23,6 +23,8 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.data.RowData; import org.apache.flink.table.gateway.api.operation.OperationHandle; @@ -31,6 +33,7 @@ import org.apache.flink.table.gateway.service.result.ResultFetcher; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation; @@ -41,8 +44,12 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH; import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING; import static org.apache.flink.configuration.DeploymentOptions.TARGET; import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; @@ -65,6 +72,11 @@ public static ResultFetcher callMaterializedTableOperation( return callCreateMaterializedTableOperation( operationExecutor, handle, (CreateMaterializedTableOperation) op); } + if (op instanceof AlterMaterializedTableRefreshOperation) { + return callAlterMaterializedTableRefreshOperation( + operationExecutor, handle, (AlterMaterializedTableRefreshOperation) op); + } + throw new SqlExecutionException( String.format( "Unsupported Operation %s for materialized table.", op.asSummaryString())); @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); + if (!(table instanceof ResolvedCatalogMaterializedTable)) { + throw new TableException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + Set allPartitionKeys = + new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); + Set unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); + unknownPartitionKeys.removeAll(allPartitionKeys); + if (!unknownPartitionKeys.isEmpty()) { + throw new TableException( + String.format( + "The partition spec contains unknown partition keys: %s.", + unknownPartitionKeys)); + } + + // Set job name, runtime mode, checkpoint interval + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT INTO %s SELECT * FROM (%s)", + materializedTableIdentifier, + materializedTable.getDefinitionQuery())); + + if (!partitionSpec.isEmpty()) { + insertStatement.append(" WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .orElseThrow(() -> new TableException("Could not happen"))); + } + + try { + // return jobId for one time refresh, user should get the refresh job info via desc + // job. + return operationExecutor.executeStatement(handle, insertStatement.toString()); + } catch (Exception e) { + // log and throw exception + LOG.error( + "Refresh job manually for materialized table {} occur exception.", + materializedTableIdentifier, + e); + throw new TableException( + String.format( + "Refresh job manually for materialized table %s occur exception.", + materializedTableIdentifier), + e); + } + } + private static List fetchAllResults(ResultFetcher resultFetcher) { Long token = 0L; List results = new ArrayList<>(); diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index 29ab697f384c0..e829f0d8dd367 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -21,12 +21,15 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion; @@ -47,8 +50,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -236,6 +241,80 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } + @Test + void testAlterMaterializedTableRefresh() throws Exception { + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + SessionHandle sessionHandle = initializeSession(); + + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // check unknown partition keys + String alterStatementWithUnknownPartitionKey = + "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds2 = '2023-01-01')"; + OperationHandle alterStatementWithUnknownPartitionKeyHandle = + service.executeStatement( + sessionHandle, + alterStatementWithUnknownPartitionKey, + -1, + new Configuration()); + + assertThatThrownBy( + () -> + awaitOperationTermination( + service, + sessionHandle, + alterStatementWithUnknownPartitionKeyHandle)) + .isInstanceOf(SqlExecutionException.class) + .rootCause() + .isInstanceOf(TableException.class) + .hasMessage("The partition spec contains unknown partition keys: [ds2]."); + + // check valid statement + long currentTime = System.currentTimeMillis(); + String alterStatement = + "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds = '2023-01-01')"; + OperationHandle alterHandle = + service.executeStatement(sessionHandle, alterStatement, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterHandle); + List result = fetchAllResults(sessionHandle, alterHandle); + assertThat(result.size()).isEqualTo(1); + String jobId = result.get(0).getString(0).toString(); + + OperationHandle describeJobOperationHandle = + service.executeStatement( + sessionHandle, + String.format("DESCRIBE JOB '%s'", jobId), + -1, + new Configuration()); + + result = fetchAllResults(sessionHandle, describeJobOperationHandle); + assertThat(result.size()).isEqualTo(1); + RowData jobRow = result.get(0); + assertThat(jobRow.getTimestamp(3, 3).getMillisecond()).isGreaterThan(currentTime); + } + private SessionHandle initializeSession() { SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); String catalogDDL = @@ -271,4 +350,17 @@ private SessionHandle initializeSession() { service.configureSession(sessionHandle, dataGenSource, -1); return sessionHandle; } + + private List fetchAllResults( + SessionHandle sessionHandle, OperationHandle operationHandle) { + Long token = 0L; + List results = new ArrayList<>(); + while (token != null) { + ResultSet result = + service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE); + results.addAll(result.getData()); + token = result.getNextToken(); + } + return results; + } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java index f8037b05e4c70..3380b6a7e2032 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java @@ -48,6 +48,10 @@ public SqlIdentifier getTableName() { return tableIdentifier; } + public String[] fullTableName() { + return tableIdentifier.names.toArray(new String[0]); + } + @Override public SqlOperator getOperator() { return OPERATOR; diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java index 26ec1bcd7c74b..db2945a7e9262 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java @@ -58,4 +58,9 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer, getOperator().getLeftPrec(), getOperator().getRightPrec()); } } + + @Nullable + public SqlNodeList getPartitionSpec() { + return partitionSpec; + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java index 7bf9f2c54f797..ab9a8675f887d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java @@ -145,7 +145,11 @@ public static String formatProperties(Map properties) { } public static String formatPartitionSpec(CatalogPartitionSpec spec) { - return spec.getPartitionSpec().entrySet().stream() + return formatPartitionSpec(spec.getPartitionSpec()); + } + + public static String formatPartitionSpec(Map spec) { + return spec.entrySet().stream() .map(entry -> entry.getKey() + "=" + entry.getValue()) .collect(Collectors.joining(", ")); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableRefreshOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableRefreshOperation.java new file mode 100644 index 0000000000000..5212cf58c7ff4 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableRefreshOperation.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Map; + +/** + * Operation to describe clause like: ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name + * REFRESH [PARTITION (key1=val1, key2=val2, ...)]. + */ +@Internal +public class AlterMaterializedTableRefreshOperation extends AlterMaterializedTableOperation { + + private final Map partitionSpec; + + public AlterMaterializedTableRefreshOperation( + ObjectIdentifier tableIdentifier, Map partitionSpec) { + super(tableIdentifier); + this.partitionSpec = partitionSpec; + } + + @Override + public TableResultInternal execute(Context ctx) { + // execute AlterMaterializedTableRefreshOperation in SqlGateway OperationExecutor. + // see more at MaterializedTableManager#callAlterMaterializedTableRefreshOperation + throw new UnsupportedOperationException( + "AlterMaterializedTableRefreshOperation does not support ExecutableOperation yet."); + } + + public Map getPartitionSpec() { + return partitionSpec; + } + + @Override + public String asSummaryString() { + StringBuilder sb = + new StringBuilder( + String.format("ALTER MATERIALIZED TABLE %s REFRESH", tableIdentifier)); + if (!partitionSpec.isEmpty()) { + sb.append( + String.format( + " PARTITION (%s)", OperationUtils.formatPartitionSpec(partitionSpec))); + } + + return sb.toString(); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableRefreshConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableRefreshConverter.java new file mode 100644 index 0000000000000..8b8f76e3b00fb --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableRefreshConverter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.SqlProperty; +import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefresh; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; +import org.apache.flink.util.Preconditions; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** A converter for {@link SqlAlterMaterializedTableRefresh}. */ +public class SqlAlterMaterializedTableRefreshConverter + implements SqlNodeConverter { + + @Override + public Operation convertSqlNode(SqlAlterMaterializedTableRefresh node, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(node.fullTableName()); + ObjectIdentifier identifier = + context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + + Map partitionSpec = new LinkedHashMap<>(); + if (node.getPartitionSpec() != null) { + node.getPartitionSpec() + .forEach( + spec -> { + Preconditions.checkArgument(spec instanceof SqlProperty); + SqlProperty property = (SqlProperty) spec; + partitionSpec.put( + property.getKeyString(), property.getValueString()); + }); + } + + return new AlterMaterializedTableRefreshOperation(identifier, partitionSpec); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index fc5e3bd498cbc..4684759f12129 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -57,6 +57,7 @@ public class SqlNodeConverters { register(new SqlDescribeCatalogConverter()); register(new SqlDescribeJobConverter()); register(new SqlCreateMaterializedTableConverter()); + register(new SqlAlterMaterializedTableRefreshConverter()); } /** From c494455573034fca654c64bdbe07f15f6ba0eac5 Mon Sep 17 00:00:00 2001 From: xuyang Date: Fri, 10 May 2024 21:51:19 +0800 Subject: [PATCH 2/5] address comment --- .../MaterializedTableManager.java | 95 +++++--- .../MaterializedTableStatementITCase.java | 214 ++++++++++++++---- .../service/SqlGatewayServiceITCase.java | 30 +-- .../utils/SqlGatewayServiceTestUtil.java | 19 ++ .../ddl/SqlAlterMaterializedTableRefresh.java | 7 +- ...lterMaterializedTableRefreshConverter.java | 17 +- ...izedTableNodeToOperationConverterTest.java | 30 +++ .../test/junit5/MiniClusterExtension.java | 4 + 8 files changed, 312 insertions(+), 104 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index 8b5f99063bb99..9ddec6d52a69d 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -20,11 +20,12 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.data.RowData; import org.apache.flink.table.gateway.api.operation.OperationHandle; @@ -39,6 +40,7 @@ import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation; import org.apache.flink.table.refresh.ContinuousRefreshHandler; import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer; +import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH; import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING; @@ -56,6 +59,7 @@ import static org.apache.flink.configuration.PipelineOptions.NAME; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL; import static org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK; +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; /** Manager is responsible for execute the {@link MaterializedTableOperation}. */ @Internal @@ -71,8 +75,7 @@ public static ResultFetcher callMaterializedTableOperation( if (op instanceof CreateMaterializedTableOperation) { return callCreateMaterializedTableOperation( operationExecutor, handle, (CreateMaterializedTableOperation) op); - } - if (op instanceof AlterMaterializedTableRefreshOperation) { + } else if (op instanceof AlterMaterializedTableRefreshOperation) { return callAlterMaterializedTableRefreshOperation( operationExecutor, handle, (AlterMaterializedTableRefreshOperation) op); } @@ -130,7 +133,7 @@ private static void createMaterializedInContinuousMode( try { // submit flink streaming job ResultFetcher resultFetcher = - operationExecutor.executeStatement(handle, insertStatement); + operationExecutor.executeStatement(handle, customConfig, insertStatement); // get execution.target and jobId, currently doesn't support yarn and k8s, so doesn't // get clusterId @@ -173,7 +176,7 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); - throw new TableException( + throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), @@ -188,8 +191,8 @@ private static ResultFetcher callAlterMaterializedTableRefreshOperation( ObjectIdentifier materializedTableIdentifier = alterMaterializedTableRefreshOperation.getTableIdentifier(); ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); - if (!(table instanceof ResolvedCatalogMaterializedTable)) { - throw new TableException( + if (MATERIALIZED_TABLE != table.getTableKind()) { + throw new ValidationException( String.format( "The table '%s' is not a materialized table.", materializedTableIdentifier)); @@ -201,18 +204,9 @@ private static ResultFetcher callAlterMaterializedTableRefreshOperation( Map partitionSpec = alterMaterializedTableRefreshOperation.getPartitionSpec(); - Set allPartitionKeys = - new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); - Set unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); - unknownPartitionKeys.removeAll(allPartitionKeys); - if (!unknownPartitionKeys.isEmpty()) { - throw new TableException( - String.format( - "The partition spec contains unknown partition keys: %s.", - unknownPartitionKeys)); - } + validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); - // Set job name, runtime mode, checkpoint interval + // Set job name, runtime mode Configuration customConfig = new Configuration(); String jobName = String.format( @@ -224,7 +218,7 @@ private static ResultFetcher callAlterMaterializedTableRefreshOperation( StringBuilder insertStatement = new StringBuilder( String.format( - "INSERT INTO %s SELECT * FROM (%s)", + "INSERT OVERWRITE %s SELECT * FROM (%s)", materializedTableIdentifier, materializedTable.getDefinitionQuery())); @@ -237,27 +231,76 @@ private static ResultFetcher callAlterMaterializedTableRefreshOperation( String.format( "%s = '%s'", entry.getKey(), entry.getValue())) .reduce((s1, s2) -> s1 + " AND " + s2) - .orElseThrow(() -> new TableException("Could not happen"))); + .get()); } try { - // return jobId for one time refresh, user should get the refresh job info via desc - // job. - return operationExecutor.executeStatement(handle, insertStatement.toString()); + LOG.debug( + "Begin to manually refreshing the materialization table {}, statement: {}", + materializedTableIdentifier, + insertStatement); + return operationExecutor.executeStatement( + handle, customConfig, insertStatement.toString()); } catch (Exception e) { // log and throw exception LOG.error( - "Refresh job manually for materialized table {} occur exception.", + "Manually refreshing the materialization table {} occur exception.", materializedTableIdentifier, e); - throw new TableException( + throw new SqlExecutionException( String.format( - "Refresh job manually for materialized table %s occur exception.", + "Manually refreshing the materialization table %s occur exception.", materializedTableIdentifier), e); } } + private static void validatePartitionSpec( + Map partitionSpec, ResolvedCatalogMaterializedTable table) { + ResolvedSchema schema = table.getResolvedSchema(); + Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + + Set unknownPartitionKeys = new HashSet<>(); + Set nonStringPartitionKeys = new HashSet<>(); + + for (String partitionKey : partitionSpec.keySet()) { + if (!schema.getColumn(partitionKey).isPresent()) { + unknownPartitionKeys.add(partitionKey); + continue; + } + + if (!schema.getColumn(partitionKey) + .get() + .getDataType() + .getLogicalType() + .getTypeRoot() + .getFamilies() + .contains(LogicalTypeFamily.CHARACTER_STRING)) { + nonStringPartitionKeys.add(partitionKey); + } + } + + if (!unknownPartitionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "The partition spec contains unknown partition keys: [%s]. All known partition keys are: [%s].", + unknownPartitionKeys.stream() + .collect(Collectors.joining("', '", "'", "'")), + allPartitionKeys.stream() + .collect(Collectors.joining("', '", "'", "'")))); + } + + if (!nonStringPartitionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "Currently, specifying non-char or non-string type partition fields" + + " to refresh materialized tables is not supported." + + " All specific partition keys with unsupported types are: [%s].", + nonStringPartitionKeys.stream() + .collect(Collectors.joining("', '", "'", "'")))); + } + } + private static List fetchAllResults(ResultFetcher resultFetcher) { Long token = 0L; List results = new ArrayList<>(); diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index e829f0d8dd367..efb88cabb8e8a 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -18,10 +18,17 @@ package org.apache.flink.table.gateway.service; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobType; +import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -29,15 +36,16 @@ import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.gateway.api.operation.OperationHandle; -import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion; import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.types.Row; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.junit.jupiter.api.BeforeAll; @@ -55,12 +63,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_STORE_KIND; import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination; +import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -243,36 +254,166 @@ void testCreateMaterializedTableInFullMode() { @Test void testAlterMaterializedTableRefresh() throws Exception { + long timeout = Duration.ofSeconds(20).toMillis(); + long pause = Duration.ofSeconds(2).toMillis(); // initialize session handle, create test-filesystem catalog and register it to catalog // store SessionHandle sessionHandle = initializeSession(); + List data = new ArrayList<>(); + data.add(Row.of(1L, 1L, 1L, "2024-01-01")); + data.add(Row.of(2L, 2L, 2L, "2024-01-02")); + data.add(Row.of(3L, 3L, 3L, "2024-01-02")); + String dataId = TestValuesTableFactory.registerData(data); + + String sourceDdl = + String.format( + "CREATE TABLE my_source (\n" + + " order_id BIGINT,\n" + + " user_id BIGINT,\n" + + " shop_id BIGINT,\n" + + " order_created_at STRING\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true',\n" + + " 'data-id' = '%s'\n" + + ")", + dataId); + service.executeStatement(sessionHandle, sourceDdl, -1, new Configuration()); + String materializedTableDDL = - "CREATE MATERIALIZED TABLE users_shops" + "CREATE MATERIALIZED TABLE my_materialized_table" + " PARTITIONED BY (ds)\n" + " WITH(\n" + " 'format' = 'debezium-json'\n" + " )\n" - + " FRESHNESS = INTERVAL '30' SECOND\n" + + " FRESHNESS = INTERVAL '2' SECOND\n" + " AS SELECT \n" + " user_id,\n" + " shop_id,\n" + " ds,\n" + + " COUNT(order_id) AS order_cnt\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // verify data exists in materialized table + CommonTestUtils.waitUtil( + () -> + fetchTableData(sessionHandle, "SELECT * FROM my_materialized_table").size() + == data.size(), + Duration.ofMillis(timeout), + Duration.ofMillis(pause), + "Failed to verify the data in materialized table."); + assertThat( + fetchTableData( + sessionHandle, + "SELECT * FROM my_materialized_table where ds = '2024-01-02'") + .size()) + .isEqualTo(2); + + // remove the last element + data.remove(2); + + long currentTime = System.currentTimeMillis(); + String alterStatement = + "ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds = '2024-01-02')"; + OperationHandle alterHandle = + service.executeStatement(sessionHandle, alterStatement, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterHandle); + List result = fetchAllResults(service, sessionHandle, alterHandle); + assertThat(result.size()).isEqualTo(1); + String jobId = result.get(0).getString(0).toString(); + + MiniCluster miniCluster = MINI_CLUSTER.getMiniCluster(); + + // 1. verify a new job is created + Optional job = + miniCluster.listJobs().get(timeout, TimeUnit.MILLISECONDS).stream() + .filter(j -> j.getJobId().toString().equals(jobId)) + .findFirst(); + assertThat(job).isPresent(); + assertThat(job.get().getStartTime()).isGreaterThan(currentTime); + + // 2. verify the new job is a batch job + ArchivedExecutionGraph executionGraph = + miniCluster + .getArchivedExecutionGraph(JobID.fromHexString(jobId)) + .get(30, TimeUnit.SECONDS); + assertThat(executionGraph.getJobType()).isEqualTo(JobType.BATCH); + + // 3. verify the new job is finished + CommonTestUtils.waitUtil( + () -> { + try { + return JobStatus.FINISHED.equals( + miniCluster + .getJobStatus(JobID.fromHexString(jobId)) + .get(5000, TimeUnit.SECONDS)); + } catch (Exception ignored) { + } + return false; + }, + Duration.ofMillis(timeout), + Duration.ofMillis(pause), + "Failed to verify whether the job is finished."); + + // 4. verify the new job overwrite the data + CommonTestUtils.waitUtil( + () -> + fetchTableData(sessionHandle, "SELECT * FROM my_materialized_table").size() + == data.size(), + Duration.ofMillis(timeout), + Duration.ofMillis(pause), + "Failed to verify the data in materialized table."); + assertThat( + fetchTableData( + sessionHandle, + "SELECT * FROM my_materialized_table where ds = '2024-01-02'") + .size()) + .isEqualTo(1); + } + + @Test + void testAlterMaterializedTableRefreshWithInvalidPartitionSpec() throws Exception { + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + SessionHandle sessionHandle = initializeSession(); + + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds1, ds2)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds1,\n" + + " ds2,\n" + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + " SUM (1) AS pv\n" + " FROM (\n" - + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds1, user_id % 10 as ds2, payment_amount_cents FROM datagenSource" + " ) AS tmp\n" - + " GROUP BY (user_id, shop_id, ds)"; + + " GROUP BY (user_id, shop_id, ds1, ds2)"; OperationHandle materializedTableHandle = service.executeStatement( sessionHandle, materializedTableDDL, -1, new Configuration()); awaitOperationTermination(service, sessionHandle, materializedTableHandle); - // check unknown partition keys + // CASE 1: check unknown partition keys String alterStatementWithUnknownPartitionKey = - "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds2 = '2023-01-01')"; + "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds3 = '2024-01-01')"; OperationHandle alterStatementWithUnknownPartitionKeyHandle = service.executeStatement( sessionHandle, @@ -288,31 +429,34 @@ void testAlterMaterializedTableRefresh() throws Exception { alterStatementWithUnknownPartitionKeyHandle)) .isInstanceOf(SqlExecutionException.class) .rootCause() - .isInstanceOf(TableException.class) - .hasMessage("The partition spec contains unknown partition keys: [ds2]."); - - // check valid statement - long currentTime = System.currentTimeMillis(); - String alterStatement = - "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds = '2023-01-01')"; - OperationHandle alterHandle = - service.executeStatement(sessionHandle, alterStatement, -1, new Configuration()); - awaitOperationTermination(service, sessionHandle, alterHandle); - List result = fetchAllResults(sessionHandle, alterHandle); - assertThat(result.size()).isEqualTo(1); - String jobId = result.get(0).getString(0).toString(); + .isInstanceOf(ValidationException.class) + .hasMessage( + "The partition spec contains unknown partition keys: ['ds3']. " + + "All known partition keys are: ['ds2', 'ds1']."); - OperationHandle describeJobOperationHandle = + // CASE 2: check specific non-string partition keys as partition spec to refresh + String alterStatementWithNonStringPartitionKey = + "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds2 = 5)"; + OperationHandle alterStatementWithNonStringPartitionKeyHandle = service.executeStatement( sessionHandle, - String.format("DESCRIBE JOB '%s'", jobId), + alterStatementWithNonStringPartitionKey, -1, new Configuration()); - result = fetchAllResults(sessionHandle, describeJobOperationHandle); - assertThat(result.size()).isEqualTo(1); - RowData jobRow = result.get(0); - assertThat(jobRow.getTimestamp(3, 3).getMillisecond()).isGreaterThan(currentTime); + assertThatThrownBy( + () -> + awaitOperationTermination( + service, + sessionHandle, + alterStatementWithNonStringPartitionKeyHandle)) + .isInstanceOf(SqlExecutionException.class) + .rootCause() + .isInstanceOf(ValidationException.class) + .hasMessage( + "Currently, specifying non-char or non-string type partition fields to refresh" + + " materialized tables is not supported. All specific partition" + + " keys with unsupported types are: ['ds2']."); } private SessionHandle initializeSession() { @@ -351,16 +495,10 @@ private SessionHandle initializeSession() { return sessionHandle; } - private List fetchAllResults( - SessionHandle sessionHandle, OperationHandle operationHandle) { - Long token = 0L; - List results = new ArrayList<>(); - while (token != null) { - ResultSet result = - service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE); - results.addAll(result.getData()); - token = result.getNextToken(); - } - return results; + private List fetchTableData(SessionHandle sessionHandle, String query) { + OperationHandle queryHandle = + service.executeStatement(sessionHandle, query, -1, new Configuration()); + + return fetchAllResults(service, sessionHandle, queryHandle); } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java index 012c4aed166e4..e2536b0447b8b 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java @@ -113,6 +113,7 @@ import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD; import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination; import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.createInitializedSession; +import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults; import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchResults; import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS; import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE; @@ -324,7 +325,7 @@ void testGetOperationFinishedAndFetchResults() throws Exception { awaitOperationTermination(service, sessionHandle, operationHandle); List expectedData = getDefaultResultSet().getData(); - List actualData = fetchAllResults(sessionHandle, operationHandle); + List actualData = fetchAllResults(service, sessionHandle, operationHandle); assertThat(actualData).isEqualTo(expectedData); service.closeOperation(sessionHandle, operationHandle); @@ -401,7 +402,7 @@ void testExecuteSqlWithConfig() { -1, Configuration.fromMap(Collections.singletonMap(key, value))); - List settings = fetchAllResults(sessionHandle, operationHandle); + List settings = fetchAllResults(service, sessionHandle, operationHandle); assertThat(settings) .contains( @@ -436,7 +437,7 @@ void testStopJobStatementWithSavepoint( OperationHandle insertOperationHandle = service.executeStatement(sessionHandle, insertSql, -1, configuration); - List results = fetchAllResults(sessionHandle, insertOperationHandle); + List results = fetchAllResults(service, sessionHandle, insertOperationHandle); assertThat(results.size()).isEqualTo(1); String jobId = results.get(0).getString(0).toString(); @@ -446,7 +447,7 @@ void testStopJobStatementWithSavepoint( OperationHandle stopOperationHandle = service.executeStatement(sessionHandle, stopSql, -1, configuration); - List stopResults = fetchAllResults(sessionHandle, stopOperationHandle); + List stopResults = fetchAllResults(service, sessionHandle, stopOperationHandle); assertThat(stopResults.size()).isEqualTo(1); if (hasSavepoint) { String savepoint = stopResults.get(0).getString(0).toString(); @@ -485,7 +486,7 @@ void testShowJobsOperation(@InjectClusterClient RestClusterClient restCluster OperationHandle insertsOperationHandle = service.executeStatement(sessionHandle, insertSql, -1, configuration); String jobId = - fetchAllResults(sessionHandle, insertsOperationHandle) + fetchAllResults(service, sessionHandle, insertsOperationHandle) .get(0) .getString(0) .toString(); @@ -496,7 +497,7 @@ void testShowJobsOperation(@InjectClusterClient RestClusterClient restCluster OperationHandle showJobsOperationHandle1 = service.executeStatement(sessionHandle, "SHOW JOBS", -1, configuration); - List result = fetchAllResults(sessionHandle, showJobsOperationHandle1); + List result = fetchAllResults(service, sessionHandle, showJobsOperationHandle1); RowData jobRow = result.stream() .filter(row -> jobId.equals(row.getString(0).toString())) @@ -532,7 +533,7 @@ void testDescribeJobOperation(@InjectClusterClient RestClusterClient restClus OperationHandle insertsOperationHandle = service.executeStatement(sessionHandle, insertSql, -1, configuration); String jobId = - fetchAllResults(sessionHandle, insertsOperationHandle) + fetchAllResults(service, sessionHandle, insertsOperationHandle) .get(0) .getString(0) .toString(); @@ -547,7 +548,7 @@ void testDescribeJobOperation(@InjectClusterClient RestClusterClient restClus -1, configuration); - List result = fetchAllResults(sessionHandle, describeJobOperationHandle); + List result = fetchAllResults(service, sessionHandle, describeJobOperationHandle); RowData jobRow = result.stream() .filter(row -> jobId.equals(row.getString(0).toString())) @@ -1164,17 +1165,4 @@ private void validateCompletionHints( assertThat(service.completeStatement(sessionHandle, incompleteSql, incompleteSql.length())) .isEqualTo(expectedCompletionHints); } - - private List fetchAllResults( - SessionHandle sessionHandle, OperationHandle operationHandle) { - Long token = 0L; - List results = new ArrayList<>(); - while (token != null) { - ResultSet result = - service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE); - results.addAll(result.getData()); - token = result.getNextToken(); - } - return results; - } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceTestUtil.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceTestUtil.java index 5ad9a58e2f18c..e0411d088027d 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceTestUtil.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceTestUtil.java @@ -20,6 +20,7 @@ import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.gateway.api.SqlGatewayService; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.results.ResultSet; @@ -30,6 +31,9 @@ import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl; import org.apache.flink.table.gateway.service.SqlGatewayServiceStatementITCase; +import java.util.ArrayList; +import java.util.List; + /** Test util for {@link SqlGatewayServiceITCase} and {@link SqlGatewayServiceStatementITCase}. */ public class SqlGatewayServiceTestUtil { @@ -84,4 +88,19 @@ public static void awaitOperationTermination( .getOperationManager() .awaitOperationTermination(operationHandle); } + + public static List fetchAllResults( + SqlGatewayService service, + SessionHandle sessionHandle, + OperationHandle operationHandle) { + Long token = 0L; + List results = new ArrayList<>(); + while (token != null) { + ResultSet result = + service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE); + results.addAll(result.getData()); + token = result.getNextToken(); + } + return results; + } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java index db2945a7e9262..b2fc428847926 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java @@ -25,8 +25,6 @@ import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.util.ImmutableNullableList; -import javax.annotation.Nullable; - import java.util.List; /** @@ -38,7 +36,7 @@ public class SqlAlterMaterializedTableRefresh extends SqlAlterMaterializedTable private final SqlNodeList partitionSpec; public SqlAlterMaterializedTableRefresh( - SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList partitionSpec) { + SqlParserPos pos, SqlIdentifier tableName, SqlNodeList partitionSpec) { super(pos, tableName); this.partitionSpec = partitionSpec; } @@ -52,14 +50,13 @@ public List getOperandList() { public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { super.unparse(writer, leftPrec, rightPrec); writer.keyword("REFRESH"); - if (partitionSpec != null && partitionSpec.size() > 0) { + if (!partitionSpec.isEmpty()) { writer.keyword("PARTITION"); partitionSpec.unparse( writer, getOperator().getLeftPrec(), getOperator().getRightPrec()); } } - @Nullable public SqlNodeList getPartitionSpec() { return partitionSpec; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableRefreshConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableRefreshConverter.java index 8b8f76e3b00fb..a1dacdc02f1f6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableRefreshConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableRefreshConverter.java @@ -18,15 +18,13 @@ package org.apache.flink.table.planner.operations.converters; -import org.apache.flink.sql.parser.SqlProperty; +import org.apache.flink.sql.parser.SqlPartitionUtils; import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefresh; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; -import org.apache.flink.util.Preconditions; -import java.util.LinkedHashMap; import java.util.Map; /** A converter for {@link SqlAlterMaterializedTableRefresh}. */ @@ -39,17 +37,8 @@ public Operation convertSqlNode(SqlAlterMaterializedTableRefresh node, ConvertCo ObjectIdentifier identifier = context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); - Map partitionSpec = new LinkedHashMap<>(); - if (node.getPartitionSpec() != null) { - node.getPartitionSpec() - .forEach( - spec -> { - Preconditions.checkArgument(spec instanceof SqlProperty); - SqlProperty property = (SqlProperty) spec; - partitionSpec.put( - property.getKeyString(), property.getValueString()); - }); - } + Map partitionSpec = + SqlPartitionUtils.getPartitionKVs(node.getPartitionSpec()); return new AlterMaterializedTableRefreshOperation(identifier, partitionSpec); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 9514e12367e0e..96452996157c2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -24,8 +24,11 @@ import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; + import org.junit.jupiter.api.Test; import java.time.Duration; @@ -256,4 +259,31 @@ public void testCreateMaterializedTableWithInvalidFreshnessType() { .hasMessageContaining( "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit."); } + + @Test + public void testAlterMaterializedTableRefreshOperationWithPartitionSpec() { + final String sql = + "ALTER MATERIALIZED TABLE mtbl1 REFRESH PARITITON (ds1 = '1', ds2 = '2')"; + + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(AlterMaterializedTableRefreshOperation.class); + + AlterMaterializedTableRefreshOperation op = + (AlterMaterializedTableRefreshOperation) operation; + assertThat(op.getTableIdentifier().toString()).isEqualTo("default.mtbl1"); + assertThat(op.getPartitionSpec()).isEqualTo(ImmutableMap.of("ds1", "1", "ds2", "2")); + } + + @Test + public void testAlterMaterializedTableRefreshOperationWithoutPartitionSpec() { + final String sql = "ALTER MATERIALIZED TABLE mtbl1 REFRESH)"; + + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(AlterMaterializedTableRefreshOperation.class); + + AlterMaterializedTableRefreshOperation op = + (AlterMaterializedTableRefreshOperation) operation; + assertThat(op.getTableIdentifier().toString()).isEqualTo("default.mtbl1"); + assertThat(op.getPartitionSpec()).isEmpty(); + } } diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java index db95d8e9a63aa..1dbbaa7395b25 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java @@ -312,6 +312,10 @@ public boolean isRunning() { return internalMiniClusterExtension.getMiniCluster().isRunning(); } + public MiniCluster getMiniCluster() { + return internalMiniClusterExtension.getMiniCluster(); + } + private static class CloseableParameter implements ExtensionContext.Store.CloseableResource { private final T autoCloseable; From ce19270c48151c8124541a91e2806d428fe1412a Mon Sep 17 00:00:00 2001 From: xuyang Date: Sat, 11 May 2024 09:30:14 +0800 Subject: [PATCH 3/5] fix failed test --- .../SqlMaterializedTableNodeToOperationConverterTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 96452996157c2..e044c48b0a3ac 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -263,27 +263,27 @@ public void testCreateMaterializedTableWithInvalidFreshnessType() { @Test public void testAlterMaterializedTableRefreshOperationWithPartitionSpec() { final String sql = - "ALTER MATERIALIZED TABLE mtbl1 REFRESH PARITITON (ds1 = '1', ds2 = '2')"; + "ALTER MATERIALIZED TABLE mtbl1 REFRESH PARTITION (ds1 = '1', ds2 = '2')"; Operation operation = parse(sql); assertThat(operation).isInstanceOf(AlterMaterializedTableRefreshOperation.class); AlterMaterializedTableRefreshOperation op = (AlterMaterializedTableRefreshOperation) operation; - assertThat(op.getTableIdentifier().toString()).isEqualTo("default.mtbl1"); + assertThat(op.getTableIdentifier().toString()).isEqualTo("`builtin`.`default`.`mtbl1`"); assertThat(op.getPartitionSpec()).isEqualTo(ImmutableMap.of("ds1", "1", "ds2", "2")); } @Test public void testAlterMaterializedTableRefreshOperationWithoutPartitionSpec() { - final String sql = "ALTER MATERIALIZED TABLE mtbl1 REFRESH)"; + final String sql = "ALTER MATERIALIZED TABLE mtbl1 REFRESH"; Operation operation = parse(sql); assertThat(operation).isInstanceOf(AlterMaterializedTableRefreshOperation.class); AlterMaterializedTableRefreshOperation op = (AlterMaterializedTableRefreshOperation) operation; - assertThat(op.getTableIdentifier().toString()).isEqualTo("default.mtbl1"); + assertThat(op.getTableIdentifier().toString()).isEqualTo("`builtin`.`default`.`mtbl1`"); assertThat(op.getPartitionSpec()).isEmpty(); } } From 34aecb73f0b173cfb198c84563a36f94a44bc777 Mon Sep 17 00:00:00 2001 From: xuyang Date: Sat, 11 May 2024 15:23:15 +0800 Subject: [PATCH 4/5] address comments --- .../MaterializedTableManager.java | 64 ++++++++++--------- .../MaterializedTableStatementITCase.java | 46 +++++++------ .../MaterializedTableManagerTest.java | 54 ++++++++++++++++ 3 files changed, 116 insertions(+), 48 deletions(-) create mode 100644 flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index 9ddec6d52a69d..ff0670462e0cd 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -19,6 +19,7 @@ package org.apache.flink.table.gateway.service.materializedtable; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; @@ -50,7 +51,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH; import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING; @@ -215,24 +215,11 @@ private static ResultFetcher callAlterMaterializedTableRefreshOperation( customConfig.set(NAME, jobName); customConfig.set(RUNTIME_MODE, BATCH); - StringBuilder insertStatement = - new StringBuilder( - String.format( - "INSERT OVERWRITE %s SELECT * FROM (%s)", - materializedTableIdentifier, - materializedTable.getDefinitionQuery())); - - if (!partitionSpec.isEmpty()) { - insertStatement.append(" WHERE "); - insertStatement.append( - partitionSpec.entrySet().stream() - .map( - entry -> - String.format( - "%s = '%s'", entry.getKey(), entry.getValue())) - .reduce((s1, s2) -> s1 + " AND " + s2) - .get()); - } + String insertStatement = + getManuallyRefreshStatement( + materializedTableIdentifier.toString(), + materializedTable.getDefinitionQuery(), + partitionSpec); try { LOG.debug( @@ -283,24 +270,43 @@ private static void validatePartitionSpec( if (!unknownPartitionKeys.isEmpty()) { throw new ValidationException( String.format( - "The partition spec contains unknown partition keys: [%s]. All known partition keys are: [%s].", - unknownPartitionKeys.stream() - .collect(Collectors.joining("', '", "'", "'")), - allPartitionKeys.stream() - .collect(Collectors.joining("', '", "'", "'")))); + "The partition spec contains unknown partition keys:\n\n%s\n\nAll known partition keys are:\n\n%s", + String.join("\n", unknownPartitionKeys), + String.join("\n", allPartitionKeys))); } if (!nonStringPartitionKeys.isEmpty()) { throw new ValidationException( String.format( - "Currently, specifying non-char or non-string type partition fields" - + " to refresh materialized tables is not supported." - + " All specific partition keys with unsupported types are: [%s].", - nonStringPartitionKeys.stream() - .collect(Collectors.joining("', '", "'", "'")))); + "Currently, manually refreshing materialized table only supports specifying char and string type" + + " partition keys. All specific partition keys with unsupported types are:\n\n%s", + String.join("\n", nonStringPartitionKeys))); } } + @VisibleForTesting + protected static String getManuallyRefreshStatement( + String tableIdentifier, String query, Map partitionSpec) { + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT OVERWRITE %s\n SELECT * FROM (%s)", + tableIdentifier, query)); + if (!partitionSpec.isEmpty()) { + insertStatement.append("\n WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .get()); + } + + return insertStatement.toString(); + } + private static List fetchAllResults(ResultFetcher resultFetcher) { Long token = 0L; List results = new ArrayList<>(); diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index efb88cabb8e8a..0246a97a4a994 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -20,12 +20,12 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.client.JobStatusMessage; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.JobType; -import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; @@ -43,6 +43,7 @@ import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.test.junit5.InjectClusterClient; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.types.Row; @@ -253,7 +254,8 @@ void testCreateMaterializedTableInFullMode() { } @Test - void testAlterMaterializedTableRefresh() throws Exception { + void testAlterMaterializedTableRefresh( + @InjectClusterClient RestClusterClient restClusterClient) throws Exception { long timeout = Duration.ofSeconds(20).toMillis(); long pause = Duration.ofSeconds(2).toMillis(); // initialize session handle, create test-filesystem catalog and register it to catalog @@ -280,7 +282,9 @@ void testAlterMaterializedTableRefresh() throws Exception { + " 'data-id' = '%s'\n" + ")", dataId); - service.executeStatement(sessionHandle, sourceDdl, -1, new Configuration()); + OperationHandle sourceHandle = + service.executeStatement(sessionHandle, sourceDdl, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, sourceHandle); String materializedTableDDL = "CREATE MATERIALIZED TABLE my_materialized_table" @@ -332,31 +336,29 @@ void testAlterMaterializedTableRefresh() throws Exception { assertThat(result.size()).isEqualTo(1); String jobId = result.get(0).getString(0).toString(); - MiniCluster miniCluster = MINI_CLUSTER.getMiniCluster(); - // 1. verify a new job is created Optional job = - miniCluster.listJobs().get(timeout, TimeUnit.MILLISECONDS).stream() + restClusterClient.listJobs().get(timeout, TimeUnit.MILLISECONDS).stream() .filter(j -> j.getJobId().toString().equals(jobId)) .findFirst(); assertThat(job).isPresent(); assertThat(job.get().getStartTime()).isGreaterThan(currentTime); // 2. verify the new job is a batch job - ArchivedExecutionGraph executionGraph = - miniCluster - .getArchivedExecutionGraph(JobID.fromHexString(jobId)) - .get(30, TimeUnit.SECONDS); - assertThat(executionGraph.getJobType()).isEqualTo(JobType.BATCH); + JobDetailsInfo jobDetailsInfo = + restClusterClient + .getJobDetails(JobID.fromHexString(jobId)) + .get(timeout, TimeUnit.MILLISECONDS); + assertThat(jobDetailsInfo.getJobType()).isEqualTo(JobType.BATCH); // 3. verify the new job is finished CommonTestUtils.waitUtil( () -> { try { return JobStatus.FINISHED.equals( - miniCluster + restClusterClient .getJobStatus(JobID.fromHexString(jobId)) - .get(5000, TimeUnit.SECONDS)); + .get(5, TimeUnit.SECONDS)); } catch (Exception ignored) { } return false; @@ -431,8 +433,14 @@ void testAlterMaterializedTableRefreshWithInvalidPartitionSpec() throws Exceptio .rootCause() .isInstanceOf(ValidationException.class) .hasMessage( - "The partition spec contains unknown partition keys: ['ds3']. " - + "All known partition keys are: ['ds2', 'ds1']."); + "The partition spec contains unknown partition keys:\n" + + "\n" + + "ds3\n" + + "\n" + + "All known partition keys are:\n" + + "\n" + + "ds2\n" + + "ds1"); // CASE 2: check specific non-string partition keys as partition spec to refresh String alterStatementWithNonStringPartitionKey = @@ -454,9 +462,9 @@ void testAlterMaterializedTableRefreshWithInvalidPartitionSpec() throws Exceptio .rootCause() .isInstanceOf(ValidationException.class) .hasMessage( - "Currently, specifying non-char or non-string type partition fields to refresh" - + " materialized tables is not supported. All specific partition" - + " keys with unsupported types are: ['ds2']."); + "Currently, manually refreshing materialized table only supports specifying char and string type partition keys. All specific partition keys with unsupported types are:\n" + + "\n" + + "ds2"); } private SessionHandle initializeSession() { diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java new file mode 100644 index 0000000000000..c2a1cad893115 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.materializedtable; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link MaterializedTableManager}. */ +class MaterializedTableManagerTest { + + @Test + void testGetManuallyRefreshStatement() { + String tableIdentifier = "my_materialized_table"; + String query = "SELECT * FROM my_source_table"; + assertThat( + MaterializedTableManager.getManuallyRefreshStatement( + tableIdentifier, query, Collections.emptyMap())) + .isEqualTo( + "INSERT OVERWRITE my_materialized_table\n" + + " SELECT * FROM (SELECT * FROM my_source_table)"); + + Map partitionSpec = new LinkedHashMap<>(); + partitionSpec.put("k1", "v1"); + partitionSpec.put("k2", "v2"); + assertThat( + MaterializedTableManager.getManuallyRefreshStatement( + tableIdentifier, query, partitionSpec)) + .isEqualTo( + "INSERT OVERWRITE my_materialized_table\n" + + " SELECT * FROM (SELECT * FROM my_source_table)\n" + + " WHERE k1 = 'v1' AND k2 = 'v2'"); + } +} From d5717f774b432afa747fa9164daa8c6fd67e0d98 Mon Sep 17 00:00:00 2001 From: xuyang Date: Sat, 11 May 2024 16:12:04 +0800 Subject: [PATCH 5/5] remove useless code --- .../org/apache/flink/test/junit5/MiniClusterExtension.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java index 1dbbaa7395b25..db95d8e9a63aa 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java @@ -312,10 +312,6 @@ public boolean isRunning() { return internalMiniClusterExtension.getMiniCluster().isRunning(); } - public MiniCluster getMiniCluster() { - return internalMiniClusterExtension.getMiniCluster(); - } - private static class CloseableParameter implements ExtensionContext.Store.CloseableResource { private final T autoCloseable;