From d23a41a15da9455ff08543e8e9f21bb01b6210c3 Mon Sep 17 00:00:00 2001 From: Xpray Date: Fri, 17 Nov 2017 11:01:27 +0800 Subject: [PATCH] [FLINK-8095][TableAPI & SQL] Introduce ProjectSetOpTransposeRule to Flink --- .../table/plan/rules/FlinkRuleSets.scala | 2 + .../api/batch/table/SetOperatorsTest.scala | 58 +++++++++++++++++++ .../api/stream/table/SetOperatorsTest.scala | 29 ++++++++++ 3 files changed, 89 insertions(+) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index a20d14fe5baa0..10d68814bf24e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -54,6 +54,8 @@ object FlinkRuleSets { FilterAggregateTransposeRule.INSTANCE, // push filter through set operation FilterSetOpTransposeRule.INSTANCE, + // push project through set operation + ProjectSetOpTransposeRule.INSTANCE, // aggregation and projection rules AggregateProjectMergeRule.INSTANCE, diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala index 35f4429662383..929ce9c656d9e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala @@ -215,4 +215,62 @@ class SetOperatorsTest extends TableTestBase { util.verifyTable(result, expected) } + + @Test + def testProjectUnionTranspose(): Unit = { + val util = batchTestUtil() + val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) + val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + + val result = left.select('a, 'b, 'c) + .unionAll(right.select('a, 'b, 'c)) + .select('b, 'c) + + val expected = binaryNode( + "DataSetUnion", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "b", "c") + ), + unaryNode( + "DataSetCalc", + batchTableNode(1), + term("select", "b", "c") + ), + term("union", "b", "c") + ) + + util.verifyTable(result, expected) + + } + + @Test + def testProjectMinusTranspose(): Unit = { + val util = batchTestUtil() + val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) + val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + + val result = left.select('a, 'b, 'c) + .minusAll(right.select('a, 'b, 'c)) + .select('b, 'c) + + val expected = binaryNode( + "DataSetMinus", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "b", "c") + ), + unaryNode( + "DataSetCalc", + batchTableNode(1), + term("select", "b", "c") + ), + term("minus", "b", "c") + ) + + util.verifyTable(result, expected) + + } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala index b1b700bb522d6..c0fc05b53ee34 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala @@ -65,4 +65,33 @@ class SetOperatorsTest extends TableTestBase { util.verifyTable(result, expected) } + + @Test + def testProjectUnionTranspose(): Unit = { + val util = streamTestUtil() + val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) + val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + + val result = left.select('a, 'b, 'c) + .unionAll(right.select('a, 'b, 'c)) + .select('b, 'c) + + val expected = binaryNode( + "DataStreamUnion", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "b", "c") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "b", "c") + ), + term("union all", "b", "c") + ) + + util.verifyTable(result, expected) + } + }