Skip to content

Commit

Permalink
[FLINK-13509][table-planner-blink] Forbidden IS NOT DISTINCT FROM (…
Browse files Browse the repository at this point in the history
…or an expanded version) in LookupJoin.
  • Loading branch information
beyond1920 committed Aug 5, 2019
1 parent 12ea4a3 commit 5d22079
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin
import org.apache.flink.table.planner.plan.nodes.logical._
import org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan
import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
import org.apache.flink.table.planner.plan.utils.JoinUtil
import org.apache.flink.table.sources.{LookupableTableSource, TableSource}

import org.apache.calcite.plan.RelOptRule.{any, operand}
Expand All @@ -30,6 +31,10 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexProgram}

import java.util

import scala.collection.JavaConversions._

/**
* Base implementation for both
* [[org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecLookupJoinRule]] and
Expand Down Expand Up @@ -86,6 +91,23 @@ trait CommonLookupJoinRule {
}
}

// TODO Support `IS NOT DISTINCT FROM` in the future: FLINK-13509
protected def validateJoin(join: FlinkLogicalJoin): Unit = {

val filterNulls: Array[Boolean] = {
val filterNulls = new util.ArrayList[java.lang.Boolean]
JoinUtil.createJoinInfo(join.getLeft, join.getRight, join.getCondition, filterNulls)
filterNulls.map(_.booleanValue()).toArray
}

if (filterNulls.contains(false)) {
throw new TableException(
s"LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
s"alternative '(a = b) or (a IS NULL AND b IS NULL)'), the join condition is " +
s"'${join.getCondition}'")
}
}

protected def transform(
join: FlinkLogicalJoin,
input: FlinkLogicalRel,
Expand Down Expand Up @@ -115,6 +137,7 @@ abstract class BaseSnapshotOnTableScanRule(description: String)
val tableScan = call.rel[RelNode](3)
val tableSource = findTableSource(tableScan).orNull

validateJoin(join)
val temporalJoin = transform(join, input, tableSource, None)
call.transformTo(temporalJoin)
}
Expand Down Expand Up @@ -145,6 +168,7 @@ abstract class BaseSnapshotOnCalcTableScanRule(description: String)
val tableScan = call.rel[RelNode](4)
val tableSource = findTableSource(tableScan).orNull

validateJoin(join)
val temporalJoin = transform(
join, input, tableSource, Some(calc.getProgram))
call.transformTo(temporalJoin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,28 @@ class LookupJoinTest extends TableTestBase {
)
}

@Test
def testNotDistinctFromInJoinCondition(): Unit = {

// does not support join condition contains `IS NOT DISTINCT`
expectExceptionThrown(
"SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
"FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT DISTINCT FROM D.id",
"LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
"alternative '(a = b) or (a IS NULL AND b IS NULL)')",
classOf[TableException]
)

// does not support join condition contains `IS NOT DISTINCT` and similar syntax
expectExceptionThrown(
"SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
"FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)",
"LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
"alternative '(a = b) or (a IS NULL AND b IS NULL)')",
classOf[TableException]
)
}

@Test
def testLogicalPlan(): Unit = {
val sql1 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,28 @@ class LookupJoinTest extends TableTestBase with Serializable {
)
}

@Test
def testNotDistinctFromInJoinCondition(): Unit = {

// does not support join condition contains `IS NOT DISTINCT`
expectExceptionThrown(
"SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
"FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT DISTINCT FROM D.id",
"LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
"alternative '(a = b) or (a IS NULL AND b IS NULL)')",
classOf[TableException]
)

// does not support join condition contains `IS NOT DISTINCT` and similar syntax
expectExceptionThrown(
"SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
"FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)",
"LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
"alternative '(a = b) or (a IS NULL AND b IS NULL)')",
classOf[TableException]
)
}

@Test
def testInvalidLookupTableFunction(): Unit = {
streamUtil.addDataStream[(Int, String, Long, Timestamp)](
Expand Down

0 comments on commit 5d22079

Please sign in to comment.