Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17509: [C++] Simplify async scheduler by removing the need to call End #14524

Merged
3 changes: 2 additions & 1 deletion cpp/examples/arrow/engine_substrait_consumption.cc
Expand Up @@ -32,7 +32,8 @@ class IgnoringConsumer : public cp::SinkNodeConsumer {
explicit IgnoringConsumer(size_t tag) : tag_{tag} {}

arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
cp::BackpressureControl* backpressure_control) override {
cp::BackpressureControl* backpressure_control,
cp::ExecPlan* plan) override {
return arrow::Status::OK();
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/examples/arrow/execution_plan_documentation_examples.cc
Expand Up @@ -583,7 +583,8 @@ arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
: batches_seen(batches_seen), finish(std::move(finish)) {}

arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
cp::BackpressureControl* backpressure_control) override {
cp::BackpressureControl* backpressure_control,
cp::ExecPlan* plan) override {
return arrow::Status::OK();
}

Expand Down
158 changes: 80 additions & 78 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Expand Up @@ -114,7 +114,7 @@ struct ExecPlanImpl : public ExecPlan {
return task_scheduler_->StartTaskGroup(GetThreadIndex(), task_group_id, num_tasks);
}

util::AsyncTaskScheduler* async_scheduler() { return async_scheduler_.get(); }
util::AsyncTaskScheduler* async_scheduler() { return async_scheduler_; }

Status Validate() const {
if (nodes_.empty()) {
Expand All @@ -127,85 +127,89 @@ struct ExecPlanImpl : public ExecPlan {
}

Status StartProducing() {
START_COMPUTE_SPAN(span_, "ExecPlan", {{"plan", ToString()}});
#ifdef ARROW_WITH_OPENTELEMETRY
if (HasMetadata()) {
auto pairs = metadata().get()->sorted_pairs();
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span =
::arrow::internal::tracing::UnwrapSpan(span_.details.get());
std::for_each(std::begin(pairs), std::end(pairs),
[span](std::pair<std::string, std::string> const& pair) {
span->SetAttribute(pair.first, pair.second);
});
}
#endif
if (started_) {
return Status::Invalid("restarted ExecPlan");
}

std::vector<Future<>> futures;
for (auto& n : nodes_) {
RETURN_NOT_OK(n->Init());
futures.push_back(n->finished());
}

AllFinished(futures).AddCallback([this](const Status& st) {
error_st_ = st;
EndTaskGroup();
});

task_scheduler_->RegisterEnd();
int num_threads = 1;
bool sync_execution = true;
if (auto executor = exec_context()->executor()) {
num_threads = executor->GetCapacity();
sync_execution = false;
}
RETURN_NOT_OK(task_scheduler_->StartScheduling(
0 /* thread_index */,
[this](std::function<Status(size_t)> fn) -> Status {
return this->ScheduleTask(std::move(fn));
},
/*concurrent_tasks=*/2 * num_threads, sync_execution));

started_ = true;
// producers precede consumers
sorted_nodes_ = TopoSort();

Status st = Status::OK();

using rev_it = std::reverse_iterator<NodeVector::iterator>;
for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it != end; ++it) {
auto node = *it;

EVENT(span_, "StartProducing:" + node->label(),
{{"node.label", node->label()}, {"node.kind_name", node->kind_name()}});
st = node->StartProducing();
EVENT(span_, "StartProducing:" + node->label(), {{"status", st.ToString()}});
if (!st.ok()) {
// Stop nodes that successfully started, in reverse order
stopped_ = true;
StopProducingImpl(it.base(), sorted_nodes_.end());
for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it != it.base();
++fw_it) {
Future<> fut = (*fw_it)->finished();
if (!fut.is_finished()) fut.MarkFinished();
}
return st;
}
}
return st;
}

void EndTaskGroup() {
bool expected = false;
if (group_ended_.compare_exchange_strong(expected, true)) {
async_scheduler_->End();
async_scheduler_->OnFinished().AddCallback([this](const Status& st) {
MARK_SPAN(span_, error_st_ & st);
END_SPAN(span_);
finished_.MarkFinished(error_st_ & st);
});
// We call StartProducing on each of the nodes. The source nodes should generally
// start scheduling some tasks during this call.
//
// If no source node schedules any tasks (e.g. they do all their word synchronously as
// part of StartProducing) then the plan may be finished before we return from this
// call.
Future<> scheduler_finished =
util::AsyncTaskScheduler::Make([this](util::AsyncTaskScheduler* async_scheduler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this giant lambda into its own function? The extra layer of indentation is pretty confusing

this->async_scheduler_ = async_scheduler;
START_COMPUTE_SPAN(span_, "ExecPlan", {{"plan", ToString()}});
#ifdef ARROW_WITH_OPENTELEMETRY
if (HasMetadata()) {
auto pairs = metadata().get()->sorted_pairs();
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span =
::arrow::internal::tracing::UnwrapSpan(span_.details.get());
std::for_each(std::begin(pairs), std::end(pairs),
[span](std::pair<std::string, std::string> const& pair) {
span->SetAttribute(pair.first, pair.second);
});
}
#endif
// TODO(weston) The entire concept of ExecNode::finished() will hopefully go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep my simplification PR gets rid of those finished futures.

// away soon (or at least be replaced by a sub-scheduler to facilitate OT)
for (auto& n : nodes_) {
RETURN_NOT_OK(n->Init());
async_scheduler->AddSimpleTask([&] { return n->finished(); });
}

task_scheduler_->RegisterEnd();
int num_threads = 1;
bool sync_execution = true;
if (auto executor = exec_context()->executor()) {
num_threads = executor->GetCapacity();
sync_execution = false;
}
RETURN_NOT_OK(task_scheduler_->StartScheduling(
0 /* thread_index */,
[this](std::function<Status(size_t)> fn) -> Status {
return this->ScheduleTask(std::move(fn));
},
/*concurrent_tasks=*/2 * num_threads, sync_execution));

// producers precede consumers
sorted_nodes_ = TopoSort();

Status st = Status::OK();

using rev_it = std::reverse_iterator<NodeVector::iterator>;
for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it != end;
++it) {
auto node = *it;

EVENT(span_, "StartProducing:" + node->label(),
{{"node.label", node->label()}, {"node.kind_name", node->kind_name()}});
st = node->StartProducing();
EVENT(span_, "StartProducing:" + node->label(), {{"status", st.ToString()}});
if (!st.ok()) {
// Stop nodes that successfully started, in reverse order
stopped_ = true;
StopProducingImpl(it.base(), sorted_nodes_.end());
for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it != it.base();
++fw_it) {
Future<> fut = (*fw_it)->finished();
if (!fut.is_finished()) fut.MarkFinished();
}
return st;
}
}
return st;
});
scheduler_finished.AddCallback(
[this](const Status& st) { finished_.MarkFinished(st); });
// TODO(weston) Do we really need to return status here? Could we change this return
// to void?
if (finished_.is_finished()) {
return finished_.status();
} else {
return Status::OK();
}
}

Expand Down Expand Up @@ -331,9 +335,7 @@ struct ExecPlanImpl : public ExecPlan {
std::shared_ptr<const KeyValueMetadata> metadata_;

ThreadIndexer thread_indexer_;
std::atomic<bool> group_ended_{false};
std::unique_ptr<util::AsyncTaskScheduler> async_scheduler_ =
util::AsyncTaskScheduler::Make();
util::AsyncTaskScheduler* async_scheduler_ = nullptr;
std::unique_ptr<TaskScheduler> task_scheduler_ = TaskScheduler::Make();
};

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/exec/options.h
Expand Up @@ -213,8 +213,9 @@ class ARROW_EXPORT SinkNodeConsumer {
/// This will be run once the schema is finalized as the plan is starting and
/// before any calls to Consume. A common use is to save off the schema so that
/// batches can be interpreted.
/// TODO(ARROW-17837) Move ExecPlan* plan to query context
virtual Status Init(const std::shared_ptr<Schema>& schema,
BackpressureControl* backpressure_control) = 0;
BackpressureControl* backpressure_control, ExecPlan* plan) = 0;
/// \brief Consume a batch of data
virtual Status Consume(ExecBatch batch) = 0;
/// \brief Signal to the consumer that the last batch has been delivered
Expand Down
27 changes: 8 additions & 19 deletions cpp/src/arrow/compute/exec/plan_test.cc
Expand Up @@ -519,7 +519,7 @@ TEST(ExecPlanExecution, SourceConsumingSink) {
: batches_seen(batches_seen), finish(std::move(finish)) {}

Status Init(const std::shared_ptr<Schema>& schema,
BackpressureControl* backpressure_control) override {
BackpressureControl* backpressure_control, ExecPlan* plan) override {
return Status::OK();
}

Expand Down Expand Up @@ -593,7 +593,7 @@ TEST(ExecPlanExecution, ConsumingSinkNames) {
struct SchemaKeepingConsumer : public SinkNodeConsumer {
std::shared_ptr<Schema> schema_;
Status Init(const std::shared_ptr<Schema>& schema,
BackpressureControl* backpressure_control) override {
BackpressureControl* backpressure_control, ExecPlan* plan) override {
schema_ = schema;
return Status::OK();
}
Expand Down Expand Up @@ -631,23 +631,23 @@ TEST(ExecPlanExecution, ConsumingSinkNames) {
TEST(ExecPlanExecution, ConsumingSinkError) {
struct InitErrorConsumer : public SinkNodeConsumer {
Status Init(const std::shared_ptr<Schema>& schema,
BackpressureControl* backpressure_control) override {
BackpressureControl* backpressure_control, ExecPlan* plan) override {
return Status::Invalid("XYZ");
}
Status Consume(ExecBatch batch) override { return Status::OK(); }
Future<> Finish() override { return Future<>::MakeFinished(); }
};
struct ConsumeErrorConsumer : public SinkNodeConsumer {
Status Init(const std::shared_ptr<Schema>& schema,
BackpressureControl* backpressure_control) override {
BackpressureControl* backpressure_control, ExecPlan* plan) override {
return Status::OK();
}
Status Consume(ExecBatch batch) override { return Status::Invalid("XYZ"); }
Future<> Finish() override { return Future<>::MakeFinished(); }
};
struct FinishErrorConsumer : public SinkNodeConsumer {
Status Init(const std::shared_ptr<Schema>& schema,
BackpressureControl* backpressure_control) override {
BackpressureControl* backpressure_control, ExecPlan* plan) override {
return Status::OK();
}
Status Consume(ExecBatch batch) override { return Status::OK(); }
Expand All @@ -665,20 +665,9 @@ TEST(ExecPlanExecution, ConsumingSinkError) {
SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))},
{"consuming_sink", ConsumingSinkNodeOptions(consumer)}})
.AddToPlan(plan.get()));
ASSERT_OK_AND_ASSIGN(
auto source,
MakeExecNode("source", plan.get(), {},
SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))));
ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
ConsumingSinkNodeOptions(consumer)));
// If we fail at init we see it during StartProducing. Other
// failures are not seen until we start running.
if (std::dynamic_pointer_cast<InitErrorConsumer>(consumer)) {
ASSERT_RAISES(Invalid, plan->StartProducing());
} else {
ASSERT_OK(plan->StartProducing());
ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished());
}
// Since the source node is not parallel the entire plan is run during StartProducing
ASSERT_RAISES(Invalid, plan->StartProducing());
ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished());
}
}

Expand Down
18 changes: 13 additions & 5 deletions cpp/src/arrow/compute/exec/sink_node.cc
Expand Up @@ -303,7 +303,7 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl {
}
output_schema = schema(std::move(fields));
}
RETURN_NOT_OK(consumer_->Init(output_schema, this));
RETURN_NOT_OK(consumer_->Init(output_schema, this, plan_));
return Status::OK();
}

Expand Down Expand Up @@ -376,10 +376,18 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl {

protected:
void Finish(const Status& finish_st) {
consumer_->Finish().AddCallback([this, finish_st](const Status& st) {
// Prefer the plan error over the consumer error
Status final_status = finish_st & st;
finished_.MarkFinished(std::move(final_status));
plan_->async_scheduler()->AddSimpleTask([this, &finish_st] {
return consumer_->Finish().Then(
[this, finish_st]() {
finished_.MarkFinished(finish_st);
return finish_st;
},
[this, finish_st](const Status& st) {
// Prefer the plan error over the consumer error
Status final_status = finish_st & st;
finished_.MarkFinished(final_status);
return final_status;
});
});
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/exec/util.cc
Expand Up @@ -384,7 +384,8 @@ size_t ThreadIndexer::Check(size_t thread_index) {
}

Status TableSinkNodeConsumer::Init(const std::shared_ptr<Schema>& schema,
BackpressureControl* backpressure_control) {
BackpressureControl* backpressure_control,
ExecPlan* plan) {
// If the user is collecting into a table then backpressure is meaningless
ARROW_UNUSED(backpressure_control);
schema_ = schema;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/util.h
Expand Up @@ -350,7 +350,7 @@ struct ARROW_EXPORT TableSinkNodeConsumer : public SinkNodeConsumer {
TableSinkNodeConsumer(std::shared_ptr<Table>* out, MemoryPool* pool)
: out_(out), pool_(pool) {}
Status Init(const std::shared_ptr<Schema>& schema,
BackpressureControl* backpressure_control) override;
BackpressureControl* backpressure_control, ExecPlan* plan) override;
Status Consume(ExecBatch batch) override;
Future<> Finish() override;

Expand Down