From c63196db0726e171f504f77d1c8f4b6b6ed16e2e Mon Sep 17 00:00:00 2001 From: Yubin Li Date: Wed, 8 May 2024 11:47:26 +0800 Subject: [PATCH 1/3] [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax --- docs/content.zh/docs/dev/table/sql/alter.md | 17 +++- docs/content/docs/dev/table/sql/alter.md | 17 +++- .../src/test/resources/sql/catalog_database.q | 25 ++++++ .../src/test/resources/sql/catalog_database.q | 32 ++++++++ .../src/main/codegen/data/Parser.tdd | 1 + .../src/main/codegen/includes/parserImpls.ftl | 24 ++++-- .../sql/parser/ddl/SqlAlterCatalogReset.java | 76 ++++++++++++++++++ .../sql/parser/FlinkSqlParserImplTest.java | 3 +- .../flink/table/catalog/CatalogManager.java | 9 +++ .../ddl/AlterCatalogResetOperation.java | 78 +++++++++++++++++++ .../SqlAlterCatalogResetConverter.java | 48 ++++++++++++ .../converters/SqlNodeConverters.java | 1 + .../SqlDdlToOperationConverterTest.java | 23 ++++++ 13 files changed, 344 insertions(+), 10 deletions(-) create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalogReset.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogResetConverter.java diff --git a/docs/content.zh/docs/dev/table/sql/alter.md b/docs/content.zh/docs/dev/table/sql/alter.md index 808ee89302354..1cedc036c179a 100644 --- a/docs/content.zh/docs/dev/table/sql/alter.md +++ b/docs/content.zh/docs/dev/table/sql/alter.md @@ -538,10 +538,14 @@ ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA,SCALA 和 PYTHON,且函数的默认语言为 JAVA。 +{{< top >}} + ## ALTER CATALOG ```sql -ALTER CATALOG catalog_name SET (key1=val1, ...) +ALTER CATALOG catalog_name + SET (key1=val1, ...) + | RESET (key1, ...) ``` ### SET @@ -555,4 +559,15 @@ ALTER CATALOG catalog_name SET (key1=val1, ...) ALTER CATALOG cat2 SET ('default-database'='db'); ``` +### RESET + +为指定的 catalog 重置一个或多个属性。 + +`RESET` 语句示例如下。 + +```sql +-- reset 'default-database' +ALTER CATALOG cat2 RESET ('default-database'); +``` + {{< top >}} diff --git a/docs/content/docs/dev/table/sql/alter.md b/docs/content/docs/dev/table/sql/alter.md index 54fdf8f15b027..1594fec4957d6 100644 --- a/docs/content/docs/dev/table/sql/alter.md +++ b/docs/content/docs/dev/table/sql/alter.md @@ -540,10 +540,14 @@ If the function doesn't exist, nothing happens. Language tag to instruct flink runtime how to execute the function. Currently only JAVA, SCALA and PYTHON are supported, the default language for a function is JAVA. +{{< top >}} + ## ALTER CATALOG ```sql -ALTER CATALOG catalog_name SET (key1=val1, ...) +ALTER CATALOG catalog_name + SET (key1=val1, ...) + | RESET (key1, ...) ``` ### SET @@ -557,4 +561,15 @@ The following examples illustrate the usage of the `SET` statements. ALTER CATALOG cat2 SET ('default-database'='db'); ``` +### RESET + +Reset one or more properties to its default value in the specified catalog. + +The following examples illustrate the usage of the `RESET` statements. + +```sql +-- reset 'default-database' +ALTER CATALOG cat2 RESET ('default-database'); +``` + {{< top >}} diff --git a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q index c9a5b02116ae0..6b500307859b6 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q @@ -769,3 +769,28 @@ desc catalog extended cat2; +-------------------------+-------------------+ 4 rows in set !ok + +alter catalog cat2 reset ('default-database', 'k1'); +[INFO] Execute statement succeeded. +!info + +desc catalog extended cat2; ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +alter catalog cat2 reset ('type'); +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support changing 'type' +!error + +alter catalog cat2 reset (); +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key +!error diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q index 063edbabd239d..60fc7f7732849 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q @@ -911,3 +911,35 @@ desc catalog extended cat2; +-------------------------+-------------------+ 4 rows in set !ok + +alter catalog cat2 reset ('default-database', 'k1'); +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +desc catalog extended cat2; +!output ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +alter catalog cat2 reset ('type'); +!output +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support changing 'type' +!error + +alter catalog cat2 reset (); +!output +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key +!error diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index b92154605552f..6f1365951f4e3 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -35,6 +35,7 @@ "org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext" "org.apache.flink.sql.parser.ddl.SqlAlterCatalog" "org.apache.flink.sql.parser.ddl.SqlAlterCatalogOptions" + "org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset" "org.apache.flink.sql.parser.ddl.SqlAlterDatabase" "org.apache.flink.sql.parser.ddl.SqlAlterFunction" "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 6d524422625ca..4d7214325d431 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -160,13 +160,23 @@ SqlAlterCatalog SqlAlterCatalog() : { { startPos = getPos(); } catalogName = SimpleIdentifier() - - propertyList = Properties() - { - return new SqlAlterCatalogOptions(startPos.plus(getPos()), - catalogName, - propertyList); - } + ( + + propertyList = Properties() + { + return new SqlAlterCatalogOptions(startPos.plus(getPos()), + catalogName, + propertyList); + } + | + + propertyList = PropertyKeys() + { + return new SqlAlterCatalogReset(startPos.plus(getPos()), + catalogName, + propertyList); + } + ) } /** diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalogReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalogReset.java new file mode 100644 index 0000000000000..3f05a0e574d0b --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterCatalogReset.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; +import org.apache.calcite.util.NlsString; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** ALTER CATALOG catalog_name RESET (key1, ...). */ +public class SqlAlterCatalogReset extends SqlAlterCatalog { + + private final SqlNodeList propertyKeyList; + + public SqlAlterCatalogReset( + SqlParserPos position, SqlIdentifier catalogName, SqlNodeList propertyKeyList) { + super(position, catalogName); + this.propertyKeyList = requireNonNull(propertyKeyList, "propertyKeyList cannot be null"); + } + + @Override + public List getOperandList() { + return ImmutableNullableList.of(catalogName, propertyKeyList); + } + + public SqlNodeList getPropertyList() { + return propertyKeyList; + } + + public Set getResetKeys() { + return propertyKeyList.getList().stream() + .map(key -> ((NlsString) SqlLiteral.value(key)).getValue()) + .collect(Collectors.toSet()); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("RESET"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode property : propertyKeyList) { + SqlUnparseUtils.printIndent(writer); + property.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index a6c2bab9b806a..8fe3c21a41f02 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -67,8 +67,9 @@ void testDescribeCatalog() { @Test void testAlterCatalog() { - sql("alter catalog a set ('k1'='v1','k2'='v2')") + sql("alter catalog a set ('k1'='v1', 'k2'='v2')") .ok("ALTER CATALOG `A` SET (\n" + " 'k1' = 'v1',\n" + " 'k2' = 'v2'\n" + ")"); + sql("alter catalog a reset ('k1')").ok("ALTER CATALOG `A` RESET (\n" + " 'k1'\n" + ")"); } @Test diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 5e8826e6c3a53..9a44b1981c569 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -342,6 +342,15 @@ public void alterCatalog(String catalogName, CatalogDescriptor catalogDescriptor if (catalogStore.contains(catalogName) && oldCatalogDescriptor.isPresent()) { Configuration conf = oldCatalogDescriptor.get().getConfiguration(); conf.addAll(catalogDescriptor.getConfiguration()); + catalogDescriptor + .getConfiguration() + .toMap() + .forEach( + (key, value) -> { + if (value.isEmpty()) { + conf.removeKey(key); + } + }); CatalogDescriptor newCatalogDescriptor = CatalogDescriptor.of(catalogName, conf); Catalog newCatalog = initCatalog(catalogName, newCatalogDescriptor); catalogStore.removeCatalog(catalogName, false); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java new file mode 100644 index 0000000000000..019c524d840ec --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Operation to describe an ALTER CATALOG RESET statement. */ +@Internal +public class AlterCatalogResetOperation implements AlterOperation { + private final String catalogName; + private final Set resetKeys; + + public AlterCatalogResetOperation(String catalogName, Set resetKeys) { + this.catalogName = checkNotNull(catalogName); + this.resetKeys = Collections.unmodifiableSet(checkNotNull(resetKeys)); + } + + public String getCatalogName() { + return catalogName; + } + + public Set getResetKeys() { + return resetKeys; + } + + @Override + public String asSummaryString() { + return String.format( + "ALTER CATALOG %s\n%s", + catalogName, + resetKeys.stream() + .map(key -> String.format(" RESET '%s'", key)) + .collect(Collectors.joining(",\n"))); + } + + @Override + public TableResultInternal execute(Context ctx) { + try { + Configuration resetConf = new Configuration(); + resetKeys.forEach(key -> resetConf.setString(key, "")); + ctx.getCatalogManager() + .alterCatalog(catalogName, CatalogDescriptor.of(catalogName, resetConf)); + + return TableResultImpl.TABLE_RESULT_OK; + } catch (CatalogException e) { + throw new ValidationException( + String.format("Could not execute %s", asSummaryString()), e); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogResetConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogResetConverter.java new file mode 100644 index 0000000000000..c352d43785bdf --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterCatalogResetConverter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.AlterCatalogResetOperation; + +import java.util.Set; + +/** A converter for {@link SqlAlterCatalogReset}. */ +public class SqlAlterCatalogResetConverter implements SqlNodeConverter { + + @Override + public Operation convertSqlNode( + SqlAlterCatalogReset sqlAlterCatalogReset, ConvertContext context) { + String type = CommonCatalogOptions.CATALOG_TYPE.key(); + Set resetKeys = sqlAlterCatalogReset.getResetKeys(); + if (resetKeys.isEmpty() || resetKeys.contains(type)) { + String exMsg = + resetKeys.isEmpty() + ? "ALTER CATALOG RESET does not support empty key" + : String.format( + "ALTER CATALOG RESET does not support changing '%s'", type); + throw new ValidationException(exMsg); + } + return new AlterCatalogResetOperation( + sqlAlterCatalogReset.catalogName(), sqlAlterCatalogReset.getResetKeys()); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index b3dca807899be..5c6fbc8a22021 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -40,6 +40,7 @@ public class SqlNodeConverters { // register all the converters here register(new SqlCreateCatalogConverter()); register(new SqlAlterCatalogOptionsConverter()); + register(new SqlAlterCatalogResetConverter()); register(new SqlCreateViewConverter()); register(new SqlAlterViewRenameConverter()); register(new SqlAlterViewPropertiesConverter()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index 27139d1de7d01..488e691e324e5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -53,6 +53,7 @@ import org.apache.flink.table.operations.SourceQueryOperation; import org.apache.flink.table.operations.ddl.AddPartitionsOperation; import org.apache.flink.table.operations.ddl.AlterCatalogOptionsOperation; +import org.apache.flink.table.operations.ddl.AlterCatalogResetOperation; import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; import org.apache.flink.table.operations.ddl.AlterTableChangeOperation; import org.apache.flink.table.operations.ddl.AlterTableRenameOperation; @@ -83,9 +84,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -124,6 +127,26 @@ public void testAlterCatalog() { "cat2", "ALTER CATALOG cat2\n SET 'K1' = 'V1',\n SET 'k2' = 'v2_new'", expectedOptions); + + // test alter catalog reset + final Set expectedResetKeys = new HashSet<>(); + expectedResetKeys.add("K1"); + + operation = parse("ALTER CATALOG cat2 RESET ('K1')"); + assertThat(operation) + .isInstanceOf(AlterCatalogResetOperation.class) + .asInstanceOf(InstanceOfAssertFactories.type(AlterCatalogResetOperation.class)) + .extracting( + AlterCatalogResetOperation::getCatalogName, + AlterCatalogResetOperation::asSummaryString, + AlterCatalogResetOperation::getResetKeys) + .containsExactly("cat2", "ALTER CATALOG cat2\n RESET 'K1'", expectedResetKeys); + assertThatThrownBy(() -> parse("ALTER CATALOG cat2 RESET ('type')")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("ALTER CATALOG RESET does not support changing 'type'"); + assertThatThrownBy(() -> parse("ALTER CATALOG cat2 RESET ()")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("ALTER CATALOG RESET does not support empty key"); } @Test From d6e8ec745050b4e207f3babc7b133acff83c9560 Mon Sep 17 00:00:00 2001 From: Yubin Li Date: Sat, 11 May 2024 09:53:03 +0800 Subject: [PATCH 2/3] update doc --- docs/content.zh/docs/dev/table/sql/alter.md | 3 ++- docs/content/docs/dev/table/sql/alter.md | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/alter.md b/docs/content.zh/docs/dev/table/sql/alter.md index 1cedc036c179a..39de6985c9d5a 100644 --- a/docs/content.zh/docs/dev/table/sql/alter.md +++ b/docs/content.zh/docs/dev/table/sql/alter.md @@ -28,7 +28,7 @@ under the License. -ALTER 语句用于修改一个已经在 [Catalog]({{< ref "docs/dev/table/catalogs" >}}) 中注册的表、视图或函数定义。 +ALTER 语句用于修改一个已经在 [Catalog]({{< ref "docs/dev/table/catalogs" >}}) 中注册的表、视图或函数定义,或 catalog 本身的定义。 Flink SQL 目前支持以下 ALTER 语句: @@ -36,6 +36,7 @@ Flink SQL 目前支持以下 ALTER 语句: - ALTER VIEW - ALTER DATABASE - ALTER FUNCTION +- ALTER CATALOG ## 执行 ALTER 语句 diff --git a/docs/content/docs/dev/table/sql/alter.md b/docs/content/docs/dev/table/sql/alter.md index 1594fec4957d6..e842d137ce096 100644 --- a/docs/content/docs/dev/table/sql/alter.md +++ b/docs/content/docs/dev/table/sql/alter.md @@ -28,7 +28,7 @@ under the License. -ALTER statements are used to modified a registered table/view/function definition in the [Catalog]({{< ref "docs/dev/table/catalogs" >}}). +ALTER statements are used to modify the definition of a table, view or function that has already been registered in the [Catalog]({{< ref "docs/dev/table/catalogs" >}}), or the definition of a catalog itself. Flink SQL supports the following ALTER statements for now: @@ -36,6 +36,7 @@ Flink SQL supports the following ALTER statements for now: - ALTER VIEW - ALTER DATABASE - ALTER FUNCTION +- ALTER CATALOG ## Run an ALTER statement From 771be66b67e2b6e854c325fe9d946c8ef65a00da Mon Sep 17 00:00:00 2001 From: Yubin Li Date: Thu, 13 Jun 2024 15:02:00 +0800 Subject: [PATCH 3/3] update as Jane said --- .../src/test/resources/sql/catalog_database.q | 212 ++++++++------- .../src/test/resources/sql/catalog_database.q | 250 +++++++++--------- .../flink/table/catalog/CatalogManager.java | 21 +- .../ddl/AlterCatalogOptionsOperation.java | 4 +- .../ddl/AlterCatalogResetOperation.java | 6 +- .../SqlDdlToOperationConverterTest.java | 4 +- 6 files changed, 237 insertions(+), 260 deletions(-) diff --git a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q index 6b500307859b6..b99af7344f984 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q @@ -92,6 +92,110 @@ drop catalog c1; org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a catalog which is currently in use. !error +create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); +[INFO] Execute statement succeeded. +!info + +show create catalog cat2; ++---------------------------------------------------------------------------------------------+ +| result | ++---------------------------------------------------------------------------------------------+ +| CREATE CATALOG `cat2` WITH ( + 'default-database' = 'db', + 'type' = 'generic_in_memory' +) + | ++---------------------------------------------------------------------------------------------+ +1 row in set +!ok + +describe catalog cat2; ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +describe catalog extended cat2; ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +desc catalog cat2; ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +desc catalog extended cat2; ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +alter catalog cat2 set ('default-database'='db_new'); +[INFO] Execute statement succeeded. +!info + +desc catalog extended cat2; ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db_new | ++-------------------------+-------------------+ +4 rows in set +!ok + +alter catalog cat2 reset ('default-database', 'k1'); +[INFO] Execute statement succeeded. +!info + +desc catalog extended cat2; ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +alter catalog cat2 reset ('type'); +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support changing 'type' +!error + +alter catalog cat2 reset (); +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key +!error + # ========================================================================== # test database # ========================================================================== @@ -686,111 +790,3 @@ show tables from db1 like 'p_r%'; +------------+ 1 row in set !ok - -# ========================================================================== -# test catalog -# ========================================================================== - -create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); -[INFO] Execute statement succeeded. -!info - -show create catalog cat2; -+---------------------------------------------------------------------------------------------+ -| result | -+---------------------------------------------------------------------------------------------+ -| CREATE CATALOG `cat2` WITH ( - 'default-database' = 'db', - 'type' = 'generic_in_memory' -) - | -+---------------------------------------------------------------------------------------------+ -1 row in set -!ok - -describe catalog cat2; -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -describe catalog extended cat2; -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db | -+-------------------------+-------------------+ -4 rows in set -!ok - -desc catalog cat2; -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -desc catalog extended cat2; -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db | -+-------------------------+-------------------+ -4 rows in set -!ok - -alter catalog cat2 set ('default-database'='db_new'); -[INFO] Execute statement succeeded. -!info - -desc catalog extended cat2; -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db_new | -+-------------------------+-------------------+ -4 rows in set -!ok - -alter catalog cat2 reset ('default-database', 'k1'); -[INFO] Execute statement succeeded. -!info - -desc catalog extended cat2; -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -alter catalog cat2 reset ('type'); -[ERROR] Could not execute SQL statement. Reason: -org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support changing 'type' -!error - -alter catalog cat2 reset (); -[ERROR] Could not execute SQL statement. Reason: -org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key -!error diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q index 60fc7f7732849..cd7658ec0fc56 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q @@ -108,6 +108,129 @@ drop catalog default_catalog; 1 row in set !ok +create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +show create catalog cat2; +!output +CREATE CATALOG `cat2` WITH ( + 'default-database' = 'db', + 'type' = 'generic_in_memory' +) +!ok + +describe catalog cat2; +!output ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +describe catalog extended cat2; +!output ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +desc catalog cat2; +!output ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +desc catalog extended cat2; +!output ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db | ++-------------------------+-------------------+ +4 rows in set +!ok + +alter catalog cat2 set ('default-database'='db_new'); +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +desc catalog extended cat2; +!output ++-------------------------+-------------------+ +| info name | info value | ++-------------------------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | +| option:default-database | db_new | ++-------------------------+-------------------+ +4 rows in set +!ok + +alter catalog cat2 reset ('default-database', 'k1'); +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +desc catalog extended cat2; +!output ++-----------+-------------------+ +| info name | info value | ++-----------+-------------------+ +| name | cat2 | +| type | generic_in_memory | +| comment | | ++-----------+-------------------+ +3 rows in set +!ok + +alter catalog cat2 reset ('type'); +!output +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support changing 'type' +!error + +alter catalog cat2 reset (); +!output +org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key +!error + # ========================================================================== # test database # ========================================================================== @@ -816,130 +939,3 @@ show tables from db1 like 'p_r%'; +------------+ 1 row in set !ok - -# ========================================================================== -# test catalog -# ========================================================================== - -create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db'); -!output -+--------+ -| result | -+--------+ -| OK | -+--------+ -1 row in set -!ok - -show create catalog cat2; -!output -CREATE CATALOG `cat2` WITH ( - 'default-database' = 'db', - 'type' = 'generic_in_memory' -) -!ok - -describe catalog cat2; -!output -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -describe catalog extended cat2; -!output -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db | -+-------------------------+-------------------+ -4 rows in set -!ok - -desc catalog cat2; -!output -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -desc catalog extended cat2; -!output -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db | -+-------------------------+-------------------+ -4 rows in set -!ok - -alter catalog cat2 set ('default-database'='db_new'); -!output -+--------+ -| result | -+--------+ -| OK | -+--------+ -1 row in set -!ok - -desc catalog extended cat2; -!output -+-------------------------+-------------------+ -| info name | info value | -+-------------------------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -| option:default-database | db_new | -+-------------------------+-------------------+ -4 rows in set -!ok - -alter catalog cat2 reset ('default-database', 'k1'); -!output -+--------+ -| result | -+--------+ -| OK | -+--------+ -1 row in set -!ok - -desc catalog extended cat2; -!output -+-----------+-------------------+ -| info name | info value | -+-----------+-------------------+ -| name | cat2 | -| type | generic_in_memory | -| comment | | -+-----------+-------------------+ -3 rows in set -!ok - -alter catalog cat2 reset ('type'); -!output -org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support changing 'type' -!error - -alter catalog cat2 reset (); -!output -org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key -!error diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 9a44b1981c569..fc16f7ae05778 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -326,31 +326,24 @@ public void createCatalog(String catalogName, CatalogDescriptor catalogDescripto * Alters a catalog under the given name. The catalog name must be unique. * * @param catalogName the given catalog name under which to alter the given catalog - * @param catalogDescriptor catalog descriptor for altering catalog + * @param catalogUpdater catalog configuration updater to alter catalog * @throws CatalogException If the catalog neither exists in the catalog store nor in the * initialized catalogs, or if an error occurs while creating the catalog or storing the * {@link CatalogDescriptor} */ - public void alterCatalog(String catalogName, CatalogDescriptor catalogDescriptor) + public void alterCatalog(String catalogName, Consumer catalogUpdater) throws CatalogException { checkArgument( !StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); - checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null"); + checkNotNull(catalogUpdater, "Catalog configuration updater cannot be null."); + CatalogStore catalogStore = catalogStoreHolder.catalogStore(); Optional oldCatalogDescriptor = getCatalogDescriptor(catalogName); + if (catalogStore.contains(catalogName) && oldCatalogDescriptor.isPresent()) { Configuration conf = oldCatalogDescriptor.get().getConfiguration(); - conf.addAll(catalogDescriptor.getConfiguration()); - catalogDescriptor - .getConfiguration() - .toMap() - .forEach( - (key, value) -> { - if (value.isEmpty()) { - conf.removeKey(key); - } - }); + catalogUpdater.accept(conf); CatalogDescriptor newCatalogDescriptor = CatalogDescriptor.of(catalogName, conf); Catalog newCatalog = initCatalog(catalogName, newCatalogDescriptor); catalogStore.removeCatalog(catalogName, false); @@ -362,7 +355,7 @@ public void alterCatalog(String catalogName, CatalogDescriptor catalogDescriptor catalogStoreHolder.catalogStore().storeCatalog(catalogName, newCatalogDescriptor); } else { throw new CatalogException( - format("Catalog %s not exists in the catalog store.", catalogName)); + String.format("Catalog %s does not exist in the catalog store.", catalogName)); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java index dd4d1433505e9..e523aa49eb60e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogOptionsOperation.java @@ -23,7 +23,6 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.internal.TableResultImpl; import org.apache.flink.table.api.internal.TableResultInternal; -import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.table.catalog.exceptions.CatalogException; import java.util.Collections; @@ -70,8 +69,7 @@ public TableResultInternal execute(Context ctx) { try { ctx.getCatalogManager() .alterCatalog( - catalogName, - CatalogDescriptor.of(catalogName, Configuration.fromMap(properties))); + catalogName, conf -> conf.addAll(Configuration.fromMap(properties))); return TableResultImpl.TABLE_RESULT_OK; } catch (CatalogException e) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java index 019c524d840ec..b68387def7464 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterCatalogResetOperation.java @@ -19,11 +19,9 @@ package org.apache.flink.table.operations.ddl; import org.apache.flink.annotation.Internal; -import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.internal.TableResultImpl; import org.apache.flink.table.api.internal.TableResultInternal; -import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.table.catalog.exceptions.CatalogException; import java.util.Collections; @@ -64,10 +62,8 @@ public String asSummaryString() { @Override public TableResultInternal execute(Context ctx) { try { - Configuration resetConf = new Configuration(); - resetKeys.forEach(key -> resetConf.setString(key, "")); ctx.getCatalogManager() - .alterCatalog(catalogName, CatalogDescriptor.of(catalogName, resetConf)); + .alterCatalog(catalogName, conf -> resetKeys.forEach(conf::removeKey)); return TableResultImpl.TABLE_RESULT_OK; } catch (CatalogException e) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index 488e691e324e5..b10d9792bd32a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -84,7 +84,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -129,8 +128,7 @@ public void testAlterCatalog() { expectedOptions); // test alter catalog reset - final Set expectedResetKeys = new HashSet<>(); - expectedResetKeys.add("K1"); + final Set expectedResetKeys = Collections.singleton("K1"); operation = parse("ALTER CATALOG cat2 RESET ('K1')"); assertThat(operation)