From 13e573fadd4779ac1315dab6608e3e5910ce42bb Mon Sep 17 00:00:00 2001 From: xueyu <278006819@qq.com> Date: Fri, 23 Nov 2018 23:10:28 +0800 Subject: [PATCH 1/2] is_not_distinct_from --- .../flink/table/codegen/CodeGenerator.scala | 5 +++ .../table/codegen/calls/ScalarOperators.scala | 12 +++++- .../expressions/ScalarOperatorsTest.scala | 4 ++ .../runtime/batch/sql/AggregateITCase.scala | 37 +++++++++++++++++++ 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 13bf50a5ef227..c126c8cd5148f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -838,6 +838,11 @@ abstract class CodeGenerator( val right = operands(1) generateEquals(nullCheck, left, right) + case IS_NOT_DISTINCT_FROM => + val left = operands.head + val right = operands(1) + generateDistinctFrom(nullCheck, left, right); + case NOT_EQUALS => val left = operands.head val right = operands(1) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala index 282b167e91429..09f11a6be8876 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala @@ -191,7 +191,17 @@ object ScalarOperators { ) ) } - } + } + + def generateDistinctFrom( + nullCheck: Boolean, + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + val newleft = left.copy(nullTerm = GeneratedExpression.NEVER_NULL) + val newright = left.copy(nullTerm = GeneratedExpression.NEVER_NULL) + generateEquals(nullCheck, newleft, newright) + } def generateEquals( nullCheck: Boolean, diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala index de4c804f075b0..d61627b2c41b0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala @@ -406,6 +406,10 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { "((((true) === true) || false).cast(STRING) + 'X ').trim", "trueX") testTableApi(12.isNull, "12.isNull", "false") + + testSqlApi("f12 IS NOT DISTINCT FROM NULL", "true") + testSqlApi("f9 IS NOT DISTINCT FROM NULL", "false") + testSqlApi("f9 IS NOT DISTINCT FROM 10", "true") } @Test diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala index 09ccfc47edaba..fff252f4ea4f2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala @@ -559,4 +559,41 @@ class AggregateITCase( TestBaseUtils.compareResultAsText(result.asJava, expected) } + + @Test + def testMultipleDistinctWithDiffParams(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlWithNull = "SELECT a, " + + " CASE WHEN b = 2 THEN null ELSE b END AS b, " + + " c FROM MyTable" + + val sqlQuery = + "SELECT b, " + + " COUNT(DISTINCT b), " + + " SUM(DISTINCT (a / 3)), " + + " COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," + + " COUNT(DISTINCT c) " + + "FROM (" + + sqlWithNull + + ") GROUP BY b " + + "ORDER BY b" + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", t) + + val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() + + val expected = Seq( + "1,1,0,1,1", + "3,1,3,3,3", + "4,1,5,1,4", + "5,1,12,1,5", + "6,1,18,1,6", + "null,0,1,1,2" + ).mkString("\n") + + TestBaseUtils.compareResultAsText(result.asJava, expected) + } } From 57d14d811d8a12119fb681604f431a1e65c0e6d6 Mon Sep 17 00:00:00 2001 From: xueyu <278006819@qq.com> Date: Wed, 5 Dec 2018 13:52:37 +0800 Subject: [PATCH 2/2] feedback address --- .../flink/table/codegen/CodeGenerator.scala | 2 +- .../table/codegen/calls/ScalarOperators.scala | 30 +++++++++++++++---- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index c126c8cd5148f..afef97120d6a9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -841,7 +841,7 @@ abstract class CodeGenerator( case IS_NOT_DISTINCT_FROM => val left = operands.head val right = operands(1) - generateDistinctFrom(nullCheck, left, right); + generateIsNotDistinctFrom(left, right); case NOT_EQUALS => val left = operands.head diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala index 09f11a6be8876..afa534a70caf6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala @@ -193,14 +193,34 @@ object ScalarOperators { } } - def generateDistinctFrom( - nullCheck: Boolean, + def generateIsNotDistinctFrom( left: GeneratedExpression, right: GeneratedExpression) : GeneratedExpression = { - val newleft = left.copy(nullTerm = GeneratedExpression.NEVER_NULL) - val newright = left.copy(nullTerm = GeneratedExpression.NEVER_NULL) - generateEquals(nullCheck, newleft, newright) + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val resultTypeTerm = primitiveTypeTermForTypeInfo(BOOLEAN_TYPE_INFO) + val equalExpression = generateEquals( + false, + left.copy(code = GeneratedExpression.NO_CODE), + right.copy(code = GeneratedExpression.NO_CODE)) + + val resultCode = + s""" + |${left.code} + |${right.code} + |$resultTypeTerm $resultTerm; + |if (${left.nullTerm}) { + | $resultTerm = ${right.nullTerm}; + |} else if (${right.nullTerm}) { + | $resultTerm = ${left.nullTerm}; + |} else { + | ${equalExpression.code} + | $resultTerm = ${equalExpression.resultTerm}; + |} + |""".stripMargin + + GeneratedExpression(resultTerm, GeneratedExpression.NEVER_NULL, resultCode, BOOLEAN_TYPE_INFO) } def generateEquals(