Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions docs/content.zh/docs/sql/materialized-table/statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
| DROP {column_name | (column_name, column_name, ...) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | DISTRIBUTION }
| SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)]
| REFRESH [PARTITION partition_spec] |
| RESET (key1, key2, ...)
| AS <select_statement>
<schema_component>:
{ <column_component> | <constraint_component> | <watermark_component> }
Expand Down Expand Up @@ -548,6 +549,40 @@ ALTER MATERIALIZED TABLE my_materialized_table RESUME;
ALTER MATERIALIZED TABLE my_materialized_table RESUME WITH ('sink.parallelism'='10');
```

## RESET

```
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESET (key1, key2, ...)
```

`RESET` is used to remove table options from a materialized table.

**Key handling:**
- Keys that are not currently set on the table are silently ignored. The statement still succeeds.
- Duplicate keys in the key list are de-duplicated and treated as a single reset for that key.
- The empty key list `RESET ()` is rejected with a validation error.
- The `connector` key is reserved and cannot be reset. Attempting to do so is rejected with a validation error.

**Example:**

```sql
-- Remove the 'format' and 'sink.parallelism' options
ALTER MATERIALIZED TABLE my_materialized_table RESET ('format', 'sink.parallelism');

-- 'sink.parallelism' is not currently set on the table: this is a no-op for that key,
-- 'format' is still removed and the statement succeeds.
ALTER MATERIALIZED TABLE my_materialized_table RESET ('format', 'sink.parallelism');

-- Duplicates collapse to a single reset for 'format'.
ALTER MATERIALIZED TABLE my_materialized_table RESET ('format', 'format');
```

<span class="label label-danger">Note</span> When run through the Flink SQL Gateway, the behavior depends on the refresh mode and current refresh status:
- `FULL` mode: the change is applied to the catalog. The refresh workflow is not touched.
- `CONTINUOUS` mode, `ACTIVATED` status: the running refresh job is stopped with savepoint, the change is applied to the catalog, and a new refresh job is started using the updated options. The new job does **not** restore from the savepoint taken during suspend, so streaming state is reset.
- `CONTINUOUS` mode, `SUSPENDED` status: the change is applied to the catalog and the savepoint stored in the refresh handler is cleared, so the next `RESUME` will also start a fresh job.
- `CONTINUOUS` mode, `INITIALIZING` status: the statement is rejected.

## REFRESH

```
Expand Down
35 changes: 35 additions & 0 deletions docs/content/docs/sql/materialized-table/statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
| DROP {column_name | (column_name, column_name, ...) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | DISTRIBUTION }
| SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)]
| REFRESH [PARTITION partition_spec] |
| RESET (key1, key2, ...)
| AS <select_statement>
<schema_component>:
{ <column_component> | <constraint_component> | <watermark_component> }
Expand Down Expand Up @@ -548,6 +549,40 @@ ALTER MATERIALIZED TABLE my_materialized_table RESUME;
ALTER MATERIALIZED TABLE my_materialized_table RESUME WITH ('sink.parallelism'='10');
```

## RESET

```
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESET (key1, key2, ...)
```

`RESET` is used to remove table options from a materialized table.

**Key handling:**
- Keys that are not currently set on the table are silently ignored. The statement still succeeds.
- Duplicate keys in the key list are de-duplicated and treated as a single reset for that key.
- The empty key list `RESET ()` is rejected with a validation error.
- The `connector` key is reserved and cannot be reset. Attempting to do so is rejected with a validation error.

**Example:**

```sql
-- Remove the 'format' and 'sink.parallelism' options
ALTER MATERIALIZED TABLE my_materialized_table RESET ('format', 'sink.parallelism');

-- 'sink.parallelism' is not currently set on the table: this is a no-op for that key,
-- 'format' is still removed and the statement succeeds.
ALTER MATERIALIZED TABLE my_materialized_table RESET ('format', 'sink.parallelism');

