-
Notifications
You must be signed in to change notification settings - Fork 435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GLUTEN-4668][CH] Merge two phase hash-based aggregate into one aggregate in the spark plan when there is no shuffle #4669
Conversation
Run Gluten Clickhouse CI |
@liujiayi771 @lgbo-ustc please help to review, thanks. |
Run Gluten Clickhouse CI |
} | ||
} | ||
|
||
override def apply(plan: SparkPlan): SparkPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to use PhysicalPlanSelector.maybe
to check if guten is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps you should check whether the SparkPlan
has been tagged as TRANSFORM_UNSUPPORTED
. There are some rules earlier that will tag the SparkPlan
, for example, FallbackOnANSIMode
will tag all SparkPlan
s as TRANSFORM_UNSUPPORTED
when ANSI mode is enabled. In this case, we can also avoid merging the aggregations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Run Gluten Clickhouse CI |
if (isPartialAgg(child, hashAgg)) { | ||
// convert to complete mode aggregate expressions | ||
val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete)) | ||
HashAggregateExec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use
hashAgg.copy(
aggregateExpressions = completeAggregateExpressions,
child = child.child
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Run Gluten Clickhouse CI |
7aa0f55
to
35c2555
Compare
Run Gluten Clickhouse CI |
2 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
46d6a6d
to
17c9586
Compare
Run Gluten Clickhouse CI |
@liujiayi771 @lgbo-ustc @ulysses-you @PHILO-HE @rui-mo please help to review again, thanks. |
17c9586
to
f5e1689
Compare
Run Gluten Clickhouse CI |
resultExpressions, | ||
child: HashAggregateExec) | ||
if !isStreaming && isTransformable(hashAgg) && isTransformable(child) => | ||
if (isPartialAgg(child, hashAgg)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not put this if
into previous line ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} else { | ||
objectHashAgg | ||
} | ||
case plan: SparkPlan => plan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also handle SortAggregate ? it is possible that there is no shuffle and sort between two SortAggregate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added SortAggregate
// to handle outputs according to the AggregateMode | ||
for (attr <- child.output) { | ||
typeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) | ||
nameList.add(ConverterUtils.genColumnNameWithExprId(attr)) | ||
nameList.addAll(ConverterUtils.collectStructFieldNames(attr.dataType)) | ||
} | ||
(child.output, output) | ||
} else if (!modes.contains(Partial)) { | ||
} else if (modes.forall(_ == Partial)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how CH backend transform aggregate. But the code seems different with before, Partial
can appear with PartialMerge
with distinct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some description for the motivation of this PR? It is a usual case for partial + final without shuffle between? Thanks.
f5e1689
to
51e5270
Compare
Run Gluten Clickhouse CI |
Updated, please review again, thanks. @ulysses-you @rui-mo |
agg.resultExpressions, | ||
agg.child | ||
) | ||
transformer.doValidate().isValid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we have not pulled out pre/post project, so this validation seems very likely to fail. Do we need this check ? I think it should be fine even we fallback to vanilla Spark after merging aggregates, since there already exists a rule ReplaceHashWithSortAgg to do the similar thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure that merging sort aggregates also works for the vanilla Spark, do you have any idea for this? the hash based aggregates may be work even fallback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
51e5270
to
580338f
Compare
Run Gluten Clickhouse CI |
lgtm if test pass, cc @rui-mo if you have other comments |
…gate in the spark plan when there is no shuffle Examples: HashAggregate(t1.i, SUM, final) | => HashAggregate(t1.i, SUM, complete) HashAggregate(t1.i, SUM, partial) now this feature only support for CH backend. Close apache#4668. Co-authored-by: lgbo <lgbo.ustc@gmail.com>
580338f
to
66bf015
Compare
Run Gluten Clickhouse CI |
LGTM |
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
What changes were proposed in this pull request?
Merge two phase hash-based aggregate into one aggregate in the spark plan when there is no shuffle between them:
Examples:
For example:
TPCH Q18 with bucket tables, before this pr:
there are two
HashAggregateTransformer
in one whole stage;after this pr:
there is only one
HashAggregateTransformer
in one whole stage, and will reduce the time for the secondHashAggregateTransformer
.Now this feature only support for CH backend.
Close #4668.
(Fixes: #4668)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)