Skip to content

Commit

Permalink
[FLINK-13433][table-planner-blink] Do not fetch data from LookupableT…
Browse files Browse the repository at this point in the history
…ableSource if the JoinKey in left side of LookupJoin contains null value.
  • Loading branch information
beyond1920 committed Jul 31, 2019
1 parent 70fe6aa commit bb70e45
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object LookupJoinCodeGenerator {
: GeneratedFunction[FlatMapFunction[BaseRow, BaseRow]] = {

val ctx = CodeGeneratorContext(config)
val (prepareCode, parameters) = prepareParameters(
val (prepareCode, parameters, parametersCheckNullCode) = prepareParameters(
ctx,
typeFactory,
inputType,
Expand All @@ -91,7 +91,11 @@ object LookupJoinCodeGenerator {
s"""
|$prepareCode
|$setCollectorCode
|$lookupFunctionTerm.eval($parameters);
|if ($parametersCheckNullCode) {
| return;
|} else {
| $lookupFunctionTerm.eval($parameters);
| }
""".stripMargin

FunctionCodeGenerator.generateFunction(
Expand All @@ -118,7 +122,7 @@ object LookupJoinCodeGenerator {
: GeneratedFunction[AsyncFunction[BaseRow, AnyRef]] = {

val ctx = CodeGeneratorContext(config)
val (prepareCode, parameters) = prepareParameters(
val (prepareCode, parameters, parametersCheckNullCode) = prepareParameters(
ctx,
typeFactory,
inputType,
Expand All @@ -133,8 +137,12 @@ object LookupJoinCodeGenerator {
val body =
s"""
|$prepareCode
|$DELEGATE delegates = new $DELEGATE($DEFAULT_COLLECTOR_TERM);
|$lookupFunctionTerm.eval(delegates.getCompletableFuture(), $parameters);
|if ($parametersCheckNullCode) {
| return;
|} else {
| $DELEGATE delegates = new $DELEGATE($DEFAULT_COLLECTOR_TERM);
| $lookupFunctionTerm.eval(delegates.getCompletableFuture(), $parameters);
|}
""".stripMargin

FunctionCodeGenerator.generateFunction(
Expand All @@ -156,7 +164,7 @@ object LookupJoinCodeGenerator {
lookupKeyInOrder: Array[Int],
allLookupFields: Map[Int, LookupKey],
isExternalArgs: Boolean,
fieldCopy: Boolean): (String, String) = {
fieldCopy: Boolean): (String, String, String) = {

val inputFieldExprs = for (i <- lookupKeyInOrder) yield {
allLookupFields.get(i) match {
Expand Down Expand Up @@ -197,7 +205,11 @@ object LookupJoinCodeGenerator {
""".stripMargin
(code, newTerm)
}
(codeAndArg.map(_._1).mkString("\n"), codeAndArg.map(_._2).mkString(", "))
val parametersContainsNull = codeAndArg.map(_._2).map(arg => s"$arg == null").mkString("|| ")
(
codeAndArg.map(_._1).mkString("\n"),
codeAndArg.map(_._2).mkString(", "),
parametersContainsNull)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ abstract class CommonLookupJoin(
joinType: JoinRelType): Unit = {

// check join on all fields of PRIMARY KEY or (UNIQUE) INDEX
if (allLookupKeys.isEmpty || allLookupKeys.isEmpty) {
if (allLookupKeys.isEmpty) {
throw new TableException(
"Temporal table join requires an equality condition on fields of " +
s"table [${tableSource.explainSource()}].")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ class LookupJoinITCase extends BatchTestBase {
BatchTestBase.row(8L, 11L, "Hello world"),
BatchTestBase.row(9L, 12L, "Hello world!"))

val dataWithNull = List(
BatchTestBase.row(null, 15L, "Hello"),
BatchTestBase.row(3L, 15L, "Fabian"),
BatchTestBase.row(null, 11L, "Hello world"),
BatchTestBase.row(9L, 12L, "Hello world!"))

val typeInfo = new RowTypeInfo(LONG_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO)

val userData = List(
Expand All @@ -55,14 +61,43 @@ class LookupJoinITCase extends BatchTestBase {
.enableAsync()
.build()

val userDataWithNull = List(
(11, 1L, "Julian"),
(22, null, "Hello"),
(33, 3L, "Fabian"),
(44, null, "Hello world"))

val userWithNullDataTableSource = InMemoryLookupableTableSource.builder()
.data(userDataWithNull)
.field("age", Types.INT)
.field("id", Types.LONG)
.field("name", Types.STRING)
.build()

val userAsyncWithNullDataTableSource = InMemoryLookupableTableSource.builder()
.data(userDataWithNull)
.field("age", Types.INT)
.field("id", Types.LONG)
.field("name", Types.STRING)
.build()

@Before
override def before() {
super.before()
BatchTableEnvUtil.registerCollection(tEnv, "T0", data, typeInfo, "id, len, content")
val myTable = tEnv.sqlQuery("SELECT *, PROCTIME() as proctime FROM T0")
tEnv.registerTable("T", myTable)

BatchTableEnvUtil.registerCollection(
tEnv, "T1", dataWithNull, typeInfo, "id, len, content")
val myTable1 = tEnv.sqlQuery("SELECT *, PROCTIME() as proctime FROM T1")
tEnv.registerTable("nullableT", myTable1)

tEnv.registerTableSource("userTable", userTableSource)
tEnv.registerTableSource("userAsyncTable", userAsyncTableSource)

tEnv.registerTableSource("userWithNullDataTable", userWithNullDataTableSource)
tEnv.registerTableSource("userWithNullDataAsyncTable", userAsyncWithNullDataTableSource)
}

@Test
Expand Down Expand Up @@ -231,4 +266,105 @@ class LookupJoinITCase extends BatchTestBase {
BatchTestBase.row(9, 12, null, null))
checkResult(sql, expected, false)
}

@Test
def testJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
// TODO: enable object reuse until [FLINK-12351] is fixed.
env.getConfig.disableObjectReuse()

val sql = "SELECT T.id, T.len, D.name FROM nullableT T JOIN userWithNullDataTable " +
"for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"

val expected = Seq(
BatchTestBase.row(3,15,"Fabian"))
checkResult(sql, expected, false)
}

@Test
def testLeftJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
// TODO: enable object reuse until [FLINK-12351] is fixed.
env.getConfig.disableObjectReuse()

val sql = "SELECT D.id, T.len, D.name FROM nullableT T LEFT JOIN userWithNullDataTable " +
"for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"
val expected = Seq(
BatchTestBase.row(null,15,null),
BatchTestBase.row(3,15,"Fabian"),
BatchTestBase.row(null,11,null),
BatchTestBase.row(null,12,null))
checkResult(sql, expected, false)
}

@Test
def testJoinTemporalTableOnNullConstantKey(): Unit = {
// TODO: enable object reuse until [FLINK-12351] is fixed.
env.getConfig.disableObjectReuse()

val sql = "SELECT T.id, T.len, T.content FROM T JOIN userTable " +
"for system_time as of T.proctime AS D ON D.id = null"
val expected = Seq()
checkResult(sql, expected, false)
}

@Test
def testJoinTemporalTableOnMultiKeyFieldsWithNullConstantKey(): Unit = {
// TODO: enable object reuse until [FLINK-12351] is fixed.
env.getConfig.disableObjectReuse()

val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
"for system_time as of T.proctime AS D ON T.content = D.name AND null = D.id"
val expected = Seq()
checkResult(sql, expected, false)
}


@Test
def testAsyncJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
// TODO: enable object reuse until [FLINK-12351] is fixed.
env.getConfig.disableObjectReuse()

val sql = "SELECT T.id, T.len, D.name FROM nullableT T JOIN userWithNullDataTable " +
"for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"

val expected = Seq(
BatchTestBase.row(3,15,"Fabian"))
checkResult(sql, expected, false)
}

@Test
def testAsyncLeftJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
// TODO: enable object reuse until [FLINK-12351] is fixed.
env.getConfig.disableObjectReuse()

val sql = "SELECT D.id, T.len, D.name FROM nullableT T LEFT JOIN userWithNullDataTable " +
"for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"
val expected = Seq(
BatchTestBase.row(null,15,null),
BatchTestBase.row(3,15,"Fabian"),
BatchTestBase.row(null,11,null),
BatchTestBase.row(null,12,null))
checkResult(sql, expected, false)
}

@Test
def testAsyncJoinTemporalTableOnNullConstantKey(): Unit = {
// TODO: enable object reuse until [FLINK-12351] is fixed.
env.getConfig.disableObjectReuse()

val sql = "SELECT T.id, T.len, T.content FROM T JOIN userTable " +
"for system_time as of T.proctime AS D ON D.id = null"
val expected = Seq()
checkResult(sql, expected, false)
}

@Test
def testAsyncJoinTemporalTableOnMultiKeyFieldsWithNullConstantKey(): Unit = {
// TODO: enable object reuse until [FLINK-12351] is fixed.
env.getConfig.disableObjectReuse()

val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
"for system_time as of T.proctime AS D ON T.content = D.name AND null = D.id"
val expected = Seq()
checkResult(sql, expected, false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ class LookupJoinITCase extends StreamingTestBase {
.field("name", Types.STRING)
.build()

val userDataWithNull = List(
(11, 1L, "Julian"),
(22, null, "Hello"),
(33, 3L, "Fabian"),
(44, null, "Hello world")
)

val userWithNullDataTableSourceWith2Keys = InMemoryLookupableTableSource.builder()
.data(userDataWithNull)
.field("age", Types.INT)
.field("id", Types.LONG)
.field("name", Types.STRING)
.build()

@Test
def testJoinTemporalTable(): Unit = {
val streamTable = env.fromCollection(data)
Expand Down Expand Up @@ -418,4 +432,101 @@ class LookupJoinITCase extends StreamingTestBase {
assertEquals(0, userTableSource.getResourceCounter)
}

@Test
def testJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
val streamTable = env.fromCollection(dataWithNull)
.toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
tEnv.registerTable("T", streamTable)

tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys)

val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
"for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"

val sink = new TestingAppendSink
tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
env.execute()

val expected = Seq(
"3,15,Fabian")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
assertEquals(0, userTableSourceWith2Keys.getResourceCounter)
}

@Test
def testLeftJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
val streamTable = env.fromCollection(dataWithNull)
.toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
tEnv.registerTable("T", streamTable)

tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys)

val sql = "SELECT D.id, T.len, D.name FROM T LEFT JOIN userTable " +
"for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"

val sink = new TestingAppendSink
tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
env.execute()

val expected = Seq(
"null,15,null",
"3,15,Fabian",
"null,11,null",
"null,12,null")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
assertEquals(0, userTableSourceWith2Keys.getResourceCounter)
}

@Test
def testJoinTemporalTableOnNullConstantKey(): Unit = {
implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
val streamTable = env.fromCollection(dataWithNull)
.toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
tEnv.registerTable("T", streamTable)

tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys)

val sql = "SELECT T.id, T.len, T.content FROM T JOIN userTable " +
"for system_time as of T.proctime AS D ON D.id = null"

val sink = new TestingAppendSink
tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
env.execute()

val expected = Seq()
assertEquals(expected.sorted, sink.getAppendResults.sorted)
assertEquals(0, userTableSource.getResourceCounter)
}

@Test
def testJoinTemporalTableOnMultiKeyFieldsWithNullConstantKey(): Unit = {
val streamTable = env.fromCollection(data)
.toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
tEnv.registerTable("T", streamTable)

tEnv.registerTableSource("userTable", userTableSourceWith2Keys)

val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
"for system_time as of T.proctime AS D ON T.content = D.name AND null = D.id"

val sink = new TestingAppendSink
tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
env.execute()

val expected = Seq()
assertEquals(expected.sorted, sink.getAppendResults.sorted)
assertEquals(0, userTableSourceWith2Keys.getResourceCounter)
}

}

0 comments on commit bb70e45

Please sign in to comment.