Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-40224: [C++] Fix: improve the backpressure handling in the dataset writer #40722

Merged
merged 1 commit into from
Apr 4, 2024

Conversation

westonpace
Copy link
Member

@westonpace westonpace commented Mar 21, 2024

Rationale for this change

The dataset writer would fire the resume callback as soon as the underlying dataset writer's queues freed up, even if there were pending tasks. Backpressure is not applied immediately and so a few tasks will always trickle in. If backpressure is pausing and then resuming frequently this can lead to a buildup of pending tasks and uncontrolled memory growth.

What changes are included in this PR?

The resume callback is not called until all pending write tasks have completed.

Are these changes tested?

There is quite an extensive set of tests for the dataset writer already and they continue to pass. I ran them on repeat, with and without stress, and did not see any issues.

However, the underlying problem (dataset writer can have uncontrolled memory growth) is still not tested as it is quite difficult to test. I was able to run the setup described in the issue to reproduce the issue. With this fix the repartitioning task completes for me.

Are there any user-facing changes?

No

@westonpace westonpace added the Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. label Mar 21, 2024
@westonpace westonpace added this to the 16.0.0 milestone Mar 21, 2024
Copy link

⚠️ GitHub issue #40224 has been automatically assigned in GitHub to PR creator.

@thisisnic
Copy link
Member

thisisnic commented Mar 25, 2024

Thanks @westonpace! I can confirm that when running this code on the dataset using the query reported in the original issue, everything now works perfectly! 🎉

@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting committer review Awaiting committer review labels Mar 25, 2024
@thisisnic
Copy link
Member

@pitrou or @bkietz - are you happy for me to merge it?

