Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable #29079

Closed
wants to merge 7 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Jul 12, 2020

What changes were proposed in this pull request?

Based on a follow up comment in #28123, where we can coalesce buckets for shuffled hash join as well. The note here is we only coalesce the buckets from shuffled hash join stream side (i.e. the side not building hash map), so we don't need to worry about OOM when coalescing multiple buckets in one task for building hash map.

If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.

Refactor existing physical plan rule CoalesceBucketsInSortMergeJoin to CoalesceBucketsInJoin, for covering shuffled hash join as well.
Refactor existing unit test CoalesceBucketsInSortMergeJoinSuite to CoalesceBucketsInJoinSuite, for covering shuffled hash join as well.

Why are the changes needed?

Avoid shuffle for joining different bucketed tables, is also useful for shuffled hash join. In production, we are seeing users to use shuffled hash join to join bucketed tables (set spark.sql.join.preferSortMergeJoin=false, to avoid sort), and this can help avoid shuffle if number of buckets are not same.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added unit tests in CoalesceBucketsInJoinSuite for verifying shuffled hash join physical plan.

Performance number per request from @maropu

I was looking at TPCDS per suggestion from @maropu. But I found most of queries from TPCDS are doing aggregate, and only several ones are doing join. None of input tables are bucketed. So I took the approach to test a modified version of TPCDS q93 as

SELECT ss_ticket_number, sr_ticket_number
FROM store_sales
JOIN store_returns
ON ss_ticket_number = sr_ticket_number

And make store_sales and store_returns to be bucketed tables.

Physical query plan without coalesce:

