diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlUseCatalog.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlUseCatalog.java index 6fa2fa0d126ed..6f1e81e8b065e 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlUseCatalog.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlUseCatalog.java @@ -35,7 +35,7 @@ */ public class SqlUseCatalog extends SqlCall { - public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE CATALOG", SqlKind.OTHER); + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE CATALOG", SqlKind.OTHER_DDL); private final SqlIdentifier catalogName; public SqlUseCatalog(SqlParserPos pos, SqlIdentifier catalogName) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 65c13a7cd599e..261829abb0870 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -58,6 +58,7 @@ import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.TableSourceQueryOperation; +import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.operations.utils.OperationTreeBuilder; @@ -450,7 +451,7 @@ public void sqlUpdate(String stmt) { if (operations.size() != 1) { throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " + - "INSERT, CREATE TABLE, DROP TABLE"); + "INSERT, CREATE TABLE, DROP TABLE, USE CATALOG"); } Operation operation = operations.get(0); @@ -473,10 +474,13 @@ public void sqlUpdate(String stmt) { catalogManager.dropTable( dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists()); + } else if (operation instanceof UseCatalogOperation) { + UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation; + catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName()); } else { throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of " + - "type INSERT, CREATE TABLE, DROP TABLE"); + "type INSERT, CREATE TABLE, DROP TABLE, USE CATALOG"); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java new file mode 100644 index 0000000000000..e7177a117584e --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +/** + * Operation to describe a USE CATALOG statement. + */ +public class UseCatalogOperation implements UseOperation { + + private String catalogName; + + public UseCatalogOperation(String catalogName) { + this.catalogName = catalogName; + } + + public String getCatalogName() { + return catalogName; + } + + @Override + public String asSummaryString() { + return String.format("USE CATALOGS %s", catalogName); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java new file mode 100644 index 0000000000000..a75f8d05eeaf7 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; + +/** + * An {@link Operation} that describes the catalog/database switch statements, + * e.g. USE CATALOG or USE [catalogName.]dataBaseName. + * + *

Different sub operations can represent their special meanings. For example, a + * use catalog operation means switching current catalog to another, + * while use database operation means switching current database. + */ +@Internal +public interface UseOperation extends Operation { +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 9a9da5a4f2e52..dd7701d7bd508 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -22,6 +22,7 @@ import org.apache.flink.sql.parser.ddl.SqlDropTable; import org.apache.flink.sql.parser.ddl.SqlTableColumn; import org.apache.flink.sql.parser.ddl.SqlTableOption; +import org.apache.flink.sql.parser.ddl.SqlUseCatalog; import org.apache.flink.sql.parser.dml.RichSqlInsert; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; @@ -32,6 +33,7 @@ import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; @@ -100,6 +102,8 @@ public static Optional convert( return Optional.of(converter.convertDropTable((SqlDropTable) validated)); } else if (validated instanceof RichSqlInsert) { return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated)); + } else if (validated instanceof SqlUseCatalog) { + return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated)); } else if (validated.getKind().belongsTo(SqlKind.QUERY)) { return Optional.of(converter.convertSqlQuery(validated)); } else { @@ -177,6 +181,11 @@ private Operation convertSqlInsert(RichSqlInsert insert) { insert.isOverwrite()); } + /** Convert use catalog statement. */ + private Operation convertUseCatalog(SqlUseCatalog useCatalog) { + return new UseCatalogOperation(useCatalog.getCatalogName()); + } + /** Fallback method for sql query. */ private Operation convertSqlQuery(SqlNode node) { return toQueryOperation(flinkPlanner, node); diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 0dc377fd0dd0b..4021d5bc93ba0 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -41,6 +41,7 @@ import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.planner.calcite.CalciteParser; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; @@ -115,6 +116,16 @@ public void after() throws TableNotExistException { catalog.dropTable(path2, true); } + @Test + public void testUseCatalog() { + final String sql = "USE CATALOG cat1"; + FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); + Operation operation = parse(sql, planner, parser); + assert operation instanceof UseCatalogOperation; + assertEquals("cat1", ((UseCatalogOperation) operation).getCatalogName()); + } + @Test public void testCreateTable() { final String sql = "CREATE TABLE tbl1 (\n" + diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala index d301f0770ee30..991ade58fdf96 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala @@ -21,18 +21,16 @@ package org.apache.flink.table.planner.catalog import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.internal.TableEnvironmentImpl import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, ValidationException} -import org.apache.flink.table.catalog.{CatalogFunctionImpl, ObjectPath} +import org.apache.flink.table.catalog.{CatalogFunctionImpl, GenericInMemoryCatalog, ObjectPath} import org.apache.flink.table.planner.expressions.utils.Func0 import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0 import org.apache.flink.types.Row - import org.junit.Assert.assertEquals import org.junit.rules.ExpectedException import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{Before, Ignore, Rule, Test} - import java.sql.Timestamp import java.util @@ -828,6 +826,16 @@ class CatalogTableITCase(isStreamingMode: Boolean) { tableEnv.sqlUpdate("DROP TABLE IF EXISTS catalog1.database1.t1") assert(tableEnv.listTables().sameElements(Array[String]("t1"))) } + + @Test + def testUseCatalog(): Unit = { + tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("cat1")) + tableEnv.registerCatalog("cat2", new GenericInMemoryCatalog("cat2")) + tableEnv.sqlUpdate("use catalog cat1") + assertEquals("cat1", tableEnv.getCurrentCatalog) + tableEnv.sqlUpdate("use catalog cat2") + assertEquals("cat2", tableEnv.getCurrentCatalog) + } } object CatalogTableITCase { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java index da246e0cbc239..00051e109e8d5 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java @@ -22,6 +22,7 @@ import org.apache.flink.sql.parser.ddl.SqlDropTable; import org.apache.flink.sql.parser.ddl.SqlTableColumn; import org.apache.flink.sql.parser.ddl.SqlTableOption; +import org.apache.flink.sql.parser.ddl.SqlUseCatalog; import org.apache.flink.sql.parser.dml.RichSqlInsert; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; @@ -36,6 +37,7 @@ import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.PlannerQueryOperation; +import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.types.utils.TypeConversions; @@ -102,6 +104,8 @@ public static Optional convert( throw new ValidationException("Partial inserts are not supported"); } return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated)); + } else if (validated instanceof SqlUseCatalog) { + return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated)); } else if (validated.getKind().belongsTo(SqlKind.QUERY)) { return Optional.of(converter.convertSqlQuery(validated)); } else { @@ -189,6 +193,11 @@ private Operation convertSqlInsert(RichSqlInsert insert) { insert.isOverwrite()); } + /** Convert use catalog statement. */ + private Operation convertUseCatalog(SqlUseCatalog useCatalog) { + return new UseCatalogOperation(useCatalog.getCatalogName()); + } + //~ Tools ------------------------------------------------------------------ /** diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index ca2ecccd170d9..8798e092daae2 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -468,7 +468,7 @@ abstract class TableEnvImpl( if (operations.size != 1) throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " + - "INSERT, CREATE TABLE, DROP TABLE") + "INSERT, CREATE TABLE, DROP TABLE, USE CATALOG") operations.get(0) match { case op: CatalogSinkModifyOperation => @@ -485,9 +485,11 @@ abstract class TableEnvImpl( catalogManager.dropTable( dropTableOperation.getTableIdentifier, dropTableOperation.isIfExists) + case useCatalogOperation: UseCatalogOperation => + catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of " + - "type INSERT, CREATE TABLE, DROP TABLE") + "type INSERT, CREATE TABLE, DROP TABLE, USE CATALOG") } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java index 21a399f2781ce..da0e68ec74b13 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java @@ -44,6 +44,7 @@ import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.planner.PlanningConfigurationBuilder; import org.apache.flink.table.types.DataType; @@ -113,6 +114,16 @@ public void after() throws TableNotExistException { catalog.dropTable(path2, true); } + @Test + public void testUseCatalog() { + final String sql = "USE CATALOG cat1"; + FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + SqlNode node = getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql); + Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get(); + assert operation instanceof UseCatalogOperation; + assertEquals("cat1", ((UseCatalogOperation) operation).getCatalogName()); + } + @Test public void testCreateTable() { final String sql = "CREATE TABLE tbl1 (\n" + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala index 9d948497c00c2..fa5db23c9f823 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala @@ -534,6 +534,16 @@ class CatalogTableITCase(isStreaming: Boolean) { tableEnv.sqlUpdate("DROP TABLE IF EXISTS catalog1.database1.t1") assert(tableEnv.listTables().sameElements(Array[String]("t1"))) } + + @Test + def testUseCatalog(): Unit = { + tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("cat1")) + tableEnv.registerCatalog("cat2", new GenericInMemoryCatalog("cat2")) + tableEnv.sqlUpdate("use catalog cat1") + assertEquals("cat1", tableEnv.getCurrentCatalog) + tableEnv.sqlUpdate("use catalog cat2") + assertEquals("cat2", tableEnv.getCurrentCatalog) + } } object CatalogTableITCase {