Skip to content

Commit

Permalink
[FLINK-16070] [table-planner-blink] Stream planner supports remove co…
Browse files Browse the repository at this point in the history
…nstant keys from an aggregate


This closes #11158
  • Loading branch information
godfreyhe committed Mar 30, 2020
1 parent ae3b0ff commit 179d7a6
Show file tree
Hide file tree
Showing 12 changed files with 313 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ object FlinkStreamRuleSets {
REWRITE_COALESCE_RULES.asScala ++
REDUCE_EXPRESSION_RULES.asScala ++
List(
//removes constant keys from an Agg
AggregateProjectPullUpConstantsRule.INSTANCE,
StreamLogicalWindowAggregateRule.INSTANCE,
// slices a project into sections which contain window agg functions
// and sections which do not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,80 @@ HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS EXPR$1
+- LocalHashAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS sum$0, Partial_COUNT(c) AS count$1])
+- Calc(select=[a, b, c], where=[=(a, 1)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testGroupByWithConstantKey[aggStrategy=AUTO]">
<Resource name="sql">
<![CDATA[
SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1])
+- Exchange(distribution=[hash[a]])
+- LocalHashAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
+- Calc(select=[a, _UTF-16LE'test' AS c, b])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testGroupByWithConstantKey[aggStrategy=ONE_PHASE]">
<Resource name="sql">
<![CDATA[
SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
+- HashAggregate(isMerge=[false], groupBy=[a], select=[a, MAX(b) AS EXPR$1])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, _UTF-16LE'test' AS c, b])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testGroupByWithConstantKey[aggStrategy=TWO_PHASE]">
<Resource name="sql">
<![CDATA[
SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1])
+- Exchange(distribution=[hash[a]])
+- LocalHashAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
+- Calc(select=[a, _UTF-16LE'test' AS c, b])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,85 @@ SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS EXPR$1
+- Sort(orderBy=[a ASC])
+- Calc(select=[a, b, c], where=[=(a, 1)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testGroupByWithConstantKey[aggStrategy=AUTO]">
<Resource name="sql">
<![CDATA[
SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
+- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1])
+- Sort(orderBy=[a ASC])
+- Exchange(distribution=[hash[a]])
+- LocalSortAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
+- Sort(orderBy=[a ASC])
+- Calc(select=[a, _UTF-16LE'test' AS c, b])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testGroupByWithConstantKey[aggStrategy=ONE_PHASE]">
<Resource name="sql">
<![CDATA[
SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
+- SortAggregate(isMerge=[false], groupBy=[a], select=[a, MAX(b) AS EXPR$1])
+- Sort(orderBy=[a ASC])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, _UTF-16LE'test' AS c, b])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testGroupByWithConstantKey[aggStrategy=TWO_PHASE]">
<Resource name="sql">
<![CDATA[
SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM MyTable1) t GROUP BY a, c
]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
+- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1])
+- Sort(orderBy=[a ASC])
+- Exchange(distribution=[hash[a]])
+- LocalSortAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
+- Sort(orderBy=[a ASC])
+- Calc(select=[a, _UTF-16LE'test' AS c, b])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ LogicalProject(a=[$0], b=[$1], count_c=[$2], rank_num=[$3])
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
+- Rank(strategy=[UpdateFastStrategy[0,3]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[count_c DESC], select=[a, b, count_c, $3, w0$o0], updateAsRetraction=[false], accMode=[Acc])
+- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[], orderBy=[count_c DESC], select=[a, b, count_c, $3, w0$o0], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[single], updateAsRetraction=[false], accMode=[Acc])
+- Calc(select=[a, b, count_c, $3], updateAsRetraction=[false], accMode=[Acc])
+- Rank(strategy=[UpdateFastStrategy[0,1,2]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, cn, count_c, $3], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+- GroupAggregate(groupBy=[a, b, cn], select=[a, b, cn, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[hash[a, b, cn]], updateAsRetraction=[true], accMode=[Acc])
+- Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, count_c, $3], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+- Calc(select=[a, b, count_c], updateAsRetraction=[false], accMode=[Acc])
+- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[hash[a, b]], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[a, b, _UTF-16LE'cn' AS cn], updateAsRetraction=[true], accMode=[Acc])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[true], accMode=[Acc])
]]>
Expand Down Expand Up @@ -422,68 +422,6 @@ Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1,
+- Exchange(distribution=[hash[a, b]], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
<TestCase name="testTopNWithoutRowNumber2">
<Resource name="sql">
<![CDATA[
SELECT
CONCAT(seller_id, venture, stat_date, sku_id) as rowkey,
seller_id,
sku_id,
venture,
stat_date,
amt_dtr,
byr_cnt_dtr,
pv_dtr,
uv_dtr
FROM (
SELECT
seller_id,
sku_id,
venture,
stat_date,
amt_dtr,
byr_cnt_dtr,
pv_dtr,
uv_dtr,
ROW_NUMBER() OVER (PARTITION BY seller_id, venture, stat_date
ORDER BY amt_dtr DESC) AS rownum
FROM (
SELECT
seller_id
,sku_id
,venture
,stat_date
,incr_sum(trd_amt) AS amt_dtr
,COUNT(DISTINCT trd_buyer_id) AS byr_cnt_dtr
,SUM(log_pv) AS pv_dtr
,COUNT(DISTINCT log_visitor_id) AS uv_dtr
FROM stream_source
GROUP BY seller_id,sku_id,venture,stat_date
)
)
WHERE rownum <= 10
]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(rowkey=[CONCAT($0, $2, $3, $1)], seller_id=[$0], sku_id=[$1], venture=[$2], stat_date=[$3], amt_dtr=[$4], byr_cnt_dtr=[$5], pv_dtr=[$6], uv_dtr=[$7])
+- LogicalFilter(condition=[<=($8, 10)])
+- LogicalProject(seller_id=[$0], sku_id=[$1], venture=[$2], stat_date=[$3], amt_dtr=[$4], byr_cnt_dtr=[$5], pv_dtr=[$6], uv_dtr=[$7], rownum=[ROW_NUMBER() OVER (PARTITION BY $0, $2, $3 ORDER BY $4 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+- LogicalAggregate(group=[{0, 1, 2, 3}], amt_dtr=[INCR_SUM($4)], byr_cnt_dtr=[COUNT(DISTINCT $5)], pv_dtr=[SUM($6)], uv_dtr=[COUNT(DISTINCT $7)])
+- LogicalTableScan(table=[[default_catalog, default_database, stream_source, source: [TestTableSource(seller_id, sku_id, venture, stat_date, trd_amt, trd_buyer_id, log_pv, log_visitor_id)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[CONCAT(seller_id, venture, stat_date, sku_id) AS rowkey, seller_id, sku_id, venture, stat_date, amt_dtr, byr_cnt_dtr, pv_dtr, uv_dtr], updateAsRetraction=[false], accMode=[Acc])
+- Rank(strategy=[UpdateFastStrategy[0,1,2,3]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[seller_id, venture, stat_date], orderBy=[amt_dtr DESC], select=[seller_id, sku_id, venture, stat_date, amt_dtr, byr_cnt_dtr, pv_dtr, uv_dtr], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[hash[seller_id, venture, stat_date]], updateAsRetraction=[false], accMode=[Acc])
+- GroupAggregate(groupBy=[seller_id, sku_id, venture, stat_date], select=[seller_id, sku_id, venture, stat_date, INCR_SUM(trd_amt) AS amt_dtr, COUNT(DISTINCT trd_buyer_id) AS byr_cnt_dtr, SUM(log_pv) AS pv_dtr, COUNT(DISTINCT log_visitor_id) AS uv_dtr], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[hash[seller_id, sku_id, venture, stat_date]], updateAsRetraction=[true], accMode=[Acc])
+- TableSourceScan(table=[[default_catalog, default_database, stream_source, source: [TestTableSource(seller_id, sku_id, venture, stat_date, trd_amt, trd_buyer_id, log_pv, log_visitor_id)]]], fields=[seller_id, sku_id, venture, stat_date, trd_amt, trd_buyer_id, log_pv, log_visitor_id], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -737,46 +675,6 @@ Calc(select=[a, b, c, 10:BIGINT AS row_num], updateAsRetraction=[false], accMode
+- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+- Calc(select=[a, b, c], updateAsRetraction=[false], accMode=[Acc])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
]]>
</Resource>
</TestCase>
<TestCase name="testTopNWithGroupByConstantKey">
<Resource name="sql">
<![CDATA[
SELECT *
FROM (
SELECT a, b, count_c,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num
FROM (
SELECT a, b, COUNT(*) AS count_c
FROM (
SELECT *, 'cn' AS cn
FROM MyTable
)
GROUP BY a, b, cn
))
WHERE row_num <= 10
]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], count_c=[$2], row_num=[$3])
+- LogicalFilter(condition=[<=($3, 10)])
+- LogicalProject(a=[$0], b=[$1], count_c=[$3], row_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+- LogicalAggregate(group=[{0, 1, 2}], count_c=[COUNT()])
+- LogicalProject(a=[$0], b=[$1], cn=[_UTF-16LE'cn'])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
+- Rank(strategy=[UpdateFastStrategy[0,1,2]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, cn, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+- GroupAggregate(groupBy=[a, b, cn], select=[a, b, cn, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[hash[a, b, cn]], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[a, b, _UTF-16LE'cn' AS cn], updateAsRetraction=[true], accMode=[Acc])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -844,6 +742,46 @@ Calc(select=[row_num, a, c], where=[IS NOT NULL(b)], updateAsRetraction=[false],
+- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+- Calc(select=[a, b, c], where=[>(c, 1000)], updateAsRetraction=[false], accMode=[Acc])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[false], accMode=[Acc])
]]>
</Resource>
</TestCase>
<TestCase name="testTopNWithGroupByConstantKey">
<Resource name="sql">
<![CDATA[
SELECT *
FROM (
SELECT a, b, count_c,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num
FROM (
SELECT a, b, COUNT(*) AS count_c
FROM (
SELECT *, 'cn' AS cn
FROM MyTable
)
GROUP BY a, b, cn
))
WHERE row_num <= 10
]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], count_c=[$2], row_num=[$3])
+- LogicalFilter(condition=[<=($3, 10)])
+- LogicalProject(a=[$0], b=[$1], count_c=[$3], row_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+- LogicalAggregate(group=[{0, 1, 2}], count_c=[COUNT()])
+- LogicalProject(a=[$0], b=[$1], cn=[_UTF-16LE'cn'])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, count_c, w0$o0], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[hash[a]], updateAsRetraction=[false], accMode=[Acc])
+- Calc(select=[a, b, count_c], updateAsRetraction=[false], accMode=[Acc])
+- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c], updateAsRetraction=[false], accMode=[Acc])
+- Exchange(distribution=[hash[a, b]], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[a, b, _UTF-16LE'cn' AS cn], updateAsRetraction=[true], accMode=[Acc])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,30 @@ GroupAggregate(select=[AVG_RETRACT(a) AS EXPR$0], updateAsRetraction=[false], ac
+- Exchange(distribution=[hash[b]], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[b, a], updateAsRetraction=[true], accMode=[Acc])
+- TableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
<TestCase name="testGroupByWithConstantKey">
<Resource name="sql">
<![CDATA[
SELECT a, MAX(b), c FROM (SELECT a, 'test' AS c, b FROM T) t GROUP BY a, c
]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)])
+- LogicalProject(a=[$0], c=[_UTF-16LE'test'], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
+- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS EXPR$1])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, _UTF-16LE'test' AS c, b])
+- TableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
]]>
</Resource>
</TestCase>
Expand Down
Loading

0 comments on commit 179d7a6

Please sign in to comment.