ShuffledHashJoin [ss_ticket_number#109L], [sr_ticket_number#120L], Inner, BuildLeft
:- Exchange hashpartitioning(ss_ticket_number#109L, 4), true, [id=#67]
:  +- *(1) Project [ss_ticket_number#109L]
:     +- *(1) Filter isnotnull(ss_ticket_number#109L)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet default.store_sales[ss_ticket_number#109L] Batched: true, DataFilters: [isnotnull(ss_ticket_number#109L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_ticket_number)], ReadSchema: struct<ss_ticket_number:bigint>, SelectedBucketsCount: 2 out of 2
+- *(2) Project [sr_returned_date_sk#111L, sr_return_time_sk#112L, sr_item_sk#113L, sr_customer_sk#114L, sr_cdemo_sk#115L, sr_hdemo_sk#116L, sr_addr_sk#117L, sr_store_sk#118L, sr_reason_sk#119L, sr_ticket_number#120L, sr_return_quantity#121L, sr_return_amt#122, sr_return_tax#123, sr_return_amt_inc_tax#124, sr_fee#125, sr_return_ship_cost#126, sr_refunded_cash#127, sr_reversed_charge#128, sr_store_credit#129, sr_net_loss#130]
   +- *(2) Filter isnotnull(sr_ticket_number#120L)
      +- *(2) ColumnarToRow
         +- FileScan parquet default.store_returns[sr_returned_date_sk#111L,sr_return_time_sk#112L,sr_item_sk#113L,sr_customer_sk#114L,sr_cdemo_sk#115L,sr_hdemo_sk#116L,sr_addr_sk#117L,sr_store_sk#118L,sr_reason_sk#119L,sr_ticket_number#120L,sr_return_quantity#121L,sr_return_amt#122,sr_return_tax#123,sr_return_amt_inc_tax#124,sr_fee#125,sr_return_ship_cost#126,sr_refunded_cash#127,sr_reversed_charge#128,sr_store_credit#129,sr_net_loss#130] Batched: true, DataFilters: [isnotnull(sr_ticket_number#120L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_returns], PartitionFilters: [], PushedFilters: [IsNotNull(sr_ticket_number)], ReadSchema: struct<sr_returned_date_sk:bigint,sr_return_time_sk:bigint,sr_item_sk:bigint,sr_customer_sk:bigin..., SelectedBucketsCount: 4 out of 4

Physical query plan with coalesce:

ShuffledHashJoin [ss_ticket_number#109L], [sr_ticket_number#120L], Inner, BuildLeft
:- *(1) Project [ss_ticket_number#109L]
:  +- *(1) Filter isnotnull(ss_ticket_number#109L)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.store_sales[ss_ticket_number#109L] Batched: true, DataFilters: [isnotnull(ss_ticket_number#109L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_ticket_number)], ReadSchema: struct<ss_ticket_number:bigint>, SelectedBucketsCount: 2 out of 2
+- *(2) Project [sr_returned_date_sk#111L, sr_return_time_sk#112L, sr_item_sk#113L, sr_customer_sk#114L, sr_cdemo_sk#115L, sr_hdemo_sk#116L, sr_addr_sk#117L, sr_store_sk#118L, sr_reason_sk#119L, sr_ticket_number#120L, sr_return_quantity#121L, sr_return_amt#122, sr_return_tax#123, sr_return_amt_inc_tax#124, sr_fee#125, sr_return_ship_cost#126, sr_refunded_cash#127, sr_reversed_charge#128, sr_store_credit#129, sr_net_loss#130]
   +- *(2) Filter isnotnull(sr_ticket_number#120L)
      +- *(2) ColumnarToRow
         +- FileScan parquet default.store_returns[sr_returned_date_sk#111L,sr_return_time_sk#112L,sr_item_sk#113L,sr_customer_sk#114L,sr_cdemo_sk#115L,sr_hdemo_sk#116L,sr_addr_sk#117L,sr_store_sk#118L,sr_reason_sk#119L,sr_ticket_number#120L,sr_return_quantity#121L,sr_return_amt#122,sr_return_tax#123,sr_return_amt_inc_tax#124,sr_fee#125,sr_return_ship_cost#126,sr_refunded_cash#127,sr_reversed_charge#128,sr_store_credit#129,sr_net_loss#130] Batched: true, DataFilters: [isnotnull(sr_ticket_number#120L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_returns], PartitionFilters: [], PushedFilters: [IsNotNull(sr_ticket_number)], ReadSchema: struct<sr_returned_date_sk:bigint,sr_return_time_sk:bigint,sr_item_sk:bigint,sr_customer_sk:bigin..., SelectedBucketsCount: 4 out of 4 (Coalesced to 2)

Run time improvement as 50% of wall clock time:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join coalesce bucket off              1541           1664         106          1.9         535.1       1.0X
shuffle hash join coalesce bucket on               1060           1169          81          2.7         368.1       1.5X

@c21
Copy link
Contributor Author

c21 commented Jul 12, 2020

@maropu, @cloud-fan @gatorsmile @sameeragarwal Could you help check this PR? Thanks.

@maropu
Copy link
Member

maropu commented Jul 12, 2020

Could you show us performance numbers in the PR description, first? I think we need to check the trade-off between #parallelism and shuffle I/O.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comparing to SortMergeJoin, I think to coalesce bucketed tables on HashJoin has higher risk to fail due to OOM. Although this limits only to coalesce on stream side, we build hash map for each bucket on other side and it also sounds to OOM easily.

This feature is disabled by a config by default, so it may be okay. But we should be careful not to enable it by default later.

@c21
Copy link
Contributor Author

c21 commented Jul 13, 2020

We build hash map for each bucket on other side and it also sounds to OOM easily. This feature is disabled by a config by default, so it may be okay. But we should be careful not to enable it by default later.

@viirya, thanks for comment. I agree this feature should be selectively enabled, but sorry I don't see OOM has anything to do with this feature.

You are saying OOM is an issue for shuffled hash join on bucketed table, which I agree. This feature is coalescing on stream side (not touch build side at all), so I don't think it's adding any more risk for OOM on build side. As sort merge join is by default preferred over shuffled hash join, so when users enable shuffled hash join by config explicitly, they should already pay attention to OOM problem.

Am I miss anything? Thanks.

@viirya
Copy link
Member

viirya commented Jul 13, 2020

Assume you are joining two tables with 512 and 256 buckets. Without coalescing table, two tables might be shuffled to 1024 or more partitions. Building hash map is okay. When coalescing table, now you build hash map on each bucket. Each bucket now has much more data than shuffling case. It sounds more likely to OOM.

@c21
Copy link
Contributor Author

c21 commented Jul 13, 2020

@viirya, I see your point for coalescing reduces parallelism to cause more OOM on build side. I agree this can happen. All in all, this is a disable-by-default feature, and user can selectively enable it depending on their table size. But I think it's worth to have as it indeed helped our users in production for using shuffled hash join on bucketed tables.

Re OOM issue in shuffled hash join - I think we can add a fallback mechanism when building hash map and fall back to sort merge join if the size of hash map being too big to OOM (i.e. rethink https://issues.apache.org/jira/browse/SPARK-21505), we have been running this feature in production for years, and it works well.

@c21
Copy link
Contributor Author

c21 commented Jul 13, 2020

Could you show us performance numbers in the PR description, first? I think we need to check the trade-off between #parallelism and shuffle I/O.

@maropu, update PR description for one test query in TPCDS (with modification). Thanks.

@SparkQA
Copy link

SparkQA commented Jul 13, 2020

Test build #125736 has finished for PR 29079 at commit 64a95d1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member

cc @cloud-fan

@c21 c21 force-pushed the split-bucket branch 2 times, most recently from d0e12d6 to 43a59b9 Compare July 15, 2020 17:06
@SparkQA
Copy link

SparkQA commented Jul 15, 2020

Test build #125889 has finished for PR 29079 at commit 43a59b9.

  • This patch fails PySpark pip packaging tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125903 has finished for PR 29079 at commit 11d138b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Jul 16, 2020

The PR itself looks okay (Note that this is disabled by default and we can control this behaviour by a new separate config.) @cloud-fan @viirya @imback82

@maropu
Copy link
Member

maropu commented Jul 16, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125955 has finished for PR 29079 at commit 11d138b.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Jul 17, 2020

Addressed all comments besides the only one that - I am still keeping two ratio configs separately (SMJ and SHJ). Let me know if I need to change this. cc @maropu and @viirya, thanks.

@SparkQA
Copy link

SparkQA commented Jul 17, 2020

Test build #126032 has finished for PR 29079 at commit d620940.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Jul 19, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jul 19, 2020

Test build #126126 has finished for PR 29079 at commit 4c65c7f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Jul 20, 2020

@cloud-fan - all comments are addressed, wondering is there any other things needed for this PR? Thanks.

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks okay to me and I'll leave this to the others: @cloud-fan @viirya

@viirya
Copy link
Member

viirya commented Jul 21, 2020

It's too late today. I will take another look tomorrow if this is not merged yet.

@maropu
Copy link
Member

maropu commented Jul 21, 2020

Good night~, @viirya

@cloud-fan
Copy link
Contributor

LGTM except for some comments in the test

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented Jul 21, 2020

Test build #126263 has finished for PR 29079 at commit 7b20049.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 21, 2020

Test build #126266 has finished for PR 29079 at commit b1a8a92.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu closed this in 39181ff Jul 21, 2020
@maropu
Copy link
Member

maropu commented Jul 21, 2020

Thanks, all! Merged to master.

@c21
Copy link
Contributor Author

c21 commented Jul 21, 2020

Thank you all @maropu, @cloud-fan, @viirya and @imback82 for review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
7 participants