From bdbfe89fbb61b04f3ee6f29b30382bf06983c2e5 Mon Sep 17 00:00:00 2001 From: anton solovev Date: Tue, 25 Oct 2016 15:55:42 +0400 Subject: [PATCH 1/3] [FLINK-4623] Implement explain for StreamTableEnvironment. It shows Abstract Syntax Tree and Physical Execution Plan. --- .../api/table/explain/PlanJsonParser.java | 8 +++++--- .../api/table/StreamTableEnvironment.scala | 18 +++++++++++++++--- .../api/scala/stream/ExplainStreamTest.scala | 15 ++++++++++----- .../test/scala/resources/testFilterStream0.out | 15 +++++++++++++++ .../test/scala/resources/testUnionStream0.out | 18 ++++++++++++++++++ 5 files changed, 63 insertions(+), 11 deletions(-) diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java index 3c4d3d917628d..4bd5067b0eb28 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java @@ -62,7 +62,7 @@ public static String getSqlExecutionPlan(String t, Boolean extended) throws Exce if (dele > -1) { content = tempNode.getContents().substring(0, dele); } - + //replace with certain content if node is dataSource to pass //unit tests, because java and scala use different api to //get input element @@ -85,9 +85,11 @@ public static String getSqlExecutionPlan(String t, Boolean extended) throws Exce pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n"); } - printTab(tabCount + 1, pw); - pw.print(tempNode.getGlobal_properties().get(0).getName() + " : " + if (tempNode.getGlobal_properties() != null) { + printTab(tabCount + 1, pw); + pw.print(tempNode.getGlobal_properties().get(0).getName() + " : " + tempNode.getGlobal_properties().get(0).getValue() + "\n"); + } if (extended) { List globalProperties = tempNode.getGlobal_properties(); diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index b9e889d59c762..b804dd1f51491 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -26,6 +26,8 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{Programs, RuleSet} import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.explain.PlanJsonParser import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel} @@ -311,14 +313,24 @@ abstract class StreamTableEnvironment( * * @param table The table for which the AST and execution plan will be returned. */ - def explain(table: Table): String = { + def explain(table: Table): String = { val ast = RelOptUtil.toString(table.getRelNode) + val dataStream = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row])) + + val env = dataStream.getExecutionEnvironment + val jasonSqlPlan = env.getExecutionPlan + + val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, false) + s"== Abstract Syntax Tree ==" + System.lineSeparator + - ast - + s"$ast" + + System.lineSeparator + + s"== Physical Execution Plan ==" + + System.lineSeparator + + s"$sqlPlan" } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala index 71500f10f8fba..7411595ddbec8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala @@ -40,9 +40,12 @@ class ExplainStreamTest .toTable(tEnv, 'a, 'b) .filter("a % 2 = 0") - val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") + val result = tEnv.explain(table) + .replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "") + val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n", "\n") + "../../src/test/scala/resources/testFilterStream0.out").mkString + .replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "") assertEquals(result, source) } @@ -55,10 +58,12 @@ class ExplainStreamTest val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) val table = table1.unionAll(table2) - val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") + val result = tEnv.explain(table) + .replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "") + val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testUnionStream0.out").mkString.replaceAll("\\r\\n", "\n") + "../../src/test/scala/resources/testUnionStream0.out").mkString + .replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "") assertEquals(result, source) } - } diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out index 3fda6dea39c1c..888d71e5b2e8b 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out @@ -1,3 +1,18 @@ == Abstract Syntax Tree == LogicalFilter(condition=[=(MOD($0, 2), 0)]) LogicalTableScan(table=[[_DataStreamTable_0]]) + +== Physical Execution Plan == +Stage 1 : Data Source + content : collect elements with CollectionInputFormat + + Stage 2 : Operator + content : from: (a, b) + ship_strategy : REBALANCE + exchange_mode : null + + Stage 3 : Operator + content : where: (=(MOD(a, 2), 0)), select: (a, b) + ship_strategy : FORWARD + exchange_mode : null + diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out index b2e30003f10ad..dfc05ff5680fe 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out @@ -2,3 +2,21 @@ LogicalUnion(all=[true]) LogicalTableScan(table=[[_DataStreamTable_0]]) LogicalTableScan(table=[[_DataStreamTable_1]]) + +== Physical Execution Plan == +Stage 1 : Data Source + content : collect elements with CollectionInputFormat + +Stage 2 : Data Source + content : collect elements with CollectionInputFormat + + Stage 3 : Operator + content : from: (count, word) + ship_strategy : REBALANCE + exchange_mode : null + + Stage 4 : Operator + content : from: (count, word) + ship_strategy : REBALANCE + exchange_mode : null + From ba7bb4324c5f9ab322b7ffeabefff47955862408 Mon Sep 17 00:00:00 2001 From: anton solovev Date: Mon, 31 Oct 2016 13:03:57 +0400 Subject: [PATCH 2/3] [FLINK-4623] Delete exchange_mode node from stream explain --- .../org/apache/flink/api/table/explain/PlanJsonParser.java | 7 +++++-- .../src/test/scala/resources/testFilterStream0.out | 2 -- .../src/test/scala/resources/testUnionStream0.out | 2 -- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java index 4bd5067b0eb28..bd14cd2f4a803 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/explain/PlanJsonParser.java @@ -76,8 +76,11 @@ public static String getSqlExecutionPlan(String t, Boolean extended) throws Exce printTab(tabCount + 1, pw); pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n"); - printTab(tabCount + 1, pw); - pw.print("exchange_mode : " + predecessors.get(0).getExchange_mode() + "\n"); + String mode = predecessors.get(0).getExchange_mode(); + if (mode != null) { + printTab(tabCount + 1, pw); + pw.print("exchange_mode : " + mode + "\n"); + } } if (tempNode.getDriver_strategy() != null) { diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out index 888d71e5b2e8b..20ae2b1ed624d 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out @@ -9,10 +9,8 @@ Stage 1 : Data Source Stage 2 : Operator content : from: (a, b) ship_strategy : REBALANCE - exchange_mode : null Stage 3 : Operator content : where: (=(MOD(a, 2), 0)), select: (a, b) ship_strategy : FORWARD - exchange_mode : null diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out index dfc05ff5680fe..ac3635d59467b 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out @@ -13,10 +13,8 @@ Stage 2 : Data Source Stage 3 : Operator content : from: (count, word) ship_strategy : REBALANCE - exchange_mode : null Stage 4 : Operator content : from: (count, word) ship_strategy : REBALANCE - exchange_mode : null From 7b12e75abf7c375ddbfd55cd16062712de7bbf04 Mon Sep 17 00:00:00 2001 From: anton solovev Date: Tue, 1 Nov 2016 14:13:13 +0400 Subject: [PATCH 3/3] [FLINK-4623] Rename variable. Add comment to test. --- .../api/table/StreamTableEnvironment.scala | 4 ++-- .../api/scala/stream/ExplainStreamTest.scala | 21 ++++++++++++------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index b804dd1f51491..bca8d79b0d301 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -320,9 +320,9 @@ abstract class StreamTableEnvironment( val dataStream = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row])) val env = dataStream.getExecutionEnvironment - val jasonSqlPlan = env.getExecutionPlan + val jsonSqlPlan = env.getExecutionPlan - val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, false) + val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false) s"== Abstract Syntax Tree ==" + System.lineSeparator + diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala index 7411595ddbec8..5eebb340343ec 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala @@ -40,13 +40,12 @@ class ExplainStreamTest .toTable(tEnv, 'a, 'b) .filter("a % 2 = 0") - val result = tEnv.explain(table) - .replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "") + val result = replaceString(tEnv.explain(table)) val source = scala.io.Source.fromFile(testFilePath + "../../src/test/scala/resources/testFilterStream0.out").mkString - .replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "") - assertEquals(result, source) + val expect = replaceString(source) + assertEquals(result, expect) } @Test @@ -58,12 +57,18 @@ class ExplainStreamTest val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) val table = table1.unionAll(table2) - val result = tEnv.explain(table) - .replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "") + val result = replaceString(tEnv.explain(table)) val source = scala.io.Source.fromFile(testFilePath + "../../src/test/scala/resources/testUnionStream0.out").mkString - .replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "") - assertEquals(result, source) + val expect = replaceString(source) + assertEquals(result, expect) + } + + def replaceString(s: String): String = { + /* Stage {id} is ignored, because id keeps incrementing in test class + * while StreamExecutionEnvironment is up + */ + s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "") } }