Skip to content

Commit

Permalink
GH-15243: [C++] fix for potential deadlock in the group-by node (#33700)
Browse files Browse the repository at this point in the history
* Closes: #15243

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
  • Loading branch information
westonpace authored and raulcd committed Jan 18, 2023
1 parent 461c17d commit fa52bb0
Showing 1 changed file with 25 additions and 7 deletions.
32 changes: 25 additions & 7 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ class GroupByNode : public ExecNode {
outputs_[0]->InputReceived(this, out_data_.Slice(batch_size * n, batch_size));
}

Status OutputResult() {
Status DoOutputResult() {
// To simplify merging, ensure that the first grouper is nonempty
for (size_t i = 0; i < local_states_.size(); i++) {
if (local_states_[i].grouper) {
Expand All @@ -500,11 +500,28 @@ class GroupByNode : public ExecNode {

int64_t num_output_batches = bit_util::CeilDiv(out_data_.length, output_batch_size());
outputs_[0]->InputFinished(this, static_cast<int>(num_output_batches));
RETURN_NOT_OK(plan_->query_context()->StartTaskGroup(output_task_group_id_,
num_output_batches));
Status st =
plan_->query_context()->StartTaskGroup(output_task_group_id_, num_output_batches);
if (st.IsCancelled()) {
// This means the user has cancelled/aborted the plan. We will not send any batches
// and end immediately.
finished_.MarkFinished();
return Status::OK();
} else {
return st;
}
return Status::OK();
}

void OutputResult() {
// If something goes wrong outputting the result we need to make sure
// we still mark finished.
Status st = DoOutputResult();
if (!st.ok()) {
finished_.MarkFinished(st);
}
}

void InputReceived(ExecNode* input, ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
util::tracing::Span span;
Expand All @@ -521,7 +538,7 @@ class GroupByNode : public ExecNode {
if (ErrorIfNotOk(Consume(ExecSpan(batch)))) return;

if (input_counter_.Increment()) {
ErrorIfNotOk(OutputResult());
OutputResult();
}
}

Expand All @@ -542,7 +559,7 @@ class GroupByNode : public ExecNode {
DCHECK_EQ(input, inputs_[0]);

if (input_counter_.SetTotal(total_batches)) {
ErrorIfNotOk(OutputResult());
OutputResult();
}
}

Expand All @@ -551,7 +568,6 @@ class GroupByNode : public ExecNode {
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});

local_states_.resize(plan_->query_context()->max_concurrency());
return Status::OK();
}
Expand All @@ -570,7 +586,9 @@ class GroupByNode : public ExecNode {
EVENT(span_, "StopProducing");
DCHECK_EQ(output, outputs_[0]);

if (input_counter_.Cancel()) finished_.MarkFinished();
if (input_counter_.Cancel()) {
finished_.MarkFinished();
}
inputs_[0]->StopProducing(this);
}

Expand Down

0 comments on commit fa52bb0

Please sign in to comment.