Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Long time spent in function 'listJoinResults' for array mode join probe #9078

Closed
zhli1142015 opened this issue Mar 14, 2024 · 17 comments
Closed
Assignees
Labels
performance triage Newly created issue that needs attention.

Comments

@zhli1142015
Copy link
Contributor

zhli1142015 commented Mar 14, 2024

Bug description

We observed one join operator in TPCDS query72 (CBO off, 1TB TPCDS, 8*8 cores) with gluten / velox is very slow comparing with vanilla spark runs.

(19) ShuffledHashJoinExecTransformer
Left keys [1]: [cs_item_sk#14]
Right keys [1]: [inv_item_sk#122]
Join type: Inner
Join condition: (inv_quantity_on_hand#124 < cs_quantity#17)
-- HashJoin[INNER n0_0=n1_3, filter: lessthan("n0_2","n1_6")] -> n0_0:INTEGER, n0_1:INTEGER, n0_2:INTEGER, n0_3:INTEGER, n1_0:INTEGER, n1_1:INTEGER, n1_2:INTEGER, n1_3:INTEGER, n1_4:INTEGER, n1_5:BIGINT, n1_6:INTEGER, n1_7:INTEGER

Shuffle Join in TPCDS Query72
image

Through investigation, we found that half of the time in the join probe is spent within this function listJoinResults. Within this join, there is a significant number of duplicate rows on the build side. Additionally, in the current implementation, duplicate rows are linked together via the nextRow field. To traverse all duplicate rows, we would need to access the nextRow field of each row to obtain the address of the next duplicate row. This approach looks not very efficient.
image
This join operator can be reproduced by below simple query:

select count(*)
from catalog_sales
join inventory on (catalog_sales.cs_item_sk = inventory.inv_item_sk)
where cs_sold_date_sk >= 2452640
group by cs_item_sk

Proposed solution
We suggest storing the addresses of duplicate rows using a vector, as this would expedite the access to duplicate row addresses.

image

Below is the join after applying above proposal. we can see the probe latency reduces ~20 seconds.
Shuffle join in TPCDS Query72 after
image

System information

Velox System Info v0.0.2
Commit: 874f1dd
CMake Version: 3.22.1
System: Linux-5.15.146.1-microsoft-standard-WSL2
Arch: x86_64
C++ Compiler: /usr/bin/c++
C++ Compiler Version: 11.4.0
C Compiler: /usr/bin/cc
C Compiler Version: 11.4.0
CMake Prefix Path: /usr/local;/usr;/;/usr;/usr/local;/usr/X11R6;/usr/pkg;/opt

Relevant logs

No response

@zhli1142015 zhli1142015 added bug Something isn't working triage Newly created issue that needs attention. labels Mar 14, 2024
@zhli1142015
Copy link
Contributor Author

Hello @mbasmanova , could you help check this? Thanks.

@mbasmanova
Copy link
Contributor

@zhli1142015 Thank you for reporting and investigating this issue. I have some questions.

What is the performance of vanilla Spark on this query?

Within this join, there is a significant number of duplicate rows on the build side.

How many build-side rows are there per unique key? Is there a skew where some of the keys have a lot of build-side rows while others a few?

I see that number of join output rows is ~ 2x of the number of probe rows. Is it the case that only small subset of probe rows match, but they match with many build side rows? I don't have easy access to TPC-DS dataset, hence, cannot easily check these cardinalities myself.

Thank you also for proposing a solution. I see that the solution applies only to kArrayMode, but isn't this problem more generic and may happen in any hash mode? If that's the case, let's make sure the solution is generic as well.

To help evaluated proposed solution and, perhaps, iterate on other options, it would be helpful to create a benchmark that reproduces the issue. Would you be willing to help with that?

std::unordered_map<char*, std::shared_ptr<std::vector<char*>>> duplicateRows_;

I see you are using std::unordered_map. It might be more efficient to use folly::F14FastMap.

I also noticed that memory for duplicateRows_ is allocated via malloc directly and therefore is not accounted for in Velox memory pools. Let's make sure we allocate memory from a pool. See StlAllocator and AlignedStlAllocator in velox/common/memory/HashStringAllocator.h

I also noticed that you use std::shared_ptr over the vector. What's the motivation for doing that? Why no use std::vector directly?

Once we have a benchmark, it would be nice to check whether this optimization always works or if there is a regression in some cases, i.e. when the number of duplicates is low (2).

Let me know how you'd like to proceed.

CC: @Yuhta @xiaoxmeng

@mbasmanova mbasmanova added performance and removed bug Something isn't working labels Mar 14, 2024
@zhli1142015
Copy link
Contributor Author

zhli1142015 commented Mar 14, 2024

Thanks @mbasmanova for your suggestions.

What is the performance of vanilla Spark on this query?

In our test, when using Spark, the latency is 39 seconds, and when using Velox, the latency is 60 seconds. After applying this fix, the latency is reduced to 40 seconds.

Is it the case that only small subset of probe rows match, but they match with many build side rows?

Yes, this is the scenario I observed. I collected the sizes of all duplicate row vextors by logs. The average size is over sixty.

I see that the solution applies only to kArrayMode, but isn't this problem more generic and may happen in any hash mode?

I feel this problem is a common issue for all hash modes too. I fix it only for array mode, as this is the only pattern we obsrved in TPCDS. I can try different modes with benchmark to see if this is a common issue.

Do you think if it's ok to address your comments and include a benchmark in the same pull request?

Some comments from you:

  1. use folly::F14FastMap to replace std::unordered_map.
  2. memory for duplicateRows_ should be allocated from a pool.
  3. Use std::vector instead of shared pointer.

The scenarios to verify would include: 1) when the number of duplicates is low (e.g., 2), and 2) when there are a high number of duplicates (e.g., 100 or more). Please let me know if there are any other cases I should cover in the benchmark.
Thanks.

@Yuhta
Copy link
Contributor

Yuhta commented Mar 14, 2024

We also need to update the address list when we erase rows. I would suggest we put the list inside row container and avoid a second probe for duplicates.

@mbasmanova
Copy link
Contributor

The average size is over sixty.

Got it. Would you clarify a bit further about the distribution? For example, can you tell what are p25, p50, p90, p95 and max or describe the distribution in some other way?

I can try different modes with benchmark to see if this is a common issue.

That would be great. Thanks.

Do you think if it's ok to address your comments and include a benchmark in the same pull request?

I suggest to work with a single PR for now. Once we have the full solution we can decide whether it needs to be split into multiple PRs. The first step is to figure out in which cases we have a problem and what's the best way to address all these cases without regressing in other cases.

  1. when the number of duplicates is low (e.g., 2), and 2) when there are a high number of duplicates (e.g., 100 or more).

