diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 98e5918ebbf55..b7f5d878e5881 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -524,7 +524,7 @@ class KeyHasher { size_t index_; std::vector indices_; std::vector metadata_; - const RecordBatch* batch_; + std::atomic batch_; std::vector hashes_; LightContext ctx_; std::vector column_arrays_; @@ -819,7 +819,6 @@ class InputState { have_active_batch &= !queue_.TryPop(); if (have_active_batch) { DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0); // empty batches disallowed - key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), 0)); // time changed } } @@ -897,7 +896,8 @@ class InputState { Status Push(const std::shared_ptr& rb) { if (rb->num_rows() > 0) { - queue_.Push(rb); // only after above updates - push batch for processing + key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache + queue_.Push(rb); // only now push batch for processing } else { ++batches_processed_; // don't enqueue empty batches, just record as processed }