Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-35838: [C++] Add backpressure test for asof join node #35874

Merged
merged 20 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -495,9 +495,9 @@ class KeyHasher {
4 * kMiniBatchLength * sizeof(uint32_t));
}

void Invalidate() {
batch_ = NULLPTR; // invalidate cached hashes for batch - required when it changes
}
// invalidate cached hashes for batch - required when it changes
// only this method can be called concurrently with HashesFor
void Invalidate() { batch_ = NULLPTR; }

// compute and cache a hash for each row of the given batch
const std::vector<HashType>& HashesFor(const RecordBatch* batch) {
Expand Down Expand Up @@ -529,7 +529,7 @@ class KeyHasher {
size_t index_;
std::vector<col_index_t> indices_;
std::vector<KeyColumnMetadata> metadata_;
const RecordBatch* batch_;
std::atomic<const RecordBatch*> batch_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above it is saying "the key hasher is not thread-safe", if so, why do we care if this is atomic here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we using the key hasher in multiple thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is queried from one thread but invalidated from another. We discussed offline that this can be simplified so that the key hasher would only be used from one thread, but this is (currently?) out of scope for this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see in that case you can update the doc to explain how this class should be used in multi-threaded execution? Currently looks like the doc about thread safety is not correct, i.e., it is used by multiple thread while the docstring says it's not thread safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up just fixing it to be single-threaded.

Copy link
Contributor

@icexelloss icexelloss Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up just fixing it to be single-threaded.

Nice - can you point me where the fix is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I only pushed it in the recent commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am somewhat confused. If this is single-threaded, then we don't need this to be atomic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rtpsw Do we still need to change this. If so, why?

std::vector<HashType> hashes_;
LightContext ctx_;
std::vector<KeyColumnArray> column_arrays_;
Expand Down Expand Up @@ -668,18 +668,19 @@ class InputState {

static Result<std::unique_ptr<InputState>> Make(
size_t index, TolType tolerance, bool must_hash, bool may_rehash,
KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node,
icexelloss marked this conversation as resolved.
Show resolved Hide resolved
std::atomic<int32_t>& backpressure_counter,
const std::shared_ptr<arrow::Schema>& schema, const col_index_t time_col_index,
const std::vector<col_index_t>& key_col_index) {
constexpr size_t low_threshold = 4, high_threshold = 8;
std::unique_ptr<BackpressureControl> backpressure_control =
std::make_unique<BackpressureController>(node, output, backpressure_counter);
std::make_unique<BackpressureController>(
/*node=*/asof_input, /*output=*/asof_node, backpressure_counter);
ARROW_ASSIGN_OR_RAISE(auto handler,
BackpressureHandler::Make(low_threshold, high_threshold,
std::move(backpressure_control)));
return std::make_unique<InputState>(index, tolerance, must_hash, may_rehash,
key_hasher, output, std::move(handler), schema,
key_hasher, asof_node, std::move(handler), schema,
time_col_index, key_col_index);
}

Expand Down
172 changes: 145 additions & 27 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

#include <chrono>
#include <memory>
#include <mutex>
#include <numeric>
#include <random>
#include <string_view>
#include "arrow/acero/exec_plan.h"
#include "arrow/testing/future_util.h"
#ifndef NDEBUG
#include <sstream>
#endif
Expand All @@ -31,6 +34,8 @@
#ifndef NDEBUG
#include "arrow/acero/options_internal.h"
#endif
#include "arrow/acero/map_node.h"
#include "arrow/acero/query_context.h"
#include "arrow/acero/test_nodes.h"
#include "arrow/acero/test_util_internal.h"
#include "arrow/acero/util.h"
Expand Down Expand Up @@ -1360,9 +1365,66 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
})

struct BackpressureCounters {
std::atomic<int32_t> pause_count = 0;
std::atomic<int32_t> resume_count = 0;
};

struct BackpressureCountingNodeOptions : public ExecNodeOptions {
BackpressureCountingNodeOptions(BackpressureCounters* counters) : counters(counters) {}

BackpressureCounters* counters;
};

struct BackpressureCountingNode : public MapNode {
static constexpr const char* kKindName = "BackpressureCountingNode";
static constexpr const char* kFactoryName = "backpressure_count";

static void Register() {
auto exec_reg = default_exec_factory_registry();
if (!exec_reg->GetFactory(kFactoryName).ok()) {
ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureCountingNode::Make));
}
}

BackpressureCountingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema,
const BackpressureCountingNodeOptions& options)
: MapNode(plan, inputs, output_schema), counters(options.counters) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
auto bp_options = static_cast<const BackpressureCountingNodeOptions&>(options);
return plan->EmplaceNode<BackpressureCountingNode>(
plan, inputs, inputs[0]->output_schema(), bp_options);
}

