diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index d8a02c299439e..812a11f3bd4f6 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -27,6 +27,7 @@ import org.apache.flink.table.client.gateway.ResultDescriptor; import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.operations.CatalogSinkModifyOperation; +import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.command.ClearOperation; @@ -58,6 +59,7 @@ import java.nio.file.Path; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC; @@ -315,6 +317,9 @@ private void callOperation(Operation operation) { } else if (operation instanceof QueryOperation) { // SELECT callSelect((QueryOperation) operation); + } else if (operation instanceof ExplainOperation) { + // EXPLAIN + callExplain((ExplainOperation) operation); } else { // fallback to default implementation executeOperation(operation); @@ -447,6 +452,21 @@ private boolean callInsert(CatalogSinkModifyOperation operation) { return true; } + public void callExplain(ExplainOperation operation) { + final String explanation; + try { + TableResult tableResult = executor.executeOperation(sessionId, operation); + // show raw content instead of tableau style + explanation = + Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); + } catch (SqlExecutionException | NullPointerException e) { + printExecutionException(e); + return; + } + terminal.writer().println(explanation); + terminal.flush(); + } + private void executeOperation(Operation operation) { try { TableResult result = executor.executeOperation(sessionId, operation); diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q index f1dbc13bc8b8e..275fa8614a0c2 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/table.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q @@ -204,3 +204,119 @@ drop table `mod`; show tables; Empty set !ok + +# ========================================================================== +# test explain +# ========================================================================== + +CREATE TABLE IF NOT EXISTS orders ( + `user` BIGINT NOT NULl, + product VARCHAR(32), + amount INT, + ts TIMESTAMP(3), + ptime AS PROCTIME(), + PRIMARY KEY(`user`) NOT ENFORCED, + WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS +) with ( + 'connector' = 'datagen' +); +[INFO] Execute statement succeed. +!info + +CREATE TABLE IF NOT EXISTS orders2 ( + `user` BIGINT NOT NULl, + product VARCHAR(32), + amount INT, + ts TIMESTAMP(3), + PRIMARY KEY(`user`) NOT ENFORCED +) with ( + 'connector' = 'blackhole' +); +[INFO] Execute statement succeed. +!info + +# test explain plan for select +explain plan for select `user`, product from orders; +== Abstract Syntax Tree == +LogicalProject(user=[$0], product=[$1]) ++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +== Optimized Physical Plan == +Calc(select=[user, product]) ++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)]) + +- Calc(select=[user, product, ts]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +== Optimized Execution Plan == +Calc(select=[user, product]) ++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)]) + +- Calc(select=[user, product, ts]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +!ok + +# test explain plan for insert +explain plan for insert into orders2 select `user`, product, amount, ts from orders; +== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts]) ++- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3]) + +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts]) ++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts]) ++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +!ok + +# test explain select +explain select `user`, product from orders; +== Abstract Syntax Tree == +LogicalProject(user=[$0], product=[$1]) ++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +== Optimized Physical Plan == +Calc(select=[user, product]) ++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)]) + +- Calc(select=[user, product, ts]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +== Optimized Execution Plan == +Calc(select=[user, product]) ++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)]) + +- Calc(select=[user, product, ts]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +!ok + +# test explain insert +explain insert into orders2 select `user`, product, amount, ts from orders; +== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts]) ++- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3]) + +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts]) ++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts]) ++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +!ok diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd index b519cc5284490..a778bf7e9f1d2 100644 --- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd @@ -89,6 +89,7 @@ "org.apache.flink.sql.parser.dql.SqlShowTables" "org.apache.flink.sql.parser.dql.SqlShowPartitions" "org.apache.flink.sql.parser.dql.SqlRichDescribeTable" + "org.apache.flink.sql.parser.dql.SqlRichExplain" "org.apache.flink.sql.parser.dql.SqlUnloadModule" "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec" "org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec" @@ -537,6 +538,7 @@ "SqlShowPartitions()" "SqlUnloadModule()" "SqlUseModules()" + "SqlRichExplain()" ] # List of methods for parsing custom literals. diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl index c39f476d6cc15..6457f32a144e4 100644 --- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl @@ -1605,3 +1605,17 @@ SqlShowModules SqlShowModules() : return new SqlShowModules(startPos.plus(getPos()), requireFull); } } + +/** +* Parses a explain module statement. +*/ +SqlNode SqlRichExplain() : +{ + SqlNode stmt; +} +{ + [ ] + stmt = SqlQueryOrDml() { + return new SqlRichExplain(getPos(),stmt); + } +} diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java index 255fc5f1ab1ab..0bdf11b619178 100644 --- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java +++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java @@ -482,4 +482,54 @@ public void testShowModules() { sql("show full modules").ok("SHOW FULL MODULES"); } + + @Test + public void testExplain() { + String sql = "explain plan for select * from emps"; + String expected = "EXPLAIN SELECT *\n" + "FROM `EMPS`"; + this.sql(sql).ok(expected); + } + + @Test + public void testExplainJsonFormat() { + // Unsupported feature. Escape the test. + } + + @Test + public void testExplainWithImpl() { + // Unsupported feature. Escape the test. + } + + @Test + public void testExplainWithoutImpl() { + // Unsupported feature. Escape the test. + } + + @Test + public void testExplainWithType() { + // Unsupported feature. Escape the test. + } + + @Test + public void testExplainAsXml() { + // Unsupported feature. Escape the test. + } + + @Test + public void testExplainAsJson() { + // TODO: FLINK-20562 + } + + @Test + public void testExplainInsert() { + String expected = "EXPLAIN INSERT INTO `EMPS1`\n" + "(SELECT *\n" + "FROM `EMPS2`)"; + this.sql("explain plan for insert into emps1 select * from emps2").ok(expected); + } + + @Test + public void testExplainUpsert() { + String sql = "explain plan for upsert into emps1 values (1, 2)"; + String expected = "EXPLAIN UPSERT INTO `EMPS1`\n" + "VALUES (ROW(1, 2))"; + this.sql(sql).ok(expected); + } } diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 04879b2cc9c01..5ad878b684cf6 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -60,6 +60,7 @@ "org.apache.flink.sql.parser.dml.SqlEndStatementSet" "org.apache.flink.sql.parser.dql.SqlDescribeCatalog" "org.apache.flink.sql.parser.dql.SqlDescribeDatabase" + "org.apache.flink.sql.parser.dql.SqlRichExplain" "org.apache.flink.sql.parser.dql.SqlLoadModule" "org.apache.flink.sql.parser.dql.SqlShowCatalogs" "org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog" @@ -483,6 +484,7 @@ "SqlShowViews()" "SqlUnloadModule()" "SqlUseModules()" + "SqlRichExplain()" ] # List of methods for parsing custom literals. diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index fd7c6c4d4877f..7eee59e3312cb 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -1569,3 +1569,17 @@ SqlEndStatementSet SqlEndStatementSet() : return new SqlEndStatementSet(getPos()); } } + +/** +* Parses a explain module statement. +*/ +SqlNode SqlRichExplain() : +{ + SqlNode stmt; +} +{ + [ ] + stmt = SqlQueryOrDml() { + return new SqlRichExplain(getPos(),stmt); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java new file mode 100644 index 0000000000000..4df9c6a890bcc --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; + +/** EXPLAIN (PLAN FOR)* STATEMENT sql call. */ +public class SqlRichExplain extends SqlCall { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN); + + private SqlNode statement; + + public SqlRichExplain(SqlParserPos pos, SqlNode statement) { + super(pos); + this.statement = statement; + } + + public SqlNode getStatement() { + return statement; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + return Collections.singletonList(statement); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("EXPLAIN"); + statement.unparse(writer, leftPrec, rightPrec); + } + + @Override + public void setOperand(int i, SqlNode operand) { + if (i == 0) { + statement = operand; + } else { + throw new UnsupportedOperationException( + "SqlExplain SqlNode only support index equals 1"); + } + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 2959ba00770c9..7c7fd5efc2886 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -1215,6 +1215,56 @@ public void testEnd() { sql("end").ok("END"); } + @Test + public void testExplain() { + String sql = "explain plan for select * from emps"; + String expected = "EXPLAIN SELECT *\n" + "FROM `EMPS`"; + this.sql(sql).ok(expected); + } + + @Test + public void testExplainJsonFormat() { + // Unsupported feature. Escape the test. + } + + @Test + public void testExplainWithImpl() { + // Unsupported feature. Escape the test. + } + + @Test + public void testExplainWithoutImpl() { + // Unsupported feature. Escape the test. + } + + @Test + public void testExplainWithType() { + // Unsupported feature. Escape the test. + } + + @Test + public void testExplainAsXml() { + // Unsupported feature. Escape the test. + } + + @Test + public void testExplainAsJson() { + // TODO: FLINK-20562 + } + + @Test + public void testExplainInsert() { + String expected = "EXPLAIN INSERT INTO `EMPS1`\n" + "(SELECT *\n" + "FROM `EMPS2`)"; + this.sql("explain plan for insert into emps1 select * from emps2").ok(expected); + } + + @Test + public void testExplainUpsert() { + String sql = "explain plan for upsert into emps1 values (1, 2)"; + String expected = "EXPLAIN UPSERT INTO `EMPS1`\n" + "VALUES (ROW(1, 2))"; + this.sql(sql).ok(expected); + } + public static BaseMatcher validated(String validatedSql) { return new TypeSafeDiagnosingMatcher() { @Override diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java index c3834d98d345e..21241a3b2b030 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java @@ -21,8 +21,8 @@ import java.util.Collections; /** - * Operation to describe an EXPLAIN statement. NOTES: currently, only default behavior(EXPLAIN PLAN - * FOR xx) is supported. + * Operation to describe an EXPLAIN statement. NOTES: currently, only default behavior (EXPLAIN + * [PLAN FOR] xx) is supported. */ public class ExplainOperation implements Operation { private final Operation child; @@ -38,7 +38,7 @@ public Operation getChild() { @Override public String asSummaryString() { return OperationUtils.formatWithChildren( - "EXPLAIN PLAN FOR", + "EXPLAIN", Collections.emptyMap(), Collections.singletonList(child), Operation::asSummaryString); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 0bc384102d415..8806fa74edf2e 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -53,6 +53,7 @@ import org.apache.flink.sql.parser.dml.SqlEndStatementSet; import org.apache.flink.sql.parser.dql.SqlLoadModule; import org.apache.flink.sql.parser.dql.SqlRichDescribeTable; +import org.apache.flink.sql.parser.dql.SqlRichExplain; import org.apache.flink.sql.parser.dql.SqlShowCatalogs; import org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog; import org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase; @@ -140,9 +141,6 @@ import org.apache.calcite.rel.hint.HintStrategyTable; import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.sql.SqlDialect; -import org.apache.calcite.sql.SqlExplain; -import org.apache.calcite.sql.SqlExplainFormat; -import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; @@ -262,8 +260,8 @@ public static Optional convert( return Optional.of(converter.convertShowFunctions((SqlShowFunctions) validated)); } else if (validated instanceof SqlShowPartitions) { return Optional.of(converter.convertShowPartitions((SqlShowPartitions) validated)); - } else if (validated instanceof SqlExplain) { - return Optional.of(converter.convertExplain((SqlExplain) validated)); + } else if (validated instanceof SqlRichExplain) { + return Optional.of(converter.convertRichExplain((SqlRichExplain) validated)); } else if (validated instanceof SqlRichDescribeTable) { return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated)); } else if (validated instanceof RichSqlInsert) { @@ -883,16 +881,9 @@ private Operation convertShowViews(SqlShowViews sqlShowViews) { return new ShowViewsOperation(); } - /** Convert EXPLAIN statement. */ - private Operation convertExplain(SqlExplain sqlExplain) { - Operation operation = convertSqlQuery(sqlExplain.getExplicandum()); - - if (sqlExplain.getDetailLevel() != SqlExplainLevel.EXPPLAN_ATTRIBUTES - || sqlExplain.getDepth() != SqlExplain.Depth.PHYSICAL - || sqlExplain.getFormat() != SqlExplainFormat.TEXT) { - throw new TableException("Only default behavior is supported now, EXPLAIN PLAN FOR xx"); - } - + /** Convert RICH EXPLAIN statement. */ + private Operation convertRichExplain(SqlRichExplain sqlExplain) { + Operation operation = convertSqlQuery(sqlExplain.getStatement()); return new ExplainOperation(operation); } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala index de5d448bc1263..6e3afbc79955e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala @@ -142,10 +142,10 @@ class FlinkPlannerImpl( return sqlNode } sqlNode match { - case explain: SqlExplain => - val validated = validator.validate(explain.getExplicandum) - explain.setOperand(0, validated) - explain + case richExplain: SqlRichExplain => + val validated = validator.validate(richExplain.getStatement) + richExplain.setOperand(0, validated) + richExplain case _ => validator.validate(sqlNode) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index f635ea0a74b1f..c0bf232ccf13e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -1271,6 +1271,9 @@ class TableEnvironmentTest { } catch { case e: TableException => assertTrue(e.getMessage.contains("Only default behavior is supported now")) + case e: SqlParserException => + assertTrue(e.getMessage + .contains("Was expecting:\n \"FOR\" ...")) case e => fail("This should not happen, " + e.getMessage) } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java index a8cd563f4f47c..70acb49ec6347 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java @@ -37,6 +37,7 @@ import org.apache.flink.sql.parser.ddl.SqlUseDatabase; import org.apache.flink.sql.parser.dml.RichSqlInsert; import org.apache.flink.sql.parser.dql.SqlRichDescribeTable; +import org.apache.flink.sql.parser.dql.SqlRichExplain; import org.apache.flink.sql.parser.dql.SqlShowCatalogs; import org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog; import org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase; @@ -100,9 +101,6 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlDialect; -import org.apache.calcite.sql.SqlExplain; -import org.apache.calcite.sql.SqlExplainFormat; -import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; @@ -196,8 +194,8 @@ public static Optional convert( return Optional.of(converter.convertDropFunction((SqlDropFunction) validated)); } else if (validated instanceof SqlShowFunctions) { return Optional.of(converter.convertShowFunctions((SqlShowFunctions) validated)); - } else if (validated instanceof SqlExplain) { - return Optional.of(converter.convertExplain((SqlExplain) validated)); + } else if (validated instanceof SqlRichExplain) { + return Optional.of(converter.convertRichExplain((SqlRichExplain) validated)); } else if (validated instanceof SqlRichDescribeTable) { return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated)); } else if (validated instanceof RichSqlInsert) { @@ -646,16 +644,9 @@ private Operation convertShowViews(SqlShowViews sqlShowViews) { return new ShowViewsOperation(); } - /** Convert EXPLAIN statement. */ - private Operation convertExplain(SqlExplain sqlExplain) { - Operation operation = convertSqlQuery(sqlExplain.getExplicandum()); - - if (sqlExplain.getDetailLevel() != SqlExplainLevel.EXPPLAN_ATTRIBUTES - || sqlExplain.getDepth() != SqlExplain.Depth.PHYSICAL - || sqlExplain.getFormat() != SqlExplainFormat.TEXT) { - throw new TableException("Only default behavior is supported now, EXPLAIN PLAN FOR xx"); - } - + /** Convert RICH EXPLAIN statement. */ + private Operation convertRichExplain(SqlRichExplain sqlExplain) { + Operation operation = convertSqlQuery(sqlExplain.getStatement()); return new ExplainOperation(operation); } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala index 6b6b6dc5ab447..a45fa14fe8f1c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala @@ -19,7 +19,9 @@ package org.apache.flink.table.calcite import org.apache.flink.sql.parser.ExtendedSqlNode -import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, SqlShowCurrentCatalog, SqlShowCurrentDatabase, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews} +import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlRichExplain, SqlShowCatalogs, + SqlShowCurrentCatalog, SqlShowCurrentDatabase, SqlShowDatabases, + SqlShowFunctions, SqlShowTables, SqlShowViews} import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.catalog.CatalogReader import org.apache.flink.table.parse.CalciteParser @@ -43,11 +45,11 @@ import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ /** - * NOTE: this is heavily inspired by Calcite's PlannerImpl. - * We need it in order to share the planner between the Table API relational plans - * and the SQL relation plans that are created by the Calcite parser. - * The main difference is that we do not create a new RelOptPlanner in the ready() method. - */ + * NOTE: this is heavily inspired by Calcite's PlannerImpl. + * We need it in order to share the planner between the Table API relational plans + * and the SQL relation plans that are created by the Calcite parser. + * The main difference is that we do not create a new RelOptPlanner in the ready() method. + */ class FlinkPlannerImpl( val config: FrameworkConfig, val catalogReaderSupplier: JFunction[JBoolean, CatalogReader], @@ -72,14 +74,14 @@ class FlinkPlannerImpl( } /** - * Get the [[FlinkCalciteSqlValidator]] instance from this planner, create a new instance - * if current validator has not been initialized, or returns the validator - * instance directly. - * - *

The validator instance creation is not thread safe. - * - * @return a new validator instance or current existed one - */ + * Get the [[FlinkCalciteSqlValidator]] instance from this planner, create a new instance + * if current validator has not been initialized, or returns the validator + * instance directly. + * + *

The validator instance creation is not thread safe. + * + * @return a new validator instance or current existed one + */ def getOrCreateSqlValidator(): FlinkCalciteSqlValidator = { if (validator == null) { val catalogReader = catalogReaderSupplier.apply(false) @@ -132,10 +134,10 @@ class FlinkPlannerImpl( return sqlNode } sqlNode match { - case explain: SqlExplain => - val validated = validator.validate(explain.getExplicandum) - explain.setOperand(0, validated) - explain + case richExplain: SqlRichExplain => + val validated = validator.validate(richExplain.getStatement) + richExplain.setOperand(0, validated) + richExplain case _ => validator.validate(sqlNode) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala index 83430b81a1d10..6554a4cb16628 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala @@ -540,8 +540,9 @@ class BatchTableEnvironmentTest extends TableTestBase { tableEnv.executeSql(explain) fail("This should not happen") } catch { - case e: TableException => - assertTrue(e.getMessage.contains("Only default behavior is supported now")) + case e: SqlParserException => + assertTrue(e.getMessage + .contains("Was expecting:\n \"FOR\" ...")) case e => fail("This should not happen, " + e.getMessage) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala index 944402cba23ed..0112c6f0b5858 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala @@ -320,8 +320,10 @@ class StreamTableEnvironmentTest extends TableTestBase { tableEnv.executeSql(explain) fail("This should not happen") } catch { - case e: TableException => - assertTrue(e.getMessage.contains("Only default behavior is supported now")) + case e: SqlParserException => { + assertTrue(e.getMessage + .contains("Was expecting:\n \"FOR\" ...")) + } case e => fail("This should not happen, " + e.getMessage) }