@@ -277,6 +278,8 @@ class ARROW_EXPORT ThrottledAsyncTaskScheduler : public AsyncTaskScheduler {
/// Allows task to be submitted again. If there is a max_concurrent_cost limit then
/// it will still apply.
virtual void Resume() = 0;
/// Return the number of tasks queued but not yet submitted
virtual std::size_t QueueSize() = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this better a std::size_t QueueSize() const?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation uses a std::mutex and so I'd have to mark the mutex mutable right? Which would you prefer? "mutable mutex" or "non-const accessor"? I don't have strong preference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usally mutex is used with mutable. But this LGTM either, I don't have strong preference too

paused_ = true;
return has_room.Then([this] { ResumeIfNeeded(); });
} else {
ResumeIfNeeded();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is because when has_room is finished when paused_, resume_callback_ is not called?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If has_room is finished then we can unpause if there are no other tasks because there is room for another batch.

Copy link
Member

@mapleFU mapleFU Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask a stupid question that why has_room.Then not being called in this scenerio? In ThrottledAsyncTaskSchedulerImpl::ContinueTasks(), wouldn't it trigger the callback?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh because it's paused...I got to understand this. So we don't "resume" enough, which causing new tasks didn't being consumed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a picture. In the old behavior, we resume too much.
Dataset Backpressure

@mapleFU
Copy link
Member

mapleFU commented Apr 1, 2024

LGTM as a fixing here, seems currently we don't understand why this happens?

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting merge Awaiting merge labels Apr 2, 2024
@westonpace
Copy link
Member Author

LGTM as a fixing here, seems currently we don't understand why this happens?

Thanks for the review. I do understand why this was happening.

  • We can't pause immediately because Acero is push-based. It takes time for the pause signal to travel from sink to source. More tasks may be scheduled during this time (and other tasks may be in flight).
  • Previously, we did not finish queued tasks before unpausing. This means we would unpause and let in more queued tasks and then pause again real quick. This leads to the # of queued tasks growing without bound.
  • The fix does not unpause until all queued tasks have run.

@mapleFU
Copy link
Member

mapleFU commented Apr 2, 2024

Previously, we did not finish queued tasks before unpausing. This means we would unpause and let in more queued tasks and then pause again real quick. This leads to the # of queued tasks growing without bound.

I get to understand why "pause" "resume" is important. But I remain a point that don't understand. When throttle and max_concurrent_cost = 1, the max-running-task = 1, wouldn't Release and ContinueTasks() being called after current pending task (size = 1) is finished?

@westonpace
Copy link
Member Author

I get to understand why "pause" "resume" is important. But I remain a point that don't understand. When throttle and max_concurrent_cost = 1, the max-running-task = 1, wouldn't Release and ContinueTasks() being called after current pending task (size = 1) is finished?

A task is "running" even when it is blocked on backpressure. Since max-running-task is 1 then Release/ContinueTasks won't be called until the has_room future has finished. In the meantime, more tasks may have arrived. Since a task was running and max-running-tasks is 1 then those tasks are put in the queue.

@mapleFU
Copy link
Member

mapleFU commented Apr 2, 2024

So, actually this patch making "resume" more strict in dataset writer scenerio

@mapleFU
Copy link
Member

mapleFU commented Apr 2, 2024

Will merge this in friday if no negative comments

@westonpace
Copy link
Member Author

So, actually this patch making "resume" more strict in dataset writer scenerio

Yes, we want to resume less frequently 👍

Will merge this in friday if no negative comments

Thanks

}
}
if (needs_resume) {
paused_ = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be done with the mutex acquired or are all accesses to paused_ done from the same thread?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, test-ubuntu-20.04-cpp-thread-sanitizer passed at least.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All access to paused_ is done from a single "logical thread". write_tasks_ is a scheduler with max capacity of 1 and so the items submitted to it will never run in parallel (though they may be run on different OS threads).

@pitrou
Copy link
Member

pitrou commented Apr 3, 2024

@github-actions crossbow submit -g cpp

This comment was marked as outdated.

@pitrou pitrou force-pushed the fix/dataset-writer-backpressure branch from 0dc3a3d to fdb625b Compare April 3, 2024 08:34
@pitrou
Copy link
Member

pitrou commented Apr 3, 2024

@github-actions crossbow submit -g cpp

@pitrou
Copy link
Member

pitrou commented Apr 3, 2024

I rebased for CI fixes.

@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Apr 3, 2024
Copy link

github-actions bot commented Apr 3, 2024

Revision: fdb625b

Submitted crossbow builds: ursacomputing/crossbow @ actions-af079e8fb6

Task Status
test-alpine-linux-cpp GitHub Actions
test-build-cpp-fuzz GitHub Actions
test-conda-cpp GitHub Actions
test-conda-cpp-valgrind Azure
test-cuda-cpp GitHub Actions
test-debian-12-cpp-amd64 GitHub Actions
test-debian-12-cpp-i386 GitHub Actions
test-fedora-39-cpp GitHub Actions
test-ubuntu-20.04-cpp GitHub Actions
test-ubuntu-20.04-cpp-bundled GitHub Actions
test-ubuntu-20.04-cpp-minimal-with-formats GitHub Actions
test-ubuntu-20.04-cpp-thread-sanitizer GitHub Actions
test-ubuntu-22.04-cpp GitHub Actions
test-ubuntu-22.04-cpp-20 GitHub Actions
test-ubuntu-22.04-cpp-no-threading GitHub Actions
test-ubuntu-24.04-cpp GitHub Actions
test-ubuntu-24.04-cpp-gcc-14 GitHub Actions

@pitrou pitrou changed the title GH-40224: [C++] fix: improve the backpressure handling in the dataset writer GH-40224: [C++] Fix: improve the backpressure handling in the dataset writer Apr 3, 2024
@pitrou
Copy link
Member

pitrou commented Apr 3, 2024

@ursabot please benchmark

@ursabot
Copy link

ursabot commented Apr 3, 2024

Benchmark runs are scheduled for commit fdb625b. Watch https://buildkite.com/apache-arrow and https://conbench.ursa.dev for updates. A comment will be posted here when the runs are complete.

Copy link

Thanks for your patience. Conbench analyzed the 7 benchmarking runs that have been run so far on PR commit fdb625b.

There were 12 benchmark results indicating a performance regression:

The full Conbench report has more details.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Apr 3, 2024
@pitrou pitrou merged commit 640c101 into apache:main Apr 4, 2024
38 of 40 checks passed
@pitrou pitrou removed the awaiting changes Awaiting changes label Apr 4, 2024
Copy link

After merging your PR, Conbench analyzed the 7 benchmarking runs that have been run so far on merge-commit 640c101.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details. It also includes information about 6 possible false positives for unstable benchmarks that are known to sometimes produce them.

tolleybot pushed a commit to tmct/arrow that referenced this pull request May 2, 2024
…ataset writer (apache#40722)

### Rationale for this change

The dataset writer would fire the resume callback as soon as the underlying dataset writer's queues freed up, even if there were pending tasks.  Backpressure is not applied immediately and so a few tasks will always trickle in.  If backpressure is pausing and then resuming frequently this can lead to a buildup of pending tasks and uncontrolled memory growth.

### What changes are included in this PR?

The resume callback is not called until all pending write tasks have completed.

### Are these changes tested?

There is quite an extensive set of tests for the dataset writer already and they continue to pass.  I ran them on repeat, with and without stress, and did not see any issues.

However, the underlying problem (dataset writer can have uncontrolled memory growth) is still not tested as it is quite difficult to test.  I was able to run the setup described in the issue to reproduce the issue.  With this fix the repartitioning task completes for me.

### Are there any user-facing changes?

No
* GitHub Issue: apache#40224

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
vibhatha pushed a commit to vibhatha/arrow that referenced this pull request May 25, 2024
…ataset writer (apache#40722)

### Rationale for this change

The dataset writer would fire the resume callback as soon as the underlying dataset writer's queues freed up, even if there were pending tasks.  Backpressure is not applied immediately and so a few tasks will always trickle in.  If backpressure is pausing and then resuming frequently this can lead to a buildup of pending tasks and uncontrolled memory growth.

### What changes are included in this PR?

The resume callback is not called until all pending write tasks have completed.

### Are these changes tested?

There is quite an extensive set of tests for the dataset writer already and they continue to pass.  I ran them on repeat, with and without stress, and did not see any issues.

However, the underlying problem (dataset writer can have uncontrolled memory growth) is still not tested as it is quite difficult to test.  I was able to run the setup described in the issue to reproduce the issue.  With this fix the repartitioning task completes for me.

### Are there any user-facing changes?

No
* GitHub Issue: apache#40224

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: C++ Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants