diff --git a/docs/content.zh/docs/dev/table/sql/alter.md b/docs/content.zh/docs/dev/table/sql/alter.md index 808ee89302354..39de6985c9d5a 100644 --- a/docs/content.zh/docs/dev/table/sql/alter.md +++ b/docs/content.zh/docs/dev/table/sql/alter.md @@ -28,7 +28,7 @@ under the License. -ALTER 语句用于修改一个已经在 [Catalog]({{< ref "docs/dev/table/catalogs" >}}) 中注册的表、视图或函数定义。 +ALTER 语句用于修改一个已经在 [Catalog]({{< ref "docs/dev/table/catalogs" >}}) 中注册的表、视图或函数定义,或 catalog 本身的定义。 Flink SQL 目前支持以下 ALTER 语句: @@ -36,6 +36,7 @@ Flink SQL 目前支持以下 ALTER 语句: - ALTER VIEW - ALTER DATABASE - ALTER FUNCTION +- ALTER CATALOG ## 执行 ALTER 语句 @@ -538,10 +539,14 @@ ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA,SCALA 和 PYTHON,且函数的默认语言为 JAVA。 +{{< top >}} + ## ALTER CATALOG ```sql -ALTER CATALOG catalog_name SET (key1=val1, ...) +ALTER CATALOG catalog_name + SET (key1=val1, ...) + | RESET (key1, ...) ``` ### SET @@ -555,4 +560,15 @@ ALTER CATALOG catalog_name SET (key1=val1, ...) ALTER CATALOG cat2 SET ('default-database'='db'); ``` +### RESET + +为指定的 catalog 重置一个或多个属性。 + +`RESET` 语句示例如下。 + +```sql +-- reset 'default-database' +ALTER CATALOG cat2 RESET ('default-database'); +``` + {{< top >}} diff --git a/docs/content/docs/dev/table/sql/alter.md b/docs/content/docs/dev/table/sql/alter.md index 54fdf8f15b027..e842d137ce096 100644 --- a/docs/content/docs/dev/table/sql/alter.md +++ b/docs/content/docs/dev/table/sql/alter.md @@ -28,7 +28,7 @@ under the License. -ALTER statements are used to modified a registered table/view/function definition in the [Catalog]({{< ref "docs/dev/table/catalogs" >}}). +ALTER statements are used to modify the definition of a table, view or function that has already been registered in the [Catalog]({{< ref "docs/dev/table/catalogs" >}}), or the definition of a catalog itself. Flink SQL supports the following ALTER statements for now: @@ -36,6 +36,7 @@ Flink SQL supports the following ALTER statements for now: - ALTER VIEW - ALTER DATABASE - ALTER FUNCTION +- ALTER CATALOG ## Run an ALTER statement @@ -540,10 +541,14 @@ If the function doesn't exist, nothing happens. Language tag to instruct flink runtime how to execute the function. Currently only JAVA, SCALA and PYTHON are supported, the default language for a function is JAVA. +{{< top >}} + ## ALTER CATALOG ```sql -ALTER CATALOG catalog_name SET (key1=val1, ...) +ALTER CATALOG catalog_name + SET (key1=val1, ...) + | RESET (key1, ...) ``` ### SET @@ -557,4 +562,15 @@ The following examples illustrate the usage of the `SET` statements. ALTER CATALOG cat2 SET ('default-database'='db'); ``` +### RESET + +Reset one or more properties to its default value in the specified catalog. + +The following examples illustrate the usage of the `RESET` statements. + +```sql +-- reset 'default-database' +ALTER CATALOG cat2 RESET ('default-database'); +``` + {{< top >}} diff --git a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q index c9a5b02116ae0..b99af7344f984 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q @@ -92,6 +92,110 @@ drop catalog c1; org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a catalog which is currently in use. !error +create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); +[INFO] Execute statement succeeded. +!info + +show create catalog cat2; ++---------------------------------------------------------------------------------------------+ +| result | ++---------------------------------------------------------------------------------------------+ +| CREATE CATALOG `cat2` WITH ( + 'default-database' = 'db', + 'type' = 'generic_in_memory' +) + | ++---------------------------------------------------------------------------------------------+ +1 row in set +!ok + +describe catalog cat2; ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +describe catalog extended cat2; ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +desc catalog cat2; ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +desc catalog extended cat2; ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +alter catalog cat2 set ('default-database'='db_new'); +[INFO] Execute statement succeeded. +!info + +desc catalog extended cat2; ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db_new | ++-------------------------+-------------------+ +4 rows in set +!ok + +alter catalog cat2 reset ('default-database', 'k1'); +[INFO] Execute statement succeeded. +!info + +desc catalog extended cat2; ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +alter catalog cat2 reset ('type'); +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support changing 'type' +!error + +alter catalog cat2 reset (); +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key +!error + # ========================================================================== # test database # ========================================================================== @@ -686,86 +790,3 @@ show tables from db1 like 'p_r%'; +------------+ 1 row in set !ok - -# ========================================================================== -# test catalog -# ========================================================================== - -create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); -[INFO] Execute statement succeeded. -!info - -show create catalog cat2; -+---------------------------------------------------------------------------------------------+ -| result | -+---------------------------------------------------------------------------------------------+ -| CREATE CATALOG `cat2` WITH ( - 'default-database' = 'db', - 'type' = 'generic_in_memory' -) - | -+---------------------------------------------------------------------------------------------+ -1 row in set -!ok - -describe catalog cat2; -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -describe catalog extended cat2; -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db | -+-------------------------+-------------------+ -4 rows in set -!ok - -desc catalog cat2; -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -desc catalog extended cat2; -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db | -+-------------------------+-------------------+ -4 rows in set -!ok - -alter catalog cat2 set ('default-database'='db_new'); -[INFO] Execute statement succeeded. -!info - -desc catalog extended cat2; -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db_new | -+-------------------------+-------------------+ -4 rows in set -!ok diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q index 063edbabd239d..cd7658ec0fc56 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q @@ -108,6 +108,129 @@ drop catalog default_catalog; 1 row in set !ok +create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +show create catalog cat2; +!output +CREATE CATALOG `cat2` WITH ( + 'default-database' = 'db', + 'type' = 'generic_in_memory' +) +!ok + +describe catalog cat2; +!output ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +describe catalog extended cat2; +!output ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +desc catalog cat2; +!output ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +desc catalog extended cat2; +!output ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +alter catalog cat2 set ('default-database'='db_new'); +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +desc catalog extended cat2; +!output ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db_new | ++-------------------------+-------------------+ +4 rows in set +!ok + +alter catalog cat2 reset ('default-database', 'k1'); +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +desc catalog extended cat2; +!output ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +alter catalog cat2 reset ('type'); +!output +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support changing 'type' +!error + +alter catalog cat2 reset (); +!output +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key +!error + # ========================================================================== # test database # ========================================================================== @@ -816,98 +939,3 @@ show tables from db1 like 'p_r%'; +------------+ 1 row in set !ok - -# ========================================================================== -# test catalog -# ========================================================================== - -create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); -!output -+--------+ -| result | -+--------+ -| OK | -+--------+ -1 row in set -!ok - -show create catalog cat2; -!output -CREATE CATALOG `cat2` WITH ( - 'default-database' = 'db', - 'type' = 'generic_in_memory' -) -!ok - -describe catalog cat2; -!output -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -describe catalog extended cat2; -!output -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db | -+-------------------------+-------------------+ -4 rows in set -!ok - -desc catalog cat2; -!output -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -desc catalog extended cat2; -!output -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db | -+-------------------------+-------------------+ -4 rows in set -!ok - -alter catalog cat2 set ('default-database'='db_new'); -!output -+--------+ -| result | -+--------+ -| OK | -+--------+ -1 row in set -!ok - -desc catalog extended cat2; -!output -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db_new | -+-------------------------+-------------------+ -4 rows in set -!ok diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index b92154605552f..6f1365951f4e3 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -35,6 +35,7 @@ "org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext" "org.apache.flink.sql.parser.ddl.SqlAlterCatalog" "org.apache.flink.sql.parser.ddl.SqlAlterCatalogOptions" + "org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset" "org.apache.flink.sql.parser.ddl.SqlAlterDatabase" "org.apache.flink.sql.parser.ddl.SqlAlterFunction" "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 6d524422625ca..4d7214325d431 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -160,13 +160,23 @@ SqlAlterCatalog SqlAlterCatalog() : { { startPos = getPos(); } catalogName = SimpleIdentifier() - - propertyList = Properties() - { - return new SqlAlterCatalogOptions(startPos.plus(getPos()), - catalogName, - propertyList); - } + ( + + propertyList = Properties() + { + return new SqlAlterCatalogOptions(startPos.plus(getPos()), + catalogName, + propertyList); + } + | + + propertyList = PropertyKeys() + { + return new SqlAlterCatalogReset(startPos.plus(getPos()), + catalogName, + propertyList); + } + ) } /** diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalogReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalogReset.java new file mode 100644 index 0000000000000..3f05a0e574d0b --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalogReset.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; +import org.apache.calcite.util.NlsString; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** ALTER CATALOG catalog_name RESET (key1, ...). */ +public class SqlAlterCatalogReset extends SqlAlterCatalog { + + private final SqlNodeList propertyKeyList; + + public SqlAlterCatalogReset( + SqlParserPos position, SqlIdentifier catalogName, SqlNodeList propertyKeyList) { + super(position, catalogName); + this.propertyKeyList = requireNonNull(propertyKeyList, "propertyKeyList cannot be null"); + } + + @Override + public List getOperandList() { + return ImmutableNullableList.of(catalogName, propertyKeyList); + } + + public SqlNodeList getPropertyList() { + return propertyKeyList; + } + + public Set getResetKeys() { + return propertyKeyList.getList().stream() + .map(key -> ((NlsString) SqlLiteral.value(key)).getValue()) + .collect(Collectors.toSet()); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("RESET"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode property : propertyKeyList) { + SqlUnparseUtils.printIndent(writer); + property.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index a6c2bab9b806a..8fe3c21a41f02 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -67,8 +67,9 @@ void testDescribeCatalog() { @Test void testAlterCatalog() { - sql("alter catalog a set ('k1'='v1','k2'='v2')") + sql("alter catalog a set ('k1'='v1', 'k2'='v2')") .ok("ALTER CATALOG `A` SET (\n" + " 'k1' = 'v1',\n" + " 'k2' = 'v2'\n" + ")"); + sql("alter catalog a reset ('k1')").ok("ALTER CATALOG `A` RESET (\n" + " 'k1'\n" + ")"); } @Test diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 5e8826e6c3a53..fc16f7ae05778 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -326,22 +326,24 @@ public void createCatalog(String catalogName, CatalogDescriptor catalogDescripto * Alters a catalog under the given name. The catalog name must be unique. * * @param catalogName the given catalog name under which to alter the given catalog - * @param catalogDescriptor catalog descriptor for altering catalog + * @param catalogUpdater catalog configuration updater to alter catalog * @throws CatalogException If the catalog neither exists in the catalog store nor in the * initialized catalogs, or if an error occurs while creating the catalog or storing the * {@link CatalogDescriptor} */ - public void alterCatalog(String catalogName, CatalogDescriptor catalogDescriptor) + public void alterCatalog(String catalogName, Consumer catalogUpdater) throws CatalogException { checkArgument( !StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); - checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null"); + checkNotNull(catalogUpdater, "Catalog configuration updater cannot be null."); + CatalogStore catalogStore = catalogStoreHolder.catalogStore(); Optional oldCatalogDescriptor = getCatalogDescriptor(catalogName); + if (catalogStore.contains(catalogName) && oldCatalogDescriptor.isPresent()) { Configuration conf = oldCatalogDescriptor.get().getConfiguration(); - conf.addAll(catalogDescriptor.getConfiguration()); + catalogUpdater.accept(conf); CatalogDescriptor newCatalogDescriptor = CatalogDescriptor.of(catalogName, conf); Catalog newCatalog = initCatalog(catalogName, newCatalogDescriptor); catalogStore.removeCatalog(catalogName, false); @@ -353,7 +355,7 @@ public void alterCatalog(String catalogName, CatalogDescriptor catalogDescriptor catalogStoreHolder.catalogStore().storeCatalog(catalogName, newCatalogDescriptor); } else { throw new CatalogException( - format("Catalog %s not exists in the catalog store.", catalogName)); + String.format("Catalog %s does not exist in the catalog store.", catalogName)); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java index dd4d1433505e9..e523aa49eb60e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java @@ -23,7 +23,6 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.internal.TableResultImpl; import org.apache.flink.table.api.internal.TableResultInternal; -import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.table.catalog.exceptions.CatalogException; import java.util.Collections; @@ -70,8 +69,7 @@ public TableResultInternal execute(Context ctx) { try { ctx.getCatalogManager() .alterCatalog( - catalogName, - CatalogDescriptor.of(catalogName, Configuration.fromMap(properties))); + catalogName, conf -> conf.addAll(Configuration.fromMap(properties))); return TableResultImpl.TABLE_RESULT_OK; } catch (CatalogException e) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java new file mode 100644 index 0000000000000..b68387def7464 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Operation to describe an ALTER CATALOG RESET statement. */ +@Internal +public class AlterCatalogResetOperation implements AlterOperation { + private final String catalogName; + private final Set resetKeys; + + public AlterCatalogResetOperation(String catalogName, Set resetKeys) { + this.catalogName = checkNotNull(catalogName); + this.resetKeys = Collections.unmodifiableSet(checkNotNull(resetKeys)); + } + + public String getCatalogName() { + return catalogName; + } + + public Set getResetKeys() { + return resetKeys; + } + + @Override + public String asSummaryString() { + return String.format( + "ALTER CATALOG %s\n%s", + catalogName, + resetKeys.stream() + .map(key -> String.format(" RESET '%s'", key)) + .collect(Collectors.joining(",\n"))); + } + + @Override + public TableResultInternal execute(Context ctx) { + try { + ctx.getCatalogManager() + .alterCatalog(catalogName, conf -> resetKeys.forEach(conf::removeKey)); + + return TableResultImpl.TABLE_RESULT_OK; + } catch (CatalogException e) { + throw new ValidationException( + String.format("Could not execute %s", asSummaryString()), e); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogResetConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogResetConverter.java new file mode 100644 index 0000000000000..c352d43785bdf --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogResetConverter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.AlterCatalogResetOperation; + +import java.util.Set; + +/** A converter for {@link SqlAlterCatalogReset}. */ +public class SqlAlterCatalogResetConverter implements SqlNodeConverter { + + @Override + public Operation convertSqlNode( + SqlAlterCatalogReset sqlAlterCatalogReset, ConvertContext context) { + String type = CommonCatalogOptions.CATALOG_TYPE.key(); + Set resetKeys = sqlAlterCatalogReset.getResetKeys(); + if (resetKeys.isEmpty() || resetKeys.contains(type)) { + String exMsg = + resetKeys.isEmpty() + ? "ALTER CATALOG RESET does not support empty key" + : String.format( + "ALTER CATALOG RESET does not support changing '%s'", type); + throw new ValidationException(exMsg); + } + return new AlterCatalogResetOperation( + sqlAlterCatalogReset.catalogName(), sqlAlterCatalogReset.getResetKeys()); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index b3dca807899be..5c6fbc8a22021 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -40,6 +40,7 @@ public class SqlNodeConverters { // register all the converters here register(new SqlCreateCatalogConverter()); register(new SqlAlterCatalogOptionsConverter()); + register(new SqlAlterCatalogResetConverter()); register(new SqlCreateViewConverter()); register(new SqlAlterViewRenameConverter()); register(new SqlAlterViewPropertiesConverter()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index 27139d1de7d01..b10d9792bd32a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -53,6 +53,7 @@ import org.apache.flink.table.operations.SourceQueryOperation; import org.apache.flink.table.operations.ddl.AddPartitionsOperation; import org.apache.flink.table.operations.ddl.AlterCatalogOptionsOperation; +import org.apache.flink.table.operations.ddl.AlterCatalogResetOperation; import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; import org.apache.flink.table.operations.ddl.AlterTableChangeOperation; import org.apache.flink.table.operations.ddl.AlterTableRenameOperation; @@ -86,6 +87,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -124,6 +126,25 @@ public void testAlterCatalog() { "cat2", "ALTER CATALOG cat2\n SET 'K1' = 'V1',\n SET 'k2' = 'v2_new'", expectedOptions); + + // test alter catalog reset + final Set expectedResetKeys = Collections.singleton("K1"); + + operation = parse("ALTER CATALOG cat2 RESET ('K1')"); + assertThat(operation) + .isInstanceOf(AlterCatalogResetOperation.class) + .asInstanceOf(InstanceOfAssertFactories.type(AlterCatalogResetOperation.class)) + .extracting( + AlterCatalogResetOperation::getCatalogName, + AlterCatalogResetOperation::asSummaryString, + AlterCatalogResetOperation::getResetKeys) + .containsExactly("cat2", "ALTER CATALOG cat2\n RESET 'K1'", expectedResetKeys); + assertThatThrownBy(() -> parse("ALTER CATALOG cat2 RESET ('type')")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("ALTER CATALOG RESET does not support changing 'type'"); + assertThatThrownBy(() -> parse("ALTER CATALOG cat2 RESET ()")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("ALTER CATALOG RESET does not support empty key"); } @Test