From c2375e0a3af00cf91668375955d1caebef65f1fa Mon Sep 17 00:00:00 2001 From: zjuwangg Date: Wed, 20 Nov 2019 23:07:23 +0800 Subject: [PATCH 1/5] Add useCatalogOperation and support it both in flink/blink planner --- .../api/internal/TableEnvironmentImpl.java | 9 ++++- .../operations/ddl/UseCatalogOperation.java | 40 +++++++++++++++++++ .../table/operations/ddl/UseOperation.java | 33 +++++++++++++++ .../operations/SqlToOperationConverter.java | 9 +++++ .../sqlexec/SqlToOperationConverter.java | 9 +++++ .../table/api/internal/TableEnvImpl.scala | 8 ++-- 6 files changed, 102 insertions(+), 6 deletions(-) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseCatalogOperation.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseOperation.java diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 65c13a7cd599e..a93993aa36be6 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -60,6 +60,7 @@ import org.apache.flink.table.operations.TableSourceQueryOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; +import org.apache.flink.table.operations.ddl.UseCatalogOperation; import org.apache.flink.table.operations.utils.OperationTreeBuilder; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; @@ -450,7 +451,7 @@ public void sqlUpdate(String stmt) { if (operations.size() != 1) { throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " + - "INSERT, CREATE TABLE, DROP TABLE"); + "INSERT, CREATE TABLE, DROP TABLE, USE CATALOG"); } Operation operation = operations.get(0); @@ -473,7 +474,11 @@ public void sqlUpdate(String stmt) { catalogManager.dropTable( dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists()); - } else { + } else if (operation instanceof UseCatalogOperation) { + UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation; + catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName()); + } + else { throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of " + "type INSERT, CREATE TABLE, DROP TABLE"); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseCatalogOperation.java new file mode 100644 index 0000000000000..9511150b89a52 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseCatalogOperation.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +/** + * Operation to describe a USE CATALOG statement. + */ +public class UseCatalogOperation implements UseOperation { + + private String catalogName; + + public UseCatalogOperation(String catalogName) { + this.catalogName = catalogName; + } + + public String getCatalogName() { + return catalogName; + } + + @Override + public String asSummaryString() { + return String.format("USE CATALOGS %s", catalogName); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseOperation.java new file mode 100644 index 0000000000000..7b3c636c7af4d --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseOperation.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.operations.Operation; + +/** + * A {@link Operation} that describes the DDL statements, e.g. USE CATALOG or USE [catalogName.]dataBaseName. + * + *

Different sub operations can represent their special meanings. For example, a + * use catalog operation means switching current catalog to another, + * while use database operation means switching current database. + */ +@Internal +public interface UseOperation extends Operation { +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 9a9da5a4f2e52..5ab7c19a5bea4 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -22,6 +22,7 @@ import org.apache.flink.sql.parser.ddl.SqlDropTable; import org.apache.flink.sql.parser.ddl.SqlTableColumn; import org.apache.flink.sql.parser.ddl.SqlTableOption; +import org.apache.flink.sql.parser.ddl.SqlUseCatalog; import org.apache.flink.sql.parser.dml.RichSqlInsert; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; @@ -34,6 +35,7 @@ import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; +import org.apache.flink.table.operations.ddl.UseCatalogOperation; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.types.DataType; @@ -100,6 +102,8 @@ public static Optional convert( return Optional.of(converter.convertDropTable((SqlDropTable) validated)); } else if (validated instanceof RichSqlInsert) { return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated)); + } else if (validated instanceof SqlUseCatalog) { + return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated)); } else if (validated.getKind().belongsTo(SqlKind.QUERY)) { return Optional.of(converter.convertSqlQuery(validated)); } else { @@ -177,6 +181,11 @@ private Operation convertSqlInsert(RichSqlInsert insert) { insert.isOverwrite()); } + /** Convert use catalog statement. */ + private Operation convertUseCatalog(SqlUseCatalog useCatalog) { + return new UseCatalogOperation(useCatalog.getCatalogName()); + } + /** Fallback method for sql query. */ private Operation convertSqlQuery(SqlNode node) { return toQueryOperation(flinkPlanner, node); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java index da246e0cbc239..00771d2e1fdd8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java @@ -22,6 +22,7 @@ import org.apache.flink.sql.parser.ddl.SqlDropTable; import org.apache.flink.sql.parser.ddl.SqlTableColumn; import org.apache.flink.sql.parser.ddl.SqlTableOption; +import org.apache.flink.sql.parser.ddl.SqlUseCatalog; import org.apache.flink.sql.parser.dml.RichSqlInsert; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; @@ -38,6 +39,7 @@ import org.apache.flink.table.operations.PlannerQueryOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; +import org.apache.flink.table.operations.ddl.UseCatalogOperation; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.calcite.rel.RelRoot; @@ -102,6 +104,8 @@ public static Optional convert( throw new ValidationException("Partial inserts are not supported"); } return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated)); + } else if (validated instanceof SqlUseCatalog) { + return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated)); } else if (validated.getKind().belongsTo(SqlKind.QUERY)) { return Optional.of(converter.convertSqlQuery(validated)); } else { @@ -189,6 +193,11 @@ private Operation convertSqlInsert(RichSqlInsert insert) { insert.isOverwrite()); } + /** Convert use catalog statement. */ + private Operation convertUseCatalog(SqlUseCatalog useCatalog) { + return new UseCatalogOperation(useCatalog.getCatalogName()); + } + //~ Tools ------------------------------------------------------------------ /** diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index ca2ecccd170d9..0f56774c5860a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -29,18 +29,16 @@ import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory} import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, UserDefinedAggregateFunction, _} import org.apache.flink.table.module.{Module, ModuleManager} -import org.apache.flink.table.operations.ddl.{CreateTableOperation, DropTableOperation} +import org.apache.flink.table.operations.ddl.{CreateTableOperation, DropTableOperation, UseCatalogOperation} import org.apache.flink.table.operations.utils.OperationTreeBuilder import org.apache.flink.table.operations.{CatalogQueryOperation, TableSourceQueryOperation, _} import org.apache.flink.table.planner.{ParserImpl, PlanningConfigurationBuilder} import org.apache.flink.table.sinks.{OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils} import org.apache.flink.table.sources.TableSource import org.apache.flink.table.util.JavaScalaConversionUtil - import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema import org.apache.calcite.sql.parser.SqlParser import org.apache.calcite.tools.FrameworkConfig - import _root_.java.util.function.{Supplier => JSupplier} import _root_.java.util.{Optional, HashMap => JHashMap, Map => JMap} @@ -468,7 +466,7 @@ abstract class TableEnvImpl( if (operations.size != 1) throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " + - "INSERT, CREATE TABLE, DROP TABLE") + "INSERT, CREATE TABLE, DROP TABLE, USE CATALOG") operations.get(0) match { case op: CatalogSinkModifyOperation => @@ -485,6 +483,8 @@ abstract class TableEnvImpl( catalogManager.dropTable( dropTableOperation.getTableIdentifier, dropTableOperation.isIfExists) + case useCatalogOperation: UseCatalogOperation => + catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of " + "type INSERT, CREATE TABLE, DROP TABLE") From 442d683bfdc59204390b26d5433b42ed9a350cf0 Mon Sep 17 00:00:00 2001 From: zjuwangg Date: Thu, 21 Nov 2019 01:05:37 +0800 Subject: [PATCH 2/5] add converter test and make sqlUseCatalog sql kind to other ddl to skip row type validate --- .../apache/flink/sql/parser/ddl/SqlUseCatalog.java | 2 +- .../operations/SqlToOperationConverterTest.java | 11 +++++++++++ .../table/sqlexec/SqlToOperationConverterTest.java | 11 +++++++++++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlUseCatalog.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlUseCatalog.java index 6fa2fa0d126ed..6f1e81e8b065e 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlUseCatalog.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlUseCatalog.java @@ -35,7 +35,7 @@ */ public class SqlUseCatalog extends SqlCall { - public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE CATALOG", SqlKind.OTHER); + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE CATALOG", SqlKind.OTHER_DDL); private final SqlIdentifier catalogName; public SqlUseCatalog(SqlParserPos pos, SqlIdentifier catalogName) { diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 0dc377fd0dd0b..03f524e9e9d58 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -42,6 +42,7 @@ import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.operations.ddl.UseCatalogOperation; import org.apache.flink.table.planner.calcite.CalciteParser; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema; @@ -115,6 +116,16 @@ public void after() throws TableNotExistException { catalog.dropTable(path2, true); } + @Test + public void testUseCatalog() { + final String sql = "USE CATALOG cat1"; + FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); + Operation operation = parse(sql, planner, parser); + assert operation instanceof UseCatalogOperation; + assertEquals("cat1", ((UseCatalogOperation) operation).getCatalogName()); + } + @Test public void testCreateTable() { final String sql = "CREATE TABLE tbl1 (\n" + diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java index 21a399f2781ce..8c746c73c0f7c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java @@ -45,6 +45,7 @@ import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.operations.ddl.UseCatalogOperation; import org.apache.flink.table.planner.PlanningConfigurationBuilder; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.TypeConversions; @@ -113,6 +114,16 @@ public void after() throws TableNotExistException { catalog.dropTable(path2, true); } + @Test + public void testUseCatalog() { + final String sql = "USE CATALOG cat1"; + FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + SqlNode node = getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql); + Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get(); + assert operation instanceof UseCatalogOperation; + assertEquals("cat1", ((UseCatalogOperation) operation).getCatalogName()); + } + @Test public void testCreateTable() { final String sql = "CREATE TABLE tbl1 (\n" + From ea0572560ca5e700a8d749484aac2edb17c07ba9 Mon Sep 17 00:00:00 2001 From: zjuwangg Date: Thu, 21 Nov 2019 01:56:46 +0800 Subject: [PATCH 3/5] add related ITcase in CatalogTableITCase to both planner --- .../table/planner/catalog/CatalogTableITCase.scala | 14 +++++++++++--- .../flink/table/catalog/CatalogTableITCase.scala | 10 ++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala index d301f0770ee30..991ade58fdf96 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala @@ -21,18 +21,16 @@ package org.apache.flink.table.planner.catalog import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.internal.TableEnvironmentImpl import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, ValidationException} -import org.apache.flink.table.catalog.{CatalogFunctionImpl, ObjectPath} +import org.apache.flink.table.catalog.{CatalogFunctionImpl, GenericInMemoryCatalog, ObjectPath} import org.apache.flink.table.planner.expressions.utils.Func0 import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0 import org.apache.flink.types.Row - import org.junit.Assert.assertEquals import org.junit.rules.ExpectedException import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{Before, Ignore, Rule, Test} - import java.sql.Timestamp import java.util @@ -828,6 +826,16 @@ class CatalogTableITCase(isStreamingMode: Boolean) { tableEnv.sqlUpdate("DROP TABLE IF EXISTS catalog1.database1.t1") assert(tableEnv.listTables().sameElements(Array[String]("t1"))) } + + @Test + def testUseCatalog(): Unit = { + tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("cat1")) + tableEnv.registerCatalog("cat2", new GenericInMemoryCatalog("cat2")) + tableEnv.sqlUpdate("use catalog cat1") + assertEquals("cat1", tableEnv.getCurrentCatalog) + tableEnv.sqlUpdate("use catalog cat2") + assertEquals("cat2", tableEnv.getCurrentCatalog) + } } object CatalogTableITCase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala index 9d948497c00c2..fa5db23c9f823 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala @@ -534,6 +534,16 @@ class CatalogTableITCase(isStreaming: Boolean) { tableEnv.sqlUpdate("DROP TABLE IF EXISTS catalog1.database1.t1") assert(tableEnv.listTables().sameElements(Array[String]("t1"))) } + + @Test + def testUseCatalog(): Unit = { + tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("cat1")) + tableEnv.registerCatalog("cat2", new GenericInMemoryCatalog("cat2")) + tableEnv.sqlUpdate("use catalog cat1") + assertEquals("cat1", tableEnv.getCurrentCatalog) + tableEnv.sqlUpdate("use catalog cat2") + assertEquals("cat2", tableEnv.getCurrentCatalog) + } } object CatalogTableITCase { From efe214d89bc2fd8f7fe2f9f08d8196cc01129590 Mon Sep 17 00:00:00 2001 From: zjuwangg Date: Thu, 21 Nov 2019 11:19:38 +0800 Subject: [PATCH 4/5] Address the comments: 1.Update exception info 2.Revert unrelated change of style --- .../flink/table/api/internal/TableEnvironmentImpl.java | 5 ++--- .../org/apache/flink/table/operations/ddl/UseOperation.java | 3 ++- .../org/apache/flink/table/api/internal/TableEnvImpl.scala | 4 +++- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index a93993aa36be6..06b1acf793a43 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -477,11 +477,10 @@ public void sqlUpdate(String stmt) { } else if (operation instanceof UseCatalogOperation) { UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation; catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName()); - } - else { + } else { throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of " + - "type INSERT, CREATE TABLE, DROP TABLE"); + "type INSERT, CREATE TABLE, DROP TABLE, USE CATALOG"); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseOperation.java index 7b3c636c7af4d..32ad617df788b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseOperation.java @@ -22,7 +22,8 @@ import org.apache.flink.table.operations.Operation; /** - * A {@link Operation} that describes the DDL statements, e.g. USE CATALOG or USE [catalogName.]dataBaseName. + * An {@link Operation} that describes the catalog/database switch statements, + * e.g. USE CATALOG or USE [catalogName.]dataBaseName. * *

Different sub operations can represent their special meanings. For example, a * use catalog operation means switching current catalog to another, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 0f56774c5860a..4f78d296081a1 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -36,9 +36,11 @@ import org.apache.flink.table.planner.{ParserImpl, PlanningConfigurationBuilder} import org.apache.flink.table.sinks.{OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils} import org.apache.flink.table.sources.TableSource import org.apache.flink.table.util.JavaScalaConversionUtil + import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema import org.apache.calcite.sql.parser.SqlParser import org.apache.calcite.tools.FrameworkConfig + import _root_.java.util.function.{Supplier => JSupplier} import _root_.java.util.{Optional, HashMap => JHashMap, Map => JMap} @@ -487,7 +489,7 @@ abstract class TableEnvImpl( catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of " + - "type INSERT, CREATE TABLE, DROP TABLE") + "type INSERT, CREATE TABLE, DROP TABLE, USE CATALOG") } } From 53d8a75d7bc87df15fc3d2107469343d11407b33 Mon Sep 17 00:00:00 2001 From: zjuwangg Date: Fri, 22 Nov 2019 10:32:05 +0800 Subject: [PATCH 5/5] relocate UseOperation package name --- .../apache/flink/table/api/internal/TableEnvironmentImpl.java | 2 +- .../flink/table/operations/{ddl => }/UseCatalogOperation.java | 2 +- .../apache/flink/table/operations/{ddl => }/UseOperation.java | 3 +-- .../table/planner/operations/SqlToOperationConverter.java | 2 +- .../table/planner/operations/SqlToOperationConverterTest.java | 2 +- .../apache/flink/table/sqlexec/SqlToOperationConverter.java | 2 +- .../org/apache/flink/table/api/internal/TableEnvImpl.scala | 2 +- .../flink/table/sqlexec/SqlToOperationConverterTest.java | 2 +- 8 files changed, 8 insertions(+), 9 deletions(-) rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/{ddl => }/UseCatalogOperation.java (96%) rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/{ddl => }/UseOperation.java (92%) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 06b1acf793a43..261829abb0870 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -58,9 +58,9 @@ import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.TableSourceQueryOperation; +import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; -import org.apache.flink.table.operations.ddl.UseCatalogOperation; import org.apache.flink.table.operations.utils.OperationTreeBuilder; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java similarity index 96% rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseCatalogOperation.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java index 9511150b89a52..e7177a117584e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseCatalogOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseCatalogOperation.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.operations.ddl; +package org.apache.flink.table.operations; /** * Operation to describe a USE CATALOG statement. diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java similarity index 92% rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseOperation.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java index 32ad617df788b..a75f8d05eeaf7 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/UseOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseOperation.java @@ -16,10 +16,9 @@ * limitations under the License. */ -package org.apache.flink.table.operations.ddl; +package org.apache.flink.table.operations; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.operations.Operation; /** * An {@link Operation} that describes the catalog/database switch statements, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 5ab7c19a5bea4..dd7701d7bd508 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -33,9 +33,9 @@ import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; -import org.apache.flink.table.operations.ddl.UseCatalogOperation; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.types.DataType; diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 03f524e9e9d58..4021d5bc93ba0 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -41,8 +41,8 @@ import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; -import org.apache.flink.table.operations.ddl.UseCatalogOperation; import org.apache.flink.table.planner.calcite.CalciteParser; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java index 00771d2e1fdd8..00051e109e8d5 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java @@ -37,9 +37,9 @@ import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.PlannerQueryOperation; +import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; -import org.apache.flink.table.operations.ddl.UseCatalogOperation; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.calcite.rel.RelRoot; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 4f78d296081a1..8798e092daae2 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -29,7 +29,7 @@ import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory} import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, UserDefinedAggregateFunction, _} import org.apache.flink.table.module.{Module, ModuleManager} -import org.apache.flink.table.operations.ddl.{CreateTableOperation, DropTableOperation, UseCatalogOperation} +import org.apache.flink.table.operations.ddl.{CreateTableOperation, DropTableOperation} import org.apache.flink.table.operations.utils.OperationTreeBuilder import org.apache.flink.table.operations.{CatalogQueryOperation, TableSourceQueryOperation, _} import org.apache.flink.table.planner.{ParserImpl, PlanningConfigurationBuilder} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java index 8c746c73c0f7c..da0e68ec74b13 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java @@ -44,8 +44,8 @@ import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; -import org.apache.flink.table.operations.ddl.UseCatalogOperation; import org.apache.flink.table.planner.PlanningConfigurationBuilder; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.TypeConversions;