It would be nice to write the benchmark in a way that allows us to easily test different distributions, e.g. 50% row have 2 dups, 35 have 10 dups, 10 have 50 dups, 5 have 100 dups (or something along these lines).

@Yuhta
Copy link
Contributor

Yuhta commented Mar 14, 2024

Also would be nice if we have a benchmark for erasing performance. This list will be faster to traverse but slower to update.

@zhli1142015
Copy link
Contributor Author

zhli1142015 commented Mar 18, 2024

The average size is over sixty.

Got it. Would you clarify a bit further about the distribution? For example, can you tell what are p25, p50, p90, p95 and max or describe the distribution in some other way?

I can try different modes with benchmark to see if this is a common issue.

That would be great. Thanks.

Do you think if it's ok to address your comments and include a benchmark in the same pull request?

I suggest to work with a single PR for now. Once we have the full solution we can decide whether it needs to be split into multiple PRs. The first step is to figure out in which cases we have a problem and what's the best way to address all these cases without regressing in other cases.

  1. when the number of duplicates is low (e.g., 2), and 2) when there are a high number of duplicates (e.g., 100 or more).

It would be nice to write the benchmark in a way that allows us to easily test different distributions, e.g. 50% row have 2 dups, 35 have 10 dups, 10 have 50 dups, 5 have 100 dups (or something along these lines).

