Skip to content

Commit

Permalink
[FLINK-19694][table-planner-blink] Update MetadataHandlers for the ne…
Browse files Browse the repository at this point in the history
…w introduced StreamExecUpsertMaterialize node

This closes #13721
  • Loading branch information
wuchong committed Oct 27, 2020
1 parent 3c67cce commit 92a2648
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 10 deletions.
Expand Up @@ -304,6 +304,14 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
columns != null && util.Arrays.equals(columns.toArray, rel.getUniqueKeys)
}

def areColumnsUnique(
rel: StreamExecChangelogNormalize,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBoolean = {
columns != null && ImmutableBitSet.of(rel.uniqueKeys: _*).equals(columns)
}

def areColumnsUnique(
rel: Aggregate,
mq: RelMetadataQuery,
Expand Down
Expand Up @@ -164,8 +164,7 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
}

// if partitionBy a update field or partitionBy a field whose mono is null, just return null
if (rel.partitionKey.exists(e =>
inputMonotonicity == null || inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) {
if (rel.partitionKey.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) {
return null
}

Expand Down Expand Up @@ -209,14 +208,23 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
rel: StreamExecDeduplicate,
mq: RelMetadataQuery): RelModifiedMonotonicity = {
if (allAppend(mq, rel.getInput)) {
val mono = new RelModifiedMonotonicity(Array.fill(rel.getRowType.getFieldCount)(MONOTONIC))
val mono = new RelModifiedMonotonicity(
Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC))
rel.getUniqueKeys.foreach(e => mono.fieldMonotonicities(e) = CONSTANT)
mono
} else {
null
}
}

def getRelModifiedMonotonicity(
rel: StreamExecChangelogNormalize,
mq: RelMetadataQuery): RelModifiedMonotonicity = {
val mono = new RelModifiedMonotonicity(Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC))
rel.uniqueKeys.foreach(e => mono.fieldMonotonicities(e) = CONSTANT)
mono
}

