Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.command.ClearOperation;
Expand Down Expand Up @@ -58,6 +59,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
Expand Down Expand Up @@ -315,6 +317,9 @@ private void callOperation(Operation operation) {
} else if (operation instanceof QueryOperation) {
// SELECT
callSelect((QueryOperation) operation);
} else if (operation instanceof ExplainOperation) {
// EXPLAIN
callExplain((ExplainOperation) operation);
} else {
// fallback to default implementation
executeOperation(operation);
Expand Down Expand Up @@ -447,6 +452,21 @@ private boolean callInsert(CatalogSinkModifyOperation operation) {
return true;
}

public void callExplain(ExplainOperation operation) {
final String explanation;
try {
TableResult tableResult = executor.executeOperation(sessionId, operation);
// show raw content instead of tableau style
explanation =
Objects.requireNonNull(tableResult.collect().next().getField(0)).toString();
} catch (SqlExecutionException | NullPointerException e) {
printExecutionException(e);
return;
}
terminal.writer().println(explanation);
terminal.flush();
}

private void executeOperation(Operation operation) {
try {
TableResult result = executor.executeOperation(sessionId, operation);
Expand Down
116 changes: 116 additions & 0 deletions flink-table/flink-sql-client/src/test/resources/sql/table.q
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,119 @@ drop table `mod`;
show tables;
Empty set
!ok

# ==========================================================================
# test explain
# ==========================================================================

CREATE TABLE IF NOT EXISTS orders (
`user` BIGINT NOT NULl,
product VARCHAR(32),
amount INT,
ts TIMESTAMP(3),
ptime AS PROCTIME(),
PRIMARY KEY(`user`) NOT ENFORCED,
WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
Comment on lines +212 to +219
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please style the DDL as above. Add " " before fields

) with (
'connector' = 'datagen'
);
[INFO] Execute statement succeed.
!info

CREATE TABLE IF NOT EXISTS orders2 (
`user` BIGINT NOT NULl,
product VARCHAR(32),
amount INT,
ts TIMESTAMP(3),
PRIMARY KEY(`user`) NOT ENFORCED
) with (
'connector' = 'blackhole'
);
[INFO] Execute statement succeed.
!info

# test explain plan for select
explain plan for select `user`, product from orders;
== Abstract Syntax Tree ==
LogicalProject(user=[$0], product=[$1])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, orders]])

== Optimized Physical Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])

== Optimized Execution Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])

!ok

# test explain plan for insert
explain plan for insert into orders2 select `user`, product, amount, ts from orders;
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
+- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, orders]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])

!ok

# test explain select
explain select `user`, product from orders;
== Abstract Syntax Tree ==
LogicalProject(user=[$0], product=[$1])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, orders]])

== Optimized Physical Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])

== Optimized Execution Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])

!ok

# test explain insert
explain insert into orders2 select `user`, product, amount, ts from orders;
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
+- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, orders]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])

!ok
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
"org.apache.flink.sql.parser.dql.SqlShowTables"
"org.apache.flink.sql.parser.dql.SqlShowPartitions"
"org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
"org.apache.flink.sql.parser.dql.SqlRichExplain"
"org.apache.flink.sql.parser.dql.SqlUnloadModule"
"org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
"org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
Expand Down Expand Up @@ -537,6 +538,7 @@
"SqlShowPartitions()"
"SqlUnloadModule()"
"SqlUseModules()"
"SqlRichExplain()"
]

# List of methods for parsing custom literals.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1605,3 +1605,17 @@ SqlShowModules SqlShowModules() :
return new SqlShowModules(startPos.plus(getPos()), requireFull);
}
}

/**
* Parses a explain module statement.
*/
SqlNode SqlRichExplain() :
{
SqlNode stmt;
}
{
<EXPLAIN> [ <PLAN> <FOR> ]
stmt = SqlQueryOrDml() {
return new SqlRichExplain(getPos(),stmt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -482,4 +482,54 @@ public void testShowModules() {

sql("show full modules").ok("SHOW FULL MODULES");
}

@Test
public void testExplain() {
String sql = "explain plan for select * from emps";
String expected = "EXPLAIN SELECT *\n" + "FROM `EMPS`";
this.sql(sql).ok(expected);
}

@Test
public void testExplainJsonFormat() {
// Unsupported feature. Escape the test.
}

@Test
public void testExplainWithImpl() {
// Unsupported feature. Escape the test.
}

@Test
public void testExplainWithoutImpl() {
// Unsupported feature. Escape the test.
}

@Test
public void testExplainWithType() {
// Unsupported feature. Escape the test.
}

@Test
public void testExplainAsXml() {
// Unsupported feature. Escape the test.
}

@Test
public void testExplainAsJson() {
// TODO: FLINK-20562
}

@Test
public void testExplainInsert() {
String expected = "EXPLAIN INSERT INTO `EMPS1`\n" + "(SELECT *\n" + "FROM `EMPS2`)";
this.sql("explain plan for insert into emps1 select * from emps2").ok(expected);
}

@Test
public void testExplainUpsert() {
String sql = "explain plan for upsert into emps1 values (1, 2)";
String expected = "EXPLAIN UPSERT INTO `EMPS1`\n" + "VALUES (ROW(1, 2))";
this.sql(sql).ok(expected);
}
}
2 changes: 2 additions & 0 deletions flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"org.apache.flink.sql.parser.dml.SqlEndStatementSet"
"org.apache.flink.sql.parser.dql.SqlDescribeCatalog"
"org.apache.flink.sql.parser.dql.SqlDescribeDatabase"
"org.apache.flink.sql.parser.dql.SqlRichExplain"
"org.apache.flink.sql.parser.dql.SqlLoadModule"
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
"org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog"
Expand Down Expand Up @@ -483,6 +484,7 @@
"SqlShowViews()"
"SqlUnloadModule()"
"SqlUseModules()"
"SqlRichExplain()"
]

# List of methods for parsing custom literals.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1569,3 +1569,17 @@ SqlEndStatementSet SqlEndStatementSet() :
return new SqlEndStatementSet(getPos());
}
}

/**
* Parses a explain module statement.
*/
SqlNode SqlRichExplain() :
{
SqlNode stmt;
}
{
<EXPLAIN> [ <PLAN> <FOR> ]
stmt = SqlQueryOrDml() {
return new SqlRichExplain(getPos(),stmt);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.dql;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.Collections;
import java.util.List;

/** EXPLAIN (PLAN FOR)* STATEMENT sql call. */
public class SqlRichExplain extends SqlCall {

public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);

private SqlNode statement;

public SqlRichExplain(SqlParserPos pos, SqlNode statement) {
super(pos);
this.statement = statement;
}

public SqlNode getStatement() {
return statement;
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Override
public List<SqlNode> getOperandList() {
return Collections.singletonList(statement);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("EXPLAIN");
statement.unparse(writer, leftPrec, rightPrec);
}

@Override
public void setOperand(int i, SqlNode operand) {
if (i == 0) {
statement = operand;
} else {
throw new UnsupportedOperationException(
"SqlExplain SqlNode only support index equals 1");
}
}
}
Loading