Skip to content

Commit

Permalink
[FLINK-15001] [table-planner-blink] The digest of sub-plan reuse shou…
Browse files Browse the repository at this point in the history
…ld contain retraction traits for stream physical nodes

This closes #10377.
  • Loading branch information
godfreyhe authored and hequn8128 committed Dec 12, 2019
1 parent f283edb commit ef0a033
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 304 deletions.
Expand Up @@ -23,7 +23,7 @@ import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.planner.plan.nodes.calcite.Sink
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan
import org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan
import org.apache.flink.table.planner.plan.utils.{DefaultRelShuttle, RelDigestUtil}
import org.apache.flink.table.planner.plan.utils.{DefaultRelShuttle, FlinkRelOptUtil}

import com.google.common.collect.{Maps, Sets}
import org.apache.calcite.rel.core.{Exchange, TableFunctionScan}
Expand Down Expand Up @@ -106,7 +106,7 @@ object SubplanReuser {
if (digest != null) {
digest
} else {
val newDigest = RelDigestUtil.getDigest(node)
val newDigest = FlinkRelOptUtil.getDigest(node)
mapRelToDigest.put(node, newDigest)
newDigest
}
Expand Down
Expand Up @@ -78,11 +78,71 @@ object FlinkRelOptUtil {
detailLevel,
withIdPrefix,
withRetractTraits,
withRowType)
withRowType,
withTreeStyle = true)
rel.explain(planWriter)
sw.toString
}

/**
* Gets the digest for a rel tree.
*
* The digest of RelNode should contain the result of RelNode#explain method, retraction traits
* (for StreamPhysicalRel) and RelNode's row type.
*
* Row type is part of the digest for the rare occasion that similar
* expressions have different types, e.g.
* "WITH
* t1 AS (SELECT CAST(a as BIGINT) AS a, SUM(b) AS b FROM x GROUP BY CAST(a as BIGINT)),
* t2 AS (SELECT CAST(a as DOUBLE) AS a, SUM(b) AS b FROM x GROUP BY CAST(a as DOUBLE))
* SELECT t1.*, t2.* FROM t1, t2 WHERE t1.b = t2.b"
*
* the physical plan is:
* {{{
* HashJoin(where=[=(b, b0)], join=[a, b, a0, b0], joinType=[InnerJoin],
* isBroadcast=[true], build=[right])
* :- HashAggregate(groupBy=[a], select=[a, Final_SUM(sum$0) AS b])
* : +- Exchange(distribution=[hash[a]])
* : +- LocalHashAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS sum$0])
* : +- Calc(select=[CAST(a) AS a, b])
* : +- ScanTable(table=[[builtin, default, x]], fields=[a, b, c])
* +- Exchange(distribution=[broadcast])
* +- HashAggregate(groupBy=[a], select=[a, Final_SUM(sum$0) AS b])
* +- Exchange(distribution=[hash[a]])
* +- LocalHashAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS sum$0])
* +- Calc(select=[CAST(a) AS a, b])
* +- ScanTable(table=[[builtin, default, x]], fields=[a, b, c])
* }}}
*
* The sub-plan of `HashAggregate(groupBy=[a], select=[a, Final_SUM(sum$0) AS b])`
* are different because `CAST(a) AS a` has different types, where one is BIGINT type
* and another is DOUBLE type.
*
* If use the result of `RelOptUtil.toString(aggregate, SqlExplainLevel.DIGEST_ATTRIBUTES)`
* on `HashAggregate(groupBy=[a], select=[a, Final_SUM(sum$0) AS b])` as digest,
* we will get incorrect result. So rewrite `explain_` method of `RelWriterImpl` to
* add row-type to digest value.
*
* @param rel rel node tree
* @return The digest of given rel tree.
*/
def getDigest(rel: RelNode): String = {
val sw = new StringWriter
rel.explain(new RelTreeWriterImpl(
new PrintWriter(sw),
explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
// ignore id, only contains RelNode's attributes
withIdPrefix = false,
// add retraction traits to digest for StreamPhysicalRel node
withRetractTraits = true,
// add row type to digest to avoid corner case that similar
// expressions have different types
withRowType = true,
// ignore tree style, only contains RelNode's attributes
withTreeStyle = false))
sw.toString
}

/**
* Returns the null direction if not specified.
*
Expand Down

This file was deleted.

Expand Up @@ -38,7 +38,8 @@ class RelTreeWriterImpl(
explainLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
withIdPrefix: Boolean = false,
withRetractTraits: Boolean = false,
withRowType: Boolean = false)
withRowType: Boolean = false,
withTreeStyle: Boolean = true)
extends RelWriterImpl(pw, explainLevel, withIdPrefix) {

var lastChildren: Seq[Boolean] = Nil
Expand All @@ -55,11 +56,13 @@ class RelTreeWriterImpl(
}

val s = new StringBuilder
if (depth > 0) {
lastChildren.init.foreach { isLast =>
s.append(if (isLast) " " else ": ")
if (withTreeStyle) {
if (depth > 0) {
lastChildren.init.foreach { isLast =>
s.append(if (isLast) " " else ": ")
}
s.append(if (lastChildren.last) "+- " else ":- ")
}
s.append(if (lastChildren.last) "+- " else ":- ")
}

if (withIdPrefix) {
Expand Down Expand Up @@ -112,18 +115,30 @@ class RelTreeWriterImpl(
}
pw.println(s)
if (inputs.length > 1) inputs.toSeq.init.foreach { rel =>
depth = depth + 1
lastChildren = lastChildren :+ false
if (withTreeStyle) {
depth = depth + 1
lastChildren = lastChildren :+ false
}

rel.explain(this)
depth = depth - 1
lastChildren = lastChildren.init

if (withTreeStyle) {
depth = depth - 1
lastChildren = lastChildren.init
}
}
if (!inputs.isEmpty) {
depth = depth + 1
lastChildren = lastChildren :+ true
if (withTreeStyle) {
depth = depth + 1
lastChildren = lastChildren :+ true
}

inputs.toSeq.last.explain(this)
depth = depth - 1
lastChildren = lastChildren.init

if (withTreeStyle) {
depth = depth - 1
lastChildren = lastChildren.init
}
}
}
}
@@ -1,14 +1,14 @@
LogicalIntersect(all=[false],rowType=[RecordType(INTEGER random)])
LogicalIntersect(all=[false],rowType=[RecordType(INTEGER random)])
LogicalProject(random=[$0],rowType=[RecordType(INTEGER random)])
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1],rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)])
LogicalProject(random=[$1], EXPR$1=[RAND()],rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)])
LogicalTableScan(table=[[default_catalog, default_database, MyTable]],rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)])
LogicalProject(random=[$0],rowType=[RecordType(INTEGER random)])
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1],rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)])
LogicalProject(random=[$1], EXPR$1=[RAND()],rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)])
LogicalTableScan(table=[[default_catalog, default_database, MyTable]],rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)])
LogicalProject(random=[$0],rowType=[RecordType(INTEGER random)])
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1],rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)])
LogicalProject(random=[$1], EXPR$1=[RAND()],rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)])
LogicalTableScan(table=[[default_catalog, default_database, MyTable]],rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)])
LogicalIntersect(all=[false]), rowType=[RecordType(INTEGER random)]
LogicalIntersect(all=[false]), rowType=[RecordType(INTEGER random)]
LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)]
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)]
LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)]
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)]
LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)]
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)]
@@ -1,14 +1,14 @@
LogicalIntersect(all=[false],rowType=[RecordType(INTEGER random)])
LogicalIntersect(all=[false],rowType=[RecordType(INTEGER random)])
LogicalProject(random=[$0],rowType=[RecordType(INTEGER random)])
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1],rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)])
LogicalProject(random=[$1], EXPR$1=[RAND()],rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)])
LogicalTableScan(table=[[default_catalog, default_database, MyTable]],rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)])
LogicalProject(random=[$0],rowType=[RecordType(INTEGER random)])
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1],rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)])
LogicalProject(random=[$1], EXPR$1=[RAND()],rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)])
LogicalTableScan(table=[[default_catalog, default_database, MyTable]],rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)])
LogicalProject(random=[$0],rowType=[RecordType(INTEGER random)])
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1],rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)])
LogicalProject(random=[$1], EXPR$1=[RAND()],rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)])
LogicalTableScan(table=[[default_catalog, default_database, MyTable]],rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)])
LogicalIntersect(all=[false]), rowType=[RecordType(INTEGER random)]
LogicalIntersect(all=[false]), rowType=[RecordType(INTEGER random)]
LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)]
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)]
LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)]
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)]
LogicalProject(random=[$0]), rowType=[RecordType(INTEGER random)]
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
LogicalProject(random=[$1], EXPR$1=[RAND()]), rowType=[RecordType(INTEGER random, DOUBLE EXPR$1)]
LogicalTableScan(table=[[default_catalog, default_database, MyTable]]), rowType=[RecordType(VARCHAR(2147483647) first, INTEGER id, DOUBLE score, VARCHAR(2147483647) last)]

0 comments on commit ef0a033

Please sign in to comment.