[GLUTEN-4213][CORE] Refactoring insertion process of pre/post projection#4245
[GLUTEN-4213][CORE] Refactoring insertion process of pre/post projection#4245liujiayi771 wants to merge 12 commits intoapache:mainfrom
Conversation
|
Run Gluten Clickhouse CI |
90c92e6 to
3ee879f
Compare
|
Run Gluten Clickhouse CI |
|
@zhztheplayer @rui-mo Could you help review? I have already validated this modification on TPCDS. Using this framework to insert pre/post projection can eliminate a significant amount of redundant code in the transformer. The previous approach required many if-else branches based on whether to insert projection and whether it was for validation. It also eliminated the need to construct projection based on an index. |
|
@waitinfuture Could you help review? |
3ee879f to
f89ffc0
Compare
|
Run Gluten Clickhouse CI |
|
Design doc #4213 (comment) |
f89ffc0 to
f23ee20
Compare
|
Run Gluten Clickhouse CI |
rui-mo
left a comment
There was a problem hiding this comment.
Thanks for the nice refactor. In the meantime, could you also check if metrics work well? Some relevant code to handle the metrics of pre/post projection could be removed.
| expr => | ||
| expr.filter match { | ||
| case None | Some(_: Attribute) | Some(_: Literal) => | ||
| case None | Some(_: Attribute) => |
There was a problem hiding this comment.
Why literal is removed here?
There was a problem hiding this comment.
There are two reasons for this:
- If the filter condition is a Literal, it can only be of boolean type. Such filters are typically removed during the optimization process in Spark. You can refer to the "EliminateAggregateFilter" Rule in Spark for more information.
- filter in velox only support
FieldAccessTypedExpr, but it is possible that CK supports Literal filter.
|
Seems there is a PR #3649 proposing the similar refactor. @ulysses-you @liujiayi771 Could you help check on that? Thanks. |
|
The work done by these two PRs is essentially the same, with the difference being that #3649 modifies the logical plan, while my PR modifies the physical plan, and my PR also support post-projection for agg. For the sort operator, both pre and post projection can be modified in the logical plan. For agg, the pre projection can be modified in the logical plan, but the post projection can only be modified in the physical plan if the native output doesn't match with the resultExpressions in Spark's output. Initially, I also considered doing it in the logical plan to avoid impacting validation and AQE. I think we can combine both approaches, doing the parts that can be done in the logical plan in the logical rule, but for the sort in TakeOrderedAndProjectExecTransformer, it should be done only in the physical plan. @ulysses-you I didn't notice your PR before. I searched for issues related to project but did not check the pull requests. I would like to hear your opinion, as our approaches are fairly similar. |
f23ee20 to
f556a66
Compare
|
Run Gluten Clickhouse CI |
|
I think the main goal to pull out pre/post project is:
One option is that, we can do pull out pre-project at logical side and do pull out post-project at columnar side. |
|
@ulysses-you Aggree with you. I can continue to modify the pre-projection part into logical rule if you'd like, or would you prefer to continue working on #3649? |
|
@liujiayi771 it's fine to go ahead in this pr, thank you |
|
@ulysses-you I have identified an issue where, if we modify the logical plan, the extendedOperatorOptimizationRules we insert is placed before DecimalAggregates. DecimalAggregates converts sum/avg(decimal attr) into sum/decimal(unscaledValue(decimal attr)), but the unscaledValue cannot be seen in our rule. This results in the required pre-project not being added. Maybe we should use ExperimentalMethods.extraOptimizations or postHocOptimizationBatches? I currently do not know how to use postHocOptimizationBatches. |
|
|
Yes, this approach will work, but do you think it is reasonable to use |
|
@ulysses-you One method I can think of is to add a rule through injectCheckRule, before the optimization step. This rule would only perform the modifications on the sparkSession. However, this approach might be considered as a hack. case class AddExtraOptimizations(sparkSession: SparkSession) extends (LogicalPlan => Unit) {
override def apply(plan: LogicalPlan): Unit = {
sparkSession.experimental.extraOptimizations = sparkSession.experimental.extraOptimizations ++
Seq(InsertPreProject)
}
} |
|
I think it's ok, Spark won't remove public interface in general. We can argue that if someone create a pr to remove it. |
|
Run Gluten Clickhouse CI |
03828ef to
00211d1
Compare
|
Run Gluten Clickhouse CI |
00211d1 to
8059246
Compare
1d17f36 to
6227408
Compare
|
Run Gluten Clickhouse CI |
6227408 to
f4e968f
Compare
|
Run Gluten Clickhouse CI |
1 similar comment
|
Run Gluten Clickhouse CI |
c868592 to
b4cdbea
Compare
|
Run Gluten Clickhouse CI |
zhztheplayer
left a comment
There was a problem hiding this comment.
Thanks!
And would you like to update the PR description to add some words to summarize the newly added rules? This would help others understand the changes.
(probably including)
- ColumnarPullOutProject (ColumnarPullOutPostProject + ColumnarPullOutPreProject)
- GlutenPlanPullOutProject
- PullOutProject
| /** | ||
| * Merge the results of two ValidationResult objects, including combining the reasons message for | ||
| * invalid ValidationResult. | ||
| * - valid merge valid = valid | ||
| * - invalid merge valid = invalid | ||
| * - invalid merge invalid = invalid | ||
| */ | ||
| def merge(first: ValidationResult, second: ValidationResult): ValidationResult = { | ||
| if (first.isValid && second.isValid) { | ||
| ok | ||
| } else { | ||
| val reasonStr = first.reason.getOrElse("") + second.reason.getOrElse("") | ||
| notOk(reasonStr) | ||
| } | ||
| } |
There was a problem hiding this comment.
I didn't find usage of this method. Am I missing some thing?
There was a problem hiding this comment.
Yes, we can remove it now.
| val pulledOutSortExec = | ||
| ColumnarPullOutProject.getPulledOutPlanLocally[SortExec](sortExec) |
There was a problem hiding this comment.
Need for this kind of statement would mean rule AddTransformHintRule is coupled with ColumnarPullOutProject.
So do we have chance to make ColumnarPullOutProject more independent? I think we had the design to allow a rule rely on tags generated by AddTransformHintRule but probably we'd better to avoid AddTransformHintRule from depending on other rules.
There was a problem hiding this comment.
ColumnarPullOutProject will pull out ProjectExec and need to verify if ProjectExec can be converted to native plan. If we want to decouple ColumnarPullOutProject from TransformHintRule, we can place it before TransformHintRule, which is also feasible. Initially, I implemented it this way, but I encountered an issue where ClickHouse's custom agg #3629 (comment) throws an exception when determining if post-project is needed. It may require ClickHouse's assistance to redesign the API for custom agg and not rely on throwing exceptions in getAttrsIndexForExtensionAggregateExpr for fallback, but instead trigger the fallback logic in doValidationInternal.
We can proceed with the modifications step by step. For now, let's place the validation of ProjectExec within the rule itself. ColumnarPullOutProject will only validate ProjectExec. I understand that there are similar codes in other places in Gluten that tag hints, and we can handle them together later.
There was a problem hiding this comment.
I understand that there are similar codes in other places in Gluten that tag hints, and we can handle them together later.
Actually I think we may allow adding validation tags to the original physical plan in rules other than AddTransformHintRule as of now, just as if multiple rules are doing validation together. However I am not sure whether the code you mentioned above can be a good case so it might still be needed to optimize, I'll take a look as well.
I encountered an issue where ClickHouse's custom agg #3629 (comment) throws an exception when determining if post-project is needed.
This is interesting... So my feeling is we may have to re-think how should we handle backend-specific pre/post project creation code when doing the refactor. Say, if backend A has some specialized conditions to decide whether a project should be pulled out from a plan node, the we'd provide extensibility to have it customized?
Also in the patch, code of the new feature is currently located into several places, including logical optimization, validation (transform hint), and physical optimization (the actual pulling logic). So I slightly feel that the complexity added to Gluten is a little bit higher than what we expected? Do we have chance to reduce? At the same time I am just more worried about coupling of the rules in this PR. Do you think we can add some new methods to backend API to deal with the CH Agg issue you mentioned?
There was a problem hiding this comment.
The part about "custom agg" may not have been expressed clearly. Currently, both CH and Velox require a "post project" process. The logic is the same, which is to convert the output on the native side to a consistent output for Spark. What I mean is that CH throws an exception when retrieving the native output, which requires validation before pulling it out. It doesn't mean that CH "post project" process is different from Velox's and has custom requirements. In fact, it is about the getAttrForAggregateExprs method that retrieves the actual output of the aggregation. Based on this output, a "post-project" is constructed. Velox also has validation logic that throws exceptions. I also hope to include this logic in the doValidationInternal method like this. CH has custom aggregation requirements, and validation is also performed when retrieving the output for custom agg. For example, the CustomSum only supports Final code. Since I am not familiar with the specific logic of other custom agg in CH, I cannot move the validation logic into doValidationInternal like in Velox. It may require CH's developers to redesign this part. However, this part is not essential and can be improved in future development.
There was a problem hiding this comment.
Initially, I implemented it this way, but I encountered an issue where ClickHouse's custom agg #3629 (comment) throws an exception when determining if post-project is needed.
@liujiayi771 Maybe we could try to call getAttrForAggregateExprs method in doValidationInternal for CH backend. With this issue solved, can we make ColumnarPullOutProject independent?
gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala
Outdated
Show resolved
Hide resolved
|
Run Gluten Clickhouse CI |
|
@JkSelf Could you also help to take a look? |
|
|
||
| override def outputOrdering: Seq[SortOrder] = sortOrder | ||
| override def outputOrdering: Seq[SortOrder] = child match { | ||
| case project: ProjectExecTransformer if ProjectTypeHint.isPreProject(project) => |
There was a problem hiding this comment.
@JkSelf The issue with outputOrdering encountered earlier is currently resolved as this.
7ca2d66 to
9b2130b
Compare
|
Run Gluten Clickhouse CI |
| true | ||
| case _ => false | ||
| }.isDefined) | ||
| case Sort(order, _, _) => |
There was a problem hiding this comment.
@liujiayi771 We have added a sort check in the needPreProject method. However, it appears that the logic for handling sort operators is not being added in this context here.
There was a problem hiding this comment.
I used to pullout project for Sort in LogicalPlan rule. But in this way, the outputOrdering issue cannot be solved easily. So I move this logical to SparkPlan rule, and this will not have performance issues like agg. We can remove the Sort case in this place.
There was a problem hiding this comment.
Could you provide more information about the outputOrdering issue you mentioned? Maybe i missed some context. Thanks.
| // post-projection is needed. | ||
| true | ||
| } | ||
| case _ => false |
There was a problem hiding this comment.
Do we need the sort check here?
There was a problem hiding this comment.
Sort is different from Agg, if it has pre-project, it always needs a post-project, so I pullout pre and post project together in ColumnarPullOutPreProject for SortExec.
rui-mo
left a comment
There was a problem hiding this comment.
Thanks for your work. Added several comments.
| val pulledOutSortExec = | ||
| ColumnarPullOutProject.getPulledOutPlanLocally[SortExec](sortExec) |
There was a problem hiding this comment.
Initially, I implemented it this way, but I encountered an issue where ClickHouse's custom agg #3629 (comment) throws an exception when determining if post-project is needed.
@liujiayi771 Maybe we could try to call getAttrForAggregateExprs method in doValidationInternal for CH backend. With this issue solved, can we make ColumnarPullOutProject independent?
| * This rule will insert a pre-project in the child of operators such as Aggregate, Sort, Join, | ||
| * etc., when they involve expressions that need to be evaluated in advance. | ||
| */ | ||
| case class PullOutProject(session: SparkSession) |
There was a problem hiding this comment.
Why this class is named as PullOutProject if it aims to insert a pre-project? I also feel we are lacking some key information in the class descriptions. E.g. PullOutProject works on logical plan level, what cases are covered in this rule, and what are the steps to insert a project.
| } | ||
| } | ||
|
|
||
| object ColumnarPullOutProject extends Rule[SparkPlan] { |
There was a problem hiding this comment.
For the newly introduced rules, maybe we can provide more information about their functionality and usage. For this one, especially the difference with PullOutPreProject.
| override def apply(plan: SparkPlan): SparkPlan = applyPullOutColumnarPreRules(plan) | ||
| } | ||
|
|
||
| case class ColumnarPullOutPostProject(validation: Boolean = false) |
| child = preProject | ||
| ) | ||
| newSort.copyTagsFrom(sort) | ||
| ProjectExecTransformer(sort.child.output, newSort).fallbackIfInvalid |
There was a problem hiding this comment.
Should we throw for other plan?
| } | ||
|
|
||
| /** This rule only used for situation that directly create GlutenPlan. */ | ||
| object GlutenPlanPullOutProject extends Rule[SparkPlan] with PullOutProjectHelper { |
There was a problem hiding this comment.
What does directly create GlutenPlan mean? Better to clarify a bit. It seems only Sort is covered in this rule, can we add the reason?
| override protected def withNewChildInternal(newChild: SparkPlan): ProjectExecTransformer = | ||
| copy(child = newChild) | ||
|
|
||
| def fallbackIfInvalid: SparkPlan = { |
There was a problem hiding this comment.
If validation fails, fallback to vanilla Spark and add
NotTransformabletag.
The same functionality should be covered by existing rules. Is it possible to remove the duplicate check here?
| .isDefined && plan.getTagValue(TAG).get.isInstanceOf[PRE_PROJECT] | ||
| } | ||
|
|
||
| def tagPostProject(plan: SparkPlan): Unit = { |
There was a problem hiding this comment.
What kind of project is regarded as post-project? Maybe add a clear definition here. Same for pre-project.
|
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days. |
|
This PR was auto-closed because it has been stalled for 10 days with no activity. Please feel free to reopen if it is still valid. Thanks. |
What changes were proposed in this pull request?
Implement #4213.
Introduced three kinds of Rules.
PullOutProject. Pulling out pre-project at theLogicalPlanlevel. Currently, it only supports the Velox backend and can reduce the number of pre-projects when agg includes distinct.ColumnarPullOutProject(ColumnarPullOutPostProject+ColumnarPullOutPreProject). Pulling out pre/post-project at theSparkPlanlevel.PullOutProjectcannot handle all scenarios (e.g.,Aggregateintroduced byInjectRuntimeFilterwill be executed beforePullOutProject, and someExpressions will be generated inStrategy). The missing parts will be handled completely byColumnarPullOutProject. Some information required for post-project is more easily obtained at the physical plan level, hence it is handled there.GlutenPlanPullOutProject. Handling the case of constructing a Gluten transformer directly inTakeOrderedAndProjectExecTransformer.Currently, only agg and sort have been incorporated into this framework. In the future, support for operators such as join and window that require pre/post projection will be added.
Next steps:
How was this patch tested?
Exists CI.