Skip to content

Commit

Permalink
ARROW-17350: [C++] Create a scheduler for asynchronous work (apache#1…
Browse files Browse the repository at this point in the history
…3912)

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
  • Loading branch information
westonpace authored and fatemehp committed Oct 17, 2022
1 parent 344ac14 commit 88d710b
Show file tree
Hide file tree
Showing 11 changed files with 1,166 additions and 704 deletions.
29 changes: 17 additions & 12 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,21 @@ struct ExecPlanImpl : public ExecPlan {

Result<Future<>> BeginExternalTask() {
Future<> completion_future = Future<>::Make();
ARROW_ASSIGN_OR_RAISE(bool task_added,
task_group_.AddTaskIfNotEnded(completion_future));
if (task_added) {
return std::move(completion_future);
if (async_scheduler_->AddSimpleTask(
[completion_future] { return completion_future; })) {
return completion_future;
}
// Return an invalid future if we were already finished to signal to the
// caller that they should not begin the task
return Future<>{};
}

Status ScheduleTask(std::function<Status()> fn) {
auto executor = exec_context_->executor();
if (!executor) return fn();
// Adds a task which submits fn to the executor and tracks its progress. If we're
// already stopping then the task is ignored and fn is not executed.
return task_group_
.AddTaskIfNotEnded([executor, fn]() { return executor->Submit(std::move(fn)); })
.status();
// aborted then the task is ignored and fn is not executed.
async_scheduler_->AddSimpleTask(
[executor, fn]() { return executor->Submit(std::move(fn)); });
return Status::OK();
}

Status ScheduleTask(std::function<Status(size_t)> fn) {
Expand All @@ -113,6 +110,8 @@ struct ExecPlanImpl : public ExecPlan {
return task_scheduler_->StartTaskGroup(GetThreadIndex(), task_group_id, num_tasks);
}

util::AsyncTaskScheduler* async_scheduler() { return async_scheduler_.get(); }

Status Validate() const {
if (nodes_.empty()) {
return Status::Invalid("ExecPlan has no node");
Expand Down Expand Up @@ -197,7 +196,8 @@ struct ExecPlanImpl : public ExecPlan {
void EndTaskGroup() {
bool expected = false;
if (group_ended_.compare_exchange_strong(expected, true)) {
task_group_.End().AddCallback([this](const Status& st) {
async_scheduler_->End();
async_scheduler_->OnFinished().AddCallback([this](const Status& st) {
MARK_SPAN(span_, error_st_ & st);
END_SPAN(span_);
finished_.MarkFinished(error_st_ & st);
Expand Down Expand Up @@ -328,7 +328,8 @@ struct ExecPlanImpl : public ExecPlan {

ThreadIndexer thread_indexer_;
std::atomic<bool> group_ended_{false};
util::AsyncTaskGroup task_group_;
std::unique_ptr<util::AsyncTaskScheduler> async_scheduler_ =
util::AsyncTaskScheduler::Make();
std::unique_ptr<TaskScheduler> task_scheduler_ = TaskScheduler::Make();
};

Expand Down Expand Up @@ -386,6 +387,10 @@ Status ExecPlan::StartTaskGroup(int task_group_id, int64_t num_tasks) {
return ToDerived(this)->StartTaskGroup(task_group_id, num_tasks);
}

util::AsyncTaskScheduler* ExecPlan::async_scheduler() {
return ToDerived(this)->async_scheduler();
}

Status ExecPlan::Validate() { return ToDerived(this)->Validate(); }

Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); }
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
/// \param num_tasks The number of times to run the task
Status StartTaskGroup(int task_group_id, int64_t num_tasks);

util::AsyncTaskScheduler* async_scheduler();

/// The initial inputs
const NodeVector& sources() const;

Expand Down
20 changes: 16 additions & 4 deletions cpp/src/arrow/compute/exec/tpch_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow/compute/exec/tpch_node.h"
#include "arrow/buffer.h"
#include "arrow/compute/exec/exec_plan.h"
#include "arrow/util/async_util.h"
#include "arrow/util/formatting.h"
#include "arrow/util/future.h"
#include "arrow/util/io_util.h"
Expand Down Expand Up @@ -3374,13 +3375,18 @@ class TpchNode : public ExecNode {
[[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }

Status StartProducing() override {
return generator_->StartProducing(
num_running_++;
ARROW_RETURN_NOT_OK(generator_->StartProducing(
plan_->max_concurrency(),
[this](ExecBatch batch) { this->OutputBatchCallback(std::move(batch)); },
[this](int64_t num_batches) { this->FinishedCallback(num_batches); },
[this](std::function<Status(size_t)> func) -> Status {
return this->ScheduleTaskCallback(std::move(func));
});
}));
if (--num_running_ == 0) {
finished_.MarkFinished(Status::OK());
}
return Status::OK();
}

void PauseProducing(ExecNode* output, int32_t counter) override {
Expand Down Expand Up @@ -3408,23 +3414,29 @@ class TpchNode : public ExecNode {

void FinishedCallback(int64_t total_num_batches) {
outputs_[0]->InputFinished(this, static_cast<int>(total_num_batches));
finished_.MarkFinished();
finished_generating_.store(true);
}

Status ScheduleTaskCallback(std::function<Status(size_t)> func) {
if (finished_.is_finished()) return Status::OK();
if (finished_generating_.load()) return Status::OK();
num_running_++;
return plan_->ScheduleTask([this, func](size_t thread_index) {
Status status = func(thread_index);
if (!status.ok()) {
StopProducing();
ErrorIfNotOk(status);
}
if (--num_running_ == 0) {
finished_.MarkFinished(Status::OK());
}
return status;
});
}

const char* name_;
std::unique_ptr<TpchTableGenerator> generator_;
std::atomic<bool> finished_generating_{false};
std::atomic<int> num_running_{0};
};

class TpchGenImpl : public TpchGen {
Expand Down
Loading

0 comments on commit 88d710b

Please sign in to comment.