Skip to content

Commit

Permalink
GH-35838: [C++] Add backpressure test for asof join node (#35874)
Browse files Browse the repository at this point in the history
### What changes are included in this PR?

Passing the correct nodes to the backpressure controller, along with better parameter naming/doc. Also added reusable gate-classes (`Gate`, `GatedNodeOptions` and `GatedNode`) that enable holding all input batches until a gate is released, in order to support more robust backpressure testing in this PR.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.

**This PR contains a "Critical Fix".**
* Closes: #35838

Lead-authored-by: Yaron Gvili <rtpsw@hotmail.com>
Co-authored-by: Weston Pace <weston.pace@gmail.com>
Co-authored-by: rtpsw <rtpsw@hotmail.com>
Signed-off-by: Li Jin <ice.xelloss@gmail.com>
  • Loading branch information
rtpsw and westonpace committed Jun 21, 2023
1 parent 65d603a commit 7323952
Show file tree
Hide file tree
Showing 4 changed files with 336 additions and 33 deletions.
13 changes: 7 additions & 6 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -495,9 +495,9 @@ class KeyHasher {
4 * kMiniBatchLength * sizeof(uint32_t));
}

void Invalidate() {
batch_ = NULLPTR; // invalidate cached hashes for batch - required when it changes
}
// invalidate cached hashes for batch - required when it changes
// only this method can be called concurrently with HashesFor
void Invalidate() { batch_ = NULLPTR; }

// compute and cache a hash for each row of the given batch
const std::vector<HashType>& HashesFor(const RecordBatch* batch) {
Expand Down Expand Up @@ -668,18 +668,19 @@ class InputState {

static Result<std::unique_ptr<InputState>> Make(
size_t index, TolType tolerance, bool must_hash, bool may_rehash,
KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node,
std::atomic<int32_t>& backpressure_counter,
const std::shared_ptr<arrow::Schema>& schema, const col_index_t time_col_index,
const std::vector<col_index_t>& key_col_index) {
constexpr size_t low_threshold = 4, high_threshold = 8;
std::unique_ptr<BackpressureControl> backpressure_control =
std::make_unique<BackpressureController>(node, output, backpressure_counter);
std::make_unique<BackpressureController>(
/*node=*/asof_input, /*output=*/asof_node, backpressure_counter);
ARROW_ASSIGN_OR_RAISE(auto handler,
BackpressureHandler::Make(low_threshold, high_threshold,
std::move(backpressure_control)));
return std::make_unique<InputState>(index, tolerance, must_hash, may_rehash,
key_hasher, output, std::move(handler), schema,
key_hasher, asof_node, std::move(handler), schema,
time_col_index, key_col_index);
}

Expand Down
172 changes: 145 additions & 27 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

#include <chrono>
#include <memory>
#include <mutex>
#include <numeric>
#include <random>
#include <string_view>
#include "arrow/acero/exec_plan.h"
#include "arrow/testing/future_util.h"
#ifndef NDEBUG
#include <sstream>
#endif
Expand All @@ -31,6 +34,8 @@
#ifndef NDEBUG
#include "arrow/acero/options_internal.h"
#endif
#include "arrow/acero/map_node.h"
#include "arrow/acero/query_context.h"
#include "arrow/acero/test_nodes.h"
#include "arrow/acero/test_util_internal.h"
#include "arrow/acero/util.h"
Expand Down Expand Up @@ -1360,9 +1365,66 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
})

struct BackpressureCounters {
std::atomic<int32_t> pause_count = 0;
std::atomic<int32_t> resume_count = 0;
};

struct BackpressureCountingNodeOptions : public ExecNodeOptions {
BackpressureCountingNodeOptions(BackpressureCounters* counters) : counters(counters) {}

BackpressureCounters* counters;
};

struct BackpressureCountingNode : public MapNode {
static constexpr const char* kKindName = "BackpressureCountingNode";
static constexpr const char* kFactoryName = "backpressure_count";

static void Register() {
auto exec_reg = default_exec_factory_registry();
if (!exec_reg->GetFactory(kFactoryName).ok()) {
ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureCountingNode::Make));
}
}

BackpressureCountingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema,
const BackpressureCountingNodeOptions& options)
: MapNode(plan, inputs, output_schema), counters(options.counters) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
auto bp_options = static_cast<const BackpressureCountingNodeOptions&>(options);
return plan->EmplaceNode<BackpressureCountingNode>(
plan, inputs, inputs[0]->output_schema(), bp_options);
}

const char* kind_name() const override { return kKindName; }
Result<ExecBatch> ProcessBatch(ExecBatch batch) override { return batch; }

void PauseProducing(ExecNode* output, int32_t counter) override {
++counters->pause_count;
inputs()[0]->PauseProducing(this, counter);
}
void ResumeProducing(ExecNode* output, int32_t counter) override {
++counters->resume_count;
inputs()[0]->ResumeProducing(this, counter);
}

BackpressureCounters* counters;
};

AsyncGenerator<std::optional<ExecBatch>> GetGen(
AsyncGenerator<std::optional<ExecBatch>> gen) {
return gen;
}
AsyncGenerator<std::optional<ExecBatch>> GetGen(BatchesWithSchema bws) {
return bws.gen(false, false);
}

template <typename BatchesMaker>
void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
double fast_delay, double slow_delay, bool noisy = false) {
void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) {
auto l_schema =
schema({field("time", int32()), field("key", int32()), field("l_value", int32())});
auto r0_schema =
Expand All @@ -1381,36 +1443,93 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));

Declaration l_src = {
"source", SourceNodeOptions(
l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
Declaration r0_src = {
"source", SourceNodeOptions(
r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
Declaration r1_src = {
"source", SourceNodeOptions(
r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
BackpressureCountingNode::Register();
RegisterTestNodes(); // for GatedNode

Declaration asofjoin = {
"asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
struct BackpressureSourceConfig {
std::string name_prefix;
bool is_gated;
std::shared_ptr<Schema> schema;
decltype(l_batches) batches;

std::string name() const {
return name_prefix + ";" + (is_gated ? "gated" : "ungated");
}
};

auto gate_ptr = Gate::Make();
auto& gate = *gate_ptr;
GatedNodeOptions gate_options(gate_ptr.get());

// Two ungated and one gated
std::vector<BackpressureSourceConfig> source_configs = {
{"0", false, l_schema, l_batches},
{"1", true, r0_schema, r0_batches},
{"2", false, r1_schema, r1_batches},
};

std::vector<BackpressureCounters> bp_counters(source_configs.size());
std::vector<Declaration> src_decls;
std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
std::vector<Declaration::Input> bp_decls;
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& config = source_configs[i];

src_decls.emplace_back("source",
SourceNodeOptions(config.schema, GetGen(config.batches)));
bp_options.push_back(
std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
std::shared_ptr<ExecNodeOptions> options = bp_options.back();
std::vector<Declaration::Input> bp_in = {src_decls.back()};
Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
std::move(options)};
if (config.is_gated) {
bp_decl = {std::string{GatedNodeOptions::kName}, {bp_decl}, gate_options};
}
bp_decls.push_back(bp_decl);
}

Declaration asofjoin = {"asofjoin", bp_decls,
GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};

ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
internal::ThreadPool::Make(1));
ExecContext exec_ctx(default_memory_pool(), tpool.get());
Future<BatchesWithCommonSchema> batches_fut =
DeclarationToExecBatchesAsync(asofjoin, exec_ctx);

auto has_bp_been_applied = [&] {
// One of the inputs is gated. The other two will eventually be paused by the asof
// join node
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& counters = bp_counters[i];
if (source_configs[i].is_gated) {
if (counters.pause_count > 0) return false;
} else {
if (counters.pause_count != 1) return false;
}
}
return true;
};

BusyWait(10.0, has_bp_been_applied);
ASSERT_TRUE(has_bp_been_applied());

ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
DeclarationToReader(asofjoin, /*use_threads=*/false));
gate.ReleaseAllBatches();
ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut);

int64_t total_length = 0;
for (;;) {
ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
if (!batch) {
break;
// One of the inputs is gated. The other two will eventually be resumed by the asof
// join node
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& counters = bp_counters[i];
if (!source_configs[i].is_gated) {
ASSERT_GE(counters.resume_count, 0);
}
total_length += batch->num_rows();
}
ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
}

TEST(AsofJoinTest, BackpressureWithBatches) {
return TestBackpressure(MakeIntegerBatches, /*num_batches=*/10, /*batch_size=*/1,
/*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/false);
return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1);
}

template <typename BatchesMaker>
Expand Down Expand Up @@ -1473,10 +1592,9 @@ T GetEnvValue(const std::string& var, T default_value) {
} // namespace

TEST(AsofJoinTest, BackpressureWithBatchesGen) {
int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 10);
int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20);
int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1);
return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size,
/*fast_delay=*/0.001, /*slow_delay=*/0.01);
return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size);
}

} // namespace acero
Expand Down
Loading

0 comments on commit 7323952

Please sign in to comment.