diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index fed60634a3abb..ff0670462e0cd 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -19,10 +19,14 @@ package org.apache.flink.table.gateway.service.materializedtable; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.data.RowData; import org.apache.flink.table.gateway.api.operation.OperationHandle; @@ -31,24 +35,31 @@ import org.apache.flink.table.gateway.service.result.ResultFetcher; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation; import org.apache.flink.table.refresh.ContinuousRefreshHandler; import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer; +import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH; import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING; import static org.apache.flink.configuration.DeploymentOptions.TARGET; import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; import static org.apache.flink.configuration.PipelineOptions.NAME; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL; import static org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK; +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; /** Manager is responsible for execute the {@link MaterializedTableOperation}. */ @Internal @@ -64,7 +75,11 @@ public static ResultFetcher callMaterializedTableOperation( if (op instanceof CreateMaterializedTableOperation) { return callCreateMaterializedTableOperation( operationExecutor, handle, (CreateMaterializedTableOperation) op); + } else if (op instanceof AlterMaterializedTableRefreshOperation) { + return callAlterMaterializedTableRefreshOperation( + operationExecutor, handle, (AlterMaterializedTableRefreshOperation) op); } + throw new SqlExecutionException( String.format( "Unsupported Operation %s for materialized table.", op.asSummaryString())); @@ -118,7 +133,7 @@ private static void createMaterializedInContinuousMode( try { // submit flink streaming job ResultFetcher resultFetcher = - operationExecutor.executeStatement(handle, insertStatement); + operationExecutor.executeStatement(handle, customConfig, insertStatement); // get execution.target and jobId, currently doesn't support yarn and k8s, so doesn't // get clusterId @@ -161,7 +176,7 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); - throw new TableException( + throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), @@ -169,6 +184,129 @@ private static void createMaterializedInContinuousMode( } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); + if (MATERIALIZED_TABLE != table.getTableKind()) { + throw new ValidationException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + + // Set job name, runtime mode + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + String insertStatement = + getManuallyRefreshStatement( + materializedTableIdentifier.toString(), + materializedTable.getDefinitionQuery(), + partitionSpec); + + try { + LOG.debug( + "Begin to manually refreshing the materialization table {}, statement: {}", + materializedTableIdentifier, + insertStatement); + return operationExecutor.executeStatement( + handle, customConfig, insertStatement.toString()); + } catch (Exception e) { + // log and throw exception + LOG.error( + "Manually refreshing the materialization table {} occur exception.", + materializedTableIdentifier, + e); + throw new SqlExecutionException( + String.format( + "Manually refreshing the materialization table %s occur exception.", + materializedTableIdentifier), + e); + } + } + + private static void validatePartitionSpec( + Map partitionSpec, ResolvedCatalogMaterializedTable table) { + ResolvedSchema schema = table.getResolvedSchema(); + Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + + Set unknownPartitionKeys = new HashSet<>(); + Set nonStringPartitionKeys = new HashSet<>(); + + for (String partitionKey : partitionSpec.keySet()) { + if (!schema.getColumn(partitionKey).isPresent()) { + unknownPartitionKeys.add(partitionKey); + continue; + } + + if (!schema.getColumn(partitionKey) + .get() + .getDataType() + .getLogicalType() + .getTypeRoot() + .getFamilies() + .contains(LogicalTypeFamily.CHARACTER_STRING)) { + nonStringPartitionKeys.add(partitionKey); + } + } + + if (!unknownPartitionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "The partition spec contains unknown partition keys:\n\n%s\n\nAll known partition keys are:\n\n%s", + String.join("\n", unknownPartitionKeys), + String.join("\n", allPartitionKeys))); + } + + if (!nonStringPartitionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "Currently, manually refreshing materialized table only supports specifying char and string type" + + " partition keys. All specific partition keys with unsupported types are:\n\n%s", + String.join("\n", nonStringPartitionKeys))); + } + } + + @VisibleForTesting + protected static String getManuallyRefreshStatement( + String tableIdentifier, String query, Map partitionSpec) { + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT OVERWRITE %s\n SELECT * FROM (%s)", + tableIdentifier, query)); + if (!partitionSpec.isEmpty()) { + insertStatement.append("\n WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .get()); + } + + return insertStatement.toString(); + } + private static List fetchAllResults(ResultFetcher resultFetcher) { Long token = 0L; List results = new ArrayList<>(); diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index 29ab697f384c0..0246a97a4a994 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -18,14 +18,23 @@ package org.apache.flink.table.gateway.service; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.jobgraph.JobType; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; @@ -33,8 +42,11 @@ import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.test.junit5.InjectClusterClient; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.types.Row; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.junit.jupiter.api.BeforeAll; @@ -47,15 +59,20 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_STORE_KIND; import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination; +import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -236,6 +253,220 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } + @Test + void testAlterMaterializedTableRefresh( + @InjectClusterClient RestClusterClient restClusterClient) throws Exception { + long timeout = Duration.ofSeconds(20).toMillis(); + long pause = Duration.ofSeconds(2).toMillis(); + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + SessionHandle sessionHandle = initializeSession(); + + List data = new ArrayList<>(); + data.add(Row.of(1L, 1L, 1L, "2024-01-01")); + data.add(Row.of(2L, 2L, 2L, "2024-01-02")); + data.add(Row.of(3L, 3L, 3L, "2024-01-02")); + String dataId = TestValuesTableFactory.registerData(data); + + String sourceDdl = + String.format( + "CREATE TABLE my_source (\n" + + " order_id BIGINT,\n" + + " user_id BIGINT,\n" + + " shop_id BIGINT,\n" + + " order_created_at STRING\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true',\n" + + " 'data-id' = '%s'\n" + + ")", + dataId); + OperationHandle sourceHandle = + service.executeStatement(sessionHandle, sourceDdl, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, sourceHandle); + + String materializedTableDDL = + "CREATE MATERIALIZED TABLE my_materialized_table" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '2' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // verify data exists in materialized table + CommonTestUtils.waitUtil( + () -> + fetchTableData(sessionHandle, "SELECT * FROM my_materialized_table").size() + == data.size(), + Duration.ofMillis(timeout), + Duration.ofMillis(pause), + "Failed to verify the data in materialized table."); + assertThat( + fetchTableData( + sessionHandle, + "SELECT * FROM my_materialized_table where ds = '2024-01-02'") + .size()) + .isEqualTo(2); + + // remove the last element + data.remove(2); + + long currentTime = System.currentTimeMillis(); + String alterStatement = + "ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds = '2024-01-02')"; + OperationHandle alterHandle = + service.executeStatement(sessionHandle, alterStatement, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterHandle); + List result = fetchAllResults(service, sessionHandle, alterHandle); + assertThat(result.size()).isEqualTo(1); + String jobId = result.get(0).getString(0).toString(); + + // 1. verify a new job is created + Optional job = + restClusterClient.listJobs().get(timeout, TimeUnit.MILLISECONDS).stream() + .filter(j -> j.getJobId().toString().equals(jobId)) + .findFirst(); + assertThat(job).isPresent(); + assertThat(job.get().getStartTime()).isGreaterThan(currentTime); + + // 2. verify the new job is a batch job + JobDetailsInfo jobDetailsInfo = + restClusterClient + .getJobDetails(JobID.fromHexString(jobId)) + .get(timeout, TimeUnit.MILLISECONDS); + assertThat(jobDetailsInfo.getJobType()).isEqualTo(JobType.BATCH); + + // 3. verify the new job is finished + CommonTestUtils.waitUtil( + () -> { + try { + return JobStatus.FINISHED.equals( + restClusterClient + .getJobStatus(JobID.fromHexString(jobId)) + .get(5, TimeUnit.SECONDS)); + } catch (Exception ignored) { + } + return false; + }, + Duration.ofMillis(timeout), + Duration.ofMillis(pause), + "Failed to verify whether the job is finished."); + + // 4. verify the new job overwrite the data + CommonTestUtils.waitUtil( + () -> + fetchTableData(sessionHandle, "SELECT * FROM my_materialized_table").size() + == data.size(), + Duration.ofMillis(timeout), + Duration.ofMillis(pause), + "Failed to verify the data in materialized table."); + assertThat( + fetchTableData( + sessionHandle, + "SELECT * FROM my_materialized_table where ds = '2024-01-02'") + .size()) + .isEqualTo(1); + } + + @Test + void testAlterMaterializedTableRefreshWithInvalidPartitionSpec() throws Exception { + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + SessionHandle sessionHandle = initializeSession(); + + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds1, ds2)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds1,\n" + + " ds2,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds1, user_id % 10 as ds2, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds1, ds2)"; + + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // CASE 1: check unknown partition keys + String alterStatementWithUnknownPartitionKey = + "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds3 = '2024-01-01')"; + OperationHandle alterStatementWithUnknownPartitionKeyHandle = + service.executeStatement( + sessionHandle, + alterStatementWithUnknownPartitionKey, + -1, + new Configuration()); + + assertThatThrownBy( + () -> + awaitOperationTermination( + service, + sessionHandle, + alterStatementWithUnknownPartitionKeyHandle)) + .isInstanceOf(SqlExecutionException.class) + .rootCause() + .isInstanceOf(ValidationException.class) + .hasMessage( + "The partition spec contains unknown partition keys:\n" + + "\n" + + "ds3\n" + + "\n" + + "All known partition keys are:\n" + + "\n" + + "ds2\n" + + "ds1"); + + // CASE 2: check specific non-string partition keys as partition spec to refresh + String alterStatementWithNonStringPartitionKey = + "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds2 = 5)"; + OperationHandle alterStatementWithNonStringPartitionKeyHandle = + service.executeStatement( + sessionHandle, + alterStatementWithNonStringPartitionKey, + -1, + new Configuration()); + + assertThatThrownBy( + () -> + awaitOperationTermination( + service, + sessionHandle, + alterStatementWithNonStringPartitionKeyHandle)) + .isInstanceOf(SqlExecutionException.class) + .rootCause() + .isInstanceOf(ValidationException.class) + .hasMessage( + "Currently, manually refreshing materialized table only supports specifying char and string type partition keys. All specific partition keys with unsupported types are:\n" + + "\n" + + "ds2"); + } + private SessionHandle initializeSession() { SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); String catalogDDL = @@ -271,4 +502,11 @@ private SessionHandle initializeSession() { service.configureSession(sessionHandle, dataGenSource, -1); return sessionHandle; } + + private List fetchTableData(SessionHandle sessionHandle, String query) { + OperationHandle queryHandle = + service.executeStatement(sessionHandle, query, -1, new Configuration()); + + return fetchAllResults(service, sessionHandle, queryHandle); + } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java index 012c4aed166e4..e2536b0447b8b 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java @@ -113,6 +113,7 @@ import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD; import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination; import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.createInitializedSession; +import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults; import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchResults; import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS; import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE; @@ -324,7 +325,7 @@ void testGetOperationFinishedAndFetchResults() throws Exception { awaitOperationTermination(service, sessionHandle, operationHandle); List expectedData = getDefaultResultSet().getData(); - List actualData = fetchAllResults(sessionHandle, operationHandle); + List actualData = fetchAllResults(service, sessionHandle, operationHandle); assertThat(actualData).isEqualTo(expectedData); service.closeOperation(sessionHandle, operationHandle); @@ -401,7 +402,7 @@ void testExecuteSqlWithConfig() { -1, Configuration.fromMap(Collections.singletonMap(key, value))); - List settings = fetchAllResults(sessionHandle, operationHandle); + List settings = fetchAllResults(service, sessionHandle, operationHandle); assertThat(settings) .contains( @@ -436,7 +437,7 @@ void testStopJobStatementWithSavepoint( OperationHandle insertOperationHandle = service.executeStatement(sessionHandle, insertSql, -1, configuration); - List results = fetchAllResults(sessionHandle, insertOperationHandle); + List results = fetchAllResults(service, sessionHandle, insertOperationHandle); assertThat(results.size()).isEqualTo(1); String jobId = results.get(0).getString(0).toString(); @@ -446,7 +447,7 @@ void testStopJobStatementWithSavepoint( OperationHandle stopOperationHandle = service.executeStatement(sessionHandle, stopSql, -1, configuration); - List stopResults = fetchAllResults(sessionHandle, stopOperationHandle); + List stopResults = fetchAllResults(service, sessionHandle, stopOperationHandle); assertThat(stopResults.size()).isEqualTo(1); if (hasSavepoint) { String savepoint = stopResults.get(0).getString(0).toString(); @@ -485,7 +486,7 @@ void testShowJobsOperation(@InjectClusterClient RestClusterClient restCluster OperationHandle insertsOperationHandle = service.executeStatement(sessionHandle, insertSql, -1, configuration); String jobId = - fetchAllResults(sessionHandle, insertsOperationHandle) + fetchAllResults(service, sessionHandle, insertsOperationHandle) .get(0) .getString(0) .toString(); @@ -496,7 +497,7 @@ void testShowJobsOperation(@InjectClusterClient RestClusterClient restCluster OperationHandle showJobsOperationHandle1 = service.executeStatement(sessionHandle, "SHOW JOBS", -1, configuration); - List result = fetchAllResults(sessionHandle, showJobsOperationHandle1); + List result = fetchAllResults(service, sessionHandle, showJobsOperationHandle1); RowData jobRow = result.stream() .filter(row -> jobId.equals(row.getString(0).toString())) @@ -532,7 +533,7 @@ void testDescribeJobOperation(@InjectClusterClient RestClusterClient restClus OperationHandle insertsOperationHandle = service.executeStatement(sessionHandle, insertSql, -1, configuration); String jobId = - fetchAllResults(sessionHandle, insertsOperationHandle) + fetchAllResults(service, sessionHandle, insertsOperationHandle) .get(0) .getString(0) .toString(); @@ -547,7 +548,7 @@ void testDescribeJobOperation(@InjectClusterClient RestClusterClient restClus -1, configuration); - List result = fetchAllResults(sessionHandle, describeJobOperationHandle); + List result = fetchAllResults(service, sessionHandle, describeJobOperationHandle); RowData jobRow = result.stream() .filter(row -> jobId.equals(row.getString(0).toString())) @@ -1164,17 +1165,4 @@ private void validateCompletionHints( assertThat(service.completeStatement(sessionHandle, incompleteSql, incompleteSql.length())) .isEqualTo(expectedCompletionHints); } - - private List fetchAllResults( - SessionHandle sessionHandle, OperationHandle operationHandle) { - Long token = 0L; - List results = new ArrayList<>(); - while (token != null) { - ResultSet result = - service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE); - results.addAll(result.getData()); - token = result.getNextToken(); - } - return results; - } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java new file mode 100644 index 0000000000000..c2a1cad893115 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.materializedtable; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link MaterializedTableManager}. */ +class MaterializedTableManagerTest { + + @Test + void testGetManuallyRefreshStatement() { + String tableIdentifier = "my_materialized_table"; + String query = "SELECT * FROM my_source_table"; + assertThat( + MaterializedTableManager.getManuallyRefreshStatement( + tableIdentifier, query, Collections.emptyMap())) + .isEqualTo( + "INSERT OVERWRITE my_materialized_table\n" + + " SELECT * FROM (SELECT * FROM my_source_table)"); + + Map partitionSpec = new LinkedHashMap<>(); + partitionSpec.put("k1", "v1"); + partitionSpec.put("k2", "v2"); + assertThat( + MaterializedTableManager.getManuallyRefreshStatement( + tableIdentifier, query, partitionSpec)) + .isEqualTo( + "INSERT OVERWRITE my_materialized_table\n" + + " SELECT * FROM (SELECT * FROM my_source_table)\n" + + " WHERE k1 = 'v1' AND k2 = 'v2'"); + } +} diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceTestUtil.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceTestUtil.java index 5ad9a58e2f18c..e0411d088027d 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceTestUtil.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceTestUtil.java @@ -20,6 +20,7 @@ import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.gateway.api.SqlGatewayService; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.results.ResultSet; @@ -30,6 +31,9 @@ import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl; import org.apache.flink.table.gateway.service.SqlGatewayServiceStatementITCase; +import java.util.ArrayList; +import java.util.List; + /** Test util for {@link SqlGatewayServiceITCase} and {@link SqlGatewayServiceStatementITCase}. */ public class SqlGatewayServiceTestUtil { @@ -84,4 +88,19 @@ public static void awaitOperationTermination( .getOperationManager() .awaitOperationTermination(operationHandle); } + + public static List fetchAllResults( + SqlGatewayService service, + SessionHandle sessionHandle, + OperationHandle operationHandle) { + Long token = 0L; + List results = new ArrayList<>(); + while (token != null) { + ResultSet result = + service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE); + results.addAll(result.getData()); + token = result.getNextToken(); + } + return results; + } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java index f8037b05e4c70..3380b6a7e2032 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java @@ -48,6 +48,10 @@ public SqlIdentifier getTableName() { return tableIdentifier; } + public String[] fullTableName() { + return tableIdentifier.names.toArray(new String[0]); + } + @Override public SqlOperator getOperator() { return OPERATOR; diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java index 26ec1bcd7c74b..b2fc428847926 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java @@ -25,8 +25,6 @@ import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.util.ImmutableNullableList; -import javax.annotation.Nullable; - import java.util.List; /** @@ -38,7 +36,7 @@ public class SqlAlterMaterializedTableRefresh extends SqlAlterMaterializedTable private final SqlNodeList partitionSpec; public SqlAlterMaterializedTableRefresh( - SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList partitionSpec) { + SqlParserPos pos, SqlIdentifier tableName, SqlNodeList partitionSpec) { super(pos, tableName); this.partitionSpec = partitionSpec; } @@ -52,10 +50,14 @@ public List getOperandList() { public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { super.unparse(writer, leftPrec, rightPrec); writer.keyword("REFRESH"); - if (partitionSpec != null && partitionSpec.size() > 0) { + if (!partitionSpec.isEmpty()) { writer.keyword("PARTITION"); partitionSpec.unparse( writer, getOperator().getLeftPrec(), getOperator().getRightPrec()); } } + + public SqlNodeList getPartitionSpec() { + return partitionSpec; + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java index 7bf9f2c54f797..ab9a8675f887d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java @@ -145,7 +145,11 @@ public static String formatProperties(Map properties) { } public static String formatPartitionSpec(CatalogPartitionSpec spec) { - return spec.getPartitionSpec().entrySet().stream() + return formatPartitionSpec(spec.getPartitionSpec()); + } + + public static String formatPartitionSpec(Map spec) { + return spec.entrySet().stream() .map(entry -> entry.getKey() + "=" + entry.getValue()) .collect(Collectors.joining(", ")); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableRefreshOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableRefreshOperation.java new file mode 100644 index 0000000000000..5212cf58c7ff4 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableRefreshOperation.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Map; + +/** + * Operation to describe clause like: ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name + * REFRESH [PARTITION (key1=val1, key2=val2, ...)]. + */ +@Internal +public class AlterMaterializedTableRefreshOperation extends AlterMaterializedTableOperation { + + private final Map partitionSpec; + + public AlterMaterializedTableRefreshOperation( + ObjectIdentifier tableIdentifier, Map partitionSpec) { + super(tableIdentifier); + this.partitionSpec = partitionSpec; + } + + @Override + public TableResultInternal execute(Context ctx) { + // execute AlterMaterializedTableRefreshOperation in SqlGateway OperationExecutor. + // see more at MaterializedTableManager#callAlterMaterializedTableRefreshOperation + throw new UnsupportedOperationException( + "AlterMaterializedTableRefreshOperation does not support ExecutableOperation yet."); + } + + public Map getPartitionSpec() { + return partitionSpec; + } + + @Override + public String asSummaryString() { + StringBuilder sb = + new StringBuilder( + String.format("ALTER MATERIALIZED TABLE %s REFRESH", tableIdentifier)); + if (!partitionSpec.isEmpty()) { + sb.append( + String.format( + " PARTITION (%s)", OperationUtils.formatPartitionSpec(partitionSpec))); + } + + return sb.toString(); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableRefreshConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableRefreshConverter.java new file mode 100644 index 0000000000000..a1dacdc02f1f6 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableRefreshConverter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.SqlPartitionUtils; +import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefresh; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; + +import java.util.Map; + +/** A converter for {@link SqlAlterMaterializedTableRefresh}. */ +public class SqlAlterMaterializedTableRefreshConverter + implements SqlNodeConverter { + + @Override + public Operation convertSqlNode(SqlAlterMaterializedTableRefresh node, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(node.fullTableName()); + ObjectIdentifier identifier = + context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + + Map partitionSpec = + SqlPartitionUtils.getPartitionKVs(node.getPartitionSpec()); + + return new AlterMaterializedTableRefreshOperation(identifier, partitionSpec); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index fc5e3bd498cbc..4684759f12129 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -57,6 +57,7 @@ public class SqlNodeConverters { register(new SqlDescribeCatalogConverter()); register(new SqlDescribeJobConverter()); register(new SqlCreateMaterializedTableConverter()); + register(new SqlAlterMaterializedTableRefreshConverter()); } /** diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 9514e12367e0e..e044c48b0a3ac 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -24,8 +24,11 @@ import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; + import org.junit.jupiter.api.Test; import java.time.Duration; @@ -256,4 +259,31 @@ public void testCreateMaterializedTableWithInvalidFreshnessType() { .hasMessageContaining( "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit."); } + + @Test + public void testAlterMaterializedTableRefreshOperationWithPartitionSpec() { + final String sql = + "ALTER MATERIALIZED TABLE mtbl1 REFRESH PARTITION (ds1 = '1', ds2 = '2')"; + + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(AlterMaterializedTableRefreshOperation.class); + + AlterMaterializedTableRefreshOperation op = + (AlterMaterializedTableRefreshOperation) operation; + assertThat(op.getTableIdentifier().toString()).isEqualTo("`builtin`.`default`.`mtbl1`"); + assertThat(op.getPartitionSpec()).isEqualTo(ImmutableMap.of("ds1", "1", "ds2", "2")); + } + + @Test + public void testAlterMaterializedTableRefreshOperationWithoutPartitionSpec() { + final String sql = "ALTER MATERIALIZED TABLE mtbl1 REFRESH"; + + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(AlterMaterializedTableRefreshOperation.class); + + AlterMaterializedTableRefreshOperation op = + (AlterMaterializedTableRefreshOperation) operation; + assertThat(op.getTableIdentifier().toString()).isEqualTo("`builtin`.`default`.`mtbl1`"); + assertThat(op.getPartitionSpec()).isEmpty(); + } }