Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32628][SQL] Use bloom filter to improve dynamic partition pruning #38464

Closed
wants to merge 4 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Nov 1, 2022

What changes were proposed in this pull request?

This PR enhances DPP to use bloom filters if spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is disabled and build plan can't build broadcast by size and can reuse the existing shuffle exchanges.

Why are the changes needed?

Avoid job fail if spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is disabled:

select catalog_sales.* from  catalog_sales join catalog_returns  where cr_order_number = cs_sold_date_sk and cr_returned_time_sk < 40000;
20/08/16 06:44:42 ERROR TaskSetManager: Total size of serialized results of 494 tasks (1225.3 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

@wangyum
Copy link
Member Author

wangyum commented Nov 1, 2022

@github-actions github-actions bot added the SQL label Nov 1, 2022
@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @wangyum .

@@ -65,7 +70,7 @@ case class PlanAdaptiveDynamicPruningFilters(
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
} else if (onlyInBroadcast) {
DynamicPruningExpression(Literal.TrueLiteral)
} else {
} else if (canBroadcastBySize(buildPlan, conf)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be over estimated. The final plan has an Aggregate which may dramatically reduce the data size.

} else {
val childPlan = adaptivePlan.executedPlan
val reusedShuffleExchange = collectFirst(rootPlan) {
case s: ShuffleExchangeExec if s.child.sameResult(childPlan) => s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another tricky part: is reusing shuffle always better than starting a new query with column pruning?

@cloud-fan
Copy link
Contributor

I agree with using bloom filters, as the size estimation can be wrong and the build size can be too large that InSubquery can't work. However, this PR contains another optimization that forces shuffle reuse when building the subquery to build bloom filter. Can we do it later with more discussions? This is a general optimization that can apply in other places as well: InSubquery DPP, bloom filter join.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Feb 25, 2023
@github-actions github-actions bot closed this Feb 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants