Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-13268 [C++][Compute] Add ExecNode for semi and anti-semi join #10845

Closed
wants to merge 43 commits into from

Conversation

nirandaperera
Copy link
Contributor

Adding prelim impl of semi joins

@nirandaperera nirandaperera marked this pull request as draft July 30, 2021 20:06
@github-actions
Copy link

@michalursa
Copy link
Contributor

michalursa commented Jul 30, 2021

  1. The comment about UINT32_MAX probably needs to be updated

  2. It seems to me that ConsumeCachedProbeBatches is only called for a single thread index - the one for the thread that reaches completion of build_counter_.

  3. Because in StopProducing calls to Cancel() on two AtomicCounters are connected with ||, finished_.MarkFinished() can be called twice (first thread gets true from first counter Cancel() call and some other thread from the second Cancel() call). Also, shouldn't we always Cancel() both counters?

  4. I wonder what happens with an empty input on build side?

  5. I think we will only have one class for hash join. It should be fine to call it HashJoinNode and throw Status::NotImplemented() for join types outside of semi, anti-semi. JoinType enum could have elements from all other types as well. Also JoinType enum would be better as "enum class" although Arrow C++ probably has some policy about enums.

  6. I would rename build_side_complete_ to hash_table_built_ or hash_table_build_complete_. Currently I get it confused with build_counter_ checks, where one means all build side input batches consumed by local state hash tables, and the other means hash table merge is complete.

  7. Also it would be nice to tie these two conditions above to futures, so that a merge task and tasks to process cache probe side batches could be generated and scheduled to execute once these futures are complete. But the futures are not critical at this point, just something nice to have.

  8. Status returned from CacheProbeBatch is always OK()

  9. We probably don't support DictionaryArray in key columns in the code as it is right now, we should check and return Status::NotImplemented() when making hash join node (or make sure it works). Also there could be a scenario where one side of the join uses DictionaryArray while the other uses Array with the same underlying type for keys to compare.

  10. In BuildSideMerge() ARROW_DCHECK(state->grouper). Perhaps it is a copy-paste from group by node, but it would be good to have a comment about why it is not possible to have states 0 and 2 initialized but not 1. This is not obvious. And maybe it should just be relaxed to skip processing if the local thread state with a given index is not initialized.

  11. TotalReached() method added to AtomicCounter is not used anywhere.

  12. There is a problem with null key. I believe in hash join with equality condition it should be that "null != null" (and there is usually a separate comparison operator that treats nulls as equal), while in group by "null==null" when matching groups. We should have a comment about it and document it for the users (maybe we don't have documentation strings for exec nodes yet). If needed we would have to filter out null keys separately from Grouper.

@nirandaperera
Copy link
Contributor Author

nirandaperera commented Aug 2, 2021

2. It seems to me that ConsumeCachedProbeBatches is only called for a single thread index - the one for the thread that reaches completion of build_counter_.

Yes, thanks @michalursa. I missed this!

3. Because in StopProducing calls to Cancel() on two AtomicCounters are connected with ||, finished_.MarkFinished() can be called twice (first thread gets true from first counter Cancel() call and some other thread from the second Cancel() call). Also, shouldn't we always Cancel() both counters?

I see... The GroupByNode had this,

    if (input_counter_.Cancel()) {
      finished_.MarkFinished();
    } else if (output_counter_.Cancel()) {
      finished_.MarkFinished();
    }

and I was wondering why both the cases had the same code path. I thought it can be combined in a single statement.

So, do you mean to say that finished_.MarkFinished() should be called if build_counter_.Cancel() && out_counter_.Cancel()?

4. I wonder what happens with an empty input on build side?

This was my thought process. build_counter_ has -1 for total initially. So, until the build_input signals the InputFinished with 0, probe batches will be cached. And when it receives 0, it toggles build_side_complete_ and probe batches will be queried against an empty hashmap.
We could actually return a NullArray in the Grouper::Find method, prematurely (if the hashmap is empty). WDYT?

5. I think we will only have one class for hash join. It should be fine to call it HashJoinNode and throw Status::NotImplemented() for join types outside of semi, anti-semi. JoinType enum could have elements from all other types as well. Also JoinType enum would be better as "enum class" although Arrow C++ probably has some policy about enums.

Sure!

6. I would rename build_side_complete_ to hash_table_built_ or hash_table_build_complete_. Currently I get it confused with build_counter_ checks, where one means all build side input batches consumed by local state hash tables, and the other means hash table merge is complete.

Sure!

7. Also it would be nice to tie these two conditions above to futures, so that a merge task and tasks to process cache probe side batches could be generated and scheduled to execute once these futures are complete. But the futures are not critical at this point, just something nice to have.

I will think about this one! :-)

