Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-32268][SQL] Bloom Filter Join #29065

Closed
wants to merge 4 commits into from
Closed

[WIP][SPARK-32268][SQL] Bloom Filter Join #29065

wants to merge 4 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Jul 10, 2020

What changes were proposed in this pull request?

Reduce the shuffle data can significantly improve the query performance and increase Spark cluster stability. There is a DPP-like way to filter out shuffle data. The main difference is that the bloom filter is used to filter the data(A simple Bloom filter benchmark). This PR add support this feature. The design document could be found here.

Why are the changes needed?

Improve query performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Evaluate dynamic Bloom Filter runtime-filtering by TPCDS.

SQL Shuffle stage origin data size Pruning shuffle stage data size Shuffle stage origin records Pruning shuffle stage records Disable shuffle pruning (second) Enable shuffle pruning (second)
q13 3.2 GiB 93.1 MiB 86,409,332 1,480,662 13 14
q16 158.2 GiB 270.7 MiB 7,136,969,739 10,295,246 84 36
q24a 355.6 GiB 39.7 GiB 13,428,037,922 1,504,810,137 660 432
q24b 355.6 GiB 39.7 GiB 13,428,037,922 1,504,810,137 660 492
q65 37.2 GiB 37.2 GiB 2,627,543,089 2,627,543,089 45 45
q72 40.9 GiB 1543.3 MiB 1,418,327,817 47,248,271 276 38
q80 8.3 GiB 8.2 GiB 295,853,928 292,353,065 37 36
q85 12.8 GiB 1508.2 MiB 329,635,219 37,337,231 16 12
q93 26.7 GiB 435.9 MiB 1,389,592,792 20,744,184 270 258
q94 87.6 GiB 1012.0 MiB 3,598,433,079 34,538,210 58 27
q95 640.5 GiB 344.6 GiB 872,596,314 793,654,526 414 402

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125627 has finished for PR 29065 at commit a47485b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class RuntimeBloomFilterPruningSubquery(
  • case class BuildBloomFilter(
  • case class InBloomFilter(bloomFilterExp: Expression, value: Expression)
  • case class BloomFilterSubqueryExec(

@jovany-wang
Copy link

jovany-wang commented Jul 22, 2020

Hi @wangyum , This is a nice PR to me. But some issues in my mind should be thrown here.

I didn't do more perf between MinMax and Bloom, but in my personal sense, these may effect the perf of different cases.
So how about making these things more general? like:

                          DynamicFilter
                                |
               Is the filtering key partitioned?
                        /                  \
                      Y                     N
                     /                       \
              DPP filter         Choose a best filter for it. (from MinMax, Bloom or other filters such as index filter, etc)
                                       Note: Not all of the filters can be pushed to scan.

That is just a rough idea, but the key point is to make DynamicFilter(or name it RuntimeFilter) more general(that means both of MinMaxFilter, BloomFilter and DPPFilter are DynamicFilter), so that it will be easy to get extended.

I have seen another proposal about RuntimeFilter(MinMax) before, so making things easy to be extended should be important as well as the perf result. em, maybe it's hard to make it more extendable.

Feel free to point my incorrect understanding out, thx.

@wangyum
Copy link
Member Author

wangyum commented Jul 28, 2020

@jovany-wang Thank you very much for your suggestion. I appreciate the time and effort you have spent to share your insightful comments, which will be seriously considered.

@SparkQA
Copy link

SparkQA commented Jul 28, 2020

Test build #126700 has finished for PR 29065 at commit da0a420.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum wangyum closed this Jul 28, 2020
@wangyum wangyum deleted the SPARK-32268 branch July 28, 2020 07:06
@wangyum wangyum restored the SPARK-32268 branch July 28, 2020 07:06
@wangyum wangyum reopened this Jul 28, 2020
@SparkQA
Copy link

SparkQA commented Jul 28, 2020

Test build #126705 has finished for PR 29065 at commit 94bfb36.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

with DynamicPruning
with Unevaluable {
with DynamicPruning
with Unevaluable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exprId: ExprId = NamedExpression.newExprId)
extends SubqueryExpression(buildQuery, Seq(pruningKey), exprId)
with DynamicPruning
with Unevaluable {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun
Copy link
Member

Hi, @wangyum . The doc and PR looks reasonable. Is there a plan for further update because there is [WIP] still?

@SparkQA
Copy link

SparkQA commented Aug 10, 2020

Test build #127281 has finished for PR 29065 at commit 94bfb36.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 8, 2020

Test build #128383 has finished for PR 29065 at commit 1a8cc9b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Oct 5, 2020

What's the current status of this PR? Waiting for reviews?

@maropu
Copy link
Member

maropu commented Oct 5, 2020

Reduce the shuffle data can significantly improve the query performance

btw, IMHO ShufflePruning looks a bit misleading. I thought first this PR targets at removing shuffle exchanges by runtime filters.

}
}

case class DynamicShufflePruningSubquery(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks DynamicPartitionPruningSubquery and DynamicShufflePruningSubquery are almost the same, so we need this new predicate? Could we add a value to represent a pruning type in a class field of DynamicPruningSubquery like this?

case class DynamicPruningSubquery(
    pruningKey: Expression,
    buildQuery: LogicalPlan,
    buildKeys: Seq[Expression],
    broadcastKeyIndex: Int,
    onlyInBroadcast: Boolean,
    exprId: ExprId,
    pruningType: PruningType) <---- This?

val hasBenefit = pruningHasBenefit(r, partScan, l, left)
newRight = insertPartitionPredicate(r, newRight, l, left, leftKeys, hasBenefit)
// shuffle pruning
case None if conf.dynamicShufflePruningEnabled && canPruneRight(joinType) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new feature is enabled only if both dynamicPartitionPruningEnabled and dynamicShufflePruningEnabled are true?

DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
val broadcastValues = SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange)
if (preferBloomFilter(buildKeys(broadcastKeyIndex), buildPlan)) {
DynamicPruningExpression(BloomFilterSubqueryExec(value, broadcastValues, exprId))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this PR propose two things: 1. improving the existing part pruning by bloom filters and 2. implementing a new dynamic pruning strategy (shuffle pruning)?

s"Bloom filter only supports atomic types, but got ${colType.catalogString}.")

val updater: (BloomFilter, InternalRow) => Unit =
(filter, row) => BloomFilterUtils.putValue(filter, row.get(0, colType))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change can cause perf. regression because the pattern matching of colType happens every time updater called.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 14, 2021
@github-actions github-actions bot closed this Jan 15, 2021
@wangyum
Copy link
Member Author

wangyum commented Aug 15, 2021

Some real cases of our cluster.
Case 1:

Before this PR After this PR
image image

Case 2:

Before this PR After this PR
image image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants