From b056e07b8aefe73a26239da481d92a8562c41833 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 10 Feb 2023 17:08:43 -0800 Subject: [PATCH] GH-34059: [C++] Add a fetch node based on a batch index (#34060) This PR introduces the concept of `ExecBatch:index` but does not yet do much with it. As a proof of concept this PR adds a fetch node which can be inserted anywhere in the plan (not just at the sink) to satisfy `LIMIT x OFFSET y` (Substrait calls this fetch and so I have also). This PR also introduces two sequencing accumulation queues which will be useful, I hope, for anyone implementing nodes that rely on ordered execution. This PR unfortunately introduces a new query option which is whether or not the sink node should pay the small performance hit required to sequence output. While considering how best to add this option I realized we will probably have more query options in the near future regarding "how much RAM to use" (e.g. spillover) and potentially more beyond that. So I have taken all the options and put them into `arrow::compute::QueryOptions` (this already existed but it was not user facing and I added more things to it). I added a new DeclarationToXyz overload that accepts QueryOptions. This has, unfortunately, led to a bit of overload explosion but I think this should be the last new addition to the overload set (and we can deprecate the older overloads at some point). This PR also includes a new `gen::Gen / gen::TestGen` facility for generating test tables for input. I'd like to eventually use this to simplify some of the existing exec plan tests as well. I'm willing to split this into a separate PR if that makes sense. * Closes: #34059 Authored-by: Weston Pace Signed-off-by: Weston Pace --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/compute/exec.h | 9 + cpp/src/arrow/compute/exec/CMakeLists.txt | 7 + .../arrow/compute/exec/accumulation_queue.cc | 112 +++++++++ .../arrow/compute/exec/accumulation_queue.h | 100 ++++++++ cpp/src/arrow/compute/exec/exec_plan.cc | 216 ++++++++++++------ cpp/src/arrow/compute/exec/exec_plan.h | 49 ++++ cpp/src/arrow/compute/exec/fetch_node.cc | 207 +++++++++++++++++ cpp/src/arrow/compute/exec/fetch_node_test.cc | 90 ++++++++ cpp/src/arrow/compute/exec/options.h | 27 ++- cpp/src/arrow/compute/exec/query_context.cc | 5 +- cpp/src/arrow/compute/exec/query_context.h | 16 +- cpp/src/arrow/compute/exec/sink_node.cc | 57 ++++- cpp/src/arrow/compute/exec/source_node.cc | 2 + cpp/src/arrow/compute/exec/test_nodes.cc | 116 ++++++++++ cpp/src/arrow/compute/exec/test_nodes.h | 14 ++ cpp/src/arrow/compute/exec/test_nodes_test.cc | 58 +++++ cpp/src/arrow/testing/generator.cc | 214 +++++++++++++++++ cpp/src/arrow/testing/generator.h | 84 +++++++ 19 files changed, 1278 insertions(+), 106 deletions(-) create mode 100644 cpp/src/arrow/compute/exec/fetch_node.cc create mode 100644 cpp/src/arrow/compute/exec/fetch_node_test.cc create mode 100644 cpp/src/arrow/compute/exec/test_nodes_test.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 26ca425718e3c..6382fdfb3a778 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -393,6 +393,7 @@ if(ARROW_COMPUTE) compute/exec/bloom_filter.cc compute/exec/exec_plan.cc compute/exec/expression.cc + compute/exec/fetch_node.cc compute/exec/filter_node.cc compute/exec/hash_join.cc compute/exec/hash_join_dict.cc diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index c3ccdd4f871f9..8128d84a12b15 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -151,6 +151,9 @@ class ARROW_EXPORT SelectionVector { const int32_t* indices_; }; +/// An index to represent that a batch does not belong to an ordered stream +constexpr int64_t kUnsequencedIndex = -1; + /// \brief A unit of work for kernel execution. It contains a collection of /// Array and Scalar values and an optional SelectionVector indicating that /// there is an unmaterialized filter that either must be materialized, or (if @@ -209,6 +212,12 @@ struct ARROW_EXPORT ExecBatch { /// whether any values are Scalar. int64_t length = 0; + /// \brief index of this batch in a sorted stream of batches + /// + /// This index must be strictly monotonic starting at 0 without gaps or + /// it can be set to kUnsequencedIndex if there is no meaningful order + int64_t index = kUnsequencedIndex; + /// \brief The sum of bytes in each buffer referenced by the batch /// /// Note: Scalars are not counted diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index 9e1e8911d27d3..b0ddf0c539f47 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -29,6 +29,13 @@ add_arrow_compute_test(plan_test "arrow-compute" SOURCES plan_test.cc + test_nodes_test.cc + test_nodes.cc) +add_arrow_compute_test(fetch_node_test + PREFIX + "arrow-compute" + SOURCES + fetch_node_test.cc test_nodes.cc) add_arrow_compute_test(hash_join_node_test PREFIX diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.cc b/cpp/src/arrow/compute/exec/accumulation_queue.cc index 192db52942820..1ffc93a086bf6 100644 --- a/cpp/src/arrow/compute/exec/accumulation_queue.cc +++ b/cpp/src/arrow/compute/exec/accumulation_queue.cc @@ -18,6 +18,11 @@ #include "arrow/compute/exec/accumulation_queue.h" #include +#include +#include +#include + +#include "arrow/util/logging.h" namespace arrow { namespace util { @@ -54,5 +59,112 @@ void AccumulationQueue::Clear() { } ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; } + +namespace { + +struct LowestBatchIndexAtTop { + bool operator()(const ExecBatch& left, const ExecBatch& right) const { + return left.index > right.index; + } +}; + +class SequencingQueueImpl : public SequencingQueue { + public: + explicit SequencingQueueImpl(Processor* processor) : processor_(processor) {} + + Status InsertBatch(ExecBatch batch) override { + std::unique_lock lk(mutex_); + if (batch.index == next_index_) { + return DeliverNextUnlocked(std::move(batch), std::move(lk)); + } + queue_.emplace(std::move(batch)); + return Status::OK(); + } + + private: + Status DeliverNextUnlocked(ExecBatch batch, std::unique_lock&& lk) { + // Should be able to detect and avoid this at plan construction + DCHECK_NE(batch.index, ::arrow::compute::kUnsequencedIndex) + << "attempt to use a sequencing queue on an unsequenced stream of batches"; + std::vector tasks; + next_index_++; + ARROW_ASSIGN_OR_RAISE(std::optional this_task, + processor_->Process(std::move(batch))); + while (!queue_.empty() && next_index_ == queue_.top().index) { + ARROW_ASSIGN_OR_RAISE(std::optional task, processor_->Process(queue_.top())); + if (task) { + tasks.push_back(std::move(*task)); + } + queue_.pop(); + next_index_++; + } + lk.unlock(); + // Schedule tasks for stale items + for (auto& task : tasks) { + processor_->Schedule(std::move(task)); + } + // Run the current item immediately + if (this_task) { + ARROW_RETURN_NOT_OK(std::move(*this_task)()); + } + return Status::OK(); + } + + Processor* processor_; + + std::priority_queue, LowestBatchIndexAtTop> queue_; + int next_index_ = 0; + std::mutex mutex_; +}; + +class SerialSequencingQueueImpl : public SerialSequencingQueue { + public: + explicit SerialSequencingQueueImpl(Processor* processor) : processor_(processor) {} + + Status InsertBatch(ExecBatch batch) override { + std::unique_lock lk(mutex_); + queue_.push(std::move(batch)); + if (queue_.top().index == next_index_ && !is_processing_) { + is_processing_ = true; + return DoProcess(std::move(lk)); + } + return Status::OK(); + } + + private: + Status DoProcess(std::unique_lock&& lk) { + while (!queue_.empty() && queue_.top().index == next_index_) { + ExecBatch next(queue_.top()); + queue_.pop(); + next_index_++; + lk.unlock(); + // ARROW_RETURN_NOT_OK may return early here. In that case is_processing_ will + // never switch to false so no other threads can process but that should be ok + // since we failed anyways. It is important however, that we do not hold the lock. + ARROW_RETURN_NOT_OK(processor_->Process(std::move(next))); + lk.lock(); + } + is_processing_ = false; + return Status::OK(); + } + + Processor* processor_; + + std::mutex mutex_; + std::priority_queue, LowestBatchIndexAtTop> queue_; + int next_index_ = 0; + bool is_processing_ = false; +}; + +} // namespace + +std::unique_ptr SequencingQueue::Make(Processor* processor) { + return std::make_unique(processor); +} + +std::unique_ptr SerialSequencingQueue::Make(Processor* processor) { + return std::make_unique(processor); +} + } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.h b/cpp/src/arrow/compute/exec/accumulation_queue.h index 4b23e5ffcac54..5d1b5be5a3aea 100644 --- a/cpp/src/arrow/compute/exec/accumulation_queue.h +++ b/cpp/src/arrow/compute/exec/accumulation_queue.h @@ -18,9 +18,12 @@ #pragma once #include +#include +#include #include #include "arrow/compute/exec.h" +#include "arrow/result.h" namespace arrow { namespace util { @@ -53,5 +56,102 @@ class AccumulationQueue { std::vector batches_; }; +/// A queue that sequences incoming batches +/// +/// This can be used when a node needs to do some kind of ordered processing on +/// the stream. +/// +/// Batches can be inserted in any order. The process_callback will be called on +/// the batches, in order, without reentrant calls. For this reason the callback +/// should be quick. +/// +/// For example, in a top-n node, the process callback should determine how many +/// rows need to be delivered for the given batch, and then return a task to actually +/// deliver those rows. +class SequencingQueue { + public: + using Task = std::function; + + /// Strategy that describes how to handle items + class Processor { + public: + /// Process the batch, potentially generating a task + /// + /// This method will be called on each batch in order. Calls to this method + /// will be serialized and it will not be called reentrantly. This makes it + /// safe to do things that rely on order but minimal time should be spent here + /// to avoid becoming a bottlneck. + /// + /// \return a follow-up task that will be scheduled. The follow-up task(s) are + /// is not guaranteed to run in any particular order. If nullopt is + /// returned then nothing will be scheduled. + virtual Result> Process(ExecBatch batch) = 0; + /// Schedule a task + virtual void Schedule(Task task) = 0; + }; + + virtual ~SequencingQueue() = default; + + /// Insert a batch into the queue + /// + /// This will insert the batch into the queue. If this batch was the next batch + /// to deliver then this will trigger 1+ calls to the process callback to generate + /// 1+ tasks. + /// + /// The task generated by this call will be executed immediately. The remaining + /// tasks will be scheduled using the schedule callback. + /// + /// From a data pipeline perspective the sequencing queue is a "sometimes" breaker. If + /// a task arrives in order then this call will usually execute the downstream pipeline. + /// If this task arrives early then this call will only queue the data. + virtual Status InsertBatch(ExecBatch batch) = 0; + + /// Create a queue + /// \param processor describes how to process the batches, must outlive the queue + static std::unique_ptr Make(Processor* processor); +}; + +/// A queue that sequences incoming batches +/// +/// Unlike SequencingQueue the Process method is not expected to schedule new tasks. +/// +/// If a batch arrives and another thread is currently processing then the batch +/// will be queued and control will return. In other words, delivery of batches will +/// not block on the Process method. +/// +/// It can be helpful to think of this as if a dedicated thread is running Process as +/// batches arrive +class SerialSequencingQueue { + public: + /// Strategy that describes how to handle items + class Processor { + public: + /// Process the batch + /// + /// This method will be called on each batch in order. Calls to this method + /// will be serialized and it will not be called reentrantly. This makes it + /// safe to do things that rely on order. + /// + /// If this falls behind then data may accumulate + /// + /// TODO: Could add backpressure if needed but right now all uses of this should + /// be pretty fast and so are unlikely to block. + virtual Status Process(ExecBatch batch) = 0; + }; + + virtual ~SerialSequencingQueue() = default; + + /// Insert a batch into the queue + /// + /// This will insert the batch into the queue. If this batch was the next batch + /// to deliver then this may trigger calls to the processor which will be run + /// as part of this call. + virtual Status InsertBatch(ExecBatch batch) = 0; + + /// Create a queue + /// \param processor describes how to process the batches, must outlive the queue + static std::unique_ptr Make(Processor* processor); +}; + } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index a187d4346fad2..629460b755209 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -607,6 +607,94 @@ Result> DeclarationToSchema(const Declaration& declarati return last_node->inputs()[0]->output_schema(); } +namespace { + +Future> DeclarationToTableImpl( + Declaration declaration, QueryOptions query_options, + ::arrow::internal::Executor* cpu_executor) { + ExecContext exec_ctx(query_options.memory_pool, cpu_executor, + query_options.function_registry); + std::shared_ptr> output_table = + std::make_shared>(); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, ExecPlan::Make(exec_ctx)); + TableSinkNodeOptions sink_options(output_table.get()); + sink_options.sequence_output = query_options.sequence_output; + Declaration with_sink = + Declaration::Sequence({declaration, {"table_sink", sink_options}}); + ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get())); + ARROW_RETURN_NOT_OK(exec_plan->Validate()); + exec_plan->StartProducing(); + return exec_plan->finished().Then([exec_plan, output_table] { return *output_table; }); +} + +Future>> DeclarationToBatchesImpl( + Declaration declaration, QueryOptions options, + ::arrow::internal::Executor* cpu_executor) { + return DeclarationToTableImpl(std::move(declaration), options, cpu_executor) + .Then([](const std::shared_ptr& table) { + return TableBatchReader(table).ToRecordBatches(); + }); +} + +Future DeclarationToExecBatchesImpl( + Declaration declaration, QueryOptions options, + ::arrow::internal::Executor* cpu_executor) { + std::shared_ptr out_schema; + AsyncGenerator> sink_gen; + ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, ExecPlan::Make(exec_ctx)); + SinkNodeOptions sink_options(&sink_gen, &out_schema); + sink_options.sequence_delivery = options.sequence_output; + Declaration with_sink = Declaration::Sequence({declaration, {"sink", sink_options}}); + ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get())); + ARROW_RETURN_NOT_OK(exec_plan->Validate()); + exec_plan->StartProducing(); + auto collected_fut = CollectAsyncGenerator(sink_gen); + return AllFinished({exec_plan->finished(), Future<>(collected_fut)}) + .Then([collected_fut, exec_plan, + schema = std::move(out_schema)]() -> Result { + ARROW_ASSIGN_OR_RAISE(auto collected, collected_fut.result()); + std::vector exec_batches = ::arrow::internal::MapVector( + [](std::optional batch) { return batch.value_or(ExecBatch()); }, + std::move(collected)); + return BatchesWithCommonSchema{std::move(exec_batches), schema}; + }); +} + +Future<> DeclarationToStatusImpl(Declaration declaration, QueryOptions options, + ::arrow::internal::Executor* cpu_executor) { + ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, ExecPlan::Make(exec_ctx)); + ARROW_ASSIGN_OR_RAISE(ExecNode * last_node, declaration.AddToPlan(exec_plan.get())); + if (!last_node->is_sink()) { + Declaration null_sink = + Declaration("consuming_sink", {last_node}, + ConsumingSinkNodeOptions(NullSinkNodeConsumer::Make())); + ARROW_RETURN_NOT_OK(null_sink.AddToPlan(exec_plan.get())); + } + ARROW_RETURN_NOT_OK(exec_plan->Validate()); + exec_plan->StartProducing(); + // Keep the exec_plan alive until it finishes + return exec_plan->finished().Then([exec_plan]() {}); +} + +QueryOptions QueryOptionsFromCustomExecContext(ExecContext exec_context) { + QueryOptions options; + options.memory_pool = exec_context.memory_pool(); + options.function_registry = exec_context.func_registry(); + return options; +} + +QueryOptions QueryOptionsFromArgs(MemoryPool* memory_pool, + FunctionRegistry* function_registry) { + QueryOptions options; + options.memory_pool = memory_pool; + options.function_registry = function_registry; + return options; +} + +} // namespace + Result DeclarationToString(const Declaration& declaration, FunctionRegistry* function_registry) { // We pass in the default memory pool and the CPU executor but nothing we are doing @@ -623,29 +711,21 @@ Result DeclarationToString(const Declaration& declaration, Future> DeclarationToTableAsync(Declaration declaration, ExecContext exec_context) { - std::shared_ptr> output_table = - std::make_shared>(); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, - ExecPlan::Make(exec_context)); - Declaration with_sink = Declaration::Sequence( - {declaration, {"table_sink", TableSinkNodeOptions(output_table.get())}}); - ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get())); - ARROW_RETURN_NOT_OK(exec_plan->Validate()); - exec_plan->StartProducing(); - return exec_plan->finished().Then([exec_plan, output_table] { return *output_table; }); + return DeclarationToTableImpl(declaration, + QueryOptionsFromCustomExecContext(exec_context), + exec_context.executor()); } Future> DeclarationToTableAsync( Declaration declaration, bool use_threads, MemoryPool* memory_pool, FunctionRegistry* function_registry) { + QueryOptions query_options = QueryOptionsFromArgs(memory_pool, function_registry); if (use_threads) { - ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(), - function_registry); - return DeclarationToTableAsync(std::move(declaration), ctx); + return DeclarationToTableImpl(std::move(declaration), query_options, + ::arrow::internal::GetCpuThreadPool()); } else { ARROW_ASSIGN_OR_RAISE(std::shared_ptr tpool, ThreadPool::Make(1)); - ExecContext ctx(memory_pool, tpool.get(), function_registry); - return DeclarationToTableAsync(std::move(declaration), ctx) + return DeclarationToTableImpl(std::move(declaration), query_options, tpool.get()) .Then([tpool](const std::shared_ptr
& table) { return table; }); } } @@ -656,31 +736,42 @@ Result> DeclarationToTable(Declaration declaration, FunctionRegistry* function_registry) { return ::arrow::internal::RunSynchronously>>( [=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) { - ExecContext ctx(memory_pool, executor, function_registry); - return DeclarationToTableAsync(std::move(declaration), ctx); + return DeclarationToTableImpl( + std::move(declaration), QueryOptionsFromArgs(memory_pool, function_registry), + executor); }, use_threads); } +Result> DeclarationToTable(Declaration declaration, + QueryOptions query_options) { + if (query_options.custom_cpu_executor != nullptr) { + return Status::Invalid("Cannot use synchronous methods with a custom CPU executor"); + } + return ::arrow::internal::RunSynchronously>>( + [=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) { + return DeclarationToTableImpl(std::move(declaration), query_options, executor); + }, + query_options.use_threads); +} + Future>> DeclarationToBatchesAsync( Declaration declaration, ExecContext exec_context) { - return DeclarationToTableAsync(std::move(declaration), exec_context) - .Then([](const std::shared_ptr
& table) { - return TableBatchReader(table).ToRecordBatches(); - }); + return DeclarationToBatchesImpl(std::move(declaration), + QueryOptionsFromCustomExecContext(exec_context), + exec_context.executor()); } Future>> DeclarationToBatchesAsync( Declaration declaration, bool use_threads, MemoryPool* memory_pool, FunctionRegistry* function_registry) { + QueryOptions query_options = QueryOptionsFromArgs(memory_pool, function_registry); if (use_threads) { - ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(), - function_registry); - return DeclarationToBatchesAsync(std::move(declaration), ctx); + return DeclarationToBatchesImpl(std::move(declaration), query_options, + ::arrow::internal::GetCpuThreadPool()); } else { ARROW_ASSIGN_OR_RAISE(std::shared_ptr tpool, ThreadPool::Make(1)); - ExecContext ctx(memory_pool, tpool.get(), function_registry); - return DeclarationToBatchesAsync(std::move(declaration), ctx) + return DeclarationToBatchesImpl(std::move(declaration), query_options, tpool.get()) .Then([tpool](const std::vector>& batches) { return batches; }); @@ -693,46 +784,31 @@ Result>> DeclarationToBatches( return ::arrow::internal::RunSynchronously< Future>>>( [=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) { - ExecContext ctx(memory_pool, executor, function_registry); - return DeclarationToBatchesAsync(std::move(declaration), ctx); + return DeclarationToBatchesImpl( + std::move(declaration), QueryOptionsFromArgs(memory_pool, function_registry), + executor); }, use_threads); } Future DeclarationToExecBatchesAsync(Declaration declaration, ExecContext exec_context) { - std::shared_ptr out_schema; - AsyncGenerator> sink_gen; - ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, - ExecPlan::Make(exec_context)); - Declaration with_sink = Declaration::Sequence( - {declaration, {"sink", SinkNodeOptions(&sink_gen, &out_schema)}}); - ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get())); - ARROW_RETURN_NOT_OK(exec_plan->Validate()); - exec_plan->StartProducing(); - auto collected_fut = CollectAsyncGenerator(sink_gen); - return AllFinished({exec_plan->finished(), Future<>(collected_fut)}) - .Then([collected_fut, exec_plan, - schema = std::move(out_schema)]() -> Result { - ARROW_ASSIGN_OR_RAISE(auto collected, collected_fut.result()); - std::vector exec_batches = ::arrow::internal::MapVector( - [](std::optional batch) { return batch.value_or(ExecBatch()); }, - std::move(collected)); - return BatchesWithCommonSchema{std::move(exec_batches), schema}; - }); + return DeclarationToExecBatchesImpl(std::move(declaration), + QueryOptionsFromCustomExecContext(exec_context), + exec_context.executor()); } Future DeclarationToExecBatchesAsync( Declaration declaration, bool use_threads, MemoryPool* memory_pool, FunctionRegistry* function_registry) { + QueryOptions query_options = QueryOptionsFromArgs(memory_pool, function_registry); if (use_threads) { - ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(), - function_registry); - return DeclarationToExecBatchesAsync(std::move(declaration), ctx); + return DeclarationToExecBatchesImpl(std::move(declaration), query_options, + ::arrow::internal::GetCpuThreadPool()); } else { ARROW_ASSIGN_OR_RAISE(std::shared_ptr tpool, ThreadPool::Make(1)); - ExecContext ctx(memory_pool, tpool.get(), function_registry); - return DeclarationToExecBatchesAsync(std::move(declaration), ctx) + return DeclarationToExecBatchesImpl(std::move(declaration), query_options, + tpool.get()) .Then([tpool](const BatchesWithCommonSchema& batches) { return batches; }); } } @@ -743,38 +819,30 @@ Result DeclarationToExecBatches( return ::arrow::internal::RunSynchronously>( [=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) { ExecContext ctx(memory_pool, executor, function_registry); - return DeclarationToExecBatchesAsync(std::move(declaration), ctx); + return DeclarationToExecBatchesImpl( + std::move(declaration), QueryOptionsFromArgs(memory_pool, function_registry), + executor); }, use_threads); } Future<> DeclarationToStatusAsync(Declaration declaration, ExecContext exec_context) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr exec_plan, - ExecPlan::Make(exec_context)); - ARROW_ASSIGN_OR_RAISE(ExecNode * last_node, declaration.AddToPlan(exec_plan.get())); - if (!last_node->is_sink()) { - Declaration null_sink = - Declaration("consuming_sink", {last_node}, - ConsumingSinkNodeOptions(NullSinkNodeConsumer::Make())); - ARROW_RETURN_NOT_OK(null_sink.AddToPlan(exec_plan.get())); - } - ARROW_RETURN_NOT_OK(exec_plan->Validate()); - exec_plan->StartProducing(); - // Keep the exec_plan alive until it finishes - return exec_plan->finished().Then([exec_plan]() {}); + return DeclarationToStatusImpl(std::move(declaration), + QueryOptionsFromCustomExecContext(exec_context), + exec_context.executor()); } Future<> DeclarationToStatusAsync(Declaration declaration, bool use_threads, MemoryPool* memory_pool, FunctionRegistry* function_registry) { + QueryOptions query_options = QueryOptionsFromArgs(memory_pool, function_registry); if (use_threads) { - ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(), - function_registry); - return DeclarationToStatusAsync(std::move(declaration), ctx); + return DeclarationToStatusImpl(std::move(declaration), query_options, + ::arrow::internal::GetCpuThreadPool()); } else { ARROW_ASSIGN_OR_RAISE(std::shared_ptr tpool, ThreadPool::Make(1)); - ExecContext ctx(memory_pool, tpool.get(), function_registry); - return DeclarationToStatusAsync(std::move(declaration), ctx).Then([tpool]() {}); + return DeclarationToStatusImpl(std::move(declaration), query_options, tpool.get()) + .Then([tpool]() {}); } } @@ -783,7 +851,9 @@ Status DeclarationToStatus(Declaration declaration, bool use_threads, return ::arrow::internal::RunSynchronously>( [=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) { ExecContext ctx(memory_pool, executor, function_registry); - return DeclarationToStatusAsync(std::move(declaration), ctx); + return DeclarationToStatusImpl( + std::move(declaration), QueryOptionsFromArgs(memory_pool, function_registry), + executor); }, use_threads); } @@ -893,6 +963,7 @@ Result> DeclarationToReader( namespace internal { void RegisterSourceNode(ExecFactoryRegistry*); +void RegisterFetchNode(ExecFactoryRegistry*); void RegisterFilterNode(ExecFactoryRegistry*); void RegisterProjectNode(ExecFactoryRegistry*); void RegisterUnionNode(ExecFactoryRegistry*); @@ -908,6 +979,7 @@ ExecFactoryRegistry* default_exec_factory_registry() { public: DefaultRegistry() { internal::RegisterSourceNode(this); + internal::RegisterFetchNode(this); internal::RegisterFilterNode(this); internal::RegisterProjectNode(this); internal::RegisterUnionNode(this); diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index dc875ef479550..c2738945c27cb 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -408,6 +408,52 @@ struct ARROW_EXPORT Declaration { std::string label; }; +struct ARROW_EXPORT QueryOptions { + /// \brief Should the plan use a legacy batching strategy + /// + /// This is currently in place only to support the Scanner::ToTable + /// method. This method relies on batch indices from the scanner + /// remaining consistent. This is impractical in the ExecPlan which + /// might slice batches as needed (e.g. for a join) + /// + /// However, it still works for simple plans and this is the only way + /// we have at the moment for maintaining implicit order. + bool use_legacy_batching = false; + + /// If the output has a meaningful order then sequence the output of the plan + /// + /// If the output has no meaningful order then this option will be ignored. + bool sequence_output = false; + + /// \brief should the plan use multiple background threads for CPU-intensive work + /// + /// If this is false then all CPU work will be done on the calling thread. I/O tasks + /// will still happen on the I/O executor and may be multi-threaded (but should not use + /// significant CPU resources). + /// + /// Will be ignored if custom_cpu_executor is set + bool use_threads = true; + + /// \brief custom executor to use for CPU-intensive work + /// + /// Must be null or remain valid for the duration of the plan. If this is null then + /// a default thread pool will be chosen whose behavior will be controlled by + /// the `use_threads` option. + ::arrow::internal::Executor* custom_cpu_executor = NULLPTR; + + /// \brief a memory pool to use for allocations + /// + /// Must be null or remain valid for the duration of the plan. If this is null then + /// the arrow::default_memory_pool() will be used. + MemoryPool* memory_pool = NULLPTR; + + /// \brief a function registry to use for the plan + /// + /// Must be null or remain valid for the duration of the plan. If this is null then + /// defaults to arrow::compute::GetFunctionRegistry() + FunctionRegistry* function_registry = NULLPTR; +}; + /// \brief Calculate the output schema of a declaration /// /// This does not actually execute the plan. This operation may fail if the @@ -457,6 +503,9 @@ ARROW_EXPORT Result> DeclarationToTable( MemoryPool* memory_pool = default_memory_pool(), FunctionRegistry* function_registry = NULLPTR); +ARROW_EXPORT Result> DeclarationToTable( + Declaration declaration, QueryOptions query_options); + /// \brief Asynchronous version of \see DeclarationToTable /// /// \param declaration A declaration describing the plan to run diff --git a/cpp/src/arrow/compute/exec/fetch_node.cc b/cpp/src/arrow/compute/exec/fetch_node.cc new file mode 100644 index 0000000000000..d78bdd581b208 --- /dev/null +++ b/cpp/src/arrow/compute/exec/fetch_node.cc @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/compute/api_vector.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/accumulation_queue.h" +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec/map_node.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/query_context.h" +#include "arrow/compute/exec/util.h" +#include "arrow/datum.h" +#include "arrow/result.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/future.h" +#include "arrow/util/logging.h" +#include "arrow/util/tracing_internal.h" + +namespace arrow { + +using internal::checked_cast; + +namespace compute { +namespace { + +class FetchCounter { + public: + struct Page { + int64_t to_skip; + int64_t to_send; + bool ended; + }; + + FetchCounter(int64_t rows_to_skip, int64_t rows_to_send) + : rows_to_send_(rows_to_send), rows_to_skip_(rows_to_skip) {} + + Page NextPage(const ExecBatch& batch) { + int64_t rows_in_batch_to_skip = 0; + if (rows_to_skip_ > 0) { + rows_in_batch_to_skip = std::min(rows_to_skip_, batch.length); + rows_to_skip_ -= rows_in_batch_to_skip; + } + + int64_t rows_in_batch_to_send = 0; + if (rows_to_send_ > 0) { + rows_in_batch_to_send = + std::min(rows_to_send_, batch.length - rows_in_batch_to_skip); + rows_to_send_ -= rows_in_batch_to_send; + } + return {rows_in_batch_to_skip, rows_in_batch_to_send, rows_to_send_ == 0}; + } + + private: + int64_t rows_to_send_; + int64_t rows_to_skip_; +}; + +class FetchNode : public ExecNode, public TracedNode, util::SequencingQueue::Processor { + public: + FetchNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, int64_t offset, int64_t count) + : ExecNode(plan, std::move(inputs), {"input"}, std::move(output_schema)), + TracedNode(this), + offset_(offset), + count_(count), + fetch_counter_(offset, count), + sequencing_queue_(util::SequencingQueue::Make(this)) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "FetchNode")); + + const auto& fetch_options = checked_cast(options); + + int64_t offset = fetch_options.offset; + int64_t count = fetch_options.count; + + if (offset < 0) { + return Status::Invalid("`offset` must be non-negative"); + } + if (count < 0) { + return Status::Invalid("`count` must be non-negative"); + } + + std::shared_ptr output_schema = inputs[0]->output_schema(); + return plan->EmplaceNode(plan, std::move(inputs), std::move(output_schema), + offset, count); + } + + const char* kind_name() const override { return "FetchNode"; } + + Status InputFinished(ExecNode* input, int total_batches) override { + DCHECK_EQ(input, inputs_[0]); + EVENT_ON_CURRENT_SPAN("InputFinished", {{"batches.length", total_batches}}); + // Normally we will finish in InputFinished because we sent count_ rows. However, it + // is possible that the input does not contain count_ rows and so we have to end from + // here + if (in_batch_counter_.SetTotal(total_batches)) { + if (!finished_) { + finished_ = true; + ARROW_RETURN_NOT_OK(inputs_[0]->StopProducing()); + ARROW_RETURN_NOT_OK(output_->InputFinished(this, out_batch_count_)); + } + } + return Status::OK(); + } + + Status StartProducing() override { + NoteStartProducing(ToStringExtra()); + return Status::OK(); + } + + void PauseProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->PauseProducing(this, counter); + } + + void ResumeProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->ResumeProducing(this, counter); + } + + Status StopProducingImpl() override { return Status::OK(); } + + Status InputReceived(ExecNode* input, ExecBatch batch) override { + auto scope = TraceInputReceived(batch); + DCHECK_EQ(input, inputs_[0]); + + return sequencing_queue_->InsertBatch(std::move(batch)); + } + + Result> Process(ExecBatch batch) override { + if (finished_) { + return std::nullopt; + } + FetchCounter::Page page = fetch_counter_.NextPage(batch); + std::optional task_or_none; + if (page.to_send > 0) { + int new_index = out_batch_count_++; + task_or_none = [this, to_send = page.to_send, to_skip = page.to_skip, new_index, + batch = std::move(batch)]() mutable { + ExecBatch batch_to_send = std::move(batch); + if (to_skip > 0 || to_send < batch_to_send.length) { + batch_to_send = batch_to_send.Slice(to_skip, to_send); + } + batch_to_send.index = new_index; + return output_->InputReceived(this, std::move(batch_to_send)); + }; + } + // In the in_batch_counter_ case we've run out of data to process (count_ was + // greater than the total # of non-skipped rows) In the page.ended case we've + // just hit our desired output count + if (in_batch_counter_.Increment() || (page.ended && !finished_)) { + finished_ = true; + ARROW_RETURN_NOT_OK(inputs_[0]->StopProducing()); + ARROW_RETURN_NOT_OK(output_->InputFinished(this, out_batch_count_)); + } + return task_or_none; + } + + void Schedule(util::SequencingQueue::Task task) override { + plan_->query_context()->ScheduleTask(std::move(task), "FetchNode::ProcessBatch"); + } + + protected: + std::string ToStringExtra(int indent = 0) const override { + std::stringstream ss; + ss << "offset=" << offset_ << " count=" << count_; + return ss.str(); + } + + private: + bool finished_ = false; + int64_t offset_; + int64_t count_; + AtomicCounter in_batch_counter_; + int32_t out_batch_count_ = 0; + FetchCounter fetch_counter_; + std::unique_ptr sequencing_queue_; +}; + +} // namespace + +namespace internal { + +void RegisterFetchNode(ExecFactoryRegistry* registry) { + DCHECK_OK(registry->AddFactory(std::string(FetchNodeOptions::kName), FetchNode::Make)); +} + +} // namespace internal +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/fetch_node_test.cc b/cpp/src/arrow/compute/exec/fetch_node_test.cc new file mode 100644 index 0000000000000..24671035d6d69 --- /dev/null +++ b/cpp/src/arrow/compute/exec/fetch_node_test.cc @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include + +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_nodes.h" +#include "arrow/table.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" + +namespace arrow { +namespace compute { + +static constexpr int kRowsPerBatch = 16; +static constexpr int kNumBatches = 32; + +std::shared_ptr
TestTable() { + return gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerBatch, kNumBatches); +} + +void CheckFetch(FetchNodeOptions options) { + constexpr random::SeedType kSeed = 42; + constexpr int kJitterMod = 4; + RegisterTestNodes(); + std::shared_ptr
input = TestTable(); + Declaration plan = + Declaration::Sequence({{"table_source", TableSourceNodeOptions(input)}, + {"jitter", JitterNodeOptions(kSeed, kJitterMod)}, + {"fetch", options}}); + for (bool use_threads : {false, true}) { + QueryOptions query_options; + query_options.sequence_output = true; + query_options.use_threads = use_threads; + ASSERT_OK_AND_ASSIGN(std::shared_ptr
actual, + DeclarationToTable(plan, query_options)); + + if (options.offset >= input->num_rows() || options.count == 0) { + // In these cases, Table::Slice would fail or give us a table with 1 chunk while + // the fetch node gives us a table with 0 chunks + ASSERT_EQ(0, actual->num_rows()); + } else { + std::shared_ptr
expected = input->Slice(options.offset, options.count); + AssertTablesEqual(*expected, *actual); + } + } +} + +void CheckFetchInvalid(FetchNodeOptions options, const std::string& message) { + std::shared_ptr
input = TestTable(); + Declaration plan = Declaration::Sequence( + {{"table_source", TableSourceNodeOptions(input)}, {"fetch", options}}); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr(message), + DeclarationToStatus(std::move(plan))); +} + +TEST(FetchNode, Basic) { + CheckFetch({0, 20}); + CheckFetch({20, 20}); + CheckFetch({0, 1000}); + CheckFetch({1000, 20}); + CheckFetch({50, 50}); + CheckFetch({0, 0}); +} + +TEST(FetchNode, Invalid) { + CheckFetchInvalid({-1, 10}, "`offset` must be non-negative"); + CheckFetchInvalid({10, -1}, "`count` must be non-negative"); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index d96e31cc5b2fe..bd2bbcb8e64a0 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -175,6 +175,14 @@ class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions { Expression filter_expression; }; +class ARROW_EXPORT FetchNodeOptions : public ExecNodeOptions { + public: + static constexpr std::string_view kName = "fetch"; + FetchNodeOptions(int64_t offset, int64_t count) : offset(offset), count(count) {} + int64_t offset; + int64_t count; +}; + /// \brief Make a node which executes expressions on input batches, producing new batches. /// /// Each expression will be evaluated against each batch which is pushed to @@ -244,25 +252,30 @@ struct ARROW_EXPORT BackpressureOptions { /// \brief Add a sink node which forwards to an AsyncGenerator /// -/// Emitted batches will not be ordered. +/// Emitted batches will only be ordered if there is a meaningful ordering +/// and sequence_delivery is set to true. class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions { public: explicit SinkNodeOptions(std::function>()>* generator, std::shared_ptr* schema, BackpressureOptions backpressure = {}, - BackpressureMonitor** backpressure_monitor = NULLPTR) + BackpressureMonitor** backpressure_monitor = NULLPTR, + bool sequence_delivery = false) : generator(generator), schema(schema), backpressure(backpressure), - backpressure_monitor(backpressure_monitor) {} + backpressure_monitor(backpressure_monitor), + sequence_delivery(false) {} explicit SinkNodeOptions(std::function>()>* generator, BackpressureOptions backpressure = {}, - BackpressureMonitor** backpressure_monitor = NULLPTR) + BackpressureMonitor** backpressure_monitor = NULLPTR, + bool sequence_delivery = false) : generator(generator), schema(NULLPTR), backpressure(std::move(backpressure)), - backpressure_monitor(backpressure_monitor) {} + backpressure_monitor(backpressure_monitor), + sequence_delivery(false) {} /// \brief A pointer to a generator of batches. /// @@ -286,6 +299,8 @@ class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions { /// the amount of data currently queued in the sink node. This is an optional utility /// and backpressure can be applied even if this is not used. BackpressureMonitor** backpressure_monitor; + /// \brief If true and there is a meaningful ordering then sequence delivered batches + bool sequence_delivery; }; /// \brief Control used by a SinkNodeConsumer to pause & resume @@ -340,6 +355,7 @@ class ARROW_EXPORT ConsumingSinkNodeOptions : public ExecNodeOptions { /// If specified then names must be provided for all fields. Currently, only a flat /// schema is supported (see ARROW-15901). std::vector names; + bool sequence_output = false; }; /// \brief Make a node which sorts rows passed through it @@ -563,6 +579,7 @@ class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions { : output_table(output_table) {} std::shared_ptr
* output_table; + bool sequence_output = false; }; /// @} diff --git a/cpp/src/arrow/compute/exec/query_context.cc b/cpp/src/arrow/compute/exec/query_context.cc index 65a1a67418183..384bf3f3ab531 100644 --- a/cpp/src/arrow/compute/exec/query_context.cc +++ b/cpp/src/arrow/compute/exec/query_context.cc @@ -22,7 +22,6 @@ namespace arrow { using internal::CpuInfo; namespace compute { -QueryOptions::QueryOptions() : use_legacy_batching(false) {} QueryContext::QueryContext(QueryOptions opts, ExecContext exec_context) : options_(opts), @@ -64,8 +63,8 @@ void QueryContext::ScheduleTask(std::function fn, std::string_view nam ::arrow::internal::Executor* exec = executor(); // Adds a task which submits fn to the executor and tracks its progress. If we're // already stopping then the task is ignored and fn is not executed. - async_scheduler_->AddSimpleTask([exec, fn]() { return exec->Submit(std::move(fn)); }, - name); + async_scheduler_->AddSimpleTask( + [exec, fn = std::move(fn)]() mutable { return exec->Submit(std::move(fn)); }, name); } void QueryContext::ScheduleTask(std::function fn, std::string_view name) { diff --git a/cpp/src/arrow/compute/exec/query_context.h b/cpp/src/arrow/compute/exec/query_context.h index 10f151b3188ba..453a9a7d7972a 100644 --- a/cpp/src/arrow/compute/exec/query_context.h +++ b/cpp/src/arrow/compute/exec/query_context.h @@ -19,29 +19,17 @@ #include #include "arrow/compute/exec.h" +#include "arrow/compute/exec/exec_plan.h" #include "arrow/compute/exec/task_util.h" #include "arrow/compute/exec/util.h" #include "arrow/io/interfaces.h" #include "arrow/util/async_util.h" +#include "arrow/util/type_fwd.h" namespace arrow { using io::IOContext; namespace compute { -struct ARROW_EXPORT QueryOptions { - QueryOptions(); - - /// \brief Should the plan use a legacy batching strategy - /// - /// This is currently in place only to support the Scanner::ToTable - /// method. This method relies on batch indices from the scanner - /// remaining consistent. This is impractical in the ExecPlan which - /// might slice batches as needed (e.g. for a join) - /// - /// However, it still works for simple plans and this is the only way - /// we have at the moment for maintaining implicit order. - bool use_legacy_batching; -}; class ARROW_EXPORT QueryContext { public: diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 0b179cd6cd803..6ad32c7adc64e 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -23,6 +23,7 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/exec.h" +#include "arrow/compute/exec/accumulation_queue.h" #include "arrow/compute/exec/exec_plan.h" #include "arrow/compute/exec/expression.h" #include "arrow/compute/exec/options.h" @@ -97,12 +98,14 @@ class BackpressureReservoir : public BackpressureMonitor { const uint64_t pause_if_above_; }; -class SinkNode : public ExecNode, public TracedNode { +class SinkNode : public ExecNode, + public TracedNode, + util::SerialSequencingQueue::Processor { public: SinkNode(ExecPlan* plan, std::vector inputs, AsyncGenerator>* generator, std::shared_ptr* schema, BackpressureOptions backpressure, - BackpressureMonitor** backpressure_monitor_out) + BackpressureMonitor** backpressure_monitor_out, bool sequence_delivery) : ExecNode(plan, std::move(inputs), {"collected"}, {}), TracedNode(this), backpressure_queue_(backpressure.resume_if_below, backpressure.pause_if_above), @@ -128,6 +131,9 @@ class SinkNode : public ExecNode, public TracedNode { return batch; }); }; + if (sequence_delivery) { + sequencer_ = util::SerialSequencingQueue::Make(this); + } } ~SinkNode() override { *node_destroyed_ = true; } @@ -140,7 +146,8 @@ class SinkNode : public ExecNode, public TracedNode { RETURN_NOT_OK(ValidateOptions(sink_options)); return plan->EmplaceNode(plan, std::move(inputs), sink_options.generator, sink_options.schema, sink_options.backpressure, - sink_options.backpressure_monitor); + sink_options.backpressure_monitor, + sink_options.sequence_delivery); } const char* kind_name() const override { return "SinkNode"; } @@ -208,8 +215,16 @@ class SinkNode : public ExecNode, public TracedNode { DCHECK_EQ(input, inputs_[0]); RecordBackpressureBytesUsed(batch); - bool did_push = producer_.Push(std::move(batch)); - if (!did_push) return Status::OK(); // producer_ was Closed already + if (sequencer_) { + ARROW_RETURN_NOT_OK(sequencer_->InsertBatch(std::move(batch))); + } else { + ARROW_RETURN_NOT_OK(Process(std::move(batch))); + } + return Status::OK(); + } + + Status Process(ExecBatch batch) override { + producer_.Push(std::move(batch)); if (input_counter_.Increment()) { return Finish(); @@ -255,21 +270,29 @@ class SinkNode : public ExecNode, public TracedNode { PushGenerator> push_gen_; PushGenerator>::Producer producer_; std::shared_ptr node_destroyed_; + std::unique_ptr sequencer_; }; // A sink node that owns consuming the data and will not finish until the consumption // is finished. Use SinkNode if you are transferring the ownership of the data to another // system. Use ConsumingSinkNode if the data is being consumed within the exec plan (i.e. // the exec plan should not complete until the consumption has completed). -class ConsumingSinkNode : public ExecNode, public BackpressureControl, public TracedNode { +class ConsumingSinkNode : public ExecNode, + public BackpressureControl, + public TracedNode, + util::SerialSequencingQueue::Processor { public: ConsumingSinkNode(ExecPlan* plan, std::vector inputs, std::shared_ptr consumer, - std::vector names) + std::vector names, bool sequence_delivery) : ExecNode(plan, std::move(inputs), {"to_consume"}, {}), TracedNode(this), consumer_(std::move(consumer)), - names_(std::move(names)) {} + names_(std::move(names)) { + if (sequence_delivery) { + sequencer_ = util::SerialSequencingQueue::Make(this); + } + } static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { @@ -280,9 +303,9 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl, public Tr return Status::Invalid("A SinkNodeConsumer is required"); } - return plan->EmplaceNode(plan, std::move(inputs), - std::move(sink_options.consumer), - std::move(sink_options.names)); + return plan->EmplaceNode( + plan, std::move(inputs), std::move(sink_options.consumer), + std::move(sink_options.names), sink_options.sequence_output); } const char* kind_name() const override { return "ConsumingSinkNode"; } @@ -334,6 +357,13 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl, public Tr DCHECK_EQ(input, inputs_[0]); + if (sequencer_) { + return sequencer_->InsertBatch(std::move(batch)); + } + return Process(std::move(batch)); + } + + Status Process(ExecBatch batch) override { // This can happen if an error was received and the source hasn't yet stopped. Since // we have already called consumer_->Finish we don't want to call consumer_->Consume if (input_counter_.Completed()) { @@ -344,6 +374,7 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl, public Tr if (input_counter_.Increment()) { Finish(); } + return Status::OK(); } @@ -364,6 +395,7 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl, public Tr std::shared_ptr consumer_; std::vector names_; std::atomic backpressure_counter_ = 0; + std::unique_ptr sequencer_; }; static Result MakeTableConsumingSinkNode( compute::ExecPlan* plan, std::vector inputs, @@ -374,6 +406,7 @@ static Result MakeTableConsumingSinkNode( auto tb_consumer = std::make_shared(sink_options.output_table, pool); auto consuming_sink_node_options = ConsumingSinkNodeOptions{tb_consumer}; + consuming_sink_node_options.sequence_output = sink_options.sequence_output; return MakeExecNode("consuming_sink", plan, inputs, consuming_sink_node_options); } @@ -384,7 +417,7 @@ struct OrderBySinkNode final : public SinkNode { AsyncGenerator>* generator) : SinkNode(plan, std::move(inputs), generator, /*schema=*/nullptr, /*backpressure=*/{}, - /*backpressure_monitor_out=*/nullptr), + /*backpressure_monitor_out=*/nullptr, /*sequence_delivery=*/false), impl_(std::move(impl)) {} const char* kind_name() const override { return "OrderBySinkNode"; } diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index ffb19d2e1062b..426063a3c18d9 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -274,6 +274,7 @@ struct TableSourceNode : public SourceNode { std::shared_ptr batch; std::vector exec_batches; + int index = 0; while (true) { auto batch_res = reader->Next(); if (batch_res.ok()) { @@ -283,6 +284,7 @@ struct TableSourceNode : public SourceNode { break; } exec_batches.emplace_back(*batch); + exec_batches[exec_batches.size() - 1].index = index++; } return exec_batches; } diff --git a/cpp/src/arrow/compute/exec/test_nodes.cc b/cpp/src/arrow/compute/exec/test_nodes.cc index 9352a2290b99f..85e272dbbcb02 100644 --- a/cpp/src/arrow/compute/exec/test_nodes.cc +++ b/cpp/src/arrow/compute/exec/test_nodes.cc @@ -25,8 +25,10 @@ #include "arrow/api.h" #include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/query_context.h" #include "arrow/compute/exec/util.h" #include "arrow/io/interfaces.h" +#include "arrow/testing/random.h" #include "arrow/util/checked_cast.h" #include "arrow/util/iterator.h" @@ -91,5 +93,119 @@ AsyncGenerator> MakeDelayedGen(BatchesWithSchema src, return MakeDelayedGen(MakeVectorIterator(opt_batches), label, delay_sec, noisy); } +namespace { + +class JitterNode : public ExecNode { + public: + struct QueuedBatch { + int adjusted_order; + ExecBatch batch; + }; + + JitterNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, random::SeedType seed, + int max_jitter_modifier) + : ExecNode(plan, std::move(inputs), {"input"}, std::move(output_schema)), + rng_(seed), + jitter_dist_(0, max_jitter_modifier) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "JitterNode")); + const auto& jitter_options = checked_cast(options); + std::shared_ptr output_schema = inputs[0]->output_schema(); + return plan->EmplaceNode(plan, std::move(inputs), + std::move(output_schema), jitter_options.seed, + jitter_options.max_jitter_modifier); + } + + const char* kind_name() const override { return "JitterNode"; } + + Status StartProducing() override { return Status::OK(); } + + void PauseProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->PauseProducing(this, counter); + } + + void ResumeProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->ResumeProducing(this, counter); + } + + Status InputReceived(ExecNode* input, ExecBatch batch) override { + std::vector to_deliver; + bool should_finish = false; + { + std::lock_guard lk(mutex_); + int current_count = counter_.count(); + int adjusted_count = current_count + jitter_dist_(rng_); + QueuedBatch queued{adjusted_count, std::move(batch)}; + queue_.push(std::move(queued)); + while (!queue_.empty() && queue_.top().adjusted_order <= current_count) { + to_deliver.push_back(std::move(queue_.top())); + queue_.pop(); + } + if (counter_.Increment()) { + should_finish = true; + } + } + Dispatch(std::move(to_deliver)); + if (should_finish) { + Finish(); + } + return Status::OK(); + } + + Status InputFinished(ExecNode* input, int total_batches) override { + if (counter_.SetTotal(total_batches)) { + Finish(); + } + return output_->InputFinished(this, total_batches); + } + + void Dispatch(std::vector batches) { + for (auto& queued : batches) { + std::function task = [this, batch = std::move(queued.batch)]() mutable { + return output_->InputReceived(this, std::move(batch)); + }; + plan_->query_context()->ScheduleTask(std::move(task), "JitterNode::ProcessBatch"); + } + } + + protected: + Status StopProducingImpl() override { return Status::OK(); } + virtual void Finish() { + std::vector to_deliver; + while (!queue_.empty()) { + to_deliver.push_back(std::move(queue_.top())); + queue_.pop(); + } + Dispatch(std::move(to_deliver)); + } + + private: + struct QueuedBatchCompare { + bool operator()(const QueuedBatch& left, const QueuedBatch& right) const { + return left.adjusted_order > right.adjusted_order; + } + }; + + AtomicCounter counter_; + std::mutex mutex_; + std::default_random_engine rng_; + std::uniform_int_distribution jitter_dist_; + std::priority_queue, QueuedBatchCompare> queue_; +}; + +} // namespace + +void RegisterTestNodes() { + static std::once_flag registered; + std::call_once(registered, [] { + ExecFactoryRegistry* registry = default_exec_factory_registry(); + DCHECK_OK( + registry->AddFactory(std::string(JitterNodeOptions::kName), JitterNode::Make)); + }); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/test_nodes.h b/cpp/src/arrow/compute/exec/test_nodes.h index 4adc46c93d342..3417e6dd4336b 100644 --- a/cpp/src/arrow/compute/exec/test_nodes.h +++ b/cpp/src/arrow/compute/exec/test_nodes.h @@ -21,6 +21,7 @@ #include "arrow/compute/exec/options.h" #include "arrow/compute/exec/test_util.h" +#include "arrow/testing/random.h" namespace arrow { namespace compute { @@ -41,5 +42,18 @@ AsyncGenerator> MakeDelayedGen(BatchesWithSchema src, double delay_sec, bool noisy = false); +/// A node that slightly resequences the input at random +struct JitterNodeOptions : public ExecNodeOptions { + random::SeedType seed; + /// The max amount to add to a node's "cost". + int max_jitter_modifier; + + JitterNodeOptions(random::SeedType seed, int max_jitter_modifier) + : seed(seed), max_jitter_modifier(max_jitter_modifier) {} + static constexpr std::string_view kName = "jitter"; +}; + +void RegisterTestNodes(); + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/test_nodes_test.cc b/cpp/src/arrow/compute/exec/test_nodes_test.cc new file mode 100644 index 0000000000000..cfc0637b2f7d7 --- /dev/null +++ b/cpp/src/arrow/compute/exec/test_nodes_test.cc @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/test_nodes.h" + +#include + +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/table.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/random.h" + +namespace arrow { +namespace compute { + +TEST(JitterNode, Basic) { + static constexpr random::SeedType kTestSeed = 42; + static constexpr int kMaxJitterMod = 4; + static constexpr int kNumBatches = 256; + RegisterTestNodes(); + std::shared_ptr
input = + gen::Gen({gen::Constant(std::make_shared(0))}) + ->FailOnError() + ->Table(1, kNumBatches); + Declaration plan = + Declaration::Sequence({{"table_source", TableSourceNodeOptions(input)}, + {"jitter", JitterNodeOptions(kTestSeed, kMaxJitterMod)}}); + ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema batches_and_schema, + DeclarationToExecBatches(std::move(plan))); + + ASSERT_EQ(kNumBatches, static_cast(batches_and_schema.batches.size())); + int numOutOfPlace = 0; + for (int idx = 0; idx < kNumBatches; idx++) { + const ExecBatch& batch = batches_and_schema.batches[idx]; + int jitter = std::abs(idx - static_cast(batch.index)); + if (jitter > 0) { + numOutOfPlace++; + } + } + ASSERT_GT(numOutOfPlace, 0); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/testing/generator.cc b/cpp/src/arrow/testing/generator.cc index 8e411e5fe05fa..a0d8968bb3202 100644 --- a/cpp/src/arrow/testing/generator.cc +++ b/cpp/src/arrow/testing/generator.cc @@ -27,16 +27,25 @@ #include "arrow/array.h" #include "arrow/buffer.h" #include "arrow/builder.h" +#include "arrow/compute/exec.h" +#include "arrow/datum.h" #include "arrow/record_batch.h" #include "arrow/scalar.h" +#include "arrow/table.h" #include "arrow/testing/builder.h" #include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit_util.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/macros.h" +#include "arrow/util/string.h" namespace arrow { +using internal::checked_pointer_cast; + template ::CType, typename BuilderType = typename TypeTraits::BuilderType> static inline std::shared_ptr ConstantArray(int64_t size, CType value) { @@ -188,4 +197,209 @@ Result> ScalarVectorToArray(const ScalarVector& scalars) return out; } +namespace gen { + +namespace { +class ConstantGenerator : public ArrayGenerator { + public: + explicit ConstantGenerator(std::shared_ptr value) : value_(std::move(value)) {} + + Result> Generate(int64_t num_rows) override { + return MakeArrayFromScalar(*value_, num_rows); + } + + std::shared_ptr type() const override { return value_->type; } + + private: + std::shared_ptr value_; +}; + +class StepGenerator : public ArrayGenerator { + public: + StepGenerator(uint32_t start, uint32_t step) : start_(start), step_(step) {} + + Result> Generate(int64_t num_rows) override { + UInt32Builder builder; + ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); + uint32_t val = start_; + for (int64_t i = 0; i < num_rows; i++) { + builder.UnsafeAppend(val); + val += step_; + } + start_ = val; + return builder.Finish(); + } + + std::shared_ptr type() const override { return uint32(); } + + private: + uint32_t start_; + uint32_t step_; +}; + +static constexpr random::SeedType kTestSeed = 42; + +class RandomGenerator : public ArrayGenerator { + public: + explicit RandomGenerator(std::shared_ptr type) : type_(std::move(type)) {} + + random::RandomArrayGenerator* generator() { + static random::RandomArrayGenerator instance(kTestSeed); + return &instance; + } + + Result> Generate(int64_t num_rows) override { + return generator()->ArrayOf(type_, num_rows); + } + + std::shared_ptr type() const override { return type_; } + + private: + std::shared_ptr type_; +}; + +class DataGeneratorImpl : public DataGenerator, + public std::enable_shared_from_this { + public: + explicit DataGeneratorImpl(std::vector generators) + : generators_(std::move(generators)) { + schema_ = DeriveSchemaFromGenerators(); + } + + Result> RecordBatch(int64_t num_rows) override { + std::vector> columns; + columns.reserve(generators_.size()); + for (auto& field : generators_) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr arr, field.gen->Generate(num_rows)); + columns.push_back(std::move(arr)); + } + return RecordBatch::Make(schema_, num_rows, std::move(columns)); + } + + Result>> RecordBatches( + int64_t rows_per_batch, int num_batches) override { + std::vector> batches; + batches.reserve(num_batches); + for (int i = 0; i < num_batches; i++) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::RecordBatch> batch, + RecordBatch(rows_per_batch)); + batches.push_back(std::move(batch)); + } + return batches; + } + +#ifdef ARROW_COMPUTE + Result<::arrow::compute::ExecBatch> ExecBatch(int64_t num_rows) override { + std::vector values; + values.reserve(generators_.size()); + for (auto& field : generators_) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr arr, field.gen->Generate(num_rows)); + values.push_back(std::move(arr)); + } + return ::arrow::compute::ExecBatch(std::move(values), num_rows); + } + + Result> ExecBatches(int64_t rows_per_batch, + int num_batches) override { + std::vector<::arrow::compute::ExecBatch> batches; + for (int i = 0; i < num_batches; i++) { + ARROW_ASSIGN_OR_RAISE(::arrow::compute::ExecBatch batch, ExecBatch(rows_per_batch)); + batches.push_back(std::move(batch)); + } + return batches; + } +#endif + + Result> Table(int64_t rows_per_chunk, + int num_chunks = 1) override { + ARROW_ASSIGN_OR_RAISE(RecordBatchVector batches, + RecordBatches(rows_per_chunk, num_chunks)); + return ::arrow::Table::FromRecordBatches(batches); + } + + std::shared_ptr<::arrow::Schema> Schema() override { return schema_; } + + std::unique_ptr FailOnError() override; + + private: + std::shared_ptr<::arrow::Schema> DeriveSchemaFromGenerators() { + FieldVector fields; + for (std::size_t i = 0; i < generators_.size(); i++) { + const GeneratorField& gen = generators_[i]; + std::string name; + if (gen.name) { + name = *gen.name; + } else { + name = "f" + internal::ToChars(i); + } + fields.push_back(field(std::move(name), gen.gen->type())); + } + return schema(std::move(fields)); + } + + std::vector generators_; + std::shared_ptr<::arrow::Schema> schema_; +}; + +class GTestDataGeneratorImpl : public GTestDataGenerator { + public: + explicit GTestDataGeneratorImpl(std::shared_ptr target) + : target_(std::move(target)) {} + std::shared_ptr<::arrow::RecordBatch> RecordBatch(int64_t num_rows) override { + EXPECT_OK_AND_ASSIGN(auto batch, target_->RecordBatch(num_rows)); + return batch; + } + std::vector> RecordBatches( + int64_t rows_per_batch, int num_batches) override { + EXPECT_OK_AND_ASSIGN(auto batches, + target_->RecordBatches(rows_per_batch, num_batches)); + return batches; + } +#ifdef ARROW_COMPUTE + ::arrow::compute::ExecBatch ExecBatch(int64_t num_rows) override { + EXPECT_OK_AND_ASSIGN(auto batch, target_->ExecBatch(num_rows)); + return batch; + } + std::vector<::arrow::compute::ExecBatch> ExecBatches(int64_t rows_per_batch, + int num_batches) override { + EXPECT_OK_AND_ASSIGN(auto batches, target_->ExecBatches(rows_per_batch, num_batches)); + return batches; + } +#endif + std::shared_ptr<::arrow::Table> Table(int64_t rows_per_chunk, int num_chunks) override { + EXPECT_OK_AND_ASSIGN(auto table, target_->Table(rows_per_chunk, num_chunks)); + return table; + } + std::shared_ptr<::arrow::Schema> Schema() override { return target_->Schema(); } + + private: + std::shared_ptr target_; +}; + +// Defined down here to avoid circular dependency between DataGeneratorImpl and +// GTestDataGeneratorImpl +std::unique_ptr DataGeneratorImpl::FailOnError() { + return std::make_unique(shared_from_this()); +} + +} // namespace + +std::shared_ptr Constant(std::shared_ptr value) { + return std::make_shared(std::move(value)); +} + +std::shared_ptr Step(uint32_t start, uint32_t step) { + return std::make_shared(start, step); +} + +std::shared_ptr Random(std::shared_ptr type) { + return std::make_shared(std::move(type)); +} + +std::shared_ptr Gen(std::vector fields) { + return std::make_shared(std::move(fields)); +} + +} // namespace gen + } // namespace arrow diff --git a/cpp/src/arrow/testing/generator.h b/cpp/src/arrow/testing/generator.h index 3a70659293ce1..06cbbcfbbda6c 100644 --- a/cpp/src/arrow/testing/generator.h +++ b/cpp/src/arrow/testing/generator.h @@ -23,6 +23,8 @@ #include #include "arrow/array/array_base.h" +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/type_fwd.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/visibility.h" #include "arrow/type_fwd.h" @@ -234,4 +236,86 @@ class ARROW_TESTING_EXPORT ConstantArrayGenerator { ARROW_TESTING_EXPORT Result> ScalarVectorToArray(const ScalarVector& scalars); +namespace gen { + +class ARROW_TESTING_EXPORT ArrayGenerator { + public: + virtual ~ArrayGenerator() = default; + virtual Result> Generate(int64_t num_rows) = 0; + virtual std::shared_ptr type() const = 0; +}; + +// Same as DataGenerator below but instead of returning Result an ok status is EXPECT'd +class ARROW_TESTING_EXPORT GTestDataGenerator { + public: + virtual ~GTestDataGenerator() = default; + virtual std::shared_ptr<::arrow::RecordBatch> RecordBatch(int64_t num_rows) = 0; + virtual std::vector> RecordBatches( + int64_t rows_per_batch, int num_batches) = 0; +#ifdef ARROW_COMPUTE + virtual ::arrow::compute::ExecBatch ExecBatch(int64_t num_rows) = 0; + virtual std::vector<::arrow::compute::ExecBatch> ExecBatches(int64_t rows_per_batch, + int num_batches) = 0; +#endif + virtual std::shared_ptr<::arrow::Table> Table(int64_t rows_per_chunk, + int num_chunks = 1) = 0; + virtual std::shared_ptr<::arrow::Schema> Schema() = 0; +}; + +class ARROW_TESTING_EXPORT DataGenerator { + public: + virtual ~DataGenerator() = default; + virtual Result> RecordBatch(int64_t num_rows) = 0; + virtual Result>> RecordBatches( + int64_t rows_per_batch, int num_batches) = 0; +#ifdef ARROW_COMPUTE + virtual Result<::arrow::compute::ExecBatch> ExecBatch(int64_t num_rows) = 0; + virtual Result> ExecBatches( + int64_t rows_per_batch, int num_batches) = 0; +#endif + virtual Result> Table(int64_t rows_per_chunk, + int num_chunks = 1) = 0; + virtual std::shared_ptr<::arrow::Schema> Schema() = 0; + /// @brief Converts this generator to a variant that fails (in a googletest sense) + /// if any error is encountered. + virtual std::unique_ptr FailOnError() = 0; +}; + +/// @brief A potentially named field +/// +/// If name is not specified then a name will be generated automatically (e.g. f0, f1) +struct ARROW_TESTING_EXPORT GeneratorField { + public: + GeneratorField(std::shared_ptr gen) // NOLINT implicit conversion + : name(), gen(std::move(gen)) {} + GeneratorField(std::string name, std::shared_ptr gen) + : name(std::move(name)), gen(std::move(gen)) {} + + std::optional name; + std::shared_ptr gen; +}; + +/// Create a table generator with the given fields +ARROW_TESTING_EXPORT std::shared_ptr Gen( + std::vector column_gens); + +/// make a generator that returns a constant value +ARROW_TESTING_EXPORT std::shared_ptr Constant( + std::shared_ptr value); +/// make a generator that returns an incrementing value +/// +/// Note: overflow is not prevented standard unsigned integer overflow applies +ARROW_TESTING_EXPORT std::shared_ptr Step(uint32_t start = 0, + uint32_t step = 1); +/// make a generator that returns a random value +ARROW_TESTING_EXPORT std::shared_ptr Random( + std::shared_ptr type); +/// TODO(if-needed) could add a repeat-scalars generator, e.g. Repeat({1, 2, 3}) for +/// 1,2,3,1,2,3,1 +/// +/// TODO(if-needed) could add a repeat-from-json generator e.g. Repeat(int32(), "[1, 2, +/// 3]")), same behavior as repeat-scalars + +} // namespace gen + } // namespace arrow