Skip to content

Commit

Permalink
[FLINK-35164][table] Support ALTER CATALOG RESET syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
liyubin117 committed May 9, 2024
1 parent 4611817 commit 4ff585e
Show file tree
Hide file tree
Showing 13 changed files with 344 additions and 10 deletions.
17 changes: 16 additions & 1 deletion docs/content.zh/docs/dev/table/sql/alter.md
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,14 @@ ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION

Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA,SCALA 和 PYTHON,且函数的默认语言为 JAVA。

{{< top >}}

## ALTER CATALOG

```sql
ALTER CATALOG catalog_name SET (key1=val1, ...)
ALTER CATALOG catalog_name
SET (key1=val1, ...)
| RESET (key1, ...)
```

### SET
Expand All @@ -555,4 +559,15 @@ ALTER CATALOG catalog_name SET (key1=val1, ...)
ALTER CATALOG cat2 SET ('default-database'='db');
```

### RESET

为指定的 catalog 重置一个或多个属性。

`RESET` 语句示例如下。

```sql
-- reset 'default-database'
ALTER CATALOG cat2 RESET ('default-database');
```

{{< top >}}
17 changes: 16 additions & 1 deletion docs/content/docs/dev/table/sql/alter.md
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,14 @@ If the function doesn't exist, nothing happens.

Language tag to instruct flink runtime how to execute the function. Currently only JAVA, SCALA and PYTHON are supported, the default language for a function is JAVA.

{{< top >}}

## ALTER CATALOG

```sql
ALTER CATALOG catalog_name SET (key1=val1, ...)
ALTER CATALOG catalog_name
SET (key1=val1, ...)
| RESET (key1, ...)
```

### SET
Expand All @@ -557,4 +561,15 @@ The following examples illustrate the usage of the `SET` statements.
ALTER CATALOG cat2 SET ('default-database'='db');
```

### RESET

Reset one or more properties to its default value in the specified catalog.

The following examples illustrate the usage of the `RESET` statements.

```sql
-- reset 'default-database'
ALTER CATALOG cat2 RESET ('default-database');
```

{{< top >}}
Original file line number Diff line number Diff line change
Expand Up @@ -769,3 +769,28 @@ desc catalog extended cat2;
+-------------------------+-------------------+
4 rows in set
!ok

alter catalog cat2 reset ('default-database', 'k1');
[INFO] Execute statement succeeded.
!info

desc catalog extended cat2;
+-----------+-------------------+
| info name | info value |
+-----------+-------------------+
| name | cat2 |
| type | generic_in_memory |
| comment | |
+-----------+-------------------+
3 rows in set
!ok

alter catalog cat2 reset ('type');
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support changing 'type'
!error

alter catalog cat2 reset ();
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key
!error
Original file line number Diff line number Diff line change
Expand Up @@ -911,3 +911,35 @@ desc catalog extended cat2;
+-------------------------+-------------------+
4 rows in set
!ok

alter catalog cat2 reset ('default-database', 'k1');
!output
+--------+
| result |
+--------+
| OK |
+--------+
1 row in set
!ok

desc catalog extended cat2;
!output
+-----------+-------------------+
| info name | info value |
+-----------+-------------------+
| name | cat2 |
| type | generic_in_memory |
| comment | |
+-----------+-------------------+
3 rows in set
!ok

alter catalog cat2 reset ('type');
!output
org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support changing 'type'
!error

alter catalog cat2 reset ();
!output
org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key
!error
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext"
"org.apache.flink.sql.parser.ddl.SqlAlterCatalog"
"org.apache.flink.sql.parser.ddl.SqlAlterCatalogOptions"
"org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset"
"org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
"org.apache.flink.sql.parser.ddl.SqlAlterFunction"
"org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,23 @@ SqlAlterCatalog SqlAlterCatalog() :
{
<ALTER> <CATALOG> { startPos = getPos(); }
catalogName = SimpleIdentifier()
<SET>
propertyList = TableProperties()
{
return new SqlAlterCatalogOptions(startPos.plus(getPos()),
catalogName,
propertyList);
}
(
<SET>
propertyList = TableProperties()
{
return new SqlAlterCatalogOptions(startPos.plus(getPos()),
catalogName,
propertyList);
}
|
<RESET>
propertyList = TablePropertyKeys()
{
return new SqlAlterCatalogReset(startPos.plus(getPos()),
catalogName,
propertyList);
}
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.ddl;

import org.apache.flink.sql.parser.SqlUnparseUtils;

import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;
import org.apache.calcite.util.NlsString;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;

/** ALTER CATALOG catalog_name RESET (key1, ...). */
public class SqlAlterCatalogReset extends SqlAlterCatalog {

private final SqlNodeList propertyKeyList;

public SqlAlterCatalogReset(
SqlParserPos position, SqlIdentifier catalogName, SqlNodeList propertyKeyList) {
super(position, catalogName);
this.propertyKeyList = requireNonNull(propertyKeyList, "propertyKeyList cannot be null");
}

@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(catalogName, propertyKeyList);
}

public SqlNodeList getPropertyList() {
return propertyKeyList;
}

public Set<String> getResetKeys() {
return propertyKeyList.getList().stream()
.map(key -> ((NlsString) SqlLiteral.value(key)).getValue())
.collect(Collectors.toSet());
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
super.unparse(writer, leftPrec, rightPrec);
writer.keyword("RESET");
SqlWriter.Frame withFrame = writer.startList("(", ")");
for (SqlNode property : propertyKeyList) {
SqlUnparseUtils.printIndent(writer);
property.unparse(writer, leftPrec, rightPrec);
}
writer.newlineAndIndent();
writer.endList(withFrame);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ void testDescribeCatalog() {

@Test
void testAlterCatalog() {
sql("alter catalog a set ('k1'='v1','k2'='v2')")
sql("alter catalog a set ('k1'='v1', 'k2'='v2')")
.ok("ALTER CATALOG `A` SET (\n" + " 'k1' = 'v1',\n" + " 'k2' = 'v2'\n" + ")");
sql("alter catalog a reset ('k1')").ok("ALTER CATALOG `A` RESET (\n" + " 'k1'\n" + ")");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,15 @@ public void alterCatalog(String catalogName, CatalogDescriptor catalogDescriptor
if (catalogStore.contains(catalogName) && oldCatalogDescriptor.isPresent()) {
Configuration conf = oldCatalogDescriptor.get().getConfiguration();
conf.addAll(catalogDescriptor.getConfiguration());
catalogDescriptor
.getConfiguration()
.toMap()
.forEach(
(key, value) -> {
if (value.isEmpty()) {
conf.removeKey(key);
}
});
CatalogDescriptor newCatalogDescriptor = CatalogDescriptor.of(catalogName, conf);
Catalog newCatalog = initCatalog(catalogName, newCatalogDescriptor);
catalogStore.removeCatalog(catalogName, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.ddl;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.exceptions.CatalogException;

import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Operation to describe an ALTER CATALOG RESET statement. */
@Internal
public class AlterCatalogResetOperation implements AlterOperation {
private final String catalogName;
private final Set<String> resetKeys;

public AlterCatalogResetOperation(String catalogName, Set<String> resetKeys) {
this.catalogName = checkNotNull(catalogName);
this.resetKeys = Collections.unmodifiableSet(checkNotNull(resetKeys));
}

public String getCatalogName() {
return catalogName;
}

public Set<String> getResetKeys() {
return resetKeys;
}

@Override
public String asSummaryString() {
return String.format(
"ALTER CATALOG %s\n%s",
catalogName,
resetKeys.stream()
.map(key -> String.format(" RESET '%s'", key))
.collect(Collectors.joining(",\n")));
}

@Override
public TableResultInternal execute(Context ctx) {
try {
Configuration resetConf = new Configuration();
resetKeys.forEach(key -> resetConf.setString(key, ""));
ctx.getCatalogManager()
.alterCatalog(catalogName, CatalogDescriptor.of(catalogName, resetConf));

return TableResultImpl.TABLE_RESULT_OK;
} catch (CatalogException e) {
throw new ValidationException(
String.format("Could not execute %s", asSummaryString()), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.operations.converters;

import org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.AlterCatalogResetOperation;

import java.util.Set;

/** A converter for {@link SqlAlterCatalogReset}. */
public class SqlAlterCatalogResetConverter implements SqlNodeConverter<SqlAlterCatalogReset> {

@Override
public Operation convertSqlNode(
SqlAlterCatalogReset sqlAlterCatalogReset, ConvertContext context) {
String type = CommonCatalogOptions.CATALOG_TYPE.key();
Set<String> resetKeys = sqlAlterCatalogReset.getResetKeys();
if (resetKeys.isEmpty() || resetKeys.contains(type)) {
String exMsg =
resetKeys.isEmpty()
? "ALTER CATALOG RESET does not support empty key"
: String.format(
"ALTER CATALOG RESET does not support changing '%s'", type);
throw new ValidationException(exMsg);
}
return new AlterCatalogResetOperation(
sqlAlterCatalogReset.catalogName(), sqlAlterCatalogReset.getResetKeys());
}
}

0 comments on commit 4ff585e

Please sign in to comment.