Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40193][SQL] Merge subquery plans with different filters #37630

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Aug 23, 2022

What changes were proposed in this pull request?

After #32298 we were able to merge scalar subquery plans. This PR is a follow-up improvement to the merging logic to be able to combine Filter nodes with different conditions if those conditions can be merged in an ancestor Aggregate node.

Consider the following query with 2 subqueries:

SELECT
  (SELECT avg(a) FROM t WHERE c = 1)
  (SELECT sum(a) FROM t WHERE c = 2)

where the subqueries can be merged to:

SELECT
  avg(a) FILTER (WHERE c = 1),
  sum(b) FILTER (WHERE c = 2)
FORM t
WHERE c = 1 OR c = 2

After this PR the 2 subqueries are merged to this optimized form:

== Optimized Logical Plan ==
Project [scalar-subquery#260 [].avg(a) AS scalarsubquery()#277, scalar-subquery#261 [].sum(b) AS scalarsubquery()#278L]
:  :- Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS mergedValue#286]
:  :  +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L]
:  :     +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
:  :        +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
:  :           +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet
:  +- Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS mergedValue#286]
:     +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L]
:        +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
:           +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
:              +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet
+- OneRowRelation

and physical form:

== Physical Plan ==
*(1) Project [Subquery scalar-subquery#260, [id=#148].avg(a) AS scalarsubquery()#277, ReusedSubquery Subquery scalar-subquery#260, [id=#148].sum(b) AS scalarsubquery()#278L]
:  :- Subquery scalar-subquery#260, [id=#148]
:  :  +- *(2) Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS mergedValue#286]
:  :     +- *(2) HashAggregate(keys=[], functions=[avg(a#264), sum(b#265)], output=[avg(a)#268, sum(b)#271L])
:  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=143]
:  :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#264) FILTER (WHERE propagatedFilter#285), partial_sum(b#265) FILTER (WHERE propagatedFilter#284)], output=[sum#288, count#289L, sum#290L])
:  :              +- *(1) Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
:  :                 +- *(1) Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
:  :                    +- *(1) ColumnarToRow
:  :                       +- FileScan parquet spark_catalog.default.t[a#264,b#265,c#266] Batched: true, DataFilters: [((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [Or(And(IsNotNull(c),EqualTo(c,1)),And(IsNotNull(c),EqualTo(c,2)))], ReadSchema: struct<a:int,b:int,c:int>
:  +- ReusedSubquery Subquery scalar-subquery#260, [id=#148]
+- *(1) Scan OneRowRelation[]

The PR introduces 2 configs:

  • spark.sql.planMerge.filterPropagation.enabled to disable filter merge and
  • spark.sql.planMerge.filterPropagation.maxCost to control how complex plans are allowed to be merged.

Why are the changes needed?

Performance improvement.

[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - Merge different filters off                   9526           9634          97          0.0   244257993.6       1.0X
[info] q9 - Merge different filters on                    3798           3881         133          0.0    97381735.1       2.5X

The performance improvement in case of q9 comes from merging 15 subqueries into 1 subquery (#32298 was able to merge 15 subqueries into 5).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing and new UTs.

@peter-toth
Copy link
Contributor Author

cc @cloud-fan, @sigmod

@@ -78,6 +77,9 @@ class SparkOptimizer(
PushPredicateThroughNonJoin,
RemoveNoopOperators) :+
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) :+
Batch("Merge Scalar Subqueries", Once,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved the MergeScalarSubqueries rule to the end of optimization phase, just before ReplaceCTERefWithRepartition. This is needed because we need to peek into the physical plans.

@@ -0,0 +1,627 @@
/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this file from catalyst to sql in the first commit: f53cddd, but unfortunately Git doesn't recognize the move as there are too many additions to the file: 269c75b

private def checkIdenticalPlans(
newPlan: LogicalPlan,
cachedPlan: LogicalPlan): Option[AttributeMap[Attribute]] = {
if (newPlan.canonicalized == cachedPlan.canonicalized) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[doubt] Does this works with V2 sources as well, considering earlyScanPushDownRules makes changes to the scan, hence changing the canonicalization of the scalar subqueries ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this doesn't work with DSv2 sources (nor did the the original #32298).

I'm planning to add DSv2 support in another follow-up PR. Probably with introducing an SupportsMerge interface that Scans can implement to merge with another Scan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #37711 to add support for DSv2 sources (only Parquet first).

@beliefer
Copy link
Contributor

beliefer commented Nov 7, 2022

@peter-toth Could you fix these conflicts. I want test this PR. Thank you!

@peter-toth
Copy link
Contributor Author

@peter-toth Could you fix these conflicts. I want test this PR. Thank you!

I've updated the PR with the latest master.

@peter-toth
Copy link
Contributor Author

@beliefer, I made a mistake previously with merging master (with SPARK-40618 changes) into this PR so I had to force-push to 56c287f. Please check the latest version.

@beliefer
Copy link
Contributor

We tested this PR and the results is:
image

cc @sigmod too.

@beliefer
Copy link
Contributor

@peter-toth Could you fix the conflicts again?

@peter-toth
Copy link
Contributor Author

@peter-toth Could you fix the conflicts again?

Sure, done.

@LuciferYang
Copy link
Contributor

LuciferYang commented Apr 18, 2023

Tested this pr using 10TB TPC-DS, the latency of q9 has been reduced by 83.39% in my production environment.

Master SPARK-40193 Percentage
q9 88895.32683 ms 14766.8049 ms 83.39%

also cc @wangyum FYI

@peter-toth
Copy link
Contributor Author

peter-toth commented Apr 24, 2023

I extracted the first commit of this PR, that just moves MergeScalarSubqueries from spark-catalyst to spark-sql, to #40932 to make the actual change of this PR clearer once that PR has been merged.

@peter-toth peter-toth changed the title [SPARK-40193][SQL] Merge subquery plans with different filters [WIP][SPARK-40193][SQL] Merge subquery plans with different filters Apr 24, 2023
@peter-toth peter-toth force-pushed the SPARK-40193-merge-filters branch 3 times, most recently from ebbe9d6 to 02e3a68 Compare August 2, 2023 11:54
@peter-toth peter-toth changed the title [WIP][SPARK-40193][SQL] Merge subquery plans with different filters [SPARK-40193][SQL] Merge subquery plans with different filters Aug 22, 2023
@@ -62,7 +62,7 @@ class SparkOptimizer(
RewriteDistinctAggregates) :+
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
PushDownPredicates) :+
Batch("Cleanup filters that cannot be pushed down", Once,
Batch("Cleanup filters that cannot be pushed down", FixedPoint(1),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because BooleanSimplification is not idempotent.

@peter-toth
Copy link
Contributor Author

peter-toth commented Aug 22, 2023

I've updated this PR, the latest version contains the discussed changes from theads of #42223:

cc @beliefer, @cloud-fan

@benjamin-j-c
Copy link

Hey, is this part of generalized subquery fusion? https://www.usenix.org/conference/osdi20/presentation/sarthi

@peter-toth
Copy link
Contributor Author

Hey, is this part of generalized subquery fusion? https://www.usenix.org/conference/osdi20/presentation/sarthi

No, this PR is not based on the above paper but our goals seems to be similar.
This PR merges scalar subquery plans only, but unfortunately it got stuck due to lack of reviews. But, if it ever gets accepted I would like to take the approach futher and do full common subplan elimination/merge...

@unigof
Copy link

unigof commented Dec 15, 2023

@peter-toth So exciting to see that you're still updating this PR!!

Is this pr base on spark 3.5? And support datasource v2?
Could you help to merge this pr to master

val mergeCost = if (filterPropagationSupported) Some(0d) else None

(cachedPlan, outputMap, None, None, mergeCost)
}.orElse(
(newPlan, cachedPlan) match {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI, I remember there are case match FileSourceScanPlan to check whether two FileSourceScanPlan can be merge in your old version, like the photo. Why it is not needed now?
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a logical plan optimization rule and in the previous version of this PR I was trying to peek into the physical plan by moving this rule to the last in the optimization phase and generate the physical plan of the scans + the adjacent projects/filters above it.
I did this to see if any of those projects/filters gets pushed down to physical scan (as column pruning or pushed partition or data filters). I prevented merging if the 2 physical scans differed (actually there was this PLAN_MERGE_IGNORE_PUSHED_DATA_FILTERS config to still allow merging if only pushed data filters differed) to avoid those cases that could cause performance degradation due to merging non-overlapping scans.
The problem with this approach was that:

  • The code was pretty complicated,
  • As most of the physical scans (e.g. Parquet/ORC) allow pushing down data filters so the default of PLAN_MERGE_IGNORE_PUSHED_DATA_FILTERS was true. But actually even data filter diference could cause non-overlapping scans in some physical scans.
  • This approach didn't work well with DSv2 as DSv2 physical scans can't be compared (they don't have comparable partition and data filters). To solve this I suggested a new SupportsMerge interface that DSv2 scans could implement to decide if merging makes sense. This was in a separete PR: [SPARK-40259][SQL] Support Parquet DSv2 in subquery plan merge #37711 and I implemented the interface for DSv2 Parquet only.

The new version of this PR dropped the physical plan comparison as mentioned here: #37630 (comment) and decides about merging based on costs. If the the sum of the cost differences between the original plans and the merged plan is lower than PLAN_MERGE_FILTER_PROPAGATION_MAX_COST then merging is enabled. The cost function might need some refinement: https://github.com/apache/spark/pull/37630/files#diff-5096416449daefcb91637508ae3e98a11c8ac66cae5b146b0937370115c1cbb1R734-R742 to support more expressions, but it already works for TPCDS q9.
This cost based new approach might also need some follow-up changes to make it work with DSv2, but definitely no huge changes from the DSv2 scans (like the SupportsMerge previously) are required.
This PR targets Spark 4.0 as new features are not backported to already released versions, but it could work with Spark 3.5 too.

Copy link

@unigof unigof Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add DSv2 support(especially parquet) for this pr?
I can test it's performance in our production env, thank you very much

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I realized that DSv2 support is still not simple to do with this cost based new PR. Also, I don't want to include that feature in this PR as this PR is already complicated enough.
But I rebased #37711 on top of this PR at: https://github.com/peter-toth/spark/commits/SPARK-40259-support-parquet-dsv2-in-plan-merge/ so you can test it there.

@peter-toth
Copy link
Contributor Author

@cloud-fan, @beliefer do you think we can move forward with this PR?

@@ -21,7 +21,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this line ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what to do with this PR. There doesn't seem to be much interrest in this improvement from the community, but I'm happy to fix this if we can move forward somehow...

My plan was:

  1. to allow filter merging for subqueries in this PR,
  2. and then extract the merging logic to be able to apply it on other areas of the plan,
  3. and then apply it on other areas like [SPARK-43025][SQL] Eliminate Union if filters have the same child plan #40661.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this optimization, and it has already been migrated into our company's internal branch by me. User cases similar to the tpcds q9 scenario will stand to benefit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I merged this PR into our private repository half years ago. I also want to promote this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 563cef9.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants