Skip to content

Commit

Permalink
Use double locking
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 committed May 10, 2024
1 parent a499690 commit ade727e
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -963,15 +963,22 @@ class AsofJoinNode : public ExecNode {
// If LHS is finished or empty then there's nothing we can do here
if (lhs.Finished() || lhs.Empty()) break;

// Advance each of the RHS as far as possible to be up to date for the LHS timestamp
ARROW_ASSIGN_OR_RAISE(bool any_rhs_advanced, UpdateRhs());

// If we have received enough inputs to produce the next output batch
// (decided by IsUpToDateWithLhsRow), we will perform the join and
// materialize the output batch. The join is done by advancing through
// the LHS and adding joined row to rows_ (done by Emplace). Finally,
// input batches that are no longer needed are removed to free up memory.
if (IsUpToDateWithLhsRow()) {
bool any_rhs_advanced{};
bool is_up_to_date_with_lhs_row{};
{
std::lock_guard<std::mutex> guard(push_gate_);
// Advance each of the RHS as far as possible to be up to date for the LHS
// timestamp
ARROW_ASSIGN_OR_RAISE(any_rhs_advanced, UpdateRhs());

// If we have received enough inputs to produce the next output batch
// (decided by IsUpToDateWithLhsRow), we will perform the join and
// materialize the output batch. The join is done by advancing through
// the LHS and adding joined row to rows_ (done by Emplace). Finally,
// input batches that are no longer needed are removed to free up memory.
is_up_to_date_with_lhs_row = IsUpToDateWithLhsRow();
}
if (is_up_to_date_with_lhs_row) {
dst.Emplace(state_, tolerance_);
ARROW_ASSIGN_OR_RAISE(bool advanced, lhs.Advance());
if (!advanced) break; // if we can't advance LHS, we're done for this batch
Expand Down Expand Up @@ -1032,7 +1039,7 @@ class AsofJoinNode : public ExecNode {
}

bool Process() {
std::lock_guard<std::mutex> guard(gate_);
std::lock_guard<std::mutex> guard(finish_gate_);
if (!CheckEnded()) {
return false;
}
Expand Down Expand Up @@ -1395,7 +1402,7 @@ class AsofJoinNode : public ExecNode {
rb->ToString(), DEBUG_MANIP(std::endl));

{
std::lock_guard<std::mutex> guard(gate_);
std::lock_guard<std::mutex> guard(push_gate_);
ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb));
}
process_.Push(true);
Expand All @@ -1404,7 +1411,7 @@ class AsofJoinNode : public ExecNode {

Status InputFinished(ExecNode* input, int total_batches) override {
{
std::lock_guard<std::mutex> guard(gate_);
std::lock_guard<std::mutex> guard(finish_gate_);
ARROW_DCHECK(std_has(inputs_, input));
size_t k = std_find(inputs_, input) - inputs_.begin();
state_.at(k)->set_total_batches(total_batches);
Expand Down Expand Up @@ -1458,7 +1465,8 @@ class AsofJoinNode : public ExecNode {
// InputStates
// Each input state corresponds to an input table
std::vector<std::unique_ptr<InputState>> state_;
std::mutex gate_;
std::mutex finish_gate_;
std::mutex push_gate_;
TolType tolerance_;
#ifndef NDEBUG
std::ostream* debug_os_;
Expand Down

0 comments on commit ade727e

Please sign in to comment.