8. Status returned from CacheProbeBatch is always OK()

I'll make this void!

9. We probably don't support DictionaryArray in key columns in the code as it is right now, we should check and return Status::NotImplemented() when making hash join node (or make sure it works). Also there could be a scenario where one side of the join uses DictionaryArray while the other uses Array with the same underlying type for keys to compare.

Sure!

10. In BuildSideMerge() ARROW_DCHECK(state->grouper). Perhaps it is a copy-paste from group by node, but it would be good to have a comment about why it is not possible to have states 0 and 2 initialized but not 1. This is not obvious. And maybe it should just be relaxed to skip processing if the local thread state with a given index is not initialized.

Yes, it is a copy from the GroupBy impl.
Ah! Good catch! that is something I didnt think about! Are we talking about a case like this?
Ex: 4 threads, but only 1 input batch. So, before/while other batches being initialized, thread0 receives the batch and calls BuildSideMerge(). Now, other states could have null, and ideally we could continue the loop if that is the case (because it is guaranteed that those states wouldn't receive any more batches, because build_counter_ is already completed.)

11. TotalReached() method added to AtomicCounter is not used anywhere.

12. There is a problem with null key. I believe in hash join with equality condition it should be that "null != null" (and there is usually a separate comparison operator that treats nulls as equal), while in group by "null==null" when matching groups. We should have a comment about it and document it for the users (maybe we don't have documentation strings for exec nodes yet). If needed we would have to filter out null keys separately from Grouper.

I see... but it looks like Pandas holds null/NaN/na as a valid key and if the users want to, they have to explicitly drop na values.
https://stackoverflow.com/questions/23940181/pandas-merging-with-missing-values
I started a thread on this in Zulip https://ursalabs.zulipchat.com/#narrow/stream/180245-dev/topic/Null.20values.20as.20keys

@nirandaperera nirandaperera marked this pull request as ready for review August 4, 2021 23:00
# Conflicts:
#	cpp/src/arrow/CMakeLists.txt
#	cpp/src/arrow/compute/exec/exec_plan.cc
#	cpp/src/arrow/compute/exec/exec_plan.h
#	cpp/src/arrow/compute/exec/plan_test.cc
@nirandaperera
Copy link
Contributor Author

@lidavidm I added a simple verification to the tests and added the changes discussed.

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this! Just one question about the input parameter validation.

@@ -39,7 +39,7 @@ Status ValidateJoinInputs(const std::shared_ptr<Schema>& left_schema,
const std::shared_ptr<Schema>& right_schema,
const std::vector<int>& left_keys,
const std::vector<int>& right_keys) {
if (left_keys.size() != right_keys.size()) {
if (left_keys.size() != right_keys.size() && left_keys.size() > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it valid to join with no keys?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK I don't think it's valid. We'd need some indexer if no key columns are specified

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I replied by email and it seems to have gotten messed up. Then maybe this should be

if (left_keys.size() == 0) { return Status::Invalid(...); }
if (left_keys.size() != right_keys.size())) { ...}

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this. I added the changes now

cpp/src/arrow/compute/exec/hash_join_node.cc Outdated Show resolved Hide resolved
@lidavidm
Copy link
Member

lidavidm commented Aug 28, 2021 via email

@lidavidm
Copy link
Member

Thanks for this @nirandaperera. @westonpace and @michalursa any other comments?

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few nits

@@ -19,6 +19,8 @@

#include <atomic>
#include <cstdint>
#include <thread>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I hate to add this in late but I didn't notice it earlier. So far we have managed to keep <thread> out of the public API surface. Is there anyway you can push this into the util.cc? This utility (

uint64_t GetThreadId();
) can probably prevent you from having to resort to pimpl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. that can be done :-)

Comment on lines 2182 to 2185
// TODO(niranda) re-enable this!
// if (GrouperFastImpl::CanUse(descrs)) {
// return GrouperFastImpl::Make(descrs, ctx);
// }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make a JIRA for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh!!! I completely forgot about this TBH! :-( This needs to be added before merging this PR. I was waiting for #10858 PR to be merged to add this change!

Comment on lines +447 to +449
g.ExpectFind("[[3], [3]]", "[0, 0]");

g.ExpectFind("[[3], [3]]", "[0, 0]");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to run an identical test twice? Not sure if this is a copy/paste or you are testing for idempotence/deterministic behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that was the intent. I was following the tests for Consume method.

g.ExpectConsume("[[3], [3]]", "[0, 0]");
g.ExpectConsume("[[3], [3]]", "[0, 0]");

@@ -0,0 +1,18 @@
// Licensed to the Apache Software Foundation (ASF) under one
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and I will add a JIRA for this

michalursa added a commit to michalursa/arrow that referenced this pull request Sep 15, 2021
michalursa added a commit to michalursa/arrow that referenced this pull request Sep 15, 2021
michalursa added a commit to michalursa/arrow that referenced this pull request Sep 16, 2021
nealrichardson pushed a commit to michalursa/arrow that referenced this pull request Sep 23, 2021
@nirandaperera
Copy link
Contributor Author

I see that the #10858 PR is merged now. I will add the changes and rebase this ASAP.

michalursa added a commit to michalursa/arrow that referenced this pull request Sep 29, 2021
@nirandaperera
Copy link
Contributor Author

nirandaperera commented Sep 30, 2021

I added the GrouperFastImpl::Find. I tried reusing the GrouperFastImpl::ConsumeImplmethod, but it looks like parallel test cases are failing. Following is a local stacktrace I get

__GI_raise 0x00007fa593a0818b
__GI_abort 0x00007fa5939e7859
arrow::util::CerrLog::~CerrLog logging.cc:72
arrow::util::CerrLog::~CerrLog logging.cc:74
arrow::util::ArrowLog::~ArrowLog logging.cc:250
arrow::util::TempVectorStack::release util.h:101
arrow::util::TempVectorHolder<unsigned char>::~TempVectorHolder util.h:119
arrow::compute::Hashing::HashMultiColumn key_hash.cc:274
arrow::compute::internal::(anonymous namespace)::GrouperFastImpl::ConsumeImpl<true> hash_aggregate.cc:746
arrow::compute::internal::(anonymous namespace)::GrouperFastImpl::Find hash_aggregate.cc:804
arrow::compute::HashSemiJoinNode<false>::ConsumeProbeBatch hash_join_node.cc:373
arrow::compute::HashSemiJoinNode<false>::ConsumeCachedProbeBatches()::{lambda()#1}::operator()() hash_join_node.cc:314
arrow::internal::FnOnce<void ()>::FnImpl<arrow::compute::HashSemiJoinNode<false>::ConsumeCachedProbeBatches()::{lambda()#1}>::invoke() functional.h:152
arrow::internal::FnOnce<void ()>::operator()() && functional.h:140
arrow::internal::WorkerLoop thread_pool.cc:176
arrow::internal::ThreadPool::<lambda()>::operator()(void) const thread_pool.cc:336
std::__invoke_impl<void, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> >(std::__invoke_other, arrow::internal::ThreadPool::<lambda()> &&) invoke.h:60
std::__invoke<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> >(arrow::internal::ThreadPool::<lambda()> &&) invoke.h:95
std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > >::_M_invoke<0>(std::_Index_tuple<0>) thread:244
std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > >::operator()(void) thread:251
std::thread::_State_impl<std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > > >::_M_run(void) thread:195
<unknown> 0x00007fa594612de4
start_thread 0x00007fa593995609
clone 0x00007fa593ae4293

@michalursa I see that the new PR #11150 contains these test cases. I am wondering if this is something you encountered previously.

@lidavidm
Copy link
Member

Are you able to merge the other PR on top of this one and see if that makes any difference? If so we could merge the two one immediately after the other.

@lidavidm
Copy link
Member

Some of this ended up being pulled into ARROW-13642/#11047 which is now merged, so closing this PR. Thanks @nirandaperera!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants