From 88d710be5fc7360df6b56e289249440fc1156eb6 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 1 Sep 2022 14:33:25 -1000 Subject: [PATCH] ARROW-17350: [C++] Create a scheduler for asynchronous work (#13912) Authored-by: Weston Pace Signed-off-by: Weston Pace --- cpp/src/arrow/compute/exec/exec_plan.cc | 29 +- cpp/src/arrow/compute/exec/exec_plan.h | 2 + cpp/src/arrow/compute/exec/tpch_node.cc | 20 +- cpp/src/arrow/dataset/dataset_writer.cc | 235 +++---- cpp/src/arrow/dataset/dataset_writer.h | 7 +- cpp/src/arrow/dataset/dataset_writer_test.cc | 93 +-- cpp/src/arrow/dataset/file_base.cc | 65 +- cpp/src/arrow/testing/gtest_util.cc | 8 +- cpp/src/arrow/util/async_util.cc | 452 +++++++++----- cpp/src/arrow/util/async_util.h | 348 ++++++----- cpp/src/arrow/util/async_util_test.cc | 611 ++++++++++++------- 11 files changed, 1166 insertions(+), 704 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 15d95690076a1..b6a3916de1f63 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -76,13 +76,10 @@ struct ExecPlanImpl : public ExecPlan { Result> BeginExternalTask() { Future<> completion_future = Future<>::Make(); - ARROW_ASSIGN_OR_RAISE(bool task_added, - task_group_.AddTaskIfNotEnded(completion_future)); - if (task_added) { - return std::move(completion_future); + if (async_scheduler_->AddSimpleTask( + [completion_future] { return completion_future; })) { + return completion_future; } - // Return an invalid future if we were already finished to signal to the - // caller that they should not begin the task return Future<>{}; } @@ -90,10 +87,10 @@ struct ExecPlanImpl : public ExecPlan { auto executor = exec_context_->executor(); if (!executor) return fn(); // Adds a task which submits fn to the executor and tracks its progress. If we're - // already stopping then the task is ignored and fn is not executed. - return task_group_ - .AddTaskIfNotEnded([executor, fn]() { return executor->Submit(std::move(fn)); }) - .status(); + // aborted then the task is ignored and fn is not executed. + async_scheduler_->AddSimpleTask( + [executor, fn]() { return executor->Submit(std::move(fn)); }); + return Status::OK(); } Status ScheduleTask(std::function fn) { @@ -113,6 +110,8 @@ struct ExecPlanImpl : public ExecPlan { return task_scheduler_->StartTaskGroup(GetThreadIndex(), task_group_id, num_tasks); } + util::AsyncTaskScheduler* async_scheduler() { return async_scheduler_.get(); } + Status Validate() const { if (nodes_.empty()) { return Status::Invalid("ExecPlan has no node"); @@ -197,7 +196,8 @@ struct ExecPlanImpl : public ExecPlan { void EndTaskGroup() { bool expected = false; if (group_ended_.compare_exchange_strong(expected, true)) { - task_group_.End().AddCallback([this](const Status& st) { + async_scheduler_->End(); + async_scheduler_->OnFinished().AddCallback([this](const Status& st) { MARK_SPAN(span_, error_st_ & st); END_SPAN(span_); finished_.MarkFinished(error_st_ & st); @@ -328,7 +328,8 @@ struct ExecPlanImpl : public ExecPlan { ThreadIndexer thread_indexer_; std::atomic group_ended_{false}; - util::AsyncTaskGroup task_group_; + std::unique_ptr async_scheduler_ = + util::AsyncTaskScheduler::Make(); std::unique_ptr task_scheduler_ = TaskScheduler::Make(); }; @@ -386,6 +387,10 @@ Status ExecPlan::StartTaskGroup(int task_group_id, int64_t num_tasks) { return ToDerived(this)->StartTaskGroup(task_group_id, num_tasks); } +util::AsyncTaskScheduler* ExecPlan::async_scheduler() { + return ToDerived(this)->async_scheduler(); +} + Status ExecPlan::Validate() { return ToDerived(this)->Validate(); } Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); } diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index a07884b2231cf..263f3634a5aa1 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -120,6 +120,8 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this { /// \param num_tasks The number of times to run the task Status StartTaskGroup(int task_group_id, int64_t num_tasks); + util::AsyncTaskScheduler* async_scheduler(); + /// The initial inputs const NodeVector& sources() const; diff --git a/cpp/src/arrow/compute/exec/tpch_node.cc b/cpp/src/arrow/compute/exec/tpch_node.cc index d19f20eea7ca4..978a8fb1ff728 100644 --- a/cpp/src/arrow/compute/exec/tpch_node.cc +++ b/cpp/src/arrow/compute/exec/tpch_node.cc @@ -18,6 +18,7 @@ #include "arrow/compute/exec/tpch_node.h" #include "arrow/buffer.h" #include "arrow/compute/exec/exec_plan.h" +#include "arrow/util/async_util.h" #include "arrow/util/formatting.h" #include "arrow/util/future.h" #include "arrow/util/io_util.h" @@ -3374,13 +3375,18 @@ class TpchNode : public ExecNode { [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); } Status StartProducing() override { - return generator_->StartProducing( + num_running_++; + ARROW_RETURN_NOT_OK(generator_->StartProducing( plan_->max_concurrency(), [this](ExecBatch batch) { this->OutputBatchCallback(std::move(batch)); }, [this](int64_t num_batches) { this->FinishedCallback(num_batches); }, [this](std::function func) -> Status { return this->ScheduleTaskCallback(std::move(func)); - }); + })); + if (--num_running_ == 0) { + finished_.MarkFinished(Status::OK()); + } + return Status::OK(); } void PauseProducing(ExecNode* output, int32_t counter) override { @@ -3408,23 +3414,29 @@ class TpchNode : public ExecNode { void FinishedCallback(int64_t total_num_batches) { outputs_[0]->InputFinished(this, static_cast(total_num_batches)); - finished_.MarkFinished(); + finished_generating_.store(true); } Status ScheduleTaskCallback(std::function func) { - if (finished_.is_finished()) return Status::OK(); + if (finished_generating_.load()) return Status::OK(); + num_running_++; return plan_->ScheduleTask([this, func](size_t thread_index) { Status status = func(thread_index); if (!status.ok()) { StopProducing(); ErrorIfNotOk(status); } + if (--num_running_ == 0) { + finished_.MarkFinished(Status::OK()); + } return status; }); } const char* name_; std::unique_ptr generator_; + std::atomic finished_generating_{false}; + std::atomic num_running_{0}; }; class TpchGenImpl : public TpchGen { diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 36305eac730e8..0f81fad3071b9 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -27,10 +27,12 @@ #include "arrow/table.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" +#include "arrow/util/make_unique.h" #include "arrow/util/map.h" #include "arrow/util/string.h" namespace arrow { +using internal::Executor; namespace dataset { namespace internal { @@ -112,18 +114,34 @@ struct DatasetWriterState { std::mutex visitors_mutex; }; -class DatasetWriterFileQueue : public util::AsyncDestroyable { +Result> OpenWriter( + const FileSystemDatasetWriteOptions& write_options, std::shared_ptr schema, + const std::string& filename) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr out_stream, + write_options.filesystem->OpenOutputStream(filename)); + return write_options.format()->MakeWriter(std::move(out_stream), std::move(schema), + write_options.file_write_options, + {write_options.filesystem, filename}); +} + +class DatasetWriterFileQueue { public: - explicit DatasetWriterFileQueue(const Future>& writer_fut, + explicit DatasetWriterFileQueue(const std::shared_ptr& schema, const FileSystemDatasetWriteOptions& options, DatasetWriterState* writer_state) - : options_(options), writer_state_(writer_state) { - // If this AddTask call fails (e.g. we're given an already failing future) then we - // will get the error later when we try and write to it. - ARROW_UNUSED(file_tasks_.AddTask([this, writer_fut] { - return writer_fut.Then( - [this](const std::shared_ptr& writer) { writer_ = writer; }); - })); + : options_(options), schema_(schema), writer_state_(writer_state) {} + + void Start(util::AsyncTaskScheduler* scheduler, const std::string& filename) { + scheduler_ = scheduler; + // Because the scheduler runs one task at a time we know the writer will + // be opened before any attempt to write + scheduler->AddSimpleTask([this, filename] { + Executor* io_executor = options_.filesystem->io_context().executor(); + return DeferNotOk(io_executor->Submit([this, filename]() { + ARROW_ASSIGN_OR_RAISE(writer_, OpenWriter(options_, schema_, filename)); + return Status::OK(); + })); + }); } Result> PopStagedBatch() { @@ -155,20 +173,27 @@ class DatasetWriterFileQueue : public util::AsyncDestroyable { return table->CombineChunksToBatch(); } - Status ScheduleBatch(std::shared_ptr batch) { - struct WriteTask { - Future<> operator()() { return self->WriteNext(std::move(batch)); } + void ScheduleBatch(std::shared_ptr batch) { + // TODO(ARROW-17110) This task is only here to std::move the batch + // It can be a lambda once we adopt C++14 + struct WriteTask : public util::AsyncTaskScheduler::Task { + WriteTask(DatasetWriterFileQueue* self, std::shared_ptr batch) + : self(self), batch(std::move(batch)) {} + Result> operator()(util::AsyncTaskScheduler*) { + return self->WriteNext(std::move(batch)); + } DatasetWriterFileQueue* self; std::shared_ptr batch; }; - return file_tasks_.AddTask(WriteTask{this, std::move(batch)}); + scheduler_->AddTask( + ::arrow::internal::make_unique(this, std::move(batch))); } Result PopAndDeliverStagedBatch() { ARROW_ASSIGN_OR_RAISE(std::shared_ptr next_batch, PopStagedBatch()); int64_t rows_popped = next_batch->num_rows(); rows_currently_staged_ -= next_batch->num_rows(); - ARROW_RETURN_NOT_OK(ScheduleBatch(std::move(next_batch))); + ScheduleBatch(std::move(next_batch)); return rows_popped; } @@ -188,12 +213,17 @@ class DatasetWriterFileQueue : public util::AsyncDestroyable { return Status::OK(); } - Future<> DoDestroy() override { + Status Finish() { writer_state_->staged_rows_count -= rows_currently_staged_; while (!staged_batches_.empty()) { RETURN_NOT_OK(PopAndDeliverStagedBatch()); } - return file_tasks_.End().Then([this] { return DoFinish(); }); + // At this point all write tasks have been added. Because the scheduler + // is a 1-task FIFO we know this task will run at the very end and can + // add it now. + scheduler_->AddSimpleTask([this] { return DoFinish(); }); + scheduler_->End(); + return Status::OK(); } private: @@ -225,13 +255,14 @@ class DatasetWriterFileQueue : public util::AsyncDestroyable { } const FileSystemDatasetWriteOptions& options_; + const std::shared_ptr& schema_; DatasetWriterState* writer_state_; std::shared_ptr writer_; // Batches are accumulated here until they are large enough to write out at which // point they are merged together and added to write_queue_ std::deque> staged_batches_; uint64_t rows_currently_staged_ = 0; - util::SerializedAsyncTaskGroup file_tasks_; + util::AsyncTaskScheduler* scheduler_ = nullptr; }; struct WriteTask { @@ -239,13 +270,14 @@ struct WriteTask { uint64_t num_rows; }; -class DatasetWriterDirectoryQueue : public util::AsyncDestroyable { +class DatasetWriterDirectoryQueue { public: - DatasetWriterDirectoryQueue(std::string directory, std::string prefix, - std::shared_ptr schema, + DatasetWriterDirectoryQueue(util::AsyncTaskScheduler* scheduler, std::string directory, + std::string prefix, std::shared_ptr schema, const FileSystemDatasetWriteOptions& write_options, DatasetWriterState* writer_state) - : directory_(std::move(directory)), + : scheduler_(scheduler), + directory_(std::move(directory)), prefix_(std::move(prefix)), schema_(std::move(schema)), write_options_(write_options), @@ -291,91 +323,93 @@ class DatasetWriterDirectoryQueue : public util::AsyncDestroyable { Status FinishCurrentFile() { if (latest_open_file_) { + ARROW_RETURN_NOT_OK(latest_open_file_->Finish()); latest_open_file_ = nullptr; } rows_written_ = 0; return GetNextFilename().Value(¤t_filename_); } - Result> OpenWriter(const std::string& filename) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr out_stream, - write_options_.filesystem->OpenOutputStream(filename)); - return write_options_.format()->MakeWriter(std::move(out_stream), schema_, - write_options_.file_write_options, - {write_options_.filesystem, filename}); - } - - Result> OpenFileQueue( - const std::string& filename) { - Future> file_writer_fut = - init_future_.Then([this, filename] { - ::arrow::internal::Executor* io_executor = - write_options_.filesystem->io_context().executor(); - return DeferNotOk( - io_executor->Submit([this, filename]() { return OpenWriter(filename); })); - }); - auto file_queue = util::MakeSharedAsync( - file_writer_fut, write_options_, writer_state_); - RETURN_NOT_OK(task_group_.AddTask(file_queue->on_closed().Then( - [this] { writer_state_->open_files_throttle.Release(1); }, - [this](const Status& err) { - writer_state_->open_files_throttle.Release(1); - return err; - }))); - return file_queue; + Result OpenFileQueue(const std::string& filename) { + auto file_queue = ::arrow::internal::make_unique( + schema_, write_options_, writer_state_); + DatasetWriterFileQueue* file_queue_view = file_queue.get(); + std::unique_ptr throttle = + util::AsyncTaskScheduler::MakeThrottle(1); + struct FileFinishTask { + Status operator()() { + self->writer_state_->open_files_throttle.Release(1); + return Status::OK(); + } + DatasetWriterDirectoryQueue* self; + std::unique_ptr owned_file_queue; + std::unique_ptr throttle; + }; + util::AsyncTaskScheduler::Throttle* throttle_view = throttle.get(); + util::AsyncTaskScheduler* file_scheduler = scheduler_->MakeSubScheduler( + FileFinishTask{this, std::move(file_queue), std::move(throttle)}, throttle_view); + if (init_task_) { + file_scheduler->AddSimpleTask(init_task_); + init_task_ = {}; + } + file_queue_view->Start(file_scheduler, filename); + return file_queue_view; } uint64_t rows_written() const { return rows_written_; } void PrepareDirectory() { if (directory_.empty() || !write_options_.create_dir) { - init_future_ = Future<>::MakeFinished(); + return; + } + auto create_dir_cb = [this] { + return DeferNotOk(write_options_.filesystem->io_context().executor()->Submit( + [this]() { return write_options_.filesystem->CreateDir(directory_); })); + }; + if (write_options_.existing_data_behavior == + ExistingDataBehavior::kDeleteMatchingPartitions) { + init_task_ = [this, create_dir_cb] { + return write_options_.filesystem + ->DeleteDirContentsAsync(directory_, + /*missing_dir_ok=*/true) + .Then(create_dir_cb); + }; } else { - if (write_options_.existing_data_behavior == - ExistingDataBehavior::kDeleteMatchingPartitions) { - init_future_ = write_options_.filesystem->DeleteDirContentsAsync( - directory_, /*missing_dir_ok=*/true); - } else { - init_future_ = Future<>::MakeFinished(); - } - init_future_ = init_future_.Then([this] { - return DeferNotOk(write_options_.filesystem->io_context().executor()->Submit( - [this]() { return write_options_.filesystem->CreateDir(directory_); })); - }); + init_task_ = [create_dir_cb] { return create_dir_cb(); }; } } - static Result>> - Make(util::AsyncTaskGroup* task_group, - const FileSystemDatasetWriteOptions& write_options, - DatasetWriterState* writer_state, std::shared_ptr schema, - std::string directory, std::string prefix) { - auto dir_queue = util::MakeUniqueAsync( - std::move(directory), std::move(prefix), std::move(schema), write_options, - writer_state); - RETURN_NOT_OK(task_group->AddTask(dir_queue->on_closed())); + static Result> Make( + util::AsyncTaskScheduler* scheduler, + const FileSystemDatasetWriteOptions& write_options, + DatasetWriterState* writer_state, std::shared_ptr schema, + std::string directory, std::string prefix) { + auto dir_queue = ::arrow::internal::make_unique( + scheduler, std::move(directory), std::move(prefix), std::move(schema), + write_options, writer_state); dir_queue->PrepareDirectory(); ARROW_ASSIGN_OR_RAISE(dir_queue->current_filename_, dir_queue->GetNextFilename()); // std::move required to make RTools 3.5 mingw compiler happy return std::move(dir_queue); } - Future<> DoDestroy() override { - latest_open_file_.reset(); - return task_group_.End(); + Status Finish() { + if (latest_open_file_) { + return latest_open_file_->Finish(); + } + return Status::OK(); } private: - util::AsyncTaskGroup task_group_; + util::AsyncTaskScheduler* scheduler_ = nullptr; std::string directory_; std::string prefix_; std::shared_ptr schema_; const FileSystemDatasetWriteOptions& write_options_; DatasetWriterState* writer_state_; - Future<> init_future_; + std::function()> init_task_; std::string current_filename_; - std::shared_ptr latest_open_file_; + DatasetWriterFileQueue* latest_open_file_ = nullptr; uint64_t rows_written_ = 0; uint32_t file_counter_ = 0; }; @@ -447,16 +481,17 @@ uint64_t CalculateMaxRowsStaged(uint64_t max_rows_queued) { } // namespace -class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { +class DatasetWriter::DatasetWriterImpl { public: - DatasetWriterImpl(FileSystemDatasetWriteOptions write_options, uint64_t max_rows_queued) - : write_options_(std::move(write_options)), + DatasetWriterImpl(FileSystemDatasetWriteOptions write_options, + util::AsyncTaskScheduler* scheduler, uint64_t max_rows_queued) + : scheduler_(scheduler), + write_options_(std::move(write_options)), writer_state_(max_rows_queued, write_options_.max_open_files, CalculateMaxRowsStaged(max_rows_queued)) {} Future<> WriteRecordBatch(std::shared_ptr batch, const std::string& directory, const std::string& prefix) { - RETURN_NOT_OK(CheckError()); if (batch->num_rows() == 0) { return Future<>::MakeFinished(); } @@ -469,6 +504,13 @@ class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { } } + Status Finish() { + for (const auto& directory_queue : directory_queues_) { + ARROW_RETURN_NOT_OK(directory_queue.second->Finish()); + } + return Status::OK(); + } + protected: Status CloseLargestFile() { std::shared_ptr largest = nullptr; @@ -490,7 +532,7 @@ class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { ::arrow::internal::GetOrInsertGenerated( &directory_queues_, directory + prefix, [this, &batch, &directory, &prefix](const std::string& key) { - return DatasetWriterDirectoryQueue::Make(&task_group_, write_options_, + return DatasetWriterDirectoryQueue::Make(scheduler_, write_options_, &writer_state_, batch->schema(), directory, prefix); })); @@ -530,22 +572,7 @@ class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { return Future<>::MakeFinished(); } - void SetError(Status st) { - std::lock_guard lg(mutex_); - err_ = std::move(st); - } - - Status CheckError() { - std::lock_guard lg(mutex_); - return err_; - } - - Future<> DoDestroy() override { - directory_queues_.clear(); - return task_group_.End().Then([this] { return err_; }); - } - - util::AsyncTaskGroup task_group_; + util::AsyncTaskScheduler* scheduler_ = nullptr; FileSystemDatasetWriteOptions write_options_; DatasetWriterState writer_state_; std::unordered_map> @@ -555,16 +582,18 @@ class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { }; DatasetWriter::DatasetWriter(FileSystemDatasetWriteOptions write_options, + util::AsyncTaskScheduler* scheduler, uint64_t max_rows_queued) - : impl_(util::MakeUniqueAsync(std::move(write_options), - max_rows_queued)) {} + : impl_(::arrow::internal::make_unique( + std::move(write_options), scheduler, max_rows_queued)) {} Result> DatasetWriter::Make( - FileSystemDatasetWriteOptions write_options, uint64_t max_rows_queued) { + FileSystemDatasetWriteOptions write_options, util::AsyncTaskScheduler* scheduler, + uint64_t max_rows_queued) { RETURN_NOT_OK(ValidateOptions(write_options)); RETURN_NOT_OK(EnsureDestinationValid(write_options)); return std::unique_ptr( - new DatasetWriter(std::move(write_options), max_rows_queued)); + new DatasetWriter(std::move(write_options), scheduler, max_rows_queued)); } DatasetWriter::~DatasetWriter() = default; @@ -575,11 +604,7 @@ Future<> DatasetWriter::WriteRecordBatch(std::shared_ptr batch, return impl_->WriteRecordBatch(std::move(batch), directory, prefix); } -Future<> DatasetWriter::Finish() { - Future<> finished = impl_->on_closed(); - impl_.reset(); - return finished; -} +Status DatasetWriter::Finish() { return impl_->Finish(); } } // namespace internal } // namespace dataset diff --git a/cpp/src/arrow/dataset/dataset_writer.h b/cpp/src/arrow/dataset/dataset_writer.h index e41a0f8399596..a3ac531d5f1b7 100644 --- a/cpp/src/arrow/dataset/dataset_writer.h +++ b/cpp/src/arrow/dataset/dataset_writer.h @@ -50,7 +50,7 @@ class ARROW_DS_EXPORT DatasetWriter { /// \param max_rows_queued max # of rows allowed to be queued before the dataset_writer /// will ask for backpressure static Result> Make( - FileSystemDatasetWriteOptions write_options, + FileSystemDatasetWriteOptions write_options, util::AsyncTaskScheduler* scheduler, uint64_t max_rows_queued = kDefaultDatasetWriterMaxRowsQueued); ~DatasetWriter(); @@ -83,14 +83,15 @@ class ARROW_DS_EXPORT DatasetWriter { const std::string& directory, const std::string& prefix = ""); /// Finish all pending writes and close any open files - Future<> Finish(); + Status Finish(); protected: DatasetWriter(FileSystemDatasetWriteOptions write_options, + util::AsyncTaskScheduler* scheduler, uint64_t max_rows_queued = kDefaultDatasetWriterMaxRowsQueued); class DatasetWriterImpl; - std::unique_ptr> impl_; + std::unique_ptr impl_; }; } // namespace internal diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc b/cpp/src/arrow/dataset/dataset_writer_test.cc index dc23c43a189e3..f4e7344cdb2f1 100644 --- a/cpp/src/arrow/dataset/dataset_writer_test.cc +++ b/cpp/src/arrow/dataset/dataset_writer_test.cc @@ -81,6 +81,13 @@ class DatasetWriterTestFixture : public testing::Test { }; std::shared_ptr format = std::make_shared(); write_options_.file_write_options = format->DefaultWriteOptions(); + scheduler_ = util::AsyncTaskScheduler::Make(); + } + + void EndWriterChecked(DatasetWriter* writer) { + ASSERT_OK(writer->Finish()); + scheduler_->End(); + ASSERT_FINISHES_OK(scheduler_->OnFinished()); } std::shared_ptr UseGatedFs() { @@ -130,6 +137,7 @@ class DatasetWriterTestFixture : public testing::Test { ipc::RecordBatchFileReader::Open(in_stream)); RecordBatchVector batches; *num_batches = reader->num_record_batches(); + EXPECT_GT(*num_batches, 0); for (int i = 0; i < reader->num_record_batches(); i++) { EXPECT_OK_AND_ASSIGN(std::shared_ptr next_batch, reader->ReadRecordBatch(i)); @@ -194,41 +202,46 @@ class DatasetWriterTestFixture : public testing::Test { std::shared_ptr schema_; std::vector pre_finish_visited_; std::vector post_finish_visited_; + std::unique_ptr scheduler_; FileSystemDatasetWriteOptions write_options_; uint64_t counter_ = 0; }; TEST_F(DatasetWriterTestFixture, Basic) { - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), ""); AssertFinished(queue_fut); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertCreatedData({{"testdir/chunk-0.arrow", 0, 100}}); } TEST_F(DatasetWriterTestFixture, BasicFilePrefix) { - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "", "1_"); AssertFinished(queue_fut); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertFilesCreated({"testdir/1_chunk-0.arrow"}); } TEST_F(DatasetWriterTestFixture, BasicFileDirectoryPrefix) { - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "a", "1_"); AssertFinished(queue_fut); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertFilesCreated({"testdir/a/1_chunk-0.arrow"}); } TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) { write_options_.max_rows_per_file = 10; write_options_.max_rows_per_group = 10; - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(35), ""); AssertFinished(queue_fut); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertCreatedData({{"testdir/chunk-0.arrow", 0, 10}, {"testdir/chunk-1.arrow", 10, 10}, {"testdir/chunk-2.arrow", 20, 10}, @@ -238,21 +251,23 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) { TEST_F(DatasetWriterTestFixture, MaxRowsManyWrites) { write_options_.max_rows_per_file = 10; write_options_.max_rows_per_group = 10; - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertCreatedData( {{"testdir/chunk-0.arrow", 0, 10, 4}, {"testdir/chunk-1.arrow", 10, 8, 3}}); } TEST_F(DatasetWriterTestFixture, MinRowGroup) { write_options_.min_rows_per_group = 20; - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); // Test hitting the limit exactly and inexactly ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(5), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(5), "")); @@ -264,32 +279,34 @@ TEST_F(DatasetWriterTestFixture, MinRowGroup) { ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(4), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(4), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(4), "")); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertCreatedData({{"testdir/chunk-0.arrow", 0, 46, 3}}); } TEST_F(DatasetWriterTestFixture, MaxRowGroup) { write_options_.max_rows_per_group = 10; - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); // Test hitting the limit exactly and inexactly ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(15), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(15), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(20), "")); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertCreatedData({{"testdir/chunk-0.arrow", 0, 60, 7}}); } TEST_F(DatasetWriterTestFixture, MinAndMaxRowGroup) { write_options_.max_rows_per_group = 10; write_options_.min_rows_per_group = 10; - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); // Test hitting the limit exactly and inexactly ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(15), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(15), "")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(20), "")); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertCreatedData({{"testdir/chunk-0.arrow", 0, 60, 6}}); } @@ -298,14 +315,15 @@ TEST_F(DatasetWriterTestFixture, MinRowGroupBackpressure) { // enough data to form a min row group and we fill up the dataset writer (it should // auto-evict) write_options_.min_rows_per_group = 10; - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_, 100)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get(), 100)); std::vector expected_files; for (int i = 0; i < 12; i++) { expected_files.push_back({"testdir/" + std::to_string(i) + "/chunk-0.arrow", static_cast(i * 9), 9, 1}); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(9), std::to_string(i))); } - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertCreatedData(expected_files); } @@ -313,7 +331,8 @@ TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) { // Use a gated filesystem to queue up many writes behind a file open to make sure the // file isn't opened multiple times. auto gated_fs = UseGatedFs(); - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); for (int i = 0; i < 10; i++) { Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(10), ""); AssertFinished(queue_fut); @@ -321,7 +340,7 @@ TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) { } ASSERT_OK(gated_fs->WaitForOpenOutputStream(1)); ASSERT_OK(gated_fs->UnlockOpenOutputStream(1)); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertCreatedData({{"testdir/chunk-0.arrow", 0, 100, 10}}); } @@ -330,7 +349,8 @@ TEST_F(DatasetWriterTestFixture, ConcurrentWritesDifferentFiles) { constexpr int NBATCHES = 6; auto gated_fs = UseGatedFs(); std::vector expected_files; - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); for (int i = 0; i < NBATCHES; i++) { std::string i_str = std::to_string(i); expected_files.push_back(ExpectedFile{"testdir/part" + i_str + "/chunk-0.arrow", @@ -341,14 +361,15 @@ TEST_F(DatasetWriterTestFixture, ConcurrentWritesDifferentFiles) { } ASSERT_OK(gated_fs->WaitForOpenOutputStream(NBATCHES)); ASSERT_OK(gated_fs->UnlockOpenOutputStream(NBATCHES)); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertCreatedData(expected_files); } TEST_F(DatasetWriterTestFixture, MaxOpenFiles) { auto gated_fs = UseGatedFs(); write_options_.max_open_files = 2; - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part0")); ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part1")); @@ -366,7 +387,7 @@ TEST_F(DatasetWriterTestFixture, MaxOpenFiles) { // Following call should resume existing write but, on slow test systems, the old // write may have already been finished ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part1")); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertFilesCreated({"testdir/part0/chunk-0.arrow", "testdir/part0/chunk-1.arrow", "testdir/part1/chunk-0.arrow", "testdir/part2/chunk-0.arrow"}); } @@ -379,10 +400,11 @@ TEST_F(DatasetWriterTestFixture, NoExistingDirectory) { write_options_.filesystem = filesystem_; write_options_.existing_data_behavior = ExistingDataBehavior::kDeleteMatchingPartitions; write_options_.base_dir = "testdir/subdir"; - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), ""); AssertFinished(queue_fut); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertCreatedData({{"testdir/subdir/chunk-0.arrow", 0, 100}}); } @@ -396,10 +418,11 @@ TEST_F(DatasetWriterTestFixture, DeleteExistingData) { filesystem_ = std::dynamic_pointer_cast(fs); write_options_.filesystem = filesystem_; write_options_.existing_data_behavior = ExistingDataBehavior::kDeleteMatchingPartitions; - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), ""); AssertFinished(queue_fut); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertCreatedData({{"testdir/chunk-0.arrow", 0, 100}}); AssertNotFiles({"testdir/chunk-5.arrow", "testdir/blah.txt", "testdir/subdir/foo.txt"}); } @@ -414,10 +437,11 @@ TEST_F(DatasetWriterTestFixture, PartitionedDeleteExistingData) { filesystem_ = std::dynamic_pointer_cast(fs); write_options_.filesystem = filesystem_; write_options_.existing_data_behavior = ExistingDataBehavior::kDeleteMatchingPartitions; - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "part0"); AssertFinished(queue_fut); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertCreatedData({{"testdir/part0/chunk-0.arrow", 0, 100}}); AssertNotFiles({"testdir/part0/foo.arrow"}); AssertEmptyFiles({"testdir/part1/bar.arrow"}); @@ -433,10 +457,11 @@ TEST_F(DatasetWriterTestFixture, LeaveExistingData) { filesystem_ = std::dynamic_pointer_cast(fs); write_options_.filesystem = filesystem_; write_options_.existing_data_behavior = ExistingDataBehavior::kOverwriteOrIgnore; - EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), ""); AssertFinished(queue_fut); - ASSERT_FINISHES_OK(dataset_writer->Finish()); + EndWriterChecked(dataset_writer.get()); AssertCreatedData({{"testdir/chunk-0.arrow", 0, 100}}); AssertEmptyFiles({"testdir/chunk-5.arrow", "testdir/blah.txt"}); } @@ -450,7 +475,7 @@ TEST_F(DatasetWriterTestFixture, ErrOnExistingData) { fs::File("testdir/chunk-5.arrow"), fs::File("testdir/blah.txt")})); filesystem_ = std::dynamic_pointer_cast(fs); write_options_.filesystem = filesystem_; - ASSERT_RAISES(Invalid, DatasetWriter::Make(write_options_)); + ASSERT_RAISES(Invalid, DatasetWriter::Make(write_options_, scheduler_.get())); AssertEmptyFiles( {"testdir/chunk-0.arrow", "testdir/chunk-5.arrow", "testdir/blah.txt"}); @@ -463,7 +488,7 @@ TEST_F(DatasetWriterTestFixture, ErrOnExistingData) { filesystem_ = std::dynamic_pointer_cast(fs2); write_options_.filesystem = filesystem_; write_options_.base_dir = "testdir"; - ASSERT_RAISES(Invalid, DatasetWriter::Make(write_options_)); + ASSERT_RAISES(Invalid, DatasetWriter::Make(write_options_, scheduler_.get())); AssertEmptyFiles({"testdir/part-0.arrow"}); } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index b3f161e92d53c..ff5d1e43eb3d9 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -340,6 +340,8 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { schema_ = schema; } backpressure_control_ = backpressure_control; + scheduler_throttle_ = util::AsyncTaskScheduler::MakeThrottle(1); + scheduler_ = util::AsyncTaskScheduler::Make(scheduler_throttle_.get()); return Status::OK(); } @@ -350,8 +352,14 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { } Future<> Finish() override { - RETURN_NOT_OK(task_group_.AddTask([this] { return dataset_writer_->Finish(); })); - return task_group_.End(); + scheduler_->AddSimpleTask([this]() -> Result> { + ARROW_RETURN_NOT_OK(dataset_writer_->Finish()); + // Finish is actually synchronous but we add it to the scheduler because we want to + // make sure it happens after all the write calls. + return Future<>::MakeFinished(); + }); + scheduler_->End(); + return scheduler_->OnFinished(); } private: @@ -361,7 +369,7 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { batch, guarantee, write_options_, [this](std::shared_ptr next_batch, const PartitionPathFormat& destination) { - return task_group_.AddTask([this, next_batch, destination] { + scheduler_->AddSimpleTask([this, next_batch, destination] { Future<> has_room = dataset_writer_->WriteRecordBatch( next_batch, destination.directory, destination.filename); if (!has_room.is_finished()) { @@ -374,13 +382,15 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { } return has_room; }); + return Status::OK(); }); } std::shared_ptr custom_metadata_; std::unique_ptr dataset_writer_; FileSystemDatasetWriteOptions write_options_; - util::SerializedAsyncTaskGroup task_group_; + std::unique_ptr scheduler_; + std::unique_ptr scheduler_throttle_; std::shared_ptr schema_ = nullptr; compute::BackpressureControl* backpressure_control_; }; @@ -440,8 +450,8 @@ Result MakeWriteNode(compute::ExecPlan* plan, return Status::Invalid("Must provide partitioning"); } - ARROW_ASSIGN_OR_RAISE(auto dataset_writer, - internal::DatasetWriter::Make(write_options)); + ARROW_ASSIGN_OR_RAISE(auto dataset_writer, internal::DatasetWriter::Make( + write_options, plan->async_scheduler())); std::shared_ptr consumer = std::make_shared( @@ -465,7 +475,17 @@ class TeeNode : public compute::MapNode { FileSystemDatasetWriteOptions write_options, bool async_mode) : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), dataset_writer_(std::move(dataset_writer)), - write_options_(std::move(write_options)) {} + write_options_(std::move(write_options)) { + std::unique_ptr serial_throttle = + util::AsyncTaskScheduler::MakeThrottle(1); + struct DestroyThrottle { + Status operator()() { return Status::OK(); } + std::unique_ptr owned_throttle; + }; + util::AsyncTaskScheduler::Throttle* serial_throttle_view = serial_throttle.get(); + serial_scheduler_ = plan_->async_scheduler()->MakeSubScheduler( + DestroyThrottle{std::move(serial_throttle)}, serial_throttle_view); + } static Result Make(compute::ExecPlan* plan, std::vector inputs, @@ -477,8 +497,9 @@ class TeeNode : public compute::MapNode { const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options; const std::shared_ptr schema = inputs[0]->output_schema(); - ARROW_ASSIGN_OR_RAISE(auto dataset_writer, - internal::DatasetWriter::Make(write_options)); + ARROW_ASSIGN_OR_RAISE( + auto dataset_writer, + internal::DatasetWriter::Make(write_options, plan->async_scheduler())); return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), std::move(dataset_writer), std::move(write_options), @@ -488,13 +509,17 @@ class TeeNode : public compute::MapNode { const char* kind_name() const override { return "TeeNode"; } void Finish(Status finish_st) override { - dataset_writer_->Finish().AddCallback([this, finish_st](const Status& dw_status) { - // Need to wait for the task group to complete regardless of dw_status - task_group_.End().AddCallback( - [this, dw_status, finish_st](const Status& tg_status) { - finished_.MarkFinished(dw_status & finish_st & tg_status); - }); - }); + if (!finish_st.ok()) { + MapNode::Finish(std::move(finish_st)); + return; + } + Status writer_finish_st = dataset_writer_->Finish(); + if (!writer_finish_st.ok()) { + MapNode::Finish(std::move(writer_finish_st)); + return; + } + serial_scheduler_->End(); + MapNode::Finish(Status::OK()); } Result DoTee(const compute::ExecBatch& batch) { @@ -509,7 +534,7 @@ class TeeNode : public compute::MapNode { return WriteBatch(batch, guarantee, write_options_, [this](std::shared_ptr next_batch, const PartitionPathFormat& destination) { - return task_group_.AddTask([this, next_batch, destination] { + serial_scheduler_->AddSimpleTask([this, next_batch, destination] { util::tracing::Span span; Future<> has_room = dataset_writer_->WriteRecordBatch( next_batch, destination.directory, destination.filename); @@ -519,6 +544,7 @@ class TeeNode : public compute::MapNode { } return has_room; }); + return Status::OK(); }); } @@ -551,7 +577,10 @@ class TeeNode : public compute::MapNode { private: std::unique_ptr dataset_writer_; FileSystemDatasetWriteOptions write_options_; - util::SerializedAsyncTaskGroup task_group_; + // We use a serial scheduler to submit tasks to the dataset writer. The dataset writer + // only returns an unfinished future when it needs backpressure. Using a serial + // scheduler here ensures we pause while we wait for backpressure to clear + util::AsyncTaskScheduler* serial_scheduler_; int32_t backpressure_counter_ = 0; }; diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index c5ab367befe27..a4d867088000f 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -1031,9 +1031,11 @@ class GatingTask::Impl : public std::enable_shared_from_this { } Status Unlock() { - std::lock_guard lk(mx_); - unlocked_ = true; - unlocked_cv_.notify_all(); + { + std::lock_guard lk(mx_); + unlocked_ = true; + unlocked_cv_.notify_all(); + } unlocked_future_.MarkFinished(); return status_; } diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index d8ae2e1923043..7e8c5513aab21 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -19,186 +19,340 @@ #include "arrow/util/future.h" #include "arrow/util/logging.h" +#include "arrow/util/make_unique.h" + +#include +#include +#include +#include namespace arrow { -namespace util { -AsyncDestroyable::AsyncDestroyable() : on_closed_(Future<>::Make()) {} +namespace util { -#ifndef NDEBUG -AsyncDestroyable::~AsyncDestroyable() { - DCHECK(constructed_correctly_) << "An instance of AsyncDestroyable must be created by " - "MakeSharedAsync or MakeUniqueAsync"; -} -#else -AsyncDestroyable::~AsyncDestroyable() = default; -#endif - -void AsyncDestroyable::Destroy() { - DoDestroy().AddCallback([this](const Status& st) { - on_closed_.MarkFinished(st); - delete this; - }); -} +class ThrottleImpl : public AsyncTaskScheduler::Throttle { + public: + explicit ThrottleImpl(int max_concurrent_cost) : available_cost_(max_concurrent_cost) {} -Status AsyncTaskGroup::AddTask(std::function>()> task) { - auto guard = mutex_.Lock(); - if (finished_adding_) { - return Status::Cancelled("Ignoring task added after the task group has been ended"); - } - if (!err_.ok()) { - return err_; - } - Result> maybe_task_fut = task(); - if (!maybe_task_fut.ok()) { - err_ = maybe_task_fut.status(); - return err_; + util::optional> TryAcquire(int amt) override { + std::lock_guard lk(mutex_); + if (backoff_.is_valid()) { + return backoff_; + } + if (amt <= available_cost_) { + available_cost_ -= amt; + return nullopt; + } + backoff_ = Future<>::Make(); + return backoff_; } - return AddTaskUnlocked(*maybe_task_fut, std::move(guard)); -} -Result AsyncTaskGroup::AddTaskIfNotEnded(std::function>()> task) { - auto guard = mutex_.Lock(); - if (finished_adding_) { - return false; - } - if (!err_.ok()) { - return err_; - } - Result> maybe_task_fut = task(); - if (!maybe_task_fut.ok()) { - err_ = maybe_task_fut.status(); - return err_; + void Release(int amt) override { + Future<> backoff_to_fulfill; + { + std::lock_guard lk(mutex_); + available_cost_ += amt; + if (backoff_.is_valid()) { + backoff_to_fulfill = std::move(backoff_); + } + } + if (backoff_to_fulfill.is_valid()) { + backoff_to_fulfill.MarkFinished(); + } } - ARROW_RETURN_NOT_OK(AddTaskUnlocked(*maybe_task_fut, std::move(guard))); - return true; -} -Status AsyncTaskGroup::AddTaskUnlocked(const Future<>& task_fut, - util::Mutex::Guard guard) { - // If the task is already finished there is nothing to track so lets save - // some work and return early - if (task_fut.is_finished()) { - err_ &= task_fut.status(); - return err_; - } - running_tasks_++; - guard.Unlock(); - task_fut.AddCallback([this](const Status& st) { - auto guard = mutex_.Lock(); - err_ &= st; - if (--running_tasks_ == 0 && finished_adding_) { - guard.Unlock(); - all_tasks_done_.MarkFinished(err_); - } - }); - return Status::OK(); + private: + std::mutex mutex_; + int available_cost_; + Future<> backoff_; +}; + +std::unique_ptr AsyncTaskScheduler::MakeThrottle( + int max_concurrent_cost) { + return ::arrow::internal::make_unique(max_concurrent_cost); } -Status AsyncTaskGroup::AddTask(const Future<>& task_fut) { - auto guard = mutex_.Lock(); - if (finished_adding_) { - return Status::Cancelled("Ignoring task added after the task group has been ended"); - } - if (!err_.ok()) { - return err_; +namespace { + +// Very basic FIFO queue +class FifoQueue : public AsyncTaskScheduler::Queue { + using Task = AsyncTaskScheduler::Task; + void Push(std::unique_ptr task) override { tasks_.push_back(std::move(task)); } + + std::unique_ptr Pop() override { + std::unique_ptr task = std::move(tasks_.front()); + tasks_.pop_front(); + return task; } - return AddTaskUnlocked(task_fut, std::move(guard)); -} -Result AsyncTaskGroup::AddTaskIfNotEnded(const Future<>& task_fut) { - auto guard = mutex_.Lock(); - if (finished_adding_) { - return false; + const Task& Peek() override { return *tasks_.front(); } + + bool Empty() override { return tasks_.empty(); } + + void Purge() override { tasks_.clear(); } + + private: + std::list> tasks_; +}; + +class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { + public: + using Task = AsyncTaskScheduler::Task; + using Queue = AsyncTaskScheduler::Queue; + + AsyncTaskSchedulerImpl(AsyncTaskSchedulerImpl* parent, std::unique_ptr queue, + Throttle* throttle, FnOnce finish_callback) + : AsyncTaskScheduler(), + queue_(std::move(queue)), + throttle_(throttle), + finish_callback_(std::move(finish_callback)) { + if (parent == nullptr) { + owned_global_abort_ = ::arrow::internal::make_unique>(0); + global_abort_ = owned_global_abort_.get(); + } else { + global_abort_ = parent->global_abort_; + } + if (throttle != nullptr && !queue_) { + queue_ = ::arrow::internal::make_unique(); + } } - if (!err_.ok()) { - return err_; + + ~AsyncTaskSchedulerImpl() { + { + std::unique_lock lk(mutex_); + if (state_ == State::kRunning) { + AbortUnlocked( + Status::UnknownError("AsyncTaskScheduler abandoned before completion"), + std::move(lk)); + } + } + finished_.Wait(); } - ARROW_RETURN_NOT_OK(AddTaskUnlocked(task_fut, std::move(guard))); - return true; -} -Future<> AsyncTaskGroup::End() { - auto guard = mutex_.Lock(); - finished_adding_ = true; - if (running_tasks_ == 0) { - all_tasks_done_.MarkFinished(err_); - return all_tasks_done_; + bool AddTask(std::unique_ptr task) override { + std::unique_lock lk(mutex_); + // When a scheduler has been ended that usually signals to the caller that the + // scheduler is free to be deleted (and any associated resources). In this case the + // task likely has dangling pointers/references and would be unsafe to execute. + DCHECK_NE(state_, State::kEnded) + << "Attempt to add a task to a scheduler after it had ended."; + if (state_ == State::kAborted) { + return false; + } + if (global_abort_->load()) { + AbortUnlocked(Status::Cancelled("Another scheduler aborted"), std::move(lk)); + return false; + } + if (throttle_) { + // If the queue isn't empty then don't even try and acquire the throttle + // We can safely assume it is either blocked or in the middle of trying to + // alert a queued task. + if (!queue_->Empty()) { + queue_->Push(std::move(task)); + return true; + } + util::optional> maybe_backoff = throttle_->TryAcquire(task->cost()); + if (maybe_backoff) { + queue_->Push(std::move(task)); + lk.unlock(); + maybe_backoff->AddCallback([this](const Status&) { + std::unique_lock lk2(mutex_); + ContinueTasksUnlocked(std::move(lk2)); + }); + } else { + SubmitTaskUnlocked(std::move(task), std::move(lk)); + } + } else { + SubmitTaskUnlocked(std::move(task), std::move(lk)); + } + return true; } - return all_tasks_done_; -} -Future<> AsyncTaskGroup::OnFinished() const { return all_tasks_done_; } + AsyncTaskScheduler* MakeSubScheduler(FnOnce finish_callback, + Throttle* throttle, + std::unique_ptr queue) override { + std::unique_ptr owned_child = + ::arrow::internal::make_unique( + this, std::move(queue), throttle, std::move(finish_callback)); + AsyncTaskScheduler* child = owned_child.get(); + std::list>::iterator child_itr; + { + std::lock_guard lk(mutex_); + running_tasks_++; + sub_schedulers_.push_back(std::move(owned_child)); + child_itr = --sub_schedulers_.end(); + } -SerializedAsyncTaskGroup::SerializedAsyncTaskGroup() : on_finished_(Future<>::Make()) {} + struct Finalizer { + void operator()(const Status& st) { + std::unique_lock lk(self->mutex_); + FnOnce finish_callback; + if (!st.ok()) { + self->running_tasks_--; + self->AbortUnlocked(st, std::move(lk)); + return; + } else { + // We only eagerly erase the sub-scheduler on a successful completion. This is + // because, if the sub-scheduler aborted, then the caller of MakeSubScheduler + // might still be planning to call End + finish_callback = std::move((*child_itr)->finish_callback_); + self->sub_schedulers_.erase(child_itr); + } + lk.unlock(); + Status finish_st = std::move(finish_callback)(); + lk.lock(); + self->running_tasks_--; + if (!finish_st.ok()) { + self->AbortUnlocked(finish_st, std::move(lk)); + return; + } + if (self->IsFullyFinished()) { + lk.unlock(); + self->finished_.MarkFinished(self->maybe_error_); + } + } -Status SerializedAsyncTaskGroup::AddTask(std::function>()> task) { - util::Mutex::Guard guard = mutex_.Lock(); - ARROW_RETURN_NOT_OK(err_); - if (ended_) { - return Status::Cancelled("Ignoring task added after the task group has been ended"); - } - tasks_.push(std::move(task)); - if (!processing_.is_valid()) { - ConsumeAsMuchAsPossibleUnlocked(std::move(guard)); - } - return err_; -} + AsyncTaskSchedulerImpl* self; + std::list>::iterator child_itr; + }; -Future<> SerializedAsyncTaskGroup::EndUnlocked(util::Mutex::Guard&& guard) { - ended_ = true; - if (!processing_.is_valid()) { - guard.Unlock(); - on_finished_.MarkFinished(err_); + child->OnFinished().AddCallback(Finalizer{this, child_itr}); + return child; } - return on_finished_; -} -Future<> SerializedAsyncTaskGroup::End() { return EndUnlocked(mutex_.Lock()); } + void End() override { + std::unique_lock lk(mutex_); + if (state_ == State::kAborted) { + return; + } + state_ = State::kEnded; + if (running_tasks_ == 0 && (!queue_ || queue_->Empty())) { + lk.unlock(); + finished_.MarkFinished(std::move(maybe_error_)); + } + } -Future<> SerializedAsyncTaskGroup::Abort(Status err) { - util::Mutex::Guard guard = mutex_.Lock(); - err_ = std::move(err); - tasks_ = std::queue>()>>(); - return EndUnlocked(std::move(guard)); -} + Future<> OnFinished() const override { return finished_; } -void SerializedAsyncTaskGroup::ConsumeAsMuchAsPossibleUnlocked( - util::Mutex::Guard&& guard) { - while (err_.ok() && !tasks_.empty() && TryDrainUnlocked()) { + private: + void ContinueTasksUnlocked(std::unique_lock&& lk) { + while (!queue_->Empty()) { + int next_cost = queue_->Peek().cost(); + util::optional> maybe_backoff = throttle_->TryAcquire(next_cost); + if (maybe_backoff) { + lk.unlock(); + if (!maybe_backoff->TryAddCallback([this] { + return [this](const Status&) { + std::unique_lock lk2(mutex_); + ContinueTasksUnlocked(std::move(lk2)); + }; + })) { + lk.lock(); + continue; + } + return; + } else { + std::unique_ptr next_task = queue_->Pop(); + SubmitTaskUnlocked(std::move(next_task), std::move(lk)); + lk.lock(); + } + } } - if (ended_ && (!err_.ok() || tasks_.empty()) && !processing_.is_valid()) { - guard.Unlock(); - on_finished_.MarkFinished(err_); + + bool IsFullyFinished() { + return state_ != State::kRunning && (!queue_ || queue_->Empty()) && + running_tasks_ == 0; } -} -bool SerializedAsyncTaskGroup::TryDrainUnlocked() { - if (processing_.is_valid()) { - return false; + void DoSubmitTask(std::unique_ptr task) { + int cost = task->cost(); + Result> submit_result = (*task)(this); + if (!submit_result.ok()) { + global_abort_->store(true); + std::unique_lock lk(mutex_); + running_tasks_--; + AbortUnlocked(submit_result.status(), std::move(lk)); + return; + } + submit_result->AddCallback([this, cost](const Status& st) { + std::unique_lock lk(mutex_); + if (!st.ok()) { + running_tasks_--; + AbortUnlocked(st, std::move(lk)); + return; + } + if (global_abort_->load()) { + running_tasks_--; + AbortUnlocked(Status::Cancelled("Another scheduler aborted"), std::move(lk)); + return; + } + // It's perhaps a little odd to release the throttle here instead of at the end of + // this method. However, once we decrement running_tasks_ and release the lock we + // are eligible for deletion and throttle_ would become invalid. + lk.unlock(); + if (throttle_) { + throttle_->Release(cost); + } + lk.lock(); + running_tasks_--; + if (IsFullyFinished()) { + lk.unlock(); + finished_.MarkFinished(maybe_error_); + } + }); } - std::function>()> next_task = std::move(tasks_.front()); - tasks_.pop(); - Result> maybe_next_fut = next_task(); - if (!maybe_next_fut.ok()) { - err_ &= maybe_next_fut.status(); - return true; + + void AbortUnlocked(const Status& st, std::unique_lock&& lk) { + if (state_ == State::kRunning) { + maybe_error_ = st; + state_ = State::kAborted; + if (queue_) { + queue_->Purge(); + } + } else if (state_ == State::kEnded) { + if (maybe_error_.ok()) { + maybe_error_ = st; + } + } + if (running_tasks_ == 0) { + lk.unlock(); + finished_.MarkFinished(std::move(maybe_error_)); + } } - Future<> next_fut = maybe_next_fut.MoveValueUnsafe(); - if (!next_fut.TryAddCallback([this] { - return [this](const Status& st) { - util::Mutex::Guard guard = mutex_.Lock(); - processing_ = Future<>(); - err_ &= st; - ConsumeAsMuchAsPossibleUnlocked(std::move(guard)); - }; - })) { - // Didn't add callback, future already finished - err_ &= next_fut.status(); - return true; + + void SubmitTaskUnlocked(std::unique_ptr task, std::unique_lock&& lk) { + running_tasks_++; + lk.unlock(); + DoSubmitTask(std::move(task)); } - processing_ = std::move(next_fut); - return false; + + enum State { kRunning, kAborted, kEnded }; + + std::unique_ptr queue_; + Throttle* throttle_; + FnOnce finish_callback_; + + Future<> finished_ = Future<>::Make(); + int running_tasks_ = 0; + // Starts as running, then transitions to either aborted or ended + State state_ = State::kRunning; + // Starts as ok but may transition to an error if aborted. Will be the first + // error that caused the abort. If multiple errors occur, only the first is captured. + Status maybe_error_; + std::mutex mutex_; + + std::list> sub_schedulers_; + + std::unique_ptr> owned_global_abort_ = nullptr; + std::atomic* global_abort_; +}; + +} // namespace + +std::unique_ptr AsyncTaskScheduler::Make( + Throttle* throttle, std::unique_ptr queue) { + return ::arrow::internal::make_unique( + nullptr, std::move(queue), throttle, FnOnce()); } } // namespace util diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index b3ff682996619..653654668fd97 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -17,128 +17,188 @@ #pragma once -#include - #include "arrow/result.h" #include "arrow/status.h" +#include "arrow/util/functional.h" #include "arrow/util/future.h" +#include "arrow/util/make_unique.h" #include "arrow/util/mutex.h" namespace arrow { -namespace util { -/// Custom deleter for AsyncDestroyable objects -template -struct DestroyingDeleter { - void operator()(T* p) { - if (p) { - p->Destroy(); - } - } -}; +using internal::FnOnce; -/// An object which should be asynchronously closed before it is destroyed +namespace util { + +/// A utility which keeps tracks of, and schedules, asynchronous tasks /// -/// Classes can extend this to ensure that the close method is called and completed -/// before the instance is deleted. This provides smart_ptr / delete semantics for -/// objects with an asynchronous destructor. +/// An asynchronous task has a synchronous component and an asynchronous component. +/// The synchronous component typically schedules some kind of work on an external +/// resource (e.g. the I/O thread pool or some kind of kernel-based asynchronous +/// resource like io_uring). The asynchronous part represents the work +/// done on that external resource. Executing the synchronous part will be referred +/// to as "submitting the task" since this usually includes submitting the asynchronous +/// portion to the external thread pool. /// -/// Classes which extend this must be constructed using MakeSharedAsync or MakeUniqueAsync -class ARROW_EXPORT AsyncDestroyable { - public: - AsyncDestroyable(); - virtual ~AsyncDestroyable(); - - /// A future which will complete when the AsyncDestroyable has finished and is ready - /// to be deleted. - /// - /// This can be used to ensure all work done by this object has been completed before - /// proceeding. - Future<> on_closed() { return on_closed_; } - - protected: - /// Subclasses should override this and perform any cleanup. Once the future returned - /// by this method finishes then this object is eligible for destruction and any - /// reference to `this` will be invalid - virtual Future<> DoDestroy() = 0; - - private: - void Destroy(); - - Future<> on_closed_; -#ifndef NDEBUG - bool constructed_correctly_ = false; -#endif - - template - friend struct DestroyingDeleter; - template - friend std::shared_ptr MakeSharedAsync(Args&&... args); - template - friend std::unique_ptr> MakeUniqueAsync(Args&&... args); -}; - -template -std::shared_ptr MakeSharedAsync(Args&&... args) { - static_assert(std::is_base_of::value, - "Nursery::MakeSharedCloseable only works with AsyncDestroyable types"); - std::shared_ptr ptr(new T(std::forward(args)...), DestroyingDeleter()); -#ifndef NDEBUG - ptr->constructed_correctly_ = true; -#endif - return ptr; -} - -template -std::unique_ptr> MakeUniqueAsync(Args&&... args) { - static_assert(std::is_base_of::value, - "Nursery::MakeUniqueCloseable only works with AsyncDestroyable types"); - std::unique_ptr> ptr(new T(std::forward(args)...), - DestroyingDeleter()); -#ifndef NDEBUG - ptr->constructed_correctly_ = true; -#endif - return ptr; -} - -/// A utility which keeps track of a collection of asynchronous tasks +/// By default the scheduler will submit the task (execute the synchronous part) as +/// soon as it is added, assuming the underlying thread pool hasn't terminated or the +/// scheduler hasn't aborted. In this mode the scheduler is simply acting as +/// a task group, keeping track of the ongoing work. /// /// This can be used to provide structured concurrency for asynchronous development. /// A task group created at a high level can be distributed amongst low level components /// which register work to be completed. The high level job can then wait for all work /// to be completed before cleaning up. -class ARROW_EXPORT AsyncTaskGroup { +/// +/// A task scheduler must eventually be ended when all tasks have been added. Once the +/// scheduler has been ended it is an error to add further tasks. Note, it is not an +/// error to add additional tasks after a scheduler has aborted (though these tasks +/// will be ignored and never submitted). The scheduler has a futuer which will complete +/// once the scheduler has been ended AND all remaining tasks have finished executing. +/// Ending a scheduler will NOT cause the scheduler to flush existing tasks. +/// +/// Task failure (either the synchronous portion or the asynchronous portion) will cause +/// the scheduler to enter an aborted state. The first such failure will be reported in +/// the final task future. +/// +/// The scheduler can also be manually aborted. A cancellation status will be reported as +/// the final task future. +/// +/// It is also possible to limit the number of concurrent tasks the scheduler will +/// execute. This is done by setting a task limit. The task limit initially assumes all +/// tasks are equal but a custom cost can be supplied when scheduling a task (e.g. based +/// on the total I/O cost of the task, or the expected RAM utilization of the task) +/// +/// When the total number of running tasks is limited then scheduler priority may also +/// become a consideration. By default the scheduler runs with a FIFO queue but a custom +/// task queue can be provided. One could, for example, use a priority queue to control +/// the order in which tasks are executed. +/// +/// It is common to have multiple stages of execution. For example, when scanning, we +/// first inspect each fragment (the inspect stage) to figure out the row groups and then +/// we scan row groups (the scan stage) to read in the data. This sort of multi-stage +/// execution should be represented as two seperate task groups. The first task group can +/// then have a custom finish callback which ends the second task group. +class ARROW_EXPORT AsyncTaskScheduler { public: - /// Add a task to be tracked by this task group - /// - /// If a previous task has failed then adding a task will fail - /// - /// If WaitForTasksToFinish has been called and the returned future has been marked - /// completed then adding a task will fail. - Status AddTask(std::function>()> task); - /// Same as AddTask but doesn't add the task if End() has been called. - /// - /// \return true if the task was started, false if the group had already ended - Result AddTaskIfNotEnded(std::function>()> task); - /// Add a task that has already been started - Status AddTask(const Future<>& task); - /// \brief Attempt to add a task that has already been started to this group's tracking - /// - /// The return value must be paid attention to. If the return value is false then the - /// task could not be added because the group had already ended and so the caller must - /// track the external task some other way. - Result AddTaskIfNotEnded(const Future<>& task); - /// Signal that top level tasks are done being added + /// Destructor for AsyncTaskScheduler + /// + /// If a scheduler is not in the ended state when it is destroyed then it + /// will enter an aborted state. + /// + /// The destructor will block until all submitted tasks have finished. + virtual ~AsyncTaskScheduler() = default; + /// An interface for a task + /// + /// Users may want to override this, for example, to add priority + /// information for use by a queue. + class Task { + public: + virtual ~Task() = default; + /// Submit the task + /// + /// This will be called by the scheduler at most once when there + /// is space to run the task. This is expected to be a fairly quick + /// function that simply submits the actual task work to an external + /// resource (e.g. I/O thread pool). + /// + /// If this call fails then the scheduler will enter an aborted state. + virtual Result> operator()(AsyncTaskScheduler* scheduler) = 0; + /// The cost of the task + /// + /// The scheduler limits the total number of concurrent tasks based + /// on cost. A custom cost may be used, for example, if you would like + /// to limit the number of tasks based on the total expected RAM usage of + /// the tasks (this is done in the scanner) + virtual int cost() const { return 1; } + }; + + /// An interface for a task queue + /// + /// A queue's methods will not be called concurrently + class Queue { + public: + virtual ~Queue() = default; + /// Push a task to the queue + virtual void Push(std::unique_ptr task) = 0; + /// Pop the next task from the queue + virtual std::unique_ptr Pop() = 0; + /// Peek the next task in the queue + virtual const Task& Peek() = 0; + /// Check if the queue is empty + virtual bool Empty() = 0; + /// Purge the queue of all items + virtual void Purge() = 0; + }; + + class Throttle { + public: + virtual ~Throttle() = default; + /// Acquire amt permits + /// + /// If nullopt is returned then the permits were immediately + /// acquired and the caller can proceed. If a future is returned then the caller + /// should wait for the future to complete first. When the returned future completes + /// the permits have NOT been acquired and the caller must call Acquire again + virtual util::optional> TryAcquire(int amt) = 0; + /// Release amt permits + /// + /// This will possibly complete waiting futures and should probably not be + /// called while holding locks. + virtual void Release(int amt) = 0; + }; + /// Create a throttle + /// + /// This throttle is used to limit how many tasks can run at once. The + /// user should keep the throttle alive for the lifetime of the scheduler. + /// The same throttle can be used in multiple schedulers. + static std::unique_ptr MakeThrottle(int max_concurrent_cost); + + /// Add a task to the scheduler + /// + /// If the scheduler is in an aborted state this call will return false and the task + /// will never be run. This is harmless and does not need to be guarded against. + /// + /// If the scheduler is in an ended state then this call will cause an abort. This + /// represents a logic error in the program and should be avoidable. + /// + /// If there are no limits on the number of concurrent tasks then the submit function + /// will be run immediately. + /// + /// Otherwise, if there is a limit to the number of concurrent tasks, then this task + /// will be inserted into the scheduler's queue and submitted when there is space. + /// + /// The return value for this call can usually be ignored. There is little harm in + /// attempting to add tasks to an aborted scheduler. It is only included for callers + /// that want to avoid future task generation. + /// + /// \return true if the task was submitted or queued, false if the task was ignored + virtual bool AddTask(std::unique_ptr task) = 0; + + template + struct SimpleTask : public Task { + explicit SimpleTask(Callable callable) : callable(std::move(callable)) {} + Result> operator()(AsyncTaskScheduler* scheduler) override { + return callable(); + } + Callable callable; + }; + + template + bool AddSimpleTask(Callable callable) { + return AddTask( + ::arrow::internal::make_unique>(std::move(callable))); + } + /// Signal that tasks are done being added /// - /// It is allowed for tasks to be added after this call provided the future has not yet - /// completed. This should be safe as long as the tasks being added are added as part - /// of a task that is tracked. As soon as the count of running tasks reaches 0 this - /// future will be marked complete. + /// If the scheduler is in an aborted state then this call will have no effect. /// - /// Any attempt to add a task after the returned future has completed will fail. + /// Otherwise, this will transition the scheduler into the ended state. Once all + /// remaining tasks have finished the OnFinished future will be marked completed. /// - /// The returned future that will finish when all running tasks have finished. - Future<> End(); + /// Once the scheduler is in an ended state then adding tasks is invalid and any + /// attempt to do so will cause an abort. + virtual void End() = 0; /// A future that will be finished after End is called and all tasks have completed /// /// This is the same future that is returned by End() but calling this method does @@ -147,70 +207,42 @@ class ARROW_EXPORT AsyncTaskGroup { /// /// This is a utility method for workflows where the finish future needs to be /// referenced before all top level tasks have been queued. - Future<> OnFinished() const; + virtual Future<> OnFinished() const = 0; - private: - Status AddTaskUnlocked(const Future<>& task, util::Mutex::Guard guard); - - bool finished_adding_ = false; - int running_tasks_ = 0; - Status err_; - Future<> all_tasks_done_ = Future<>::Make(); - util::Mutex mutex_; -}; - -/// A task group which serializes asynchronous tasks in a push-based workflow -/// -/// Tasks will be executed in the order they are added -/// -/// This will buffer results in an unlimited fashion so it should be combined -/// with some kind of backpressure -class ARROW_EXPORT SerializedAsyncTaskGroup { - public: - SerializedAsyncTaskGroup(); - /// Push an item into the serializer and (eventually) into the consumer + /// Create a sub-scheduler for tracking a subset of tasks /// - /// The item will not be delivered to the consumer until all previous items have been - /// consumed. + /// The parent scheduler will manage the lifetime of the sub-scheduler. It will + /// be destroyed once it is finished. /// - /// If the consumer returns an error then this serializer will go into an error state - /// and all subsequent pushes will fail with that error. Pushes that have been queued - /// but not delivered will be silently dropped. + /// Often some state needs to be associated with a subset of tasks. + /// For example, when scanning a dataset we need to keep a file reader + /// alive for all of the read tasks for each file. A sub-scheduler can be used to do + /// this. /// - /// \return True if the item was pushed immediately to the consumer, false if it was - /// queued - Status AddTask(std::function>()> task); - - /// Signal that all top level tasks have been added + /// The parent scheduler may be ended before all of its sub-schedulers + /// are ended. /// - /// The returned future that will finish when all tasks have been consumed. - Future<> End(); - - /// Abort a task group + /// If either the parent scheduler or the sub-scheduler encounter an error + /// then they will both enter an aborted state (this is a shared state). + /// Finish callbacks will not be run when the scheduler is aborted. /// - /// Tasks that have not been started will be discarded + /// The parent scheduler will not complete until the sub-scheduler's + /// tasks (and finish callback) have all executed. /// - /// The returned future will finish when all running tasks have finished. - Future<> Abort(Status err); + /// A sub-scheduler can share the same throttle as its parent but it + /// can also have its own unique throttle. + virtual AsyncTaskScheduler* MakeSubScheduler( + FnOnce finish_callback, Throttle* throttle = NULLPTR, + std::unique_ptr queue = NULLPTR) = 0; - /// A future that finishes when all queued items have been delivered. + /// Construct a scheduler /// - /// This will return the same future returned by End but will not signal - /// that all tasks have been finished. End must be called at some point in order for - /// this future to finish. - Future<> OnFinished() const { return on_finished_; } - - private: - void ConsumeAsMuchAsPossibleUnlocked(util::Mutex::Guard&& guard); - Future<> EndUnlocked(util::Mutex::Guard&& guard); - bool TryDrainUnlocked(); - - Future<> on_finished_; - std::queue>()>> tasks_; - util::Mutex mutex_; - bool ended_ = false; - Status err_; - Future<> processing_; + /// \param throttle A throttle to control how many tasks will be submitted at one time. + /// The default (nullptr) will always submit tasks when they are added. + /// \param queue A queue to control task order. Only used if throttle != nullptr. + /// The default (nullptr) will use a FIFO queue if there is a throttle. + static std::unique_ptr Make(Throttle* throttle = NULLPTR, + std::unique_ptr queue = NULLPTR); }; } // namespace util diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc index 3ad2bc15f587e..25a3ca77cea1f 100644 --- a/cpp/src/arrow/util/async_util_test.cc +++ b/cpp/src/arrow/util/async_util_test.cc @@ -17,281 +17,456 @@ #include "arrow/util/async_util.h" +#include +#include +#include +#include +#include #include +#include #include #include "arrow/result.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/future.h" +#include "arrow/util/make_unique.h" namespace arrow { namespace util { -class GatingDestroyable : public AsyncDestroyable { - public: - GatingDestroyable(Future<> close_future, bool* destroyed) - : close_future_(std::move(close_future)), destroyed_(destroyed) {} - ~GatingDestroyable() override { *destroyed_ = true; } - - protected: - Future<> DoDestroy() override { return close_future_; } - - private: - Future<> close_future_; - bool* destroyed_; -}; +TEST(AsyncTaskScheduler, ShouldScheduleConcurrentTasks) { + constexpr int kMaxConcurrentTasks = 2; + constexpr int kTotalNumTasks = kMaxConcurrentTasks + 1; + std::unique_ptr throttle = + AsyncTaskScheduler::MakeThrottle(kMaxConcurrentTasks); + // A basic test to make sure we schedule the right number of concurrent tasks + std::unique_ptr task_group = + AsyncTaskScheduler::Make(throttle.get()); + Future<> futures[kTotalNumTasks]; + bool submitted[kTotalNumTasks]; + for (int i = 0; i < kTotalNumTasks; i++) { + futures[i] = Future<>::Make(); + submitted[i] = false; + task_group->AddSimpleTask([&, i] { + submitted[i] = true; + return futures[i]; + }); + } + task_group->End(); + AssertNotFinished(task_group->OnFinished()); + for (int i = 0; i < kTotalNumTasks; i++) { + if (i < kMaxConcurrentTasks) { + ASSERT_TRUE(submitted[i]); + } else { + ASSERT_FALSE(submitted[i]); + } + } -template -void TestAsyncDestroyable(Factory factory) { - Future<> gate = Future<>::Make(); - bool destroyed = false; - bool on_closed = false; - { - auto obj = factory(gate, &destroyed); - obj->on_closed().AddCallback([&](const Status& st) { on_closed = true; }); - ASSERT_FALSE(destroyed); + for (int j = 0; j < kTotalNumTasks; j++) { + futures[j].MarkFinished(); + if (j + kMaxConcurrentTasks < kTotalNumTasks) { + ASSERT_TRUE(submitted[j + kMaxConcurrentTasks]); + } } - ASSERT_FALSE(destroyed); - ASSERT_FALSE(on_closed); - gate.MarkFinished(); - ASSERT_TRUE(destroyed); - ASSERT_TRUE(on_closed); + ASSERT_FINISHES_OK(task_group->OnFinished()); } -TEST(AsyncDestroyable, MakeShared) { - TestAsyncDestroyable([](Future<> gate, bool* destroyed) { - return MakeSharedAsync(gate, destroyed); +TEST(AsyncTaskScheduler, Abandoned) { + // The goal here is to ensure that an abandoned scheduler aborts. + // It should block until all submitted tasks finish. It should not + // submit any pending tasks. + bool submitted_task_finished = false; + bool pending_task_submitted = false; + std::unique_ptr throttle = + AsyncTaskScheduler::MakeThrottle(1); + Future<> finished_fut; + Future<> first_task = Future<>::Make(); + AsyncTaskScheduler* weak_scheduler; + std::thread delete_scheduler_thread; + { + std::unique_ptr scheduler = + AsyncTaskScheduler::Make(throttle.get()); + weak_scheduler = scheduler.get(); + finished_fut = scheduler->OnFinished(); + // This task will start and should finish + scheduler->AddSimpleTask([&, first_task] { + return first_task.Then([&] { submitted_task_finished = true; }); + }); + // This task will never be submitted + scheduler->AddSimpleTask([&] { + pending_task_submitted = true; + return Future<>::MakeFinished(); + }); + // We don't want to finish the first task until after the scheduler has been abandoned + // and entered an aborted state. However, deleting the scheduler blocks until the + // first task is finished. So we trigger the delete on a separate thread. + struct DeleteSchedulerTask { + void operator()() { scheduler.reset(); } + std::unique_ptr scheduler; + }; + delete_scheduler_thread = std::thread(DeleteSchedulerTask{std::move(scheduler)}); + } + // Here we are waiting for the scheduler to enter aborted state. Once aborted the + // scheduler will no longer accept new tasks and will return false. + BusyWait(10, [&] { + SleepABit(); + return !weak_scheduler->AddSimpleTask([] { return Future<>::MakeFinished(); }); }); + // Now that the scheduler deletion is in progress we should be able to finish the + // first task and be confident the second task should not be submitted. + first_task.MarkFinished(); + ASSERT_FINISHES_AND_RAISES(UnknownError, finished_fut); + delete_scheduler_thread.join(); + ASSERT_TRUE(submitted_task_finished); + ASSERT_FALSE(pending_task_submitted); } -// The next four tests are corner cases but can sometimes occur when using these types -// in standard containers on certain versions of the compiler/cpplib. Basically we -// want to make sure our deleter is ok with null pointers. -TEST(AsyncDestroyable, DefaultUnique) { - std::unique_ptr> default_ptr; - default_ptr.reset(); -} - -TEST(AsyncDestroyable, NullUnique) { - std::unique_ptr> null_ptr( - nullptr); - null_ptr.reset(); -} - -TEST(AsyncDestroyable, NullShared) { - std::shared_ptr null_ptr(nullptr, - DestroyingDeleter()); - null_ptr.reset(); +TEST(AsyncTaskScheduler, TaskFailsAfterEnd) { + std::unique_ptr scheduler = AsyncTaskScheduler::Make(); + Future<> task = Future<>::Make(); + scheduler->AddSimpleTask([task] { return task; }); + scheduler->End(); + AssertNotFinished(scheduler->OnFinished()); + task.MarkFinished(Status::Invalid("XYZ")); + ASSERT_FINISHES_AND_RAISES(Invalid, scheduler->OnFinished()); } -TEST(AsyncDestroyable, NullUniqueToShared) { - std::unique_ptr> null_ptr( - nullptr); - std::shared_ptr null_shared = std::move(null_ptr); - null_shared.reset(); -} - -TEST(AsyncDestroyable, MakeUnique) { - TestAsyncDestroyable([](Future<> gate, bool* destroyed) { - return MakeUniqueAsync(gate, destroyed); +TEST(AsyncTaskScheduler, SubSchedulerFinishCallback) { + bool finish_callback_ran = false; + std::unique_ptr scheduler = AsyncTaskScheduler::Make(); + AsyncTaskScheduler* sub_scheduler = scheduler->MakeSubScheduler([&] { + finish_callback_ran = true; + return Status::OK(); }); + ASSERT_FALSE(finish_callback_ran); + sub_scheduler->End(); + ASSERT_TRUE(finish_callback_ran); + scheduler->End(); + ASSERT_FINISHES_OK(scheduler->OnFinished()); } -template -class TypedTestAsyncTaskGroup : public ::testing::Test {}; - -using AsyncTaskGroupTypes = ::testing::Types; - -TYPED_TEST_SUITE(TypedTestAsyncTaskGroup, AsyncTaskGroupTypes); - -TYPED_TEST(TypedTestAsyncTaskGroup, Basic) { - TypeParam task_group; - Future<> fut1 = Future<>::Make(); - Future<> fut2 = Future<>::Make(); - ASSERT_OK(task_group.AddTask([fut1]() { return fut1; })); - ASSERT_OK(task_group.AddTask([fut2]() { return fut2; })); - Future<> all_done = task_group.End(); - AssertNotFinished(all_done); - fut1.MarkFinished(); - AssertNotFinished(all_done); - fut2.MarkFinished(); - ASSERT_FINISHES_OK(all_done); -} - -TYPED_TEST(TypedTestAsyncTaskGroup, NoTasks) { - TypeParam task_group; - ASSERT_FINISHES_OK(task_group.End()); -} - -TYPED_TEST(TypedTestAsyncTaskGroup, OnFinishedDoesNotEnd) { - TypeParam task_group; - Future<> on_finished = task_group.OnFinished(); - AssertNotFinished(on_finished); - ASSERT_FINISHES_OK(task_group.End()); - ASSERT_FINISHES_OK(on_finished); +TEST(AsyncTaskScheduler, SubSchedulerFinishAbort) { + bool finish_callback_ran = false; + std::unique_ptr scheduler = AsyncTaskScheduler::Make(); + AsyncTaskScheduler* sub_scheduler = scheduler->MakeSubScheduler([&] { + finish_callback_ran = true; + return Status::Invalid("XYZ"); + }); + ASSERT_FALSE(finish_callback_ran); + sub_scheduler->End(); + ASSERT_TRUE(finish_callback_ran); + scheduler->End(); + ASSERT_FINISHES_AND_RAISES(Invalid, scheduler->OnFinished()); } -TYPED_TEST(TypedTestAsyncTaskGroup, AddAfterDone) { - TypeParam task_group; - ASSERT_FINISHES_OK(task_group.End()); - ASSERT_RAISES(Cancelled, task_group.AddTask([] { return Future<>::Make(); })); +FnOnce EmptyFinishCallback() { + return [] { return Status::OK(); }; } -TYPED_TEST(TypedTestAsyncTaskGroup, AddAfterEndButBeforeFinish) { - TypeParam task_group; - Future<> task_one = Future<>::Make(); - ASSERT_OK(task_group.AddTask([task_one] { return task_one; })); - Future<> finish_fut = task_group.End(); - AssertNotFinished(finish_fut); - ASSERT_RAISES(Cancelled, task_group.AddTask([] { return Future<>::Make(); })); - AssertNotFinished(finish_fut); - task_one.MarkFinished(); - AssertFinished(finish_fut); - ASSERT_FINISHES_OK(finish_fut); +TEST(AsyncTaskScheduler, SubSchedulerNoticesParentAbort) { + std::unique_ptr parent = AsyncTaskScheduler::Make(); + std::unique_ptr child_throttle = + AsyncTaskScheduler::MakeThrottle(1); + AsyncTaskScheduler* child = + parent->MakeSubScheduler(EmptyFinishCallback(), child_throttle.get()); + + Future<> task = Future<>::Make(); + bool was_submitted = false; + ASSERT_TRUE(child->AddSimpleTask([task] { return task; })); + ASSERT_TRUE(child->AddSimpleTask([&was_submitted] { + was_submitted = true; + return Future<>::MakeFinished(); + })); + ASSERT_TRUE(parent->AddSimpleTask([] { return Status::Invalid("XYZ"); })); + ASSERT_FALSE(child->AddSimpleTask([task] { return task; })); + task.MarkFinished(); + child->End(); + parent->End(); + ASSERT_FINISHES_AND_RAISES(Invalid, parent->OnFinished()); } -TYPED_TEST(TypedTestAsyncTaskGroup, Error) { - TypeParam task_group; - Future<> failed_task = Future<>::MakeFinished(Status::Invalid("XYZ")); - ASSERT_RAISES(Invalid, task_group.AddTask([failed_task] { return failed_task; })); - ASSERT_FINISHES_AND_RAISES(Invalid, task_group.End()); +TEST(AsyncTaskScheduler, SubSchedulerNoTasks) { + // An unended sub-scheduler should keep the parent scheduler unfinished even if there + // there are no tasks. + std::unique_ptr parent = AsyncTaskScheduler::Make(); + AsyncTaskScheduler* child = parent->MakeSubScheduler(EmptyFinishCallback()); + parent->End(); + AssertNotFinished(parent->OnFinished()); + child->End(); + ASSERT_FINISHES_OK(parent->OnFinished()); } -TYPED_TEST(TypedTestAsyncTaskGroup, ErrorWhileNotEmpty) { - TypeParam task_group; - Future<> pending_task = Future<>::Make(); - Future<> will_fail_task = Future<>::Make(); - Future<> after_fail_task = Future<>::Make(); - ASSERT_OK(task_group.AddTask([pending_task] { return pending_task; })); - ASSERT_OK(task_group.AddTask([will_fail_task] { return will_fail_task; })); - ASSERT_OK(task_group.AddTask([after_fail_task] { return after_fail_task; })); - Future<> end = task_group.End(); - AssertNotFinished(end); - pending_task.MarkFinished(); - will_fail_task.MarkFinished(Status::Invalid("XYZ")); - after_fail_task.MarkFinished(); - ASSERT_FINISHES_AND_RAISES(Invalid, end); -} +class CustomThrottle : public AsyncTaskScheduler::Throttle { + public: + virtual util::optional> TryAcquire(int amt) { + if (gate_.is_finished()) { + return nullopt; + } else { + return gate_; + } + } + virtual void Release(int amt) {} + void Unlock() { gate_.MarkFinished(); } -TYPED_TEST(TypedTestAsyncTaskGroup, TaskFactoryFails) { - TypeParam task_group; - ASSERT_RAISES(Invalid, task_group.AddTask([] { return Status::Invalid("XYZ"); })); - ASSERT_RAISES(Invalid, task_group.AddTask([] { return Future<>::Make(); })); - ASSERT_FINISHES_AND_RAISES(Invalid, task_group.End()); -} + private: + Future<> gate_ = Future<>::Make(); +}; -TYPED_TEST(TypedTestAsyncTaskGroup, AddAfterFailed) { - TypeParam task_group; - ASSERT_RAISES(Invalid, task_group.AddTask([] { - return Future<>::MakeFinished(Status::Invalid("XYZ")); +TEST(AsyncTaskScheduler, EndWaitsForAddedButNotSubmittedTasks) { + /// If a scheduler ends then unsubmitted tasks should still be executed + std::unique_ptr throttle = + AsyncTaskScheduler::MakeThrottle(1); + std::unique_ptr task_group = + AsyncTaskScheduler::Make(throttle.get()); + Future<> slow_task = Future<>::Make(); + bool was_run = false; + ASSERT_TRUE(task_group->AddSimpleTask([slow_task] { return slow_task; })); + ASSERT_TRUE(task_group->AddSimpleTask([&was_run] { + was_run = true; + return Future<>::MakeFinished(); })); - ASSERT_RAISES(Invalid, task_group.AddTask([] { return Future<>::Make(); })); - ASSERT_FINISHES_AND_RAISES(Invalid, task_group.End()); -} - -TYPED_TEST(TypedTestAsyncTaskGroup, Stress) { - constexpr int NTASKS = 100; - TypeParam task_group; - std::vector threads; - for (int i = 0; i < NTASKS; i++) { - ASSERT_OK(task_group.AddTask([&threads] { - Future<> fut = Future<>::Make(); - threads.emplace_back([fut]() mutable { fut.MarkFinished(); }); - return fut; - })); - } - ASSERT_FINISHES_OK(task_group.End()); - for (auto& thread : threads) { - thread.join(); - } + ASSERT_FALSE(was_run); + task_group->End(); + slow_task.MarkFinished(); + ASSERT_FINISHES_OK(task_group->OnFinished()); + ASSERT_TRUE(was_run); + + /// Same test but block task by custom throttle + auto custom_throttle = ::arrow::internal::make_unique(); + task_group = AsyncTaskScheduler::Make(custom_throttle.get()); + was_run = false; + ASSERT_TRUE(task_group->AddSimpleTask([&was_run] { + was_run = true; + return Future<>::MakeFinished(); + })); + ASSERT_FALSE(was_run); + task_group->End(); + custom_throttle->Unlock(); + ASSERT_FINISHES_OK(task_group->OnFinished()); + ASSERT_TRUE(was_run); } -TEST(StandardAsyncTaskGroup, TaskFinishesAfterError) { - AsyncTaskGroup task_group; +TEST(AsyncTaskScheduler, TaskFinishesAfterError) { + /// If a task fails it shouldn't impact previously submitted tasks + std::unique_ptr task_group = AsyncTaskScheduler::Make(); Future<> fut1 = Future<>::Make(); - ASSERT_OK(task_group.AddTask([fut1] { return fut1; })); - ASSERT_RAISES(Invalid, task_group.AddTask([] { - return Future<>::MakeFinished(Status::Invalid("XYZ")); - })); - Future<> finished_fut = task_group.End(); + ASSERT_TRUE(task_group->AddSimpleTask([fut1] { return fut1; })); + ASSERT_TRUE(task_group->AddSimpleTask( + [] { return Future<>::MakeFinished(Status::Invalid("XYZ")); })); + task_group->End(); + Future<> finished_fut = task_group->OnFinished(); AssertNotFinished(finished_fut); fut1.MarkFinished(); ASSERT_FINISHES_AND_RAISES(Invalid, finished_fut); } -TEST(StandardAsyncTaskGroup, FailAfterAdd) { - AsyncTaskGroup task_group; +TEST(AsyncTaskScheduler, FailAfterAdd) { + /// If a task fails it shouldn't impact tasks that have been submitted + /// even if they were submitted later + std::unique_ptr task_group = AsyncTaskScheduler::Make(); Future<> will_fail = Future<>::Make(); - ASSERT_OK(task_group.AddTask([will_fail] { return will_fail; })); + ASSERT_TRUE(task_group->AddSimpleTask([will_fail] { return will_fail; })); Future<> added_later_and_passes = Future<>::Make(); - ASSERT_OK( - task_group.AddTask([added_later_and_passes] { return added_later_and_passes; })); + ASSERT_TRUE(task_group->AddSimpleTask( + [added_later_and_passes] { return added_later_and_passes; })); will_fail.MarkFinished(Status::Invalid("XYZ")); - ASSERT_RAISES(Invalid, task_group.AddTask([] { return Future<>::Make(); })); - Future<> finished_fut = task_group.End(); + ASSERT_FALSE(task_group->AddSimpleTask([] { return Future<>::Make(); })); + task_group->End(); + Future<> finished_fut = task_group->OnFinished(); AssertNotFinished(finished_fut); added_later_and_passes.MarkFinished(); AssertFinished(finished_fut); ASSERT_FINISHES_AND_RAISES(Invalid, finished_fut); } -// The serialized task group can never really get into a "fail after add" scenario -// because there is no parallelism. So the behavior is a little unique in these scenarios - -TEST(SerializedAsyncTaskGroup, TaskFinishesAfterError) { - SerializedAsyncTaskGroup task_group; - Future<> fut1 = Future<>::Make(); - ASSERT_OK(task_group.AddTask([fut1] { return fut1; })); - ASSERT_OK( - task_group.AddTask([] { return Future<>::MakeFinished(Status::Invalid("XYZ")); })); - Future<> finished_fut = task_group.End(); - AssertNotFinished(finished_fut); - fut1.MarkFinished(); - ASSERT_FINISHES_AND_RAISES(Invalid, finished_fut); -} - -TEST(SerializedAsyncTaskGroup, FailAfterAdd) { - SerializedAsyncTaskGroup task_group; +TEST(AsyncTaskScheduler, PurgeUnsubmitted) { + /// If a task fails then unsubmitted tasks should not be executed + std::unique_ptr throttle = + AsyncTaskScheduler::MakeThrottle(1); + std::unique_ptr task_group = + AsyncTaskScheduler::Make(throttle.get()); Future<> will_fail = Future<>::Make(); - ASSERT_OK(task_group.AddTask([will_fail] { return will_fail; })); - Future<> added_later_and_passes = Future<>::Make(); - bool added_later_and_passes_created = false; - ASSERT_OK(task_group.AddTask([added_later_and_passes, &added_later_and_passes_created] { - added_later_and_passes_created = true; - return added_later_and_passes; + ASSERT_TRUE(task_group->AddSimpleTask([will_fail] { return will_fail; })); + bool was_submitted = false; + ASSERT_TRUE(task_group->AddSimpleTask([&was_submitted] { + was_submitted = false; + return Future<>::MakeFinished(); })); will_fail.MarkFinished(Status::Invalid("XYZ")); - ASSERT_RAISES(Invalid, task_group.AddTask([] { return Future<>::Make(); })); - ASSERT_FINISHES_AND_RAISES(Invalid, task_group.End()); - ASSERT_FALSE(added_later_and_passes_created); + task_group->End(); + Future<> finished_fut = task_group->OnFinished(); + ASSERT_FINISHES_AND_RAISES(Invalid, finished_fut); + ASSERT_FALSE(was_submitted); +} + +TEST(AsyncTaskScheduler, FifoStress) { + // Regresses an issue where adding a task, when the throttle was + // just cleared, could lead to the added task being run immediately, + // even though there were queued tasks. + constexpr int kNumIters = 100; + for (int i = 0; i < kNumIters; i++) { + std::atomic middle_task_run{false}; + std::unique_ptr throttle = + AsyncTaskScheduler::MakeThrottle(1); + std::unique_ptr task_group = + AsyncTaskScheduler::Make(throttle.get()); + task_group->AddSimpleTask([] { return SleepABitAsync(); }); + task_group->AddSimpleTask([&] { + middle_task_run = true; + return Future<>::MakeFinished(); + }); + SleepABit(); + task_group->AddSimpleTask([&] { + EXPECT_TRUE(middle_task_run); + return Future<>::MakeFinished(); + }); + } +} + +TEST(AsyncTaskScheduler, MaxConcurrentTasksStress) { + constexpr int kNumIters = 100; + constexpr int kNumTasks = 32; + constexpr int kNumConcurrentTasks = 8; + for (int i = 0; i < kNumIters; i++) { + std::unique_ptr throttle = + AsyncTaskScheduler::MakeThrottle(kNumConcurrentTasks); + std::unique_ptr task_group = + AsyncTaskScheduler::Make(throttle.get()); + std::atomic num_tasks_running{0}; + for (int task_idx = 0; task_idx < kNumTasks; task_idx++) { + task_group->AddSimpleTask([&num_tasks_running, kNumConcurrentTasks] { + if (num_tasks_running.fetch_add(1) > kNumConcurrentTasks) { + ADD_FAILURE() << "More than " << kNumConcurrentTasks + << " tasks were allowed to run concurrently"; + } + return SleepABitAsync().Then( + [&num_tasks_running] { num_tasks_running.fetch_sub(1); }); + }); + } + task_group->End(); + ASSERT_FINISHES_OK(task_group->OnFinished()); + } } -TEST(SerializedAsyncTaskGroup, Abort) { - SerializedAsyncTaskGroup task_group; - struct Task { - bool started = false; - Future<> finished = Future<>::Make(); - }; - auto task_factory = [](Task& task) -> std::function()> { - return [&task] { - task.started = true; - return task.finished; +TEST(AsyncTaskScheduler, ScanningStress) { + // Simulates the scanner's use of the scheduler + // The top level scheduler scans over fragments and + // for each fragment a sub-scheduler is created that scans + // that fragment. The sub-schedulers all share a common throttle + /// If a task fails then unsubmitted tasks should not be executed + constexpr int kNumIters = 16; + constexpr int kNumFragments = 16; + constexpr int kBatchesPerFragment = 8; + constexpr int kNumConcurrentTasks = 4; + constexpr int kExpectedBatchesScanned = kNumFragments * kBatchesPerFragment; + + for (int i = 0; i < kNumIters; i++) { + std::unique_ptr batch_limit = + AsyncTaskScheduler::MakeThrottle(kNumConcurrentTasks); + std::unique_ptr listing_scheduler = AsyncTaskScheduler::Make(); + std::atomic batches_scanned{0}; + std::atomic fragments_scanned{0}; + auto scan_batch = [&] { batches_scanned++; }; + auto submit_scan = [&]() { return SleepABitAsync().Then(scan_batch); }; + auto list_fragment = [&]() { + AsyncTaskScheduler* batch_scheduler = + listing_scheduler->MakeSubScheduler(EmptyFinishCallback(), batch_limit.get()); + for (int i = 0; i < kBatchesPerFragment; i++) { + ASSERT_TRUE(batch_scheduler->AddSimpleTask(submit_scan)); + } + batch_scheduler->End(); + if (++fragments_scanned == kNumFragments) { + listing_scheduler->End(); + } }; - }; - Task one, two; - ASSERT_OK(task_group.AddTask(task_factory(one))); - ASSERT_OK(task_group.AddTask(task_factory(two))); - Future<> group_done = task_group.OnFinished(); - AssertNotFinished(group_done); - ASSERT_TRUE(one.started); - ASSERT_FALSE(two.started); - Future<> abort_done = task_group.Abort(Status::Invalid("XYZ")); - AssertNotFinished(abort_done); - one.finished.MarkFinished(); - ASSERT_FINISHES_AND_RAISES(Invalid, group_done); - ASSERT_FINISHES_AND_RAISES(Invalid, abort_done); - ASSERT_FALSE(two.started); + auto submit_list_fragment = [&]() { return SleepABitAsync().Then(list_fragment); }; + for (int frag_idx = 0; frag_idx < kNumFragments; frag_idx++) { + ASSERT_TRUE(listing_scheduler->AddSimpleTask(submit_list_fragment)); + } + ASSERT_FINISHES_OK(listing_scheduler->OnFinished()); + ASSERT_EQ(kExpectedBatchesScanned, batches_scanned.load()); + } +} + +class TaskWithPriority : public AsyncTaskScheduler::Task { + public: + TaskWithPriority(std::function>()> task, int priority) + : task(std::move(task)), priority(priority) {} + Result> operator()(AsyncTaskScheduler* scheduler) override { return task(); } + + std::function>()> task; + int priority; +}; + +struct TaskWithPriorityCompare { + bool operator()(TaskWithPriority* left, TaskWithPriority* right) { + return left->priority < right->priority; + } +}; + +// A priority queue that prefers tasks with higher priority +class PriorityQueue : public AsyncTaskScheduler::Queue { + public: + using Task = AsyncTaskScheduler::Task; + void Push(std::unique_ptr task) { + queue_.push(static_cast(task.release())); + } + std::unique_ptr Pop() { + TaskWithPriority* top = queue_.top(); + queue_.pop(); + return std::unique_ptr(top); + } + const Task& Peek() { return *queue_.top(); } + bool Empty() { return queue_.empty(); } + void Purge() { + while (!queue_.empty()) { + queue_.pop(); + } + } + + private: + std::priority_queue, + TaskWithPriorityCompare> + queue_; +}; + +TEST(AsyncTaskScheduler, Priority) { + constexpr int kNumTasks = 32; + constexpr int kNumConcurrentTasks = 8; + std::unique_ptr throttle = + AsyncTaskScheduler::MakeThrottle(kNumConcurrentTasks); + std::unique_ptr task_group = AsyncTaskScheduler::Make( + throttle.get(), ::arrow::internal::make_unique()); + + std::shared_ptr gate = GatingTask::Make(); + int submit_order[kNumTasks]; + std::atomic order_index{0}; + + for (int task_idx = 0; task_idx < kNumTasks; task_idx++) { + int priority = task_idx; + std::function>()> task_exec = [&, priority]() -> Result> { + submit_order[order_index++] = priority; + return gate->AsyncTask(); + }; + auto task = ::arrow::internal::make_unique(task_exec, priority); + task_group->AddTask(std::move(task)); + } + task_group->End(); + AssertNotFinished(task_group->OnFinished()); + + ASSERT_OK(gate->WaitForRunning(kNumConcurrentTasks)); + ASSERT_OK(gate->Unlock()); + + for (int i = 0; i < kNumConcurrentTasks; i++) { + // The first tasks will be submitted immediately since the queue is empty + ASSERT_EQ(submit_order[i], i); + } + // After that the remaining tasks will run in LIFO order because of the priority + for (int i = kNumConcurrentTasks; i < kNumTasks; i++) { + ASSERT_EQ(submit_order[i], kNumTasks - i - 1 + kNumConcurrentTasks); + } } } // namespace util