Skip to content

Commit

Permalink
Fix arrow::util::AsyncTaskScheduler::AddAsyncGenerator to avoid stack…
Browse files Browse the repository at this point in the history
… overflow
  • Loading branch information
westonpace committed Nov 1, 2022
1 parent e9f0b3f commit c6fa63f
Showing 1 changed file with 37 additions and 8 deletions.
45 changes: 37 additions & 8 deletions cpp/src/arrow/util/async_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/mutex.h"
#include "arrow/util/thread_pool.h"

#include <memory>

Expand Down Expand Up @@ -222,23 +223,51 @@ class ARROW_EXPORT AsyncTaskScheduler {
struct SubmitTask : public Task {
explicit SubmitTask(std::unique_ptr<State> state_holder)
: state_holder(std::move(state_holder)) {}

struct SubmitTaskCallback {
explicit SubmitTaskCallback(std::unique_ptr<State> state_holder)
: state_holder(std::move(state_holder)) {}
Status operator()(const T& item) {
SubmitTaskCallback(std::unique_ptr<State> state_holder, Future<> task_completion)
: state_holder(std::move(state_holder)),
task_completion(std::move(task_completion)) {}
void operator()(const Result<T>& maybe_item) {
if (!maybe_item.ok()) {
task_completion.MarkFinished(maybe_item.status());
return;
}
const auto& item = *maybe_item;
if (IsIterationEnd(item)) {
return Status::OK();
task_completion.MarkFinished();
return;
}
Status visit_st = state_holder->visitor(item);
if (!visit_st.ok()) {
task_completion.MarkFinished(std::move(visit_st));
return;
}
ARROW_RETURN_NOT_OK(state_holder->visitor(item));
state_holder->scheduler->AddTask(
std::make_unique<SubmitTask>(std::move(state_holder)));
return Status::OK();
task_completion.MarkFinished();
}
std::unique_ptr<State> state_holder;
Future<> task_completion;
};

Result<Future<>> operator()(AsyncTaskScheduler* scheduler) {
Future<T> next = state_holder->generator();
return next.Then(SubmitTaskCallback(std::move(state_holder)));
Future<> task = Future<>::Make();
// Consume as many items as we can (those that are already finished)
// synchronously to avoid recursion / stack overflow.
while (true) {
Future<T> next = state_holder->generator();
if (next.TryAddCallback(
[&] { return SubmitTaskCallback(std::move(state_holder), task); })) {
return task;
}
ARROW_ASSIGN_OR_RAISE(T item, next.result());
if (IsIterationEnd(item)) {
task.MarkFinished();
return task;
}
ARROW_RETURN_NOT_OK(state_holder->visitor(item));
}
}
std::unique_ptr<State> state_holder;
};
Expand Down

0 comments on commit c6fa63f

Please sign in to comment.