Skip to content

Commit

Permalink
apacheGH-34059: [C++] Add a fetch node based on a batch index (apache…
Browse files Browse the repository at this point in the history
…#34060)

This PR introduces the concept of `ExecBatch:index` but does not yet do much with it.  As a proof of concept this PR adds a fetch node which can be inserted anywhere in the plan (not just at the sink) to satisfy `LIMIT x OFFSET y` (Substrait calls this fetch and so I have also).

This PR also introduces two sequencing accumulation queues which will be useful, I hope, for anyone implementing nodes that rely on ordered execution.

This PR unfortunately introduces a new query option which is whether or not the sink node should pay the small performance hit required to sequence output.  While considering how best to add this option I realized we will probably have more query options in the near future regarding "how much RAM to use" (e.g. spillover) and potentially more beyond that.

So I have taken all the options and put them into `arrow::compute::QueryOptions` (this already existed but it was not user facing and I added more things to it).  I added a new DeclarationToXyz overload that accepts QueryOptions.  This has, unfortunately, led to a bit of overload explosion but I think this should be the last new addition to the overload set (and we can deprecate the older overloads at some point).

This PR also includes a new `gen::Gen / gen::TestGen` facility for generating test tables for input.  I'd like to eventually use this to simplify some of the existing exec plan tests as well.  I'm willing to split this into a separate PR if that makes sense.
* Closes: apache#34059

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
  • Loading branch information
westonpace authored and Mike Hancock committed Feb 17, 2023
1 parent f5fd3dd commit bf24566
Show file tree
Hide file tree
Showing 19 changed files with 1,278 additions and 106 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Expand Up @@ -393,6 +393,7 @@ if(ARROW_COMPUTE)
compute/exec/bloom_filter.cc
compute/exec/exec_plan.cc
compute/exec/expression.cc
compute/exec/fetch_node.cc
compute/exec/filter_node.cc
compute/exec/hash_join.cc
compute/exec/hash_join_dict.cc
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/compute/exec.h
Expand Up @@ -151,6 +151,9 @@ class ARROW_EXPORT SelectionVector {
const int32_t* indices_;
};

/// An index to represent that a batch does not belong to an ordered stream
constexpr int64_t kUnsequencedIndex = -1;

/// \brief A unit of work for kernel execution. It contains a collection of
/// Array and Scalar values and an optional SelectionVector indicating that
/// there is an unmaterialized filter that either must be materialized, or (if
Expand Down Expand Up @@ -209,6 +212,12 @@ struct ARROW_EXPORT ExecBatch {
/// whether any values are Scalar.
int64_t length = 0;

/// \brief index of this batch in a sorted stream of batches
///
/// This index must be strictly monotonic starting at 0 without gaps or
/// it can be set to kUnsequencedIndex if there is no meaningful order
int64_t index = kUnsequencedIndex;

/// \brief The sum of bytes in each buffer referenced by the batch
///
/// Note: Scalars are not counted
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/compute/exec/CMakeLists.txt
Expand Up @@ -29,6 +29,13 @@ add_arrow_compute_test(plan_test
"arrow-compute"
SOURCES
plan_test.cc
test_nodes_test.cc
test_nodes.cc)
add_arrow_compute_test(fetch_node_test
PREFIX
"arrow-compute"
SOURCES
fetch_node_test.cc
test_nodes.cc)
add_arrow_compute_test(hash_join_node_test
PREFIX
Expand Down
112 changes: 112 additions & 0 deletions cpp/src/arrow/compute/exec/accumulation_queue.cc
Expand Up @@ -18,6 +18,11 @@
#include "arrow/compute/exec/accumulation_queue.h"

#include <iterator>
#include <mutex>
#include <queue>
#include <vector>

#include "arrow/util/logging.h"

namespace arrow {
namespace util {
Expand Down Expand Up @@ -54,5 +59,112 @@ void AccumulationQueue::Clear() {
}

ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }

namespace {

struct LowestBatchIndexAtTop {
bool operator()(const ExecBatch& left, const ExecBatch& right) const {
return left.index > right.index;
}
};

class SequencingQueueImpl : public SequencingQueue {
public:
explicit SequencingQueueImpl(Processor* processor) : processor_(processor) {}

Status InsertBatch(ExecBatch batch) override {
std::unique_lock lk(mutex_);
if (batch.index == next_index_) {
return DeliverNextUnlocked(std::move(batch), std::move(lk));
}
queue_.emplace(std::move(batch));
return Status::OK();
}

private:
Status DeliverNextUnlocked(ExecBatch batch, std::unique_lock<std::mutex>&& lk) {
// Should be able to detect and avoid this at plan construction
DCHECK_NE(batch.index, ::arrow::compute::kUnsequencedIndex)
<< "attempt to use a sequencing queue on an unsequenced stream of batches";
std::vector<Task> tasks;
next_index_++;
ARROW_ASSIGN_OR_RAISE(std::optional<Task> this_task,
processor_->Process(std::move(batch)));
while (!queue_.empty() && next_index_ == queue_.top().index) {
ARROW_ASSIGN_OR_RAISE(std::optional<Task> task, processor_->Process(queue_.top()));
if (task) {
tasks.push_back(std::move(*task));
}
queue_.pop();
next_index_++;
}
lk.unlock();
// Schedule tasks for stale items
for (auto& task : tasks) {
processor_->Schedule(std::move(task));
}
// Run the current item immediately
if (this_task) {
ARROW_RETURN_NOT_OK(std::move(*this_task)());
}
return Status::OK();
}

Processor* processor_;

std::priority_queue<ExecBatch, std::vector<ExecBatch>, LowestBatchIndexAtTop> queue_;
int next_index_ = 0;
std::mutex mutex_;
};

class SerialSequencingQueueImpl : public SerialSequencingQueue {
public:
explicit SerialSequencingQueueImpl(Processor* processor) : processor_(processor) {}

Status InsertBatch(ExecBatch batch) override {
std::unique_lock lk(mutex_);
queue_.push(std::move(batch));
if (queue_.top().index == next_index_ && !is_processing_) {
is_processing_ = true;
return DoProcess(std::move(lk));
}
return Status::OK();
}

private:
Status DoProcess(std::unique_lock<std::mutex>&& lk) {
while (!queue_.empty() && queue_.top().index == next_index_) {
ExecBatch next(queue_.top());
queue_.pop();
next_index_++;
lk.unlock();
// ARROW_RETURN_NOT_OK may return early here. In that case is_processing_ will
// never switch to false so no other threads can process but that should be ok
// since we failed anyways. It is important however, that we do not hold the lock.
ARROW_RETURN_NOT_OK(processor_->Process(std::move(next)));
lk.lock();
}
is_processing_ = false;
return Status::OK();
}

Processor* processor_;

std::mutex mutex_;
std::priority_queue<ExecBatch, std::vector<ExecBatch>, LowestBatchIndexAtTop> queue_;
int next_index_ = 0;
bool is_processing_ = false;
};

} // namespace

std::unique_ptr<SequencingQueue> SequencingQueue::Make(Processor* processor) {
return std::make_unique<SequencingQueueImpl>(processor);
}

std::unique_ptr<SerialSequencingQueue> SerialSequencingQueue::Make(Processor* processor) {
return std::make_unique<SerialSequencingQueueImpl>(processor);
}

} // namespace util
} // namespace arrow
100 changes: 100 additions & 0 deletions cpp/src/arrow/compute/exec/accumulation_queue.h
Expand Up @@ -18,9 +18,12 @@
#pragma once

#include <cstdint>
#include <functional>
#include <optional>
#include <vector>

#include "arrow/compute/exec.h"
#include "arrow/result.h"

namespace arrow {
namespace util {
Expand Down Expand Up @@ -53,5 +56,102 @@ class AccumulationQueue {
std::vector<ExecBatch> batches_;
};

/// A queue that sequences incoming batches
///
/// This can be used when a node needs to do some kind of ordered processing on
/// the stream.
///
/// Batches can be inserted in any order. The process_callback will be called on
/// the batches, in order, without reentrant calls. For this reason the callback
/// should be quick.
///
/// For example, in a top-n node, the process callback should determine how many
/// rows need to be delivered for the given batch, and then return a task to actually
/// deliver those rows.
class SequencingQueue {
public:
using Task = std::function<Status()>;

/// Strategy that describes how to handle items
class Processor {
public:
/// Process the batch, potentially generating a task
///
/// This method will be called on each batch in order. Calls to this method
/// will be serialized and it will not be called reentrantly. This makes it
/// safe to do things that rely on order but minimal time should be spent here
/// to avoid becoming a bottlneck.
///
/// \return a follow-up task that will be scheduled. The follow-up task(s) are
/// is not guaranteed to run in any particular order. If nullopt is
/// returned then nothing will be scheduled.
virtual Result<std::optional<Task>> Process(ExecBatch batch) = 0;
/// Schedule a task
virtual void Schedule(Task task) = 0;
};

virtual ~SequencingQueue() = default;

/// Insert a batch into the queue
///
/// This will insert the batch into the queue. If this batch was the next batch
/// to deliver then this will trigger 1+ calls to the process callback to generate
/// 1+ tasks.
///
/// The task generated by this call will be executed immediately. The remaining
/// tasks will be scheduled using the schedule callback.
///
/// From a data pipeline perspective the sequencing queue is a "sometimes" breaker. If
/// a task arrives in order then this call will usually execute the downstream pipeline.
/// If this task arrives early then this call will only queue the data.
virtual Status InsertBatch(ExecBatch batch) = 0;

/// Create a queue
/// \param processor describes how to process the batches, must outlive the queue
static std::unique_ptr<SequencingQueue> Make(Processor* processor);
};

/// A queue that sequences incoming batches
///
/// Unlike SequencingQueue the Process method is not expected to schedule new tasks.
///
/// If a batch arrives and another thread is currently processing then the batch
/// will be queued and control will return. In other words, delivery of batches will
/// not block on the Process method.
///
/// It can be helpful to think of this as if a dedicated thread is running Process as
/// batches arrive
class SerialSequencingQueue {
public:
/// Strategy that describes how to handle items
class Processor {
public:
/// Process the batch
///
/// This method will be called on each batch in order. Calls to this method
/// will be serialized and it will not be called reentrantly. This makes it
/// safe to do things that rely on order.
///
/// If this falls behind then data may accumulate
///
/// TODO: Could add backpressure if needed but right now all uses of this should
/// be pretty fast and so are unlikely to block.
virtual Status Process(ExecBatch batch) = 0;
};

virtual ~SerialSequencingQueue() = default;

/// Insert a batch into the queue
///
/// This will insert the batch into the queue. If this batch was the next batch
/// to deliver then this may trigger calls to the processor which will be run
/// as part of this call.
virtual Status InsertBatch(ExecBatch batch) = 0;

/// Create a queue
/// \param processor describes how to process the batches, must outlive the queue
static std::unique_ptr<SerialSequencingQueue> Make(Processor* processor);
};

} // namespace util
} // namespace arrow

0 comments on commit bf24566

Please sign in to comment.