diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index f8dee5aac8815..d3c988e18e3a0 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -495,9 +495,9 @@ class KeyHasher { 4 * kMiniBatchLength * sizeof(uint32_t)); } - void Invalidate() { - batch_ = NULLPTR; // invalidate cached hashes for batch - required when it changes - } + // invalidate cached hashes for batch - required when it changes + // only this method can be called concurrently with HashesFor + void Invalidate() { batch_ = NULLPTR; } // compute and cache a hash for each row of the given batch const std::vector& HashesFor(const RecordBatch* batch) { @@ -668,18 +668,19 @@ class InputState { static Result> Make( size_t index, TolType tolerance, bool must_hash, bool may_rehash, - KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output, + KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node, std::atomic& backpressure_counter, const std::shared_ptr& schema, const col_index_t time_col_index, const std::vector& key_col_index) { constexpr size_t low_threshold = 4, high_threshold = 8; std::unique_ptr backpressure_control = - std::make_unique(node, output, backpressure_counter); + std::make_unique( + /*node=*/asof_input, /*output=*/asof_node, backpressure_counter); ARROW_ASSIGN_OR_RAISE(auto handler, BackpressureHandler::Make(low_threshold, high_threshold, std::move(backpressure_control))); return std::make_unique(index, tolerance, must_hash, may_rehash, - key_hasher, output, std::move(handler), schema, + key_hasher, asof_node, std::move(handler), schema, time_col_index, key_col_index); } diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index b113eb86e561e..c62d0c0b85f70 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -19,9 +19,12 @@ #include #include +#include #include #include #include +#include "arrow/acero/exec_plan.h" +#include "arrow/testing/future_util.h" #ifndef NDEBUG #include #endif @@ -31,6 +34,8 @@ #ifndef NDEBUG #include "arrow/acero/options_internal.h" #endif +#include "arrow/acero/map_node.h" +#include "arrow/acero/query_context.h" #include "arrow/acero/test_nodes.h" #include "arrow/acero/test_util_internal.h" #include "arrow/acero/util.h" @@ -1360,9 +1365,66 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, { schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())})); }) +struct BackpressureCounters { + std::atomic pause_count = 0; + std::atomic resume_count = 0; +}; + +struct BackpressureCountingNodeOptions : public ExecNodeOptions { + BackpressureCountingNodeOptions(BackpressureCounters* counters) : counters(counters) {} + + BackpressureCounters* counters; +}; + +struct BackpressureCountingNode : public MapNode { + static constexpr const char* kKindName = "BackpressureCountingNode"; + static constexpr const char* kFactoryName = "backpressure_count"; + + static void Register() { + auto exec_reg = default_exec_factory_registry(); + if (!exec_reg->GetFactory(kFactoryName).ok()) { + ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureCountingNode::Make)); + } + } + + BackpressureCountingNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, + const BackpressureCountingNodeOptions& options) + : MapNode(plan, inputs, output_schema), counters(options.counters) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName)); + auto bp_options = static_cast(options); + return plan->EmplaceNode( + plan, inputs, inputs[0]->output_schema(), bp_options); + } + + const char* kind_name() const override { return kKindName; } + Result ProcessBatch(ExecBatch batch) override { return batch; } + + void PauseProducing(ExecNode* output, int32_t counter) override { + ++counters->pause_count; + inputs()[0]->PauseProducing(this, counter); + } + void ResumeProducing(ExecNode* output, int32_t counter) override { + ++counters->resume_count; + inputs()[0]->ResumeProducing(this, counter); + } + + BackpressureCounters* counters; +}; + +AsyncGenerator> GetGen( + AsyncGenerator> gen) { + return gen; +} +AsyncGenerator> GetGen(BatchesWithSchema bws) { + return bws.gen(false, false); +} + template -void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, - double fast_delay, double slow_delay, bool noisy = false) { +void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { auto l_schema = schema({field("time", int32()), field("key", int32()), field("l_value", int32())}); auto r0_schema = @@ -1381,36 +1443,93 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1)); ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2)); - Declaration l_src = { - "source", SourceNodeOptions( - l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))}; - Declaration r0_src = { - "source", SourceNodeOptions( - r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))}; - Declaration r1_src = { - "source", SourceNodeOptions( - r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))}; + BackpressureCountingNode::Register(); + RegisterTestNodes(); // for GatedNode - Declaration asofjoin = { - "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)}; + struct BackpressureSourceConfig { + std::string name_prefix; + bool is_gated; + std::shared_ptr schema; + decltype(l_batches) batches; + + std::string name() const { + return name_prefix + ";" + (is_gated ? "gated" : "ungated"); + } + }; + + auto gate_ptr = Gate::Make(); + auto& gate = *gate_ptr; + GatedNodeOptions gate_options(gate_ptr.get()); + + // Two ungated and one gated + std::vector source_configs = { + {"0", false, l_schema, l_batches}, + {"1", true, r0_schema, r0_batches}, + {"2", false, r1_schema, r1_batches}, + }; + + std::vector bp_counters(source_configs.size()); + std::vector src_decls; + std::vector> bp_options; + std::vector bp_decls; + for (size_t i = 0; i < source_configs.size(); i++) { + const auto& config = source_configs[i]; + + src_decls.emplace_back("source", + SourceNodeOptions(config.schema, GetGen(config.batches))); + bp_options.push_back( + std::make_shared(&bp_counters[i])); + std::shared_ptr options = bp_options.back(); + std::vector bp_in = {src_decls.back()}; + Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in, + std::move(options)}; + if (config.is_gated) { + bp_decl = {std::string{GatedNodeOptions::kName}, {bp_decl}, gate_options}; + } + bp_decls.push_back(bp_decl); + } + + Declaration asofjoin = {"asofjoin", bp_decls, + GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)}; + + ASSERT_OK_AND_ASSIGN(std::shared_ptr tpool, + internal::ThreadPool::Make(1)); + ExecContext exec_ctx(default_memory_pool(), tpool.get()); + Future batches_fut = + DeclarationToExecBatchesAsync(asofjoin, exec_ctx); + + auto has_bp_been_applied = [&] { + // One of the inputs is gated. The other two will eventually be paused by the asof + // join node + for (size_t i = 0; i < source_configs.size(); i++) { + const auto& counters = bp_counters[i]; + if (source_configs[i].is_gated) { + if (counters.pause_count > 0) return false; + } else { + if (counters.pause_count != 1) return false; + } + } + return true; + }; + + BusyWait(10.0, has_bp_been_applied); + ASSERT_TRUE(has_bp_been_applied()); - ASSERT_OK_AND_ASSIGN(std::unique_ptr batch_reader, - DeclarationToReader(asofjoin, /*use_threads=*/false)); + gate.ReleaseAllBatches(); + ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut); - int64_t total_length = 0; - for (;;) { - ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next()); - if (!batch) { - break; + // One of the inputs is gated. The other two will eventually be resumed by the asof + // join node + for (size_t i = 0; i < source_configs.size(); i++) { + const auto& counters = bp_counters[i]; + if (!source_configs[i].is_gated) { + ASSERT_GE(counters.resume_count, 0); } - total_length += batch->num_rows(); } - ASSERT_EQ(static_cast(num_batches * batch_size), total_length); } TEST(AsofJoinTest, BackpressureWithBatches) { - return TestBackpressure(MakeIntegerBatches, /*num_batches=*/10, /*batch_size=*/1, - /*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/false); + return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1); } template @@ -1473,10 +1592,9 @@ T GetEnvValue(const std::string& var, T default_value) { } // namespace TEST(AsofJoinTest, BackpressureWithBatchesGen) { - int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 10); + int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20); int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1); - return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size, - /*fast_delay=*/0.001, /*slow_delay=*/0.01); + return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size); } } // namespace acero diff --git a/cpp/src/arrow/acero/test_nodes.cc b/cpp/src/arrow/acero/test_nodes.cc index ff95f72e6e628..e109afbe1bffb 100644 --- a/cpp/src/arrow/acero/test_nodes.cc +++ b/cpp/src/arrow/acero/test_nodes.cc @@ -31,6 +31,7 @@ #include "arrow/testing/random.h" #include "arrow/util/checked_cast.h" #include "arrow/util/iterator.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -200,12 +201,168 @@ class JitterNode : public ExecNode { } // namespace +class GateImpl { + public: + void ReleaseAllBatches() { + std::lock_guard lg(mutex_); + num_allowed_batches_ = -1; + NotifyAll(); + } + + void ReleaseOneBatch() { + std::lock_guard lg(mutex_); + DCHECK_GE(num_allowed_batches_, 0) + << "you can't call ReleaseOneBatch() after calling ReleaseAllBatches()"; + num_allowed_batches_++; + NotifyAll(); + } + + Future<> WaitForNextReleasedBatch() { + std::lock_guard lg(mutex_); + if (current_waiter_.is_valid()) { + return current_waiter_; + } + Future<> fut; + if (num_allowed_batches_ < 0 || num_released_batches_ < num_allowed_batches_) { + num_released_batches_++; + return Future<>::MakeFinished(); + } + + current_waiter_ = Future<>::Make(); + return current_waiter_; + } + + private: + void NotifyAll() { + if (current_waiter_.is_valid()) { + Future<> to_unlock = current_waiter_; + current_waiter_ = {}; + to_unlock.MarkFinished(); + } + } + + Future<> current_waiter_; + int num_released_batches_ = 0; + int num_allowed_batches_ = 0; + std::mutex mutex_; +}; + +std::shared_ptr Gate::Make() { return std::make_shared(); } + +Gate::Gate() : impl_(new GateImpl()) {} + +Gate::~Gate() { delete impl_; } + +void Gate::ReleaseAllBatches() { impl_->ReleaseAllBatches(); } + +void Gate::ReleaseOneBatch() { impl_->ReleaseOneBatch(); } + +Future<> Gate::WaitForNextReleasedBatch() { return impl_->WaitForNextReleasedBatch(); } + +namespace { + +struct GatedNode : public ExecNode, public TracedNode { + static constexpr auto kKindName = "BackpressureDelayingNode"; + static constexpr const char* kFactoryName = "backpressure_delay"; + + static void Register() { + auto exec_reg = default_exec_factory_registry(); + if (!exec_reg->GetFactory(kFactoryName).ok()) { + ASSERT_OK(exec_reg->AddFactory(kFactoryName, GatedNode::Make)); + } + } + + GatedNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, const GatedNodeOptions& options) + : ExecNode(plan, inputs, {"input"}, output_schema), + TracedNode(this), + gate_(options.gate) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName)); + auto gated_node_opts = static_cast(options); + return plan->EmplaceNode(plan, inputs, inputs[0]->output_schema(), + gated_node_opts); + } + + const char* kind_name() const override { return kKindName; } + + const Ordering& ordering() const override { return inputs_[0]->ordering(); } + Status InputFinished(ExecNode* input, int total_batches) override { + return output_->InputFinished(this, total_batches); + } + Status StartProducing() override { + NoteStartProducing(ToStringExtra()); + return Status::OK(); + } + + void PauseProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->PauseProducing(this, counter); + } + + void ResumeProducing(ExecNode* output, int32_t counter) override { + inputs_[0]->ResumeProducing(this, counter); + } + + Status StopProducingImpl() override { return Status::OK(); } + + Status SendBatchesUnlocked(std::unique_lock&& lock) { + while (!queued_batches_.empty()) { + // If we are ready to release the batch, do so immediately. + Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch(); + bool callback_added = maybe_unlocked.TryAddCallback([this] { + return [this](const Status& st) { + DCHECK_OK(st); + plan_->query_context()->ScheduleTask( + [this] { + std::unique_lock lk(mutex_); + return SendBatchesUnlocked(std::move(lk)); + }, + "GatedNode::ResumeAfterNotify"); + }; + }); + if (callback_added) { + break; + } + // Otherwise, the future is already finished which means the gate is unlocked + // and we are allowed to send a batch + ExecBatch next = std::move(queued_batches_.front()); + queued_batches_.pop(); + lock.unlock(); + ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(next))); + lock.lock(); + } + return Status::OK(); + } + + Status InputReceived(ExecNode* input, ExecBatch batch) override { + auto scope = TraceInputReceived(batch); + DCHECK_EQ(input, inputs_[0]); + + // This may be called concurrently by the source and by a restart attempt. Process + // one at a time (this critical section should be pretty small) + std::unique_lock lk(mutex_); + queued_batches_.push(std::move(batch)); + + return SendBatchesUnlocked(std::move(lk)); + } + + Gate* gate_; + std::queue queued_batches_; + std::mutex mutex_; +}; + +} // namespace + void RegisterTestNodes() { static std::once_flag registered; std::call_once(registered, [] { ExecFactoryRegistry* registry = default_exec_factory_registry(); DCHECK_OK( registry->AddFactory(std::string(JitterNodeOptions::kName), JitterNode::Make)); + DCHECK_OK( + registry->AddFactory(std::string(GatedNodeOptions::kName), GatedNode::Make)); }); } diff --git a/cpp/src/arrow/acero/test_nodes.h b/cpp/src/arrow/acero/test_nodes.h index 2d1d630b3b7f6..7e31aa31b34d7 100644 --- a/cpp/src/arrow/acero/test_nodes.h +++ b/cpp/src/arrow/acero/test_nodes.h @@ -53,6 +53,33 @@ struct JitterNodeOptions : public ExecNodeOptions { static constexpr std::string_view kName = "jitter"; }; +class GateImpl; + +class Gate { + public: + static std::shared_ptr Make(); + + Gate(); + virtual ~Gate(); + + void ReleaseAllBatches(); + void ReleaseOneBatch(); + Future<> WaitForNextReleasedBatch(); + + private: + ARROW_DISALLOW_COPY_AND_ASSIGN(Gate); + + GateImpl* impl_; +}; + +// A node that holds all input batches until a given gate is released +struct GatedNodeOptions : public ExecNodeOptions { + explicit GatedNodeOptions(Gate* gate) : gate(gate) {} + Gate* gate; + + static constexpr std::string_view kName = "gated"; +}; + void RegisterTestNodes(); } // namespace acero