const char* kind_name() const override { return kKindName; }
Result<ExecBatch> ProcessBatch(ExecBatch batch) override { return batch; }

void PauseProducing(ExecNode* output, int32_t counter) override {
++counters->pause_count;
inputs()[0]->PauseProducing(this, counter);
}
void ResumeProducing(ExecNode* output, int32_t counter) override {
++counters->resume_count;
inputs()[0]->ResumeProducing(this, counter);
}

BackpressureCounters* counters;
};

AsyncGenerator<std::optional<ExecBatch>> GetGen(
AsyncGenerator<std::optional<ExecBatch>> gen) {
return gen;
}
AsyncGenerator<std::optional<ExecBatch>> GetGen(BatchesWithSchema bws) {
return bws.gen(false, false);
}

template <typename BatchesMaker>
void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
double fast_delay, double slow_delay, bool noisy = false) {
void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) {
auto l_schema =
schema({field("time", int32()), field("key", int32()), field("l_value", int32())});
auto r0_schema =
Expand All @@ -1381,36 +1443,93 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));

Declaration l_src = {
"source", SourceNodeOptions(
l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
Declaration r0_src = {
"source", SourceNodeOptions(
r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
Declaration r1_src = {
"source", SourceNodeOptions(
r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
BackpressureCountingNode::Register();
RegisterTestNodes(); // for GatedNode

Declaration asofjoin = {
"asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
struct BackpressureSourceConfig {
std::string name_prefix;
bool is_gated;
std::shared_ptr<Schema> schema;
decltype(l_batches) batches;

std::string name() const {
return name_prefix + ";" + (is_gated ? "gated" : "ungated");
}
};

auto gate_ptr = Gate::Make();
auto& gate = *gate_ptr;
GatedNodeOptions gate_options(gate_ptr.get());

// Two ungated and one gated
std::vector<BackpressureSourceConfig> source_configs = {
{"0", false, l_schema, l_batches},
{"1", true, r0_schema, r0_batches},
{"2", false, r1_schema, r1_batches},
};

std::vector<BackpressureCounters> bp_counters(source_configs.size());
std::vector<Declaration> src_decls;
std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
std::vector<Declaration::Input> bp_decls;
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& config = source_configs[i];

src_decls.emplace_back("source",
SourceNodeOptions(config.schema, GetGen(config.batches)));
bp_options.push_back(
std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
std::shared_ptr<ExecNodeOptions> options = bp_options.back();
std::vector<Declaration::Input> bp_in = {src_decls.back()};
Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
std::move(options)};
if (config.is_gated) {
bp_decl = {std::string{GatedNodeOptions::kName}, {bp_decl}, gate_options};
}
bp_decls.push_back(bp_decl);
}

Declaration asofjoin = {"asofjoin", bp_decls,
GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};

ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
internal::ThreadPool::Make(1));
ExecContext exec_ctx(default_memory_pool(), tpool.get());
Future<BatchesWithCommonSchema> batches_fut =
DeclarationToExecBatchesAsync(asofjoin, exec_ctx);

auto has_bp_been_applied = [&] {
// One of the inputs is gated. The other two will eventually be paused by the asof
// join node
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& counters = bp_counters[i];
if (source_configs[i].is_gated) {
if (counters.pause_count > 0) return false;
icexelloss marked this conversation as resolved.
Show resolved Hide resolved
} else {
if (counters.pause_count != 1) return false;
}
}
return true;
};

BusyWait(10.0, has_bp_been_applied);
ASSERT_TRUE(has_bp_been_applied());

ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
DeclarationToReader(asofjoin, /*use_threads=*/false));
gate.ReleaseAllBatches();
ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut);

int64_t total_length = 0;
for (;;) {
ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
if (!batch) {
break;
// One of the inputs is gated. The other two will eventually be resumed by the asof
// join node
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& counters = bp_counters[i];
if (!source_configs[i].is_gated) {
ASSERT_GE(counters.resume_count, 0);
}
total_length += batch->num_rows();
}
ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
}

TEST(AsofJoinTest, BackpressureWithBatches) {
return TestBackpressure(MakeIntegerBatches, /*num_batches=*/10, /*batch_size=*/1,
/*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/false);
return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1);
}

template <typename BatchesMaker>
Expand Down Expand Up @@ -1473,10 +1592,9 @@ T GetEnvValue(const std::string& var, T default_value) {
} // namespace

TEST(AsofJoinTest, BackpressureWithBatchesGen) {
int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 10);
int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20);
int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1);
return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size,
/*fast_delay=*/0.001, /*slow_delay=*/0.01);
return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size);
}

} // namespace acero
Expand Down
Loading
Loading