From be50be94109ca63ce0587f10ab5479978231237b Mon Sep 17 00:00:00 2001 From: Xpray Date: Mon, 6 Nov 2017 23:47:33 +0800 Subject: [PATCH] [FLINK-7986][TableAPI & SQL] Introduce FilterSetOpTransposeRule to Flink --- .../table/plan/rules/FlinkRuleSets.scala | 2 + .../api/batch/table/SetOpTransposeTest.scala | 109 ++++++++++++++++++ .../api/stream/table/SetOpTransposeTest.scala | 69 +++++++++++ 3 files changed, 180 insertions(+) create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOpTransposeTest.scala create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOpTransposeTest.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index dcc735dca4a26..a20d14fe5baa0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -52,6 +52,8 @@ object FlinkRuleSets { FilterJoinRule.JOIN, // push filter through an aggregation FilterAggregateTransposeRule.INSTANCE, + // push filter through set operation + FilterSetOpTransposeRule.INSTANCE, // aggregation and projection rules AggregateProjectMergeRule.INSTANCE, diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOpTransposeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOpTransposeTest.scala new file mode 100644 index 0000000000000..ebea084f8231c --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOpTransposeTest.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.batch.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil.{binaryNode, batchTableNode, term, unaryNode} +import org.junit.Test + +class SetOpTransposeTest extends TableTestBase { + + @Test + def testFilterUnionTranspose(): Unit = { + val util = batchTestUtil() + val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) + val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + + val result = left.unionAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + + val expected = unaryNode( + "DataSetCalc", + unaryNode( + "DataSetAggregate", + binaryNode( + "DataSetUnion", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b", "c"), + term("where", ">(a, 0)") + ), + unaryNode( + "DataSetCalc", + batchTableNode(1), + term("select", "a", "b", "c"), + term("where", ">(a, 0)") + ), + term("union", "a", "b", "c") + ), + term("groupBy", "b"), + term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1") + ), + term("select", "TMP_0 AS a", "b", "TMP_1 AS c") + ) + + util.verifyTable(result, expected) + } + + @Test + def testFilterMinusTranspose(): Unit = { + val util = batchTestUtil() + val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) + val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + + val result = left.minusAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + + val expected = unaryNode( + "DataSetCalc", + unaryNode( + "DataSetAggregate", + binaryNode( + "DataSetMinus", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b", "c"), + term("where", ">(a, 0)") + ), + unaryNode( + "DataSetCalc", + batchTableNode(1), + term("select", "a", "b", "c"), + term("where", ">(a, 0)") + ), + term("minus", "a", "b", "c") + ), + term("groupBy", "b"), + term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1") + ), + term("select", "TMP_0 AS a", "b", "TMP_1 AS c") + ) + + util.verifyTable(result, expected) + } + +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOpTransposeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOpTransposeTest.scala new file mode 100644 index 0000000000000..808dbd64922eb --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOpTransposeTest.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode} +import org.junit.Test + +class SetOpTransposeTest extends TableTestBase { + + @Test + def testFilterUnionTranspose(): Unit = { + val util = streamTestUtil() + val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) + val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + + val result = left.unionAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + + val expected = unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupAggregate", + binaryNode( + "DataStreamUnion", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b", "c"), + term("where", ">(a, 0)") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "a", "b", "c"), + term("where", ">(a, 0)") + ), + term("union all", "a", "b", "c") + ), + term("groupBy", "b"), + term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1") + ), + term("select", "TMP_0 AS a", "b", "TMP_1 AS c") + ) + + util.verifyTable(result, expected) + } + +}