diff --git a/docs/content.zh/docs/sql/materialized-table/statements.md b/docs/content.zh/docs/sql/materialized-table/statements.md index d411a1d5ffeb1..fce01e00f69d6 100644 --- a/docs/content.zh/docs/sql/materialized-table/statements.md +++ b/docs/content.zh/docs/sql/materialized-table/statements.md @@ -398,6 +398,7 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name | DROP {column_name | (column_name, column_name, ...) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | DISTRIBUTION } | SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec] | + | RESET (key1, key2, ...) | AS : { | | } @@ -548,6 +549,40 @@ ALTER MATERIALIZED TABLE my_materialized_table RESUME; ALTER MATERIALIZED TABLE my_materialized_table RESUME WITH ('sink.parallelism'='10'); ``` +## RESET + +``` +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESET (key1, key2, ...) +``` + +`RESET` is used to remove table options from a materialized table. + +**Key handling:** +- Keys that are not currently set on the table are silently ignored. The statement still succeeds. +- Duplicate keys in the key list are de-duplicated and treated as a single reset for that key. +- The empty key list `RESET ()` is rejected with a validation error. +- The `connector` key is reserved and cannot be reset. Attempting to do so is rejected with a validation error. + +**Example:** + +```sql +-- Remove the 'format' and 'sink.parallelism' options +ALTER MATERIALIZED TABLE my_materialized_table RESET ('format', 'sink.parallelism'); + +-- 'sink.parallelism' is not currently set on the table: this is a no-op for that key, +-- 'format' is still removed and the statement succeeds. +ALTER MATERIALIZED TABLE my_materialized_table RESET ('format', 'sink.parallelism'); + +-- Duplicates collapse to a single reset for 'format'. +ALTER MATERIALIZED TABLE my_materialized_table RESET ('format', 'format'); +``` + +Note When run through the Flink SQL Gateway, the behavior depends on the refresh mode and current refresh status: +- `FULL` mode: the change is applied to the catalog. The refresh workflow is not touched. +- `CONTINUOUS` mode, `ACTIVATED` status: the running refresh job is stopped with savepoint, the change is applied to the catalog, and a new refresh job is started using the updated options. The new job does **not** restore from the savepoint taken during suspend, so streaming state is reset. +- `CONTINUOUS` mode, `SUSPENDED` status: the change is applied to the catalog and the savepoint stored in the refresh handler is cleared, so the next `RESUME` will also start a fresh job. +- `CONTINUOUS` mode, `INITIALIZING` status: the statement is rejected. + ## REFRESH ``` diff --git a/docs/content/docs/sql/materialized-table/statements.md b/docs/content/docs/sql/materialized-table/statements.md index acfece212e959..638e6611d65dc 100644 --- a/docs/content/docs/sql/materialized-table/statements.md +++ b/docs/content/docs/sql/materialized-table/statements.md @@ -397,6 +397,7 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name | DROP {column_name | (column_name, column_name, ...) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | DISTRIBUTION } | SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec] | + | RESET (key1, key2, ...) | AS : { | | } @@ -548,6 +549,40 @@ ALTER MATERIALIZED TABLE my_materialized_table RESUME; ALTER MATERIALIZED TABLE my_materialized_table RESUME WITH ('sink.parallelism'='10'); ``` +## RESET + +``` +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESET (key1, key2, ...) +``` + +`RESET` is used to remove table options from a materialized table. + +**Key handling:** +- Keys that are not currently set on the table are silently ignored. The statement still succeeds. +- Duplicate keys in the key list are de-duplicated and treated as a single reset for that key. +- The empty key list `RESET ()` is rejected with a validation error. +- The `connector` key is reserved and cannot be reset. Attempting to do so is rejected with a validation error. + +**Example:** + +```sql +-- Remove the 'format' and 'sink.parallelism' options +ALTER MATERIALIZED TABLE my_materialized_table RESET ('format', 'sink.parallelism'); + +-- 'sink.parallelism' is not currently set on the table: this is a no-op for that key, +-- 'format' is still removed and the statement succeeds. +ALTER MATERIALIZED TABLE my_materialized_table RESET ('format', 'sink.parallelism'); + +-- Duplicates collapse to a single reset for 'format'. +ALTER MATERIALIZED TABLE my_materialized_table RESET ('format', 'format'); +``` + +Note When run through the Flink SQL Gateway, the behavior depends on the refresh mode and current refresh status: +- `FULL` mode: the change is applied to the catalog. The refresh workflow is not touched. +- `CONTINUOUS` mode, `ACTIVATED` status: the running refresh job is stopped with savepoint, the change is applied to the catalog, and a new refresh job is started using the updated options. The new job does **not** restore from the savepoint taken during suspend, so streaming state is reset. +- `CONTINUOUS` mode, `SUSPENDED` status: the change is applied to the catalog and the savepoint stored in the refresh handler is cleared, so the next `RESUME` will also start a fresh job. +- `CONTINUOUS` mode, `INITIALIZING` status: the statement is rejected. + ## REFRESH ``` diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index e65d1fd8575e7..0fc7c51304ddf 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -1155,6 +1155,35 @@ void testAlterMaterializedTableAddDistribution() throws Exception { .isEqualTo(TableDistribution.of(Kind.HASH, 2, List.of("order_id"))); } + @Test + void testAlterMaterializedTableResetInFullMode() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", List.of(), Map.of(), RefreshMode.FULL); + + ObjectIdentifier userShopsIdentifier = getObjectIdentifier("users_shops"); + ResolvedCatalogMaterializedTable oldTable = getTable(userShopsIdentifier); + assertThat(oldTable.getOptions()).containsKey("format"); + + String alterMaterializedTableResetDDL = + "ALTER MATERIALIZED TABLE users_shops RESET ('format', 'unknown_key')"; + OperationHandle alterMaterializedTableResetHandle = + executeStatement(alterMaterializedTableResetDDL); + awaitOperationTermination(service, sessionHandle, alterMaterializedTableResetHandle); + + ResolvedCatalogMaterializedTable newTable = getTable(userShopsIdentifier); + + // the existing key is removed, the unknown key is a no-op + assertThat(newTable.getOptions()).doesNotContainKey("format"); + + // unchanged: schema, query, distribution, freshness, refresh handler + assertThat(newTable.getResolvedSchema()).isEqualTo(oldTable.getResolvedSchema()); + assertThat(newTable.getExpandedQuery()).isEqualTo(oldTable.getExpandedQuery()); + assertThat(newTable.getDistribution()).isEqualTo(oldTable.getDistribution()); + assertThat(newTable.getDefinitionFreshness()).isEqualTo(oldTable.getDefinitionFreshness()); + assertThat(newTable.getSerializedRefreshHandler()) + .isEqualTo(oldTable.getSerializedRefreshHandler()); + } + @Test void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws Exception { createAndVerifyCreateMaterializedTableWithData( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index 256dd03ccc1f7..7a1d9b67a52f1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -31,6 +31,7 @@ import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableDropSchemaConverter.SqlAlterMaterializedTableSchemaDropColumnConverter; import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableModifyDistributionConverter; import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableRefreshConverter; +import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableResetConverter; import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableResumeConverter; import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableSchemaConverter.SqlAlterMaterializedTableAddSchemaConverter; import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableSchemaConverter.SqlAlterMaterializedTableModifySchemaConverter; @@ -148,6 +149,7 @@ private static void registerMaterializedTableConverters() { register(new SqlAlterMaterializedTableDropWatermarkConverter()); register(new SqlAlterMaterializedTableModifySchemaConverter()); register(new SqlAlterMaterializedTableRefreshConverter()); + register(new SqlAlterMaterializedTableResetConverter()); register(new SqlAlterMaterializedTableResumeConverter()); register(new SqlAlterMaterializedTableSuspendConverter()); register(new SqlCreateOrAlterMaterializedTableConverter()); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableResetConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableResetConverter.java new file mode 100644 index 0000000000000..f37cbed9648e1 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableResetConverter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters.materializedtable; + +import org.apache.flink.sql.parser.SqlParseUtils; +import org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableReset; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation; + +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNodeList; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +/** A converter for {@link SqlAlterMaterializedTableReset}. */ +public class SqlAlterMaterializedTableResetConverter + extends AbstractAlterMaterializedTableConverter { + + @Override + protected Operation convertToOperation( + SqlAlterMaterializedTableReset sqlAlterTable, + ResolvedCatalogMaterializedTable oldTable, + ConvertContext context) { + return new AlterMaterializedTableChangeOperation( + resolveIdentifier(sqlAlterTable, context), + gatherTableChanges(sqlAlterTable, context), + oldTable); + } + + @Override + protected Function> gatherTableChanges( + SqlAlterMaterializedTableReset sqlAlterTable, ConvertContext context) { + final SqlNodeList propertyKeyList = sqlAlterTable.getPropertyKeyList(); + final Set resetKeys = + SqlParseUtils.extractSet( + propertyKeyList, key -> SqlParseUtils.extractString((SqlLiteral) key)); + if (resetKeys.isEmpty()) { + throw new ValidationException( + EX_MSG_PREFIX + "ALTER MATERIALIZED TABLE RESET does not support empty key."); + } + if (resetKeys.contains(FactoryUtil.CONNECTOR.key())) { + throw new ValidationException( + EX_MSG_PREFIX + + "ALTER MATERIALIZED TABLE RESET does not support changing 'connector'."); + } + final List changes = new ArrayList<>(resetKeys.size()); + for (String key : resetKeys) { + changes.add(TableChange.reset(key)); + } + return oldTable -> changes; + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 8f8dfb856186f..45e952a4b3c7f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -523,6 +523,26 @@ void testAlterMaterializedTableResume() { .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 RESUME WITH (k1: [v1])"); } + @Test + void testAlterMaterializedTableReset() { + final String sql = "ALTER MATERIALIZED TABLE base_mtbl RESET ('format', 'unknown_key')"; + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(AlterMaterializedTableChangeOperation.class); + + AlterMaterializedTableChangeOperation op = + (AlterMaterializedTableChangeOperation) operation; + assertThat(op.getTableIdentifier().toString()).isEqualTo("`builtin`.`default`.`base_mtbl`"); + assertThat(op.getTableChanges()) + .containsExactlyInAnyOrder( + TableChange.reset("format"), TableChange.reset("unknown_key")); + // resetting an unknown key is a no-op for the catalog state + assertThat(op.getNewTable().getOptions()) + .containsOnly(Map.entry("connector", "filesystem")); + assertThat(op.asSummaryString()) + .startsWith("ALTER MATERIALIZED TABLE builtin.default.base_mtbl\n") + .contains(" RESET 'format'", " RESET 'unknown_key'"); + } + @Test void testAlterMaterializedTableAsQuery() throws TableNotExistException { String sql = @@ -659,6 +679,7 @@ private static Collection testDataForCreateAlterMaterializedTableFaile list.addAll(alterModifyWithInvalidSchema()); list.addAll(alterQuery()); list.addAll(alterDrop()); + list.addAll(alterReset()); return list; } @@ -1001,6 +1022,22 @@ private static Collection alterDrop() { + "The column `m_p` is a persisted column. Dropping of persisted columns is not supported.")); } + private static Collection alterReset() { + return List.of( + TestSpec.of( + "ALTER MATERIALIZED TABLE base_mtbl RESET ()", + "ALTER MATERIALIZED TABLE RESET does not support empty key."), + TestSpec.of( + "ALTER MATERIALIZED TABLE base_mtbl RESET ('connector')", + "ALTER MATERIALIZED TABLE RESET does not support changing 'connector'."), + TestSpec.of( + "ALTER MATERIALIZED TABLE unknown_mtbl RESET ('format')", + "Materialized table `builtin`.`default`.`unknown_mtbl` doesn't exist."), + TestSpec.of( + "ALTER MATERIALIZED TABLE t3 RESET ('format')", + "ALTER MATERIALIZED TABLE for a table is not allowed")); + } + private static Collection alterSuccessCase() { List list = new ArrayList<>(); list.addAll(alterAddSchemaSuccessCase());