Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-35838: [C++] Add backpressure test for asof join node #35874

Merged
merged 20 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ class KeyHasher {
size_t index_;
std::vector<col_index_t> indices_;
std::vector<KeyColumnMetadata> metadata_;
const RecordBatch* batch_;
std::atomic<const RecordBatch*> batch_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above it is saying "the key hasher is not thread-safe", if so, why do we care if this is atomic here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we using the key hasher in multiple thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is queried from one thread but invalidated from another. We discussed offline that this can be simplified so that the key hasher would only be used from one thread, but this is (currently?) out of scope for this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see in that case you can update the doc to explain how this class should be used in multi-threaded execution? Currently looks like the doc about thread safety is not correct, i.e., it is used by multiple thread while the docstring says it's not thread safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up just fixing it to be single-threaded.

Copy link
Contributor

@icexelloss icexelloss Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up just fixing it to be single-threaded.

Nice - can you point me where the fix is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I only pushed it in the recent commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am somewhat confused. If this is single-threaded, then we don't need this to be atomic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rtpsw Do we still need to change this. If so, why?

std::vector<HashType> hashes_;
LightContext ctx_;
std::vector<KeyColumnArray> column_arrays_;
Expand Down Expand Up @@ -668,18 +668,19 @@ class InputState {

static Result<std::unique_ptr<InputState>> Make(
size_t index, TolType tolerance, bool must_hash, bool may_rehash,
KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node,
icexelloss marked this conversation as resolved.
Show resolved Hide resolved
std::atomic<int32_t>& backpressure_counter,
const std::shared_ptr<arrow::Schema>& schema, const col_index_t time_col_index,
const std::vector<col_index_t>& key_col_index) {
constexpr size_t low_threshold = 4, high_threshold = 8;
std::unique_ptr<BackpressureControl> backpressure_control =
std::make_unique<BackpressureController>(node, output, backpressure_counter);
std::make_unique<BackpressureController>(
/*node=*/asof_input, /*output=*/asof_node, backpressure_counter);
ARROW_ASSIGN_OR_RAISE(auto handler,
BackpressureHandler::Make(low_threshold, high_threshold,
std::move(backpressure_control)));
return std::make_unique<InputState>(index, tolerance, must_hash, may_rehash,
key_hasher, output, std::move(handler), schema,
key_hasher, asof_node, std::move(handler), schema,
time_col_index, key_col_index);
}

Expand Down
171 changes: 158 additions & 13 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#ifndef NDEBUG
#include "arrow/acero/options_internal.h"
#endif
#include "arrow/acero/map_node.h"
#include "arrow/acero/test_nodes.h"
#include "arrow/acero/test_util_internal.h"
#include "arrow/acero/util.h"
Expand Down Expand Up @@ -1360,6 +1361,102 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
})

struct BackpressureCounters {
std::atomic<int32_t> pause_count = 0;
std::atomic<int32_t> resume_count = 0;
};

struct BackpressureCountingNodeOptions : public ExecNodeOptions {
BackpressureCountingNodeOptions(BackpressureCounters* counters) : counters(counters) {}

BackpressureCounters* counters;
};

struct BackpressureCountingNode : public MapNode {
static constexpr const char* kKindName = "BackpressureCountingNode";
static constexpr const char* kFactoryName = "backpressure_count";

static void Register() {
auto exec_reg = default_exec_factory_registry();
if (!exec_reg->GetFactory(kFactoryName).ok()) {
ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureCountingNode::Make));
}
}

BackpressureCountingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema,
const BackpressureCountingNodeOptions& options)
: MapNode(plan, inputs, output_schema), counters(options.counters) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
auto bp_options = static_cast<const BackpressureCountingNodeOptions&>(options);
return plan->EmplaceNode<BackpressureCountingNode>(
plan, inputs, inputs[0]->output_schema(), bp_options);
}

const char* kind_name() const override { return kKindName; }
Result<ExecBatch> ProcessBatch(ExecBatch batch) override { return batch; }

void PauseProducing(ExecNode* output, int32_t counter) override {
++counters->pause_count;
inputs()[0]->PauseProducing(this, counter);
}
void ResumeProducing(ExecNode* output, int32_t counter) override {
++counters->resume_count;
inputs()[0]->ResumeProducing(this, counter);
}

BackpressureCounters* counters;
};

struct BackpressureDelayingNodeOptions : public ExecNodeOptions {
BackpressureDelayingNodeOptions(double delay_seconds, std::function<bool()> gate)
: delay_seconds(delay_seconds), gate(gate) {}

double delay_seconds;
std::function<bool()> gate;
};

struct BackpressureDelayingNode : public MapNode {
static constexpr auto kKindName = "BackpressureDelayingNode";
static constexpr const char* kFactoryName = "backpressure_delay";

static void Register() {
auto exec_reg = default_exec_factory_registry();
if (!exec_reg->GetFactory(kFactoryName).ok()) {
ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureDelayingNode::Make));
}
}

BackpressureDelayingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema,
const BackpressureDelayingNodeOptions& options)
: MapNode(plan, inputs, output_schema),
gate(options.gate),
delay_seconds(options.delay_seconds) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
auto bp_options = static_cast<const BackpressureDelayingNodeOptions&>(options);
return plan->EmplaceNode<BackpressureDelayingNode>(
plan, inputs, inputs[0]->output_schema(), bp_options);
}

const char* kind_name() const override { return kKindName; }
Result<ExecBatch> ProcessBatch(ExecBatch batch) override {
while (!gate()) {
SleepFor(delay_seconds);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use a condition variable instead of a loop here. This entire test could be written without any sleeps in that case. However, this isn't too bad. You could also get rid of delay_seconds and just use SleepABit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with SleepABit.

}
return batch;
}

std::function<bool()> gate;
double delay_seconds;
};

template <typename BatchesMaker>
void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
double fast_delay, double slow_delay, bool noisy = false) {
Expand All @@ -1381,21 +1478,62 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));

Declaration l_src = {
"source", SourceNodeOptions(
l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
Declaration r0_src = {
"source", SourceNodeOptions(
r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
Declaration r1_src = {
"source", SourceNodeOptions(
r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
BackpressureCountingNode::Register();
BackpressureDelayingNode::Register();

struct BackpressureSourceConfig {
std::string name_prefix;
bool is_fast;
std::shared_ptr<Schema> schema;
decltype(l_batches) batches;

std::string name() const { return name_prefix + ";" + (is_fast ? "fast" : "slow"); }
};

// must have at least one fast and one slow
std::vector<BackpressureSourceConfig> source_configs = {
{"0", true, l_schema, l_batches},
{"1", false, r0_schema, r0_batches},
{"2", true, r1_schema, r1_batches},
};

std::vector<BackpressureCounters> bp_counters(source_configs.size());
std::vector<Declaration> src_decls;
std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
std::vector<Declaration::Input> bp_decls;
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& config = source_configs[i];
src_decls.emplace_back(
"source", SourceNodeOptions(
config.schema,
MakeDelayedGen(config.batches, config.name(),
config.is_fast ? fast_delay : slow_delay, noisy)));
bp_options.push_back(
std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
std::shared_ptr<ExecNodeOptions> options = bp_options.back();
std::vector<Declaration::Input> bp_in = {src_decls.back()};
Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
std::move(options)};
bp_decls.push_back(bp_decl);
}

Declaration asofjoin = {
"asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
"asofjoin", bp_decls,
GetRepeatedOptions(source_configs.size(), "time", {"key"}, 1000)};

BackpressureDelayingNodeOptions delay_options(slow_delay * 5, [&bp_counters]() {
for (const auto& counters : bp_counters) {
if (counters.pause_count > 0 || counters.resume_count > 0) {
return true;
}
}
return false;
});
Declaration delaying = {
BackpressureDelayingNode::kFactoryName, {asofjoin}, delay_options};

ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
DeclarationToReader(asofjoin, /*use_threads=*/false));
DeclarationToReader(delaying, /*use_threads=*/false));

int64_t total_length = 0;
for (;;) {
Expand All @@ -1406,10 +1544,17 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
total_length += batch->num_rows();
}
ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);

size_t total_count = 0;
for (const auto& counters : bp_counters) {
total_count += counters.pause_count;
total_count += counters.resume_count;
}
ASSERT_GT(total_count, 0);
}

TEST(AsofJoinTest, BackpressureWithBatches) {
return TestBackpressure(MakeIntegerBatches, /*num_batches=*/10, /*batch_size=*/1,
return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1,
/*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/false);
}

Expand Down Expand Up @@ -1473,7 +1618,7 @@ T GetEnvValue(const std::string& var, T default_value) {
} // namespace

TEST(AsofJoinTest, BackpressureWithBatchesGen) {
int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 10);
int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20);
int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1);
return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size,
/*fast_delay=*/0.001, /*slow_delay=*/0.01);
Expand Down
Loading