From 60d29d265ea14892a33f6dcac9eb2d98b851cef1 Mon Sep 17 00:00:00 2001 From: chobeat Date: Fri, 9 Sep 2016 11:27:41 +0200 Subject: [PATCH 1/4] AST DataStream table explain --- .../api/table/StreamTableEnvironment.scala | 16 +++++ .../api/scala/stream/ExplainStreamTest.scala | 64 +++++++++++++++++++ .../resources/stream/testFilterStream0.out | 3 + .../resources/stream/testUnionStream0.out | 4 ++ 4 files changed, 87 insertions(+) create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala create mode 100644 flink-libraries/flink-table/src/test/scala/resources/stream/testFilterStream0.out create mode 100644 flink-libraries/flink-table/src/test/scala/resources/stream/testUnionStream0.out diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index 4f57ae993d522..214aa5246047f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -271,5 +271,21 @@ abstract class StreamTableEnvironment( } } + /* + * Returns the AST of the specified Table API and SQL queries and the execution plan to compute + * the result of the given [[Table]]. + * + * @param table The table for which the AST and execution plan will be returned. + * @param extended Flag to include detailed optimizer estimates. + */ + def explain(table: Table): String = { + + val ast = RelOptUtil.toString(table.getRelNode) + + s"== Abstract Syntax Tree ==" + + System.lineSeparator + + s"$ast" + + } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala new file mode 100644 index 0000000000000..c11c6842fe781 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert.assertEquals +import org.junit._ + +class ExplainStreamTest + extends StreamingMultipleProgramsTestBase { + + val testFilePath = ExplainStreamTest.this.getClass.getResource("/").getFile + + @Test + def testFilter() : Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val table = env.fromElements((1, "hello")) + .toTable(tEnv, 'a, 'b) + .filter("a % 2 = 0") + + val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/stream/testFilterStream0.out").mkString.replaceAll("\\r\\n", "\n") + assertEquals(result, source) + } + + @Test + def testUnion() : Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) + val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) + val table = table1.unionAll(table2) + + val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") + val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/stream/testUnionStream0.out").mkString.replaceAll("\\r\\n", "\n") + assertEquals(result, source) + } + +} diff --git a/flink-libraries/flink-table/src/test/scala/resources/stream/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/stream/testFilterStream0.out new file mode 100644 index 0000000000000..3fda6dea39c1c --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/resources/stream/testFilterStream0.out @@ -0,0 +1,3 @@ +== Abstract Syntax Tree == +LogicalFilter(condition=[=(MOD($0, 2), 0)]) + LogicalTableScan(table=[[_DataStreamTable_0]]) diff --git a/flink-libraries/flink-table/src/test/scala/resources/stream/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/stream/testUnionStream0.out new file mode 100644 index 0000000000000..b2e30003f10ad --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/resources/stream/testUnionStream0.out @@ -0,0 +1,4 @@ +== Abstract Syntax Tree == +LogicalUnion(all=[true]) + LogicalTableScan(table=[[_DataStreamTable_0]]) + LogicalTableScan(table=[[_DataStreamTable_1]]) From 287ecf6300c24b1076939ee04780020946079515 Mon Sep 17 00:00:00 2001 From: chobeat Date: Fri, 9 Sep 2016 14:30:50 +0200 Subject: [PATCH 2/4] moved stream explain test resources to main folder --- .../org/apache/flink/api/scala/stream/ExplainStreamTest.scala | 4 ++-- .../test/scala/resources/{stream => }/testFilterStream0.out | 0 .../test/scala/resources/{stream => }/testUnionStream0.out | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename flink-libraries/flink-table/src/test/scala/resources/{stream => }/testFilterStream0.out (100%) rename flink-libraries/flink-table/src/test/scala/resources/{stream => }/testUnionStream0.out (100%) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala index c11c6842fe781..7a1f50817515c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala @@ -42,7 +42,7 @@ class ExplainStreamTest val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/stream/testFilterStream0.out").mkString.replaceAll("\\r\\n", "\n") + "../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n", "\n") assertEquals(result, source) } @@ -57,7 +57,7 @@ class ExplainStreamTest val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/stream/testUnionStream0.out").mkString.replaceAll("\\r\\n", "\n") + "../../src/test/scala/resources/testUnionStream0.out").mkString.replaceAll("\\r\\n", "\n") assertEquals(result, source) } diff --git a/flink-libraries/flink-table/src/test/scala/resources/stream/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out similarity index 100% rename from flink-libraries/flink-table/src/test/scala/resources/stream/testFilterStream0.out rename to flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out diff --git a/flink-libraries/flink-table/src/test/scala/resources/stream/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out similarity index 100% rename from flink-libraries/flink-table/src/test/scala/resources/stream/testUnionStream0.out rename to flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out From 417fa57c9f7745c668a59514fe1ce4d5ef3f9b1b Mon Sep 17 00:00:00 2001 From: chobeat Date: Fri, 9 Sep 2016 17:10:18 +0200 Subject: [PATCH 3/4] Added Table explain documentation --- docs/dev/table_api.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 59998f0133a5a..2f659ac202f67 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -2457,3 +2457,27 @@ The Table API provides a configuration (the so-called `TableConfig`) to modify r By default, the Table API supports `null` values. Null handling can be disabled to improve preformance by setting the `nullCheck` property in the `TableConfig` to `false`. {% top %} + +Explaining a Table +---- +The Table API provides a mechanism to describe the graph of operations that leads to the resulting output. This is done through the `TableEnvironment#explain(table)` method. It returns a string describing two graphs: the Abstract Syntax Tree of the relational algebra query and the Flink's Execution Plan of the equivalent Flink's Job. + +Table `explain` is supported for both `BatchTableEnvironment` and `StreamTableEnvironment`. Currently `StreamTableEnvironment` doesn't support the explanation of the Execution Plan. + + +
+{% highlight scala %} + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) + val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) + val table = table1.unionAll(table2) + + val explanation:String = tEnv.explain(table) +{% endhighlight %} +
+{% top %} + + + From ae5f7fcffa9cb8e4dcb38fd3176536396ac7b482 Mon Sep 17 00:00:00 2001 From: chobeat Date: Tue, 13 Sep 2016 17:50:39 +0200 Subject: [PATCH 4/4] Format review --- docs/dev/table_api.md | 2 +- .../org/apache/flink/api/table/StreamTableEnvironment.scala | 2 +- .../org/apache/flink/api/scala/stream/ExplainStreamTest.scala | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 2f659ac202f67..a3bedd417999b 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -2474,7 +2474,7 @@ Table `explain` is supported for both `BatchTableEnvironment` and `StreamTableEn val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) val table = table1.unionAll(table2) - val explanation:String = tEnv.explain(table) + val explanation: String = tEnv.explain(table) {% endhighlight %} {% top %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index 214aa5246047f..e055e9a33212e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -284,7 +284,7 @@ abstract class StreamTableEnvironment( s"== Abstract Syntax Tree ==" + System.lineSeparator + - s"$ast" + ast } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala index 7a1f50817515c..71500f10f8fba 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala @@ -32,7 +32,7 @@ class ExplainStreamTest val testFilePath = ExplainStreamTest.this.getClass.getResource("/").getFile @Test - def testFilter() : Unit = { + def testFilter(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) @@ -47,7 +47,7 @@ class ExplainStreamTest } @Test - def testUnion() : Unit = { + def testUnion(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env)