From b8c5fe35b5ea08ac82aaf739f0e36842ca972dbc Mon Sep 17 00:00:00 2001 From: vasia Date: Mon, 11 Apr 2016 19:43:59 +0200 Subject: [PATCH] [FLINK-3731] make embedded SQL outer joins fail during translation --- .../plan/rules/dataSet/DataSetJoinRule.scala | 12 +++-- .../flink/api/scala/sql/test/JoinITCase.scala | 53 ++++++++++++++++++- 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala index 01b803c084737..89d33c9dd7208 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala @@ -21,12 +21,10 @@ package org.apache.flink.api.table.plan.rules.dataSet import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.JoinRelType import org.apache.calcite.rel.logical.LogicalJoin -import org.apache.calcite.rex.{RexInputRef, RexCall} -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.api.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention} -import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ class DataSetJoinRule @@ -37,6 +35,14 @@ class DataSetJoinRule "FlinkJoinRule") { + /** + * Only translate INNER joins for now + */ + override def matches(call: RelOptRuleCall): Boolean = { + val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin] + join.getJoinType.equals(JoinRelType.INNER) + } + def convert(rel: RelNode): RelNode = { val join: LogicalJoin = rel.asInstanceOf[LogicalJoin] diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala index 74844aec6cfa0..b09aa75f115a2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.{TableException, Row} -import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.api.table.plan.{PlanGenException, TranslationContext} import org.apache.flink.api.table.test.utils.TableProgramsTestBase import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode @@ -256,4 +256,55 @@ class JoinITCase( val results = result.toDataSet[Row](getConfig).collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test(expected = classOf[PlanGenException]) + def testFullOuterJoin(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect() + } + + @Test(expected = classOf[PlanGenException]) + def testLeftOuterJoin(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect() + } + + @Test(expected = classOf[PlanGenException]) + def testRightOuterJoin(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect() + } }