def getRelModifiedMonotonicity(
rel: StreamExecWatermarkAssigner,
mq: RelMetadataQuery): RelModifiedMonotonicity = {
Expand Down Expand Up @@ -336,8 +344,8 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
val inputMonotonicity = fmq.getRelModifiedMonotonicity(input)

// if group by an update field or group by a field mono is null, just return null
if (grouping.exists(e =>
inputMonotonicity == null || inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) {
if (inputMonotonicity == null ||
grouping.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) {
return null
}

Expand All @@ -357,8 +365,8 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
val inputMonotonicity = fmq.getRelModifiedMonotonicity(input)

// if group by a update field or group by a field mono is null, just return null
if (grouping.exists(e =>
inputMonotonicity == null || inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) {
if (inputMonotonicity == null ||
grouping.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) {
return null
}

Expand Down
Expand Up @@ -314,6 +314,13 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
ImmutableSet.of(ImmutableBitSet.of(rel.getUniqueKeys.map(Integer.valueOf).toList))
}

def getUniqueKeys(
rel: StreamExecChangelogNormalize,
mq: RelMetadataQuery,
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
ImmutableSet.of(ImmutableBitSet.of(rel.uniqueKeys.map(Integer.valueOf).toList))
}

def getUniqueKeys(
rel: Aggregate,
mq: RelMetadataQuery,
Expand Down
Expand Up @@ -307,7 +307,7 @@ Join(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], leftInputSpec=[NoUn
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[single])
+- Calc(select=[IS NOT NULL(m) AS $f0])
+- GroupAggregate(select=[MIN(i) AS m])
+- GroupAggregate(select=[MIN_RETRACT(i) AS m])
+- Exchange(distribution=[single])
+- Calc(select=[true AS i])
+- Join(joinType=[LeftSemiJoin], where=[$f0], select=[d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
Expand Down
Expand Up @@ -275,6 +275,15 @@ class FlinkRelMdColumnUniquenessTest extends FlinkRelMdHandlerTestBase {
assertFalse(mq.areColumnsUnique(streamDeduplicateLastRow, ImmutableBitSet.of(0, 1, 2)))
}

@Test
def testAreColumnsUniqueCountOnStreamExecChangelogNormalize(): Unit = {
assertTrue(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(0, 1)))
assertTrue(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(1, 0)))
assertFalse(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(1)))
assertFalse(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(2)))
assertFalse(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(1, 2)))
}

@Test
def testAreColumnsUniqueOnAggregate(): Unit = {
Array(logicalAgg, flinkLogicalAgg).foreach { agg =>
Expand Down
Expand Up @@ -691,6 +691,18 @@ class FlinkRelMdHandlerTestBase {
(calcOfFirstRow, calcOfLastRow)
}

protected lazy val streamChangelogNormalize = {
val key = Array(1, 0)
val hash1 = FlinkRelDistribution.hash(key, requireStrict = true)
val streamExchange = new StreamExecExchange(
cluster, studentStreamScan.getTraitSet.replace(hash1), studentStreamScan, hash1)
new StreamExecChangelogNormalize(
cluster,
streamPhysicalTraits,
streamExchange,
key)
}

// equivalent SQL is
// select a, b, c from (
// select a, b, c, rowtime
Expand All @@ -703,7 +715,7 @@ class FlinkRelMdHandlerTestBase {
cluster,
flinkLogicalTraits,
temporalLogicalScan,
ImmutableBitSet.of(5),
ImmutableBitSet.of(1),
RelCollations.of(4),
RankType.ROW_NUMBER,
new ConstantRankRange(1, 1),
Expand Down
Expand Up @@ -315,5 +315,30 @@ class FlinkRelMdModifiedMonotonicityTest extends FlinkRelMdHandlerTestBase {
assertNull(mq.getRelModifiedMonotonicity(logicalAntiJoinOnUniqueKeys))
}

@Test
def testGetRelMonotonicityOnDeduplicate(): Unit = {
assertEquals(
new RelModifiedMonotonicity(Array(NOT_MONOTONIC, CONSTANT, NOT_MONOTONIC)),
mq.getRelModifiedMonotonicity(streamDeduplicateFirstRow))

assertEquals(
new RelModifiedMonotonicity(Array(NOT_MONOTONIC, CONSTANT, CONSTANT)),
mq.getRelModifiedMonotonicity(streamDeduplicateLastRow))

assertEquals(
new RelModifiedMonotonicity(Array(
NOT_MONOTONIC, CONSTANT, NOT_MONOTONIC, NOT_MONOTONIC, NOT_MONOTONIC)),
mq.getRelModifiedMonotonicity(rowtimeDeduplicate))
}

@Test
def testGetRelMonotonicityOnChangelogNormalize(): Unit = {
assertEquals(
new RelModifiedMonotonicity(Array(
CONSTANT, CONSTANT, NOT_MONOTONIC, NOT_MONOTONIC,
NOT_MONOTONIC, NOT_MONOTONIC, NOT_MONOTONIC)),
mq.getRelModifiedMonotonicity(streamChangelogNormalize))
}

}

Expand Up @@ -155,7 +155,12 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase {
def testGetUniqueKeysOnStreamExecDeduplicate(): Unit = {
assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(streamDeduplicateFirstRow).toSet)
assertEquals(uniqueKeys(Array(1, 2)), mq.getUniqueKeys(streamDeduplicateLastRow).toSet)
assertEquals(uniqueKeys(Array(5)), mq.getUniqueKeys(rowtimeDeduplicate).toSet)
assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(rowtimeDeduplicate).toSet)
}

@Test
def testGetUniqueKeysOnStreamExecChangelogNormalize(): Unit = {
assertEquals(uniqueKeys(Array(1, 0)), mq.getUniqueKeys(streamChangelogNormalize).toSet)
}

@Test
Expand Down

0 comments on commit 92a2648

Please sign in to comment.