Hello,
I updated the PR according to @Yuhta 's comment. I reuse the nextOffset_ slot to store the pointer to the next rows vector, thus eliminating the need for maintaining an additional hash table (std::unordered_map).

[This is solved.] Additionally, I've noticed that multiple threads may simultaneously allocate memory for different next row vectors when parallel table building is enabled. Therefore, it's essential to add a mutex to protect the HashStringAllocator access during next row vector memory allocation.

Regarding row earsing, I believe there's no need to update the content of next row vectors, as rows with identical keys are always partitioned to the same partition. I add logic to release references to the next row vectors during earsing.

Below is the benchmark comparsion:
Before this PR:

============================================================================
[...]marks/HashJoinListResultBenchmark.cpp     relative  time/iter   iters/s
============================================================================
array_1_20%:1;80%:0;                                         1.68s   595.44m
array_1_20%:5;80%:0;                                         2.94s   339.57m
array_1_20%:10;80%:0;                                        5.78s   173.03m
array_1_20%:20;80%:0;                                       18.78s    53.24m
array_1_20%:50;80%:0;                                      1.90min     8.79m
array_1_10%:5;10%:1;80%:0;                                   2.28s   438.36m
array_1_10%:10;10%:5;10%:1;70%:0;                            4.80s   208.17m
array_1_10%:20;10%:10;10%:5;10%:1;60%:0;                    12.97s    77.13m
array_1_10%:50;10%:20;10%:10;10%:5;10%:1;50%:0;            1.21min    13.77m
normalized key_1_20%:1;80%:0;                                3.24s   308.64m
normalized key_1_20%:5;80%:0;                                5.18s   192.90m
normalized key_1_20%:10;80%:0;                               7.74s   129.18m
normalized key_1_20%:20;80%:0;                              19.83s    50.43m
normalized key_1_20%:50;80%:0;                             2.00min     8.35m
normalized key_1_10%:5;10%:1;80%:0;                          3.92s   254.99m
normalized key_1_10%:10;10%:5;10%:1;70%:0;                   6.66s   150.18m
normalized key_1_10%:20;10%:10;10%:5;10%:1;60%:             16.18s    61.79m
normalized key_1_10%:50;10%:20;10%:10;10%:5;10%            1.30min    12.85m
hash_1_20%:1;80%:0;                                          5.05s   197.96m
hash_1_20%:5;80%:0;                                          6.92s   144.52m
hash_1_20%:10;80%:0;                                        10.37s    96.40m
hash_1_20%:20;80%:0;                                        23.18s    43.15m
hash_1_20%:50;80%:0;                                       1.93min     8.64m
hash_1_10%:5;10%:1;80%:0;                                    5.49s   182.30m
hash_1_10%:10;10%:5;10%:1;70%:0;                             7.97s   125.52m
hash_1_10%:20;10%:10;10%:5;10%:1;60%:0;                     15.85s    63.10m
hash_1_10%:50;10%:20;10%:10;10%:5;10%:1;50%:0;             1.18min    14.10m

After this PR:

============================================================================
[...]marks/HashJoinListResultBenchmark.cpp     relative  time/iter   iters/s
============================================================================
array_1_20%:1;80%:0;                                         1.86s   536.34m
array_1_20%:5;80%:0;                                         2.42s   413.87m
array_1_20%:10;80%:0;                                        2.58s   387.93m
array_1_20%:20;80%:0;                                        3.19s   313.91m
array_1_20%:50;80%:0;                                        4.14s   241.31m
array_1_10%:5;10%:1;80%:0;                                   2.06s   484.59m
array_1_10%:10;10%:5;10%:1;70%:0;                            2.87s   348.62m
array_1_10%:20;10%:10;10%:5;10%:1;60%:0;                     3.98s   251.33m
array_1_10%:50;10%:20;10%:10;10%:5;10%:1;50%:0;              5.52s   181.20m
normalized key_1_20%:1;80%:0;                                3.47s   288.24m
normalized key_1_20%:5;80%:0;                                4.50s   222.15m
normalized key_1_20%:10;80%:0;                               4.65s   215.19m
normalized key_1_20%:20;80%:0;                               5.65s   177.02m
normalized key_1_20%:50;80%:0;                               6.85s   145.95m
normalized key_1_10%:5;10%:1;80%:0;                          3.85s   259.83m
normalized key_1_10%:10;10%:5;10%:1;70%:0;                   5.07s   197.23m
normalized key_1_10%:20;10%:10;10%:5;10%:1;60%:              6.48s   154.24m
normalized key_1_10%:50;10%:20;10%:10;10%:5;10%              8.17s   122.33m
hash_1_20%:1;80%:0;                                          5.12s   195.24m
hash_1_20%:5;80%:0;                                          5.98s   167.21m
hash_1_20%:10;80%:0;                                         6.34s   157.69m
hash_1_20%:20;80%:0;                                         7.30s   137.01m
hash_1_20%:50;80%:0;                                         8.41s   118.86m
hash_1_10%:5;10%:1;80%:0;                                    5.53s   180.85m
hash_1_10%:10;10%:5;10%:1;70%:0;                             6.72s   148.79m
hash_1_10%:20;10%:10;10%:5;10%:1;60%:0;                      8.11s   123.33m
hash_1_10%:50;10%:20;10%:10;10%:5;10%:1;50%:0;               9.23s   108.29m

Based on the observations above, listJoinResult is expected to perform faster in most scenarios, barring cases where all rows are duplicated very few times (e.g., array_1_20%:1;80%:0).

Please let me know if you have any more comments for this.

@zhli1142015
Copy link
Contributor Author

The average size is over sixty.

Got it. Would you clarify a bit further about the distribution? For example, can you tell what are p25, p50, p90, p95 and max or describe the distribution in some other way?

Build key (cs_item_sk) duplicate times distribution statistics are listed below, please let me know if any information is need.
image

@mbasmanova
Copy link
Contributor

@zhli1142015

Build key (cs_item_sk) duplicate times distribution statistics are listed below

Thank you for sharing. Apparently all keys repeat quite a bit. 34 times min and 60 on average.

@mbasmanova
Copy link
Contributor

Additionally, I've noticed that multiple threads may simultaneously allocate memory for different next row vectors when parallel table building is enabled.

I assume each thread processes rows from a single "partition", e.g. there is no key overlap between threads. If that's the case, perhaps, each thread can create its own set of next-row-vectors.

@mbasmanova
Copy link
Contributor

array_1_20%:1;80%:0;

Would you clarify how benchmark name should be interpreted? What different parts of the name mean?

@zhli1142015
Copy link
Contributor Author

Additionally, I've noticed that multiple threads may simultaneously allocate memory for different next row vectors when parallel table building is enabled.

I assume each thread processes rows from a single "partition", e.g. there is no key overlap between threads. If that's the case, perhaps, each thread can create its own set of next-row-vectors.

I created the next-row-vector using std::vector<char*, StlAllocator<char*>>(StlAllocator<char*>(stringAllocator_.get())), and below is the error I encountered in UTs. I think the issue is because that HashStringAllocator does not support simultaneous allocation from multi-threads. Please advise if there are other better ways to solve this problem.

