From 92a264801377e331ff3c18dd225caad2a3e8a4da Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Tue, 27 Oct 2020 17:35:32 +0800 Subject: [PATCH] [FLINK-19694][table-planner-blink] Update MetadataHandlers for the new introduced StreamExecUpsertMaterialize node This closes #13721 --- .../metadata/FlinkRelMdColumnUniqueness.scala | 8 ++++++ .../FlinkRelMdModifiedMonotonicity.scala | 22 ++++++++++------ .../plan/metadata/FlinkRelMdUniqueKeys.scala | 7 ++++++ .../plan/stream/sql/join/SemiAntiJoinTest.xml | 2 +- .../FlinkRelMdColumnUniquenessTest.scala | 9 +++++++ .../metadata/FlinkRelMdHandlerTestBase.scala | 14 ++++++++++- .../FlinkRelMdModifiedMonotonicityTest.scala | 25 +++++++++++++++++++ .../metadata/FlinkRelMdUniqueKeysTest.scala | 7 +++++- 8 files changed, 84 insertions(+), 10 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala index f4912f9923e96..3b237a69c0ac9 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala @@ -304,6 +304,14 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata columns != null && util.Arrays.equals(columns.toArray, rel.getUniqueKeys) } + def areColumnsUnique( + rel: StreamExecChangelogNormalize, + mq: RelMetadataQuery, + columns: ImmutableBitSet, + ignoreNulls: Boolean): JBoolean = { + columns != null && ImmutableBitSet.of(rel.uniqueKeys: _*).equals(columns) + } + def areColumnsUnique( rel: Aggregate, mq: RelMetadataQuery, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala index 675a317ef7570..20ca5903f3422 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala @@ -164,8 +164,7 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon } // if partitionBy a update field or partitionBy a field whose mono is null, just return null - if (rel.partitionKey.exists(e => - inputMonotonicity == null || inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { + if (rel.partitionKey.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { return null } @@ -209,7 +208,8 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon rel: StreamExecDeduplicate, mq: RelMetadataQuery): RelModifiedMonotonicity = { if (allAppend(mq, rel.getInput)) { - val mono = new RelModifiedMonotonicity(Array.fill(rel.getRowType.getFieldCount)(MONOTONIC)) + val mono = new RelModifiedMonotonicity( + Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC)) rel.getUniqueKeys.foreach(e => mono.fieldMonotonicities(e) = CONSTANT) mono } else { @@ -217,6 +217,14 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon } } + def getRelModifiedMonotonicity( + rel: StreamExecChangelogNormalize, + mq: RelMetadataQuery): RelModifiedMonotonicity = { + val mono = new RelModifiedMonotonicity(Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC)) + rel.uniqueKeys.foreach(e => mono.fieldMonotonicities(e) = CONSTANT) + mono + } + def getRelModifiedMonotonicity( rel: StreamExecWatermarkAssigner, mq: RelMetadataQuery): RelModifiedMonotonicity = { @@ -336,8 +344,8 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon val inputMonotonicity = fmq.getRelModifiedMonotonicity(input) // if group by an update field or group by a field mono is null, just return null - if (grouping.exists(e => - inputMonotonicity == null || inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { + if (inputMonotonicity == null || + grouping.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { return null } @@ -357,8 +365,8 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon val inputMonotonicity = fmq.getRelModifiedMonotonicity(input) // if group by a update field or group by a field mono is null, just return null - if (grouping.exists(e => - inputMonotonicity == null || inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { + if (inputMonotonicity == null || + grouping.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { return null } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala index 88d6cb01142a7..44f7b5be6939e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala @@ -314,6 +314,13 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu ImmutableSet.of(ImmutableBitSet.of(rel.getUniqueKeys.map(Integer.valueOf).toList)) } + def getUniqueKeys( + rel: StreamExecChangelogNormalize, + mq: RelMetadataQuery, + ignoreNulls: Boolean): JSet[ImmutableBitSet] = { + ImmutableSet.of(ImmutableBitSet.of(rel.uniqueKeys.map(Integer.valueOf).toList)) + } + def getUniqueKeys( rel: Aggregate, mq: RelMetadataQuery, diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml index 1d9b8d57d272b..a846c2a6906b1 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml @@ -307,7 +307,7 @@ Join(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], leftInputSpec=[NoUn : +- LegacyTableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[single]) +- Calc(select=[IS NOT NULL(m) AS $f0]) - +- GroupAggregate(select=[MIN(i) AS m]) + +- GroupAggregate(select=[MIN_RETRACT(i) AS m]) +- Exchange(distribution=[single]) +- Calc(select=[true AS i]) +- Join(joinType=[LeftSemiJoin], where=[$f0], select=[d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala index d9f6042b44bab..d942bff04f4fa 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala @@ -275,6 +275,15 @@ class FlinkRelMdColumnUniquenessTest extends FlinkRelMdHandlerTestBase { assertFalse(mq.areColumnsUnique(streamDeduplicateLastRow, ImmutableBitSet.of(0, 1, 2))) } + @Test + def testAreColumnsUniqueCountOnStreamExecChangelogNormalize(): Unit = { + assertTrue(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(0, 1))) + assertTrue(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(1, 0))) + assertFalse(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(1))) + assertFalse(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(2))) + assertFalse(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(1, 2))) + } + @Test def testAreColumnsUniqueOnAggregate(): Unit = { Array(logicalAgg, flinkLogicalAgg).foreach { agg => diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index fc5d15cb78bdd..46e14c8a0aaa6 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -691,6 +691,18 @@ class FlinkRelMdHandlerTestBase { (calcOfFirstRow, calcOfLastRow) } + protected lazy val streamChangelogNormalize = { + val key = Array(1, 0) + val hash1 = FlinkRelDistribution.hash(key, requireStrict = true) + val streamExchange = new StreamExecExchange( + cluster, studentStreamScan.getTraitSet.replace(hash1), studentStreamScan, hash1) + new StreamExecChangelogNormalize( + cluster, + streamPhysicalTraits, + streamExchange, + key) + } + // equivalent SQL is // select a, b, c from ( // select a, b, c, rowtime @@ -703,7 +715,7 @@ class FlinkRelMdHandlerTestBase { cluster, flinkLogicalTraits, temporalLogicalScan, - ImmutableBitSet.of(5), + ImmutableBitSet.of(1), RelCollations.of(4), RankType.ROW_NUMBER, new ConstantRankRange(1, 1), diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala index 64a9ff2c1189e..d49df3859de71 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala @@ -315,5 +315,30 @@ class FlinkRelMdModifiedMonotonicityTest extends FlinkRelMdHandlerTestBase { assertNull(mq.getRelModifiedMonotonicity(logicalAntiJoinOnUniqueKeys)) } + @Test + def testGetRelMonotonicityOnDeduplicate(): Unit = { + assertEquals( + new RelModifiedMonotonicity(Array(NOT_MONOTONIC, CONSTANT, NOT_MONOTONIC)), + mq.getRelModifiedMonotonicity(streamDeduplicateFirstRow)) + + assertEquals( + new RelModifiedMonotonicity(Array(NOT_MONOTONIC, CONSTANT, CONSTANT)), + mq.getRelModifiedMonotonicity(streamDeduplicateLastRow)) + + assertEquals( + new RelModifiedMonotonicity(Array( + NOT_MONOTONIC, CONSTANT, NOT_MONOTONIC, NOT_MONOTONIC, NOT_MONOTONIC)), + mq.getRelModifiedMonotonicity(rowtimeDeduplicate)) + } + + @Test + def testGetRelMonotonicityOnChangelogNormalize(): Unit = { + assertEquals( + new RelModifiedMonotonicity(Array( + CONSTANT, CONSTANT, NOT_MONOTONIC, NOT_MONOTONIC, + NOT_MONOTONIC, NOT_MONOTONIC, NOT_MONOTONIC)), + mq.getRelModifiedMonotonicity(streamChangelogNormalize)) + } + } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala index e195205b5ea8f..e662c59ec0c1d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala @@ -155,7 +155,12 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase { def testGetUniqueKeysOnStreamExecDeduplicate(): Unit = { assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(streamDeduplicateFirstRow).toSet) assertEquals(uniqueKeys(Array(1, 2)), mq.getUniqueKeys(streamDeduplicateLastRow).toSet) - assertEquals(uniqueKeys(Array(5)), mq.getUniqueKeys(rowtimeDeduplicate).toSet) + assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(rowtimeDeduplicate).toSet) + } + + @Test + def testGetUniqueKeysOnStreamExecChangelogNormalize(): Unit = { + assertEquals(uniqueKeys(Array(1, 0)), mq.getUniqueKeys(streamChangelogNormalize).toSet) } @Test