Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-35838: [C++] Add backpressure test for asof join node #35874

Merged
merged 20 commits into from
Jun 21, 2023

Conversation

rtpsw
Copy link
Contributor

@rtpsw rtpsw commented Jun 1, 2023

What changes are included in this PR?

Passing the correct nodes to the backpressure controller, along with better parameter naming/doc. Also added reusable gate-classes (Gate, GatedNodeOptions and GatedNode) that enable holding all input batches until a gate is released, in order to support more robust backpressure testing in this PR.

Are these changes tested?

Yes.

Are there any user-facing changes?

No.

This PR contains a "Critical Fix".

@icexelloss
Copy link
Contributor

@westonpace Are you OK with me merging this to unblock internal issue? @rtpsw Is working on adding tests now but it might take a while.

@@ -668,13 +668,13 @@ class InputState {

static Result<std::unique_ptr<InputState>> Make(
size_t index, TolType tolerance, bool must_hash, bool may_rehash,
KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,
Copy link
Contributor

@icexelloss icexelloss Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still looking confusing:

BackpressureController takes ExecNode* node, ExecNode* output and this one nows takes AsofJoinNode* node, ExecNode* input which is inconsistent

Can we make this consistent between the two?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also confusing that

    return std::make_unique<InputState>(index, tolerance, must_hash, may_rehash,
                                        key_hasher, node, std::move(handler), schema,
                                        time_col_index, key_col_index);

On line 681 passes the asof join node to the input state instead of the input node, why is it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rtpsw I think it is easier to revert this line as to unblock the internal first:
https://github.com/apache/arrow/pull/34392/files#diff-5493b6ae7ea2a4d5cfb581034c076e9c4be7608382168de6d1301ef67b6c01eeR1410

Then work on cleaning up changes introduced in GH-36391. The code is quite confusing now.

Copy link
Contributor Author

@rtpsw rtpsw Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still looking confusing:

BackpressureController takes ExecNode* node, ExecNode* output and this one nows takes AsofJoinNode* node, ExecNode* input which is inconsistent

Can we make this consistent between the two?

This is actually intended to make things clearer. The two places have a different perspective of what is an input and what is an output (which likely caused confusion in the first place). The ExecNode passed to Make is an input of the as-of-join node while PauseProducing (and similarly ResumeProducing) sees the as-of-join node as an output of the ExecNode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is easier to revert this line

This reversion alone doesn't compile because inputs[i] is not of type AsofJoinNode*.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably easier for me to do it instead of back and forth, opened:

#35878

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with figure out the cleaner way to do this in follow up to GH-36391. But for now I think it's easier to just revert the change.

Copy link
Contributor

@icexelloss icexelloss Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two places have a different perspective of what is an input and what is an output (which likely caused confusion in the first place). The ExecNode passed to Make is an input of the as-of-join node while PauseProducing (and similarly ResumeProducing) sees the as-of-join node as an output of the ExecNode.

We should probably fix the variable naming in the follow PR to GH-36391 how to call these things then. But for now let's just revert to what was before

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use more specific names like asof_input and asof_node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting review Awaiting review labels Jun 1, 2023
std::atomic<int32_t>& backpressure_counter,
const std::shared_ptr<arrow::Schema>& schema, const col_index_t time_col_index,
const std::vector<col_index_t>& key_col_index) {
constexpr size_t low_threshold = 4, high_threshold = 8;
std::unique_ptr<BackpressureControl> backpressure_control =
std::make_unique<BackpressureController>(node, output, backpressure_counter);
std::make_unique<BackpressureController>(input, node, backpressure_counter);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there's been some confusion, can you make parameter names explicit?

Suggested change
std::make_unique<BackpressureController>(input, node, backpressure_counter);
std::make_unique<BackpressureController>(/*xxx=*/ input, /*yyy=*/ node, backpressure_counter);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@github-actions github-actions bot added awaiting change review Awaiting change review awaiting changes Awaiting changes and removed awaiting changes Awaiting changes awaiting change review Awaiting change review labels Jun 1, 2023
@rtpsw
Copy link
Contributor Author

rtpsw commented Jun 1, 2023

@icexelloss, I added test cases. This PR can now be reviewed as a (non-quick) resolution.

@@ -1406,6 +1458,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
total_length += batch->num_rows();
}
ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);

ASSERT_GT(pause_count, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we validate pause resume counter for all sources? i.e the slow table should not have been paused, but the fast tables should?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we validate pause resume counter for all sources?

I added validation ...

i.e the slow table should not have been paused, but the fast tables should?

... but this is not the actual behavior - instead only one fast source gets paused and resumed - so this is what the recent commit validates. @westonpace, is this behavior expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may have crossed - the previous commit already has one pause and one resume counter for each source. The issue is that sometimes (due to non-deterministic timing of operations) no pause/resume is requested on one or both of the fast sources. Because of this, in the recent commit I am attempting this test logic: there is a pause request on a fast source if and only if there is a resume one too.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Jun 2, 2023
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have a little bit of concern that this test may become flaky on CI machines which have notoriously weird timing. However, we can give it a few runs and see how it goes. If it does become a problem I'd suggest we invent some kind of "gated source" so we can do something like (very rough pseudocode)...

GatedSourceNode gated_left = GatedSourceNode(l_batches);
GatedSourceNode gated_right = GatedSourceNode(r_batches);
gated_right.Ungate();

// Start plan

WaitFor([&] { right_bp_options->pause_count > 0; });
ASSERT_EQ(right_bp_options->resume_count, 0);
ASSERT_EQ(left_bp_options->pause_count, 0);
ASSERT_EQ(left_bp_options->resume_count, 0);

gated_left->Ungate();
AssertPlanFinishes();

I'm off today and Monday but I can sketch up a possible GatedSourceNode implementation when I get back if needed.

@rtpsw
Copy link
Contributor Author

rtpsw commented Jun 2, 2023

I do have a little bit of concern that this test may become flaky on CI machines

Indeed it is flaky - this and this and this CI jobs have failed on the new check. I'll make a quick attempt to fix this.

I agree relying on timing is not good, though that can be said about the pre-PR backpressure test code too, and this PR won't be worse in this respect.

@github-actions github-actions bot added awaiting change review Awaiting change review awaiting changes Awaiting changes and removed awaiting changes Awaiting changes awaiting change review Awaiting change review labels Jun 2, 2023
@icexelloss
Copy link
Contributor

Thinking about loud here:

What we want to test is that if the through put of asof join node is slower than the source, then we would pause the source. Two potential ways that I think we can reliably do this:
(1) Add some sort of "debug options" to manipulate the behavior of asof join to make it run slower. (i.e. Sleep a few seconds before actually starting the work in the processing thread)
(2) Add a downstream node to asof join that processes data slowly (similar to a slow data sink), i.e., process one batch per second. This way, the backpressure would be pushed from the slow sink to asof join then to the data sources.

I think I prefer (2) a bit more because this affects represents a real life case of slow sink.

@westonpace I am not sure if the idea of GatedSourceNode is similar or different, but happy to hear

ASSERT_GT(counters_by_is_fast[true].resume_count, 0);
// runs on some slow machines may not see any pause/resume, but if at least one pause is
// seen then at least one resume must also be seen
ASSERT_EQ(counters_by_is_fast[false].pause_count > 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we really need to check the slow sources, because it is entirely possible that it is not paused at all in the expected behavior (since it is slower than asof join?)

Copy link
Contributor Author

@rtpsw rtpsw Jun 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do. My understanding is that the intention of the pre-PR test code was to drive the as-of-join slower using a slower input to it, which should lead to backpressure from the as-of-join node toward a faster source. The additional intention of the post-PR test code is to check that indeed this backpressure happens, and this is observed at the test nodes inserted after each source.

@rtpsw
Copy link
Contributor Author

rtpsw commented Jun 2, 2023

Thinking about loud here:

What we want to test is that if the through put of asof join node is slower than the source, then we would pause the source. Two potential ways that I think we can reliably do this: (1) Add some sort of "debug options" to manipulate the behavior of asof join to make it run slower. (i.e. Sleep a few seconds before actually starting the work in the processing thread) (2) Add a downstream node to asof join that processes data slowly (similar to a slow data sink), i.e., process one batch per second. This way, the backpressure would be pushed from the slow sink to asof join then to the data sources.

I think I prefer (2) a bit more because this affects represents a real life case of slow sink.

@westonpace I am not sure if the idea of GatedSourceNode is similar or different, but happy to hear

While I'm not sure exactly what Weston has in mind, my understanding is that the GatedSourceNode's goal is to avoid flakiness due to non-deterministic timing. IMO, both (1) and (2) above could be flaky due to non-deterministic timing.

Between (1) and (2) I also wouldn't prefer (1) because the debug-options would change the behavior of the as-of-join node being tested, and I prefer to change the code driving it instead.

Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rtpsw A few comments:
(1) Please separate out threading related changes to asof join - those should be in separate PR (outside of original scope)
(2) Please update the original iGH ssue to better reflect the change in this PR
(3) other inline comments

Gate* gate;
};

struct GatedNode : public ExecNode, public TracedNode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add documentation for this class to describe the purpose of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@github-actions github-actions bot added awaiting change review Awaiting change review awaiting changes Awaiting changes and removed awaiting changes Awaiting changes awaiting change review Awaiting change review labels Jun 15, 2023
@rtpsw
Copy link
Contributor Author

rtpsw commented Jun 15, 2023

(2) Please update the original iGH ssue to better reflect the change in this PR

It's authored by Weston - I don't have permission to edit. I edited the description of this PR instead,

@rtpsw
Copy link
Contributor Author

rtpsw commented Jun 15, 2023

This CI job failure suggests that the tightening condition (in this commit) isn't always true. We should decide whether to revert the tightened condition or to look for the cause for its violation.

@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Jun 16, 2023
@icexelloss
Copy link
Contributor

(2) Please update the original iGH ssue to better reflect the change in this PR

It's authored by Weston - I don't have permission to edit. I edited the description of this PR instead,

Ok we can leave it as is then.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Jun 16, 2023
@rtpsw rtpsw requested a review from icexelloss June 17, 2023 10:36
Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@icexelloss icexelloss merged commit 7323952 into apache:main Jun 21, 2023
36 of 39 checks passed
@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting changes Awaiting changes labels Jun 21, 2023
@rtpsw rtpsw deleted the GH-35838 branch June 22, 2023 09:15
@conbench-apache-arrow
Copy link

Conbench analyzed the 6 benchmark runs on commit 73239526.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C++] Backpressure broken in asof join node
4 participants