E0319 20:25:07.423246 1806799 Exceptions.h:69] Line: /var/git/velox/velox/common/memory/HashStringAllocator.cpp:389, Function:allocateFromFreeLists, Expression: header != nullptr , Source: RUNTIME, ErrorCode: INVALID_STATE
E0319 20:25:07.423822 1806800 HashTable.cpp:840] Error in async hash build: Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Retriable: False
Expression: header != nullptr
Function: allocateFromFreeLists
File: /var/git/velox/velox/common/memory/HashStringAllocator.cpp
Line: 389
Stack trace:
# 0  facebook::velox::VeloxException::VeloxException(char const*, unsigned long, char const*, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, bool, facebook::velox::VeloxException::Type, std::basic_string_view<char, std::char_traits<char> >)
# 1  void facebook::velox::detail::veloxCheckFail<facebook::velox::VeloxRuntimeError, facebook::velox::detail::CompileTimeEmptyString>(facebook::velox::detail::VeloxCheckFailArgs const&, facebook::velox::detail::CompileTimeEmptyString)
# 2  facebook::velox::HashStringAllocator::allocateFromFreeLists(int, bool, bool)
# 3  facebook::velox::HashStringAllocator::allocate(int, bool)
# 4  void std::vector<char*, facebook::velox::StlAllocator<char*> >::_M_realloc_insert<char*&>(__gnu_cxx::__normal_iterator<char**, std::vector<char*, facebook::velox::StlAllocator<char*> > >, char*&)
# 5  facebook::velox::exec::RowContainer::appendNextRow(char*, char*, bool)
# 6  char* facebook::velox::exec::ProbeState::fullProbe<(facebook::velox::exec::ProbeState::Operation)1, facebook::velox::exec::HashTable<true>::buildFullProbe(facebook::velox::exec::ProbeState&, unsigned long, char*, bool, facebook::velox::exec::TableInsertPartitionInfo*)::{lambda(char*, int)#2}, facebook::velox::exec::HashTable<true>::buildFullProbe(facebook::velox::exec::ProbeState&, unsigned long, char*, bool, facebook::velox::exec::TableInsertPartitionInfo*)::{lambda(int, long)#1}, facebook::velox::exec::HashTable<true> >(facebook::velox::exec::HashTable<true>&, int, facebook::velox::exec::HashTable<true>::buildFullProbe(facebook::velox::exec::ProbeState&, unsigned long, char*, bool, facebook::velox::exec::TableInsertPartitionInfo*)::{lambda(char*, int)#2}, facebook::velox::exec::HashTable<true>::buildFullProbe(facebook::velox::exec::ProbeState&, unsigned long, char*, bool, facebook::velox::exec::TableInsertPartitionInfo*)::{lambda(int, long)#1}, long&, bool, facebook::velox::exec::TableInsertPartitionInfo*)
# 7  facebook::velox::exec::HashTable<true>::insertForJoin(char**, unsigned long*, int, facebook::velox::exec::TableInsertPartitionInfo*)
# 8  facebook::velox::exec::HashTable<true>::buildJoinPartition(unsigned char, std::vector<std::unique_ptr<facebook::velox::exec::RowPartitions, std::default_delete<facebook::velox::exec::RowPartitions> >, std::allocator<std::unique_ptr<facebook::velox::exec::RowPartitions, std::default_delete<facebook::velox::exec::RowPartitions> > > > const&, std::vector<char*, std::allocator<char*> >&)
# 9  std::_Function_handler<std::unique_ptr<bool, std::default_delete<bool> > (), facebook::velox::exec::HashTable<true>::parallelJoinBuild()::{lambda()#5}>::_M_invoke(std::_Any_data const&)
# 10 facebook::velox::AsyncSource<bool>::prepare()
# 11 folly::ThreadPoolExecutor::runTask(std::shared_ptr<folly::ThreadPoolExecutor::Thread> const&, folly::ThreadPoolExecutor::Task&&)
# 12 folly::CPUThreadPoolExecutor::threadRun(std::shared_ptr<folly::ThreadPoolExecutor::Thread>)
# 13 void folly::detail::function::FunctionTraits<void ()>::callSmall<std::_Bind<void (folly::ThreadPoolExecutor::*(folly::ThreadPoolExecutor*, std::shared_ptr<folly::ThreadPoolExecutor::Thread>))(std::shared_ptr<folly::ThreadPoolExecutor::Thread>)> >(folly::detail::function::Data&)
# 14 0x00000000000dc252
# 15 0x0000000000094ac2
# 16 0x000000000012684f

@mbasmanova
Copy link
Contributor

Please advise if there are other better ways to solve this problem.

See #9078 (comment)

@zhli1142015
Copy link
Contributor Author

zhli1142015 commented Mar 19, 2024

array_1_20%:1;80%:0;

Would you clarify how benchmark name should be interpreted? What different parts of the name mean?

{hash mode} _ {number of fields (keys + dependent fields)} _ {key repetition distribution}, for example: array_1_20%:1;80%:0 means the join is with array hash mode, and each build row consists of only one field. In the build side row vector, 20% of the rows have one duplication, while 80% of the rows have no duplication..

@zhli1142015
Copy link
Contributor Author

Please advise if there are other better ways to solve this problem.

See #9078 (comment)

each thread can create its own set of next-row-vectors.

I think even we do this, they also allocate memory from same HashStringAllocation instance, which still would trigger some wierd error like Function:allocateFromFreeLists, Expression: header != nullptr , Source: RUNTIME, ErrorCode: INVALID_STATE.
Or I don't get youtr point, could you help describe more details?
Thanks.

@mbasmanova
Copy link
Contributor

@zhli1142015 When building hash table in parallel, each HashTable has its own HSA, no?

@zhli1142015
Copy link
Contributor Author

Thanks, I think I get your point, updated the PR to remove the mutex for HSA.

yanngyoung pushed a commit to yanngyoung/velox that referenced this issue Apr 12, 2024
…or#9079)

Summary:
Problem
When there are a large number of rows with the same key in the build side,
the `listJoinResults` function becomes very time-consuming.

Design
`appendNextRow`
Create a next-row-vector if it doesn't exist. Append the row address to
the next-row-vector, and store the address of the next-row-vector in the
`nextOffset_` slot for all duplicate rows.

`listJoinResults`
To retrieve the addresses of all rows with the same keys, we first obtain the
address of the first row using the hash function, then, by the `nextOffset_`,
we retrieve the address of the next-row-vector. Then, we iterate through the
next-row-vector to obtain the addresses of the remaining rows. We can utilize
SIMD instructions to accelerate the next-row-vector access.

When a row needs to be erased, if value in `nextOffset_` slot is not null, then it
will be removed from corresponding next-row-vector and set it's `nextOffset_`
slot as null.

The current design is applicable to all hash modes.

Benchmark
The results indicate that this PR can accelerate the `listJoinResults` function,
with the acceleration effect becoming more pronounced as the proportion of
rows with the same key increases.

Fixes facebookincubator#9078

Pull Request resolved: facebookincubator#9079

Reviewed By: mbasmanova

Differential Revision: D55428528

Pulled By: Yuhta

fbshipit-source-id: dfce20c1ecad3eaddc6c5e024a3b21a800d54965
Joe-Abraham pushed a commit to Joe-Abraham/velox that referenced this issue Jun 7, 2024
…or#9079)

Summary:
Problem
When there are a large number of rows with the same key in the build side,
the `listJoinResults` function becomes very time-consuming.

Design
`appendNextRow`
Create a next-row-vector if it doesn't exist. Append the row address to
the next-row-vector, and store the address of the next-row-vector in the
`nextOffset_` slot for all duplicate rows.

`listJoinResults`
To retrieve the addresses of all rows with the same keys, we first obtain the
address of the first row using the hash function, then, by the `nextOffset_`,
we retrieve the address of the next-row-vector. Then, we iterate through the
next-row-vector to obtain the addresses of the remaining rows. We can utilize
SIMD instructions to accelerate the next-row-vector access.

When a row needs to be erased, if value in `nextOffset_` slot is not null, then it
will be removed from corresponding next-row-vector and set it's `nextOffset_`
slot as null.

The current design is applicable to all hash modes.

Benchmark
The results indicate that this PR can accelerate the `listJoinResults` function,
with the acceleration effect becoming more pronounced as the proportion of
rows with the same key increases.

Fixes facebookincubator#9078

Pull Request resolved: facebookincubator#9079

Reviewed By: mbasmanova

Differential Revision: D55428528

Pulled By: Yuhta

fbshipit-source-id: dfce20c1ecad3eaddc6c5e024a3b21a800d54965
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance triage Newly created issue that needs attention.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants