Skip to content

Commit

Permalink
apacheGH-36482: [C++][CI] Fix sporadic test failures in AsofJoinBasic…
Browse files Browse the repository at this point in the history
…Test (apache#36499)

### What changes are included in this PR?

The key hasher is invalidated before the first invocation of `GetKey` (via `GetLatestKey`) after a new batch arrives. In the pre-PR code, this invalidation happens within `Advance`, which is called from `AdvanceAndMemoize` only after `GetLatestKey` is called. The change adds synchronization between the input-receiving- and processing- threads, because avoiding that would require a more complicated and brittle change, e.g., one that involves detecting in the processing thread when a new batch was added to the queue in order to invalidate the key hasher at that time.

### Are these changes tested?

Yes, by existing tests.

### Are there any user-facing changes?

No.

**This PR contains a "Critical Fix".**
* Closes: apache#36482

Authored-by: Yaron Gvili <rtpsw@hotmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
  • Loading branch information
rtpsw authored and chelseajonesr committed Jul 20, 2023
1 parent 809f028 commit 8edde38
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ class KeyHasher {
size_t index_;
std::vector<col_index_t> indices_;
std::vector<KeyColumnMetadata> metadata_;
const RecordBatch* batch_;
std::atomic<const RecordBatch*> batch_;
std::vector<HashType> hashes_;
LightContext ctx_;
std::vector<KeyColumnArray> column_arrays_;
Expand Down Expand Up @@ -819,7 +819,6 @@ class InputState {
have_active_batch &= !queue_.TryPop();
if (have_active_batch) {
DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0); // empty batches disallowed
key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache
memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), 0)); // time changed
}
}
Expand Down Expand Up @@ -897,7 +896,8 @@ class InputState {

Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
if (rb->num_rows() > 0) {
queue_.Push(rb); // only after above updates - push batch for processing
key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache
queue_.Push(rb); // only now push batch for processing
} else {
++batches_processed_; // don't enqueue empty batches, just record as processed
}
Expand Down

0 comments on commit 8edde38

Please sign in to comment.