Skip to content

Commit

Permalink
[FLINK-15577][table-planner] Add Window specs to WindowAggregate node…
Browse files Browse the repository at this point in the history
…s' digests

The RelNode's digest is used by the Calcite HepPlanner to avoid adding
duplicate vertices to the graph. If an equivalent vertex was already
present in the graph, then that vertex is used in place of the newly
generated one.
This means that the digest needs to contain all the information
necessary to identifying a vertex and distinguishing it from similar
- but not equivalent - vertices.

In the case of the `WindowAggregation` nodes, the window specs are
currently not in the digest, meaning that two aggregations with the same
signatures and expressions but different windows are considered
equivalent by the planner, which is not correct and will lead to an
invalid Physical Plan.

This commit fixes this issue and adds a test ensuring that the window
specs are in the digest, as well as similar aggregations on two
different windows will not be considered equivalent.

This closes #10854

(cherry picked from commit 2447185)
  • Loading branch information
Benoit Hanotte authored and KurtYoung committed Jan 20, 2020
1 parent a2b211a commit 578a709
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class LogicalWindowAggregate(
for (property <- namedProperties) {
pw.item(property.name, property.property)
}
pw
pw.item("window", window.toString)
}

override def copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class LogicalWindowTableAggregate(
for (property <- namedProperties) {
pw.item(property.name, property.property)
}
pw
pw.item("window", window.toString)
}

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): TableAggregate = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelShuttle}
import org.apache.calcite.rel.{RelNode, RelShuttle, RelWriter}
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
Expand All @@ -52,6 +52,14 @@ class FlinkLogicalWindowAggregate(

def getNamedProperties: Seq[NamedWindowProperty] = namedProperties

override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
for (property <- namedProperties) {
pw.item(property.name, property.property)
}
pw.item("window", window.toString)
}

override def copy(
traitSet: RelTraitSet,
input: RelNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelShuttle}
import org.apache.calcite.rel.{RelNode, RelShuttle, RelWriter}
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory
Expand All @@ -50,6 +50,14 @@ class FlinkLogicalWindowTableAggregate(

def getNamedProperties: Seq[NamedWindowProperty] = namedProperties

override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
for (property <- namedProperties) {
pw.item(property.name, property.property)
}
pw.item("window", window.toString)
}

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): TableAggregate = {
new FlinkLogicalWindowTableAggregate(
window,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,4 +387,63 @@ class GroupWindowTest extends TableTestBase {

util.verifySql(sqlQuery, expected)
}

@Test
def testWindowAggregateWithDifferentWindows() = {
// This test ensures that the LogicalWindowAggregate and FlinkLogicalWindowAggregate nodes'
// digests contain the window specs. This allows the planner to make the distinction between
// similar aggregations using different windows (see FLINK-15577).
val util = batchTestUtil()
val table = util.addTable[(Timestamp)]("MyTable", 'rowtime)

val sql =
"""
|WITH window_1h AS (
| SELECT 1
| FROM MyTable
| GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
|),
|
|window_2h AS (
| SELECT 1
| FROM MyTable
| GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
|)
|
|(SELECT * FROM window_1h)
|UNION ALL
|(SELECT * FROM window_2h)
|""".stripMargin

val expected =
binaryNode(
"DataSetUnion",
unaryNode(
"DataSetCalc",
unaryNode(
"DataSetWindowAggregate",
batchTableNode(table),
// This window is the 1hr window
term("window", "SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 3600000.millis)"),
term("select")
),
term("select", "1 AS EXPR$0")
),
unaryNode(
"DataSetCalc",
unaryNode(
"DataSetWindowAggregate",
batchTableNode(table),
// This window is the 2hr window
term("window", "SlidingGroupWindow('w$, 'rowtime, 7200000.millis, 3600000.millis)"),
term("select")
),
term("select", "1 AS EXPR$0")
),
term("all", "true"),
term("union", "EXPR$0")
)

util.verifySql(sql, expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -344,4 +344,68 @@ class GroupWindowTest extends TableTestBase {
)
streamUtil.verifySql(sql, expected)
}

@Test
def testWindowAggregateWithDifferentWindows() = {
// This test ensures that the LogicalWindowAggregate and FlinkLogicalWindowAggregate nodes'
// digests contain the window specs. This allows the planner to make the distinction between
// similar aggregations using different windows (see FLINK-15577).
val sql =
"""
|WITH window_1h AS (
| SELECT 1
| FROM MyTable
| GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
|),
|
|window_2h AS (
| SELECT 1
| FROM MyTable
| GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
|)
|
|(SELECT * FROM window_1h)
|UNION ALL
|(SELECT * FROM window_2h)
|""".stripMargin

val expected =
binaryNode(
"DataStreamUnion",
unaryNode(
"DataStreamCalc",
unaryNode(
"DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(table),
term("select", "rowtime")
),
// This window is the 1hr window
term("window", "SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 3600000.millis)"),
term("select")
),
term("select", "1 AS EXPR$0")
),
unaryNode(
"DataStreamCalc",
unaryNode(
"DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(table),
term("select", "rowtime")
),
// This window is the 2hr window
term("window", "SlidingGroupWindow('w$, 'rowtime, 7200000.millis, 3600000.millis)"),
term("select")
),
term("select", "1 AS EXPR$0")
),
term("all", "true"),
term("union all", "EXPR$0")
)

streamUtil.verifySql(sql, expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -549,4 +549,63 @@ class GroupWindowTableAggregateTest extends TableTestBase {

util.verifyTable(windowedTable, expected)
}

@Test
def testWindowAggregateWithDifferentWindows(): Unit = {
// This test ensures that the LogicalWindowTableAggregate and FlinkLogicalWindowTableAggregate
// nodes'v digests contain the window specs. This allows the planner to make the distinction
// between similar aggregations using different windows (see FLINK-15577).
val tableWindow1hr = table
.window(Slide over 1.hour every 1.hour on 'd as 'w1)
.groupBy('w1)
.flatAggregate(emptyFunc('a, 'b))
.select(1 as 'a)

val tableWindow2hr = table
.window(Slide over 2.hour every 1.hour on 'd as 'w1)
.groupBy('w1)
.flatAggregate(emptyFunc('a, 'b))
.select(1 as 'b)

val joinTable = tableWindow1hr.fullOuterJoin(tableWindow2hr, 'a === 'b)

val expected =
binaryNode(
"DataStreamJoin",
unaryNode(
"DataStreamCalc",
unaryNode(
"DataStreamGroupWindowTableAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(table),
term("select", "a", "b", "d")
),
// This window is the 1hr window
term("window", "SlidingGroupWindow('w1, 'd, 3600000.millis, 3600000.millis)"),
term("select", "EmptyTableAggFunc(a, b) AS (f0, f1)")
),
term("select", "1 AS a")
),
unaryNode(
"DataStreamCalc",
unaryNode(
"DataStreamGroupWindowTableAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(table),
term("select", "a", "b", "d")
),
// This window is the 2hr window
term("window", "SlidingGroupWindow('w1, 'd, 7200000.millis, 3600000.millis)"),
term("select", "EmptyTableAggFunc(a, b) AS (f0, f1)")
),
term("select", "1 AS b")
),
term("where", "=(a, b)"),
term("join", "a", "b"),
term("joinType", "FullOuterJoin")
)
util.verifyTable(joinTable, expected)
}
}

0 comments on commit 578a709

Please sign in to comment.