diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index e0b6cfc699127..fd04d89f15830 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -25,6 +25,7 @@ #include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/mutex.h" +#include "arrow/util/thread_pool.h" #include @@ -222,23 +223,51 @@ class ARROW_EXPORT AsyncTaskScheduler { struct SubmitTask : public Task { explicit SubmitTask(std::unique_ptr state_holder) : state_holder(std::move(state_holder)) {} + struct SubmitTaskCallback { - explicit SubmitTaskCallback(std::unique_ptr state_holder) - : state_holder(std::move(state_holder)) {} - Status operator()(const T& item) { + SubmitTaskCallback(std::unique_ptr state_holder, Future<> task_completion) + : state_holder(std::move(state_holder)), + task_completion(std::move(task_completion)) {} + void operator()(const Result& maybe_item) { + if (!maybe_item.ok()) { + task_completion.MarkFinished(maybe_item.status()); + return; + } + const auto& item = *maybe_item; if (IsIterationEnd(item)) { - return Status::OK(); + task_completion.MarkFinished(); + return; + } + Status visit_st = state_holder->visitor(item); + if (!visit_st.ok()) { + task_completion.MarkFinished(std::move(visit_st)); + return; } - ARROW_RETURN_NOT_OK(state_holder->visitor(item)); state_holder->scheduler->AddTask( std::make_unique(std::move(state_holder))); - return Status::OK(); + task_completion.MarkFinished(); } std::unique_ptr state_holder; + Future<> task_completion; }; + Result> operator()(AsyncTaskScheduler* scheduler) { - Future next = state_holder->generator(); - return next.Then(SubmitTaskCallback(std::move(state_holder))); + Future<> task = Future<>::Make(); + // Consume as many items as we can (those that are already finished) + // synchronously to avoid recursion / stack overflow. + while (true) { + Future next = state_holder->generator(); + if (next.TryAddCallback( + [&] { return SubmitTaskCallback(std::move(state_holder), task); })) { + return task; + } + ARROW_ASSIGN_OR_RAISE(T item, next.result()); + if (IsIterationEnd(item)) { + task.MarkFinished(); + return task; + } + ARROW_RETURN_NOT_OK(state_holder->visitor(item)); + } } std::unique_ptr state_holder; };