-- Duplicates collapse to a single reset for 'format'.
ALTER MATERIALIZED TABLE my_materialized_table RESET ('format', 'format');
```

<span class="label label-danger">Note</span> When run through the Flink SQL Gateway, the behavior depends on the refresh mode and current refresh status:
- `FULL` mode: the change is applied to the catalog. The refresh workflow is not touched.
- `CONTINUOUS` mode, `ACTIVATED` status: the running refresh job is stopped with savepoint, the change is applied to the catalog, and a new refresh job is started using the updated options. The new job does **not** restore from the savepoint taken during suspend, so streaming state is reset.
- `CONTINUOUS` mode, `SUSPENDED` status: the change is applied to the catalog and the savepoint stored in the refresh handler is cleared, so the next `RESUME` will also start a fresh job.
- `CONTINUOUS` mode, `INITIALIZING` status: the statement is rejected.

## REFRESH

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,35 @@ void testAlterMaterializedTableAddDistribution() throws Exception {
.isEqualTo(TableDistribution.of(Kind.HASH, 2, List.of("order_id")));
}

@Test
void testAlterMaterializedTableResetInFullMode() throws Exception {
createAndVerifyCreateMaterializedTableWithData(
"users_shops", List.of(), Map.of(), RefreshMode.FULL);

ObjectIdentifier userShopsIdentifier = getObjectIdentifier("users_shops");
ResolvedCatalogMaterializedTable oldTable = getTable(userShopsIdentifier);
assertThat(oldTable.getOptions()).containsKey("format");

String alterMaterializedTableResetDDL =
"ALTER MATERIALIZED TABLE users_shops RESET ('format', 'unknown_key')";
OperationHandle alterMaterializedTableResetHandle =
executeStatement(alterMaterializedTableResetDDL);
awaitOperationTermination(service, sessionHandle, alterMaterializedTableResetHandle);

ResolvedCatalogMaterializedTable newTable = getTable(userShopsIdentifier);

// the existing key is removed, the unknown key is a no-op
assertThat(newTable.getOptions()).doesNotContainKey("format");

// unchanged: schema, query, distribution, freshness, refresh handler
assertThat(newTable.getResolvedSchema()).isEqualTo(oldTable.getResolvedSchema());
assertThat(newTable.getExpandedQuery()).isEqualTo(oldTable.getExpandedQuery());
assertThat(newTable.getDistribution()).isEqualTo(oldTable.getDistribution());
assertThat(newTable.getDefinitionFreshness()).isEqualTo(oldTable.getDefinitionFreshness());
assertThat(newTable.getSerializedRefreshHandler())
.isEqualTo(oldTable.getSerializedRefreshHandler());
}

@Test
void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws Exception {
createAndVerifyCreateMaterializedTableWithData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableDropSchemaConverter.SqlAlterMaterializedTableSchemaDropColumnConverter;
import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableModifyDistributionConverter;
import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableRefreshConverter;
import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableResetConverter;
import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableResumeConverter;
import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableSchemaConverter.SqlAlterMaterializedTableAddSchemaConverter;
import org.apache.flink.table.planner.operations.converters.materializedtable.SqlAlterMaterializedTableSchemaConverter.SqlAlterMaterializedTableModifySchemaConverter;
Expand Down Expand Up @@ -148,6 +149,7 @@ private static void registerMaterializedTableConverters() {
register(new SqlAlterMaterializedTableDropWatermarkConverter());
register(new SqlAlterMaterializedTableModifySchemaConverter());
register(new SqlAlterMaterializedTableRefreshConverter());
register(new SqlAlterMaterializedTableResetConverter());
register(new SqlAlterMaterializedTableResumeConverter());
register(new SqlAlterMaterializedTableSuspendConverter());
register(new SqlCreateOrAlterMaterializedTableConverter());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.operations.converters.materializedtable;

import org.apache.flink.sql.parser.SqlParseUtils;
import org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableReset;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;

import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNodeList;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.function.Function;

/** A converter for {@link SqlAlterMaterializedTableReset}. */
public class SqlAlterMaterializedTableResetConverter
extends AbstractAlterMaterializedTableConverter<SqlAlterMaterializedTableReset> {

@Override
protected Operation convertToOperation(
SqlAlterMaterializedTableReset sqlAlterTable,
ResolvedCatalogMaterializedTable oldTable,
ConvertContext context) {
return new AlterMaterializedTableChangeOperation(
resolveIdentifier(sqlAlterTable, context),
gatherTableChanges(sqlAlterTable, context),
oldTable);
}

@Override
protected Function<ResolvedCatalogMaterializedTable, List<TableChange>> gatherTableChanges(
SqlAlterMaterializedTableReset sqlAlterTable, ConvertContext context) {
final SqlNodeList propertyKeyList = sqlAlterTable.getPropertyKeyList();
final Set<String> resetKeys =
SqlParseUtils.extractSet(
propertyKeyList, key -> SqlParseUtils.extractString((SqlLiteral) key));
if (resetKeys.isEmpty()) {
throw new ValidationException(
EX_MSG_PREFIX + "ALTER MATERIALIZED TABLE RESET does not support empty key.");
}
if (resetKeys.contains(FactoryUtil.CONNECTOR.key())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we hard fail if there is connector option and do not fail hard if there is unknown option?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

@snuyanzin snuyanzin May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happen if there are 5 valid options and 5 unknown options?
will user be able to see which are reset and which are unknown?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we introduce a hard fail for non-exiting option we should consider to do the same for Tables too

throw new ValidationException(
EX_MSG_PREFIX
+ "ALTER MATERIALIZED TABLE RESET does not support changing 'connector'.");
}
final List<TableChange> changes = new ArrayList<>(resetKeys.size());
for (String key : resetKeys) {
changes.add(TableChange.reset(key));
}
return oldTable -> changes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,26 @@ void testAlterMaterializedTableResume() {
.isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 RESUME WITH (k1: [v1])");
}

@Test
void testAlterMaterializedTableReset() {
final String sql = "ALTER MATERIALIZED TABLE base_mtbl RESET ('format', 'unknown_key')";
Operation operation = parse(sql);
assertThat(operation).isInstanceOf(AlterMaterializedTableChangeOperation.class);

AlterMaterializedTableChangeOperation op =
(AlterMaterializedTableChangeOperation) operation;
assertThat(op.getTableIdentifier().toString()).isEqualTo("`builtin`.`default`.`base_mtbl`");
assertThat(op.getTableChanges())
.containsExactlyInAnyOrder(
TableChange.reset("format"), TableChange.reset("unknown_key"));
// resetting an unknown key is a no-op for the catalog state
assertThat(op.getNewTable().getOptions())
.containsOnly(Map.entry("connector", "filesystem"));
assertThat(op.asSummaryString())
.startsWith("ALTER MATERIALIZED TABLE builtin.default.base_mtbl\n")
.contains(" RESET 'format'", " RESET 'unknown_key'");
}

@Test
void testAlterMaterializedTableAsQuery() throws TableNotExistException {
String sql =
Expand Down Expand Up @@ -659,6 +679,7 @@ private static Collection<TestSpec> testDataForCreateAlterMaterializedTableFaile
list.addAll(alterModifyWithInvalidSchema());
list.addAll(alterQuery());
list.addAll(alterDrop());
list.addAll(alterReset());
return list;
}

Expand Down Expand Up @@ -1001,6 +1022,22 @@ private static Collection<TestSpec> alterDrop() {
+ "The column `m_p` is a persisted column. Dropping of persisted columns is not supported."));
}

private static Collection<TestSpec> alterReset() {
return List.of(
TestSpec.of(
"ALTER MATERIALIZED TABLE base_mtbl RESET ()",
"ALTER MATERIALIZED TABLE RESET does not support empty key."),
TestSpec.of(
"ALTER MATERIALIZED TABLE base_mtbl RESET ('connector')",
"ALTER MATERIALIZED TABLE RESET does not support changing 'connector'."),
TestSpec.of(
"ALTER MATERIALIZED TABLE unknown_mtbl RESET ('format')",
"Materialized table `builtin`.`default`.`unknown_mtbl` doesn't exist."),
TestSpec.of(
"ALTER MATERIALIZED TABLE t3 RESET ('format')",
"ALTER MATERIALIZED TABLE for a table is not allowed"));
}

private static Collection<TestSpec> alterSuccessCase() {
List<TestSpec> list = new ArrayList<>();
list.addAll(alterAddSchemaSuccessCase());
Expand Down