Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ class BatchPhysicalWindowAggregateRule
case (udf, aggIndex) =>
aggBufferFieldNames(aggIndex) = udf match {
case _: AggregateFunction[_, _] =>
Array(aggNames(aggIndex))
Array(aggNames(aggIndex + auxGroupSet.length))
case agf: DeclarativeAggregateFunction =>
agf.aggBufferAttributes.map {
attr =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ object RelExplainUtil {

val inNames = grouping.map(inFields(_)) ++ auxGrouping.map(inFields(_)) ++ aggStrings
val outNames = grouping.indices.map(outFields(_)) ++
(grouping.length + 1 until grouping.length + 1 + auxGrouping.length).map(outFields(_)) ++
(grouping.length until grouping.length + auxGrouping.length).map(outFields(_)) ++
outFieldNames
inNames
.zip(outNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
HashWindowAggregate(groupBy=[a4], auxGrouping=[b4], window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, b4 AS EXPR$2, COUNT(c4) AS EXPR$2])
HashWindowAggregate(groupBy=[a4], auxGrouping=[b4], window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, b4, COUNT(c4) AS EXPR$2])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that if we have a test like

  1. unparse optimized version to SQL
  2. validate SQL
  3. apply for all (at least compiled plan related) tests then we could potentially spot more issues like that
    ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it easy/tractable to unparse from the optimized version back to SQL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, we don't currently have automated validation for plans, but this feature looks quite valuable, though the implementation might be challenging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @snuyanzin
Are there any other concerns? If not, this should be ready to merge.

+- Exchange(distribution=[hash[a4]])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
Expand All @@ -401,7 +401,7 @@ LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4 AS EXPR$2, COUNT(b4) AS EXPR$2, AVG(b4) AS EXPR$3])
HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4, COUNT(b4) AS EXPR$2, AVG(b4) AS EXPR$3])
+- Exchange(distribution=[hash[a4]])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
Expand All @@ -428,7 +428,7 @@ Calc(select=[a4, c4, s, EXPR$3])
+- Exchange(distribution=[hash[a4, s]])
+- LocalHashAggregate(groupBy=[a4, s], auxGrouping=[c4], select=[a4, s, c4, Partial_COUNT(b4) AS count$0])
+- Calc(select=[a4, c4, w$start AS s, CAST((($f2 - (($f3 * $f3) / $f4)) / $f4) AS INTEGER) AS b4])
+- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+- Exchange(distribution=[keep_input_as_is[hash[a4]]])
+- Calc(select=[a4, c4, d4, b4, (b4 * b4) AS $f4])
+- Exchange(distribution=[hash[a4]])
Expand Down Expand Up @@ -457,7 +457,7 @@ Calc(select=[a4, c4, e, EXPR$3])
+- Exchange(distribution=[hash[a4, e]])
+- LocalHashAggregate(groupBy=[a4, e], auxGrouping=[c4], select=[a4, e, c4, Partial_COUNT(b4) AS count$0])
+- Calc(select=[a4, c4, w$end AS e, CAST((($f2 - (($f3 * $f3) / $f4)) / $f4) AS INTEGER) AS b4])
+- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+- Exchange(distribution=[keep_input_as_is[hash[a4]]])
+- Calc(select=[a4, c4, d4, b4, (b4 * b4) AS $f4])
+- Exchange(distribution=[hash[a4]])
Expand Down Expand Up @@ -485,7 +485,7 @@ HashAggregate(isMerge=[true], groupBy=[a4, b4], auxGrouping=[c4], select=[a4, b4
+- Exchange(distribution=[hash[a4, b4]])
+- LocalHashAggregate(groupBy=[a4, b4], auxGrouping=[c4], select=[a4, b4, c4, Partial_COUNT(*) AS count1$0])
+- Calc(select=[a4, CAST((($f2 - (($f3 * $f3) / $f4)) / $f4) AS INTEGER) AS b4, c4])
+- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+- Exchange(distribution=[keep_input_as_is[hash[a4]]])
+- Calc(select=[a4, c4, d4, b4, (b4 * b4) AS $f4])
+- Exchange(distribution=[hash[a4]])
Expand Down