Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17509: [C++] Simplify async scheduler by removing the need to call End #14524

Conversation

westonpace
Copy link
Member

@westonpace westonpace commented Oct 26, 2022

This call to end was a foot-gun because failing to call End on a scheduler would lead to deadlock. This led to numerous situations where one would accidentally fail to call End because of some kind of error or exceptional behavior.

Now the call to end is no longer required. The scheduler has also been simplified by breaking into three different concepts. The root scheduler runs a bunch of tasks until there are no more tasks and then automatically ends.

A throttle can wrap a scheduler and throttle task execution, deferring tasks to a queue if there is no space for new tasks.

A task group tracks a group of tasks and runs a finish callback when all the tasks in that group have finished.

@github-actions
Copy link

@github-actions
Copy link

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@westonpace
Copy link
Member Author

CC @save-buffer would appreciate a look

Copy link
Contributor

@save-buffer save-buffer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall seems pretty reasonable. A different interface that could be simpler is to simply change the meaning of End to mean "End when empty but allow adding more tasks" instead of "End when empty but disallow adding more tasks".

});
}
#endif
// TODO(weston) The entire concept of ExecNode::finished() will hopefully go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep my simplification PR gets rid of those finished futures.

// part of StartProducing) then the plan may be finished before we return from this
// call.
Future<> scheduler_finished =
util::AsyncTaskScheduler::Make([this](util::AsyncTaskScheduler* async_scheduler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this giant lambda into its own function? The extra layer of indentation is pretty confusing

}
std::shared_ptr<AsyncTaskScheduler> MakeSubScheduler(
std::unique_ptr<AsyncTaskScheduler::Holder> MakeSubScheduler(
FnOnce<Status(AsyncTaskScheduler*)> initial_task,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we guarantee that this initial_task gets run synchronously? I get why it's needed (the scheduler will end itself immediately after creation without it), but I would like the control flow for scheduling the initial tasks to be simple. We could make this function return a Result<unique_ptr> in case a node has a problem scheduling its initial tasks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. initial_task is always run synchronously as part of the call to MakeSubScheduler (so it should be safe, e.g. to use [&]). Furthermore, when you add a task to a scheduler that does not have a throttle, the task submitter portion will always run synchronously as part of the call to AddTask. I'll update the docs to make this clearer.

@westonpace
Copy link
Member Author

A different interface that could be simpler is to simply change the meaning of End to mean "End when empty but allow adding more tasks" instead of "End when empty but disallow adding more tasks".

You are now allowed to add tasks after a scheduler is "ended". The top-level scheduler basically starts "ended" (after the initial task runs). If you don't use or need sub-schedulers then we have what you describe.

However, there are some cases where we don't want a sub-scheduler to end even though it has run out of tasks. For example, when writing to a file. We want to close the file when all write tasks have finished. However, we don't know, at the start, how many write tasks there will be, and we don't have all of them (sometimes InputReceived will add another write task and sometimes it won't, depends on partitioning).

That being said, we still want to end the file writer no matter what when the plan itself is out of tasks. At that point there is no possible way we could add more write tasks so the fact that it hasn't been ended is probably a bug or an error condition we don't explicitly handle and so we should just abort what we were doing and close the file.

@westonpace
Copy link
Member Author

westonpace commented Nov 1, 2022

  • Reminder to myself to update the docs. They still mention that you don't need to dispose the holder but you do need to do so.
  • Create a follow-up PR to add deadlock detection

…ire a call to End

This call to end was a foot-gun because failing to call End on a scheduler would lead to deadlock.  This
led to numerous situations where one would accidentally fail to call End because of some kind of error
or exceptional behavior.

Now the call to end is no longer required.  Schedulers are semantically divided (there aren't actually
three different classes, just three different supported use cases) into three different types:

 * Destroy-when-done schedulers destroy themselves whenever they have finished running all of their tasks.
   These schedulers require an initial task to bootstrap the scheduler and then that task (and it's subtasks)
   can add more sub-tasks.  This is the simplest and most basic kind of scheduler and the top-level scheduler
   must always be this kind of scheduler.
 * Peer schedulers will be ended when their parent scheduler runs out of tasks.  These are useful for sinks
   which get tasks on a push-driven basis and don't have all their tasks up front.  However, when the plan
   is done, we know for certain no more tasks will arrive.
 * Eager peer schedulers are the same as peer schedulers but they might be eagerly ended before their parent
   scheduler is finished.  The file queue in the dataset writer is a good example.  It receives tasks in an
   ad-hoc fashion so it is a peer scheduler.  However, once we have written max_rows rows to a file we can
   end it early (no need to wait for the plan to finish).  However, if we forget to end it early (or fail
   to due so in some error scenario) it will still end when the plan ends.
…tly. It now uses the plan's scheduler as a parent.
…pts at detecting deadlock but had little luck and have run out of energy to try further.
…heduler into three classes (scheduler, throttle, task group). Moved backpressure management into the dataset writer itself.
@westonpace westonpace force-pushed the feature/ARROW-17509--async-task-scheduler-without-end branch from ca179e2 to 884d9e0 Compare November 4, 2022 23:52
…ks. Created the MakeThrottledTaskGroup utility function which properly creates a task group around a throttle around a scheduler.
@westonpace
Copy link
Member Author

I reworked everything again so the above comments aren't really valid. I've reworked the PR description to reflect the new behavior.

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is much easier to follow.

cpp/src/arrow/util/async_util.h Outdated Show resolved Hide resolved
cpp/src/arrow/util/async_util.h Outdated Show resolved Hide resolved
cpp/src/arrow/util/async_util.h Outdated Show resolved Hide resolved
cpp/src/arrow/util/async_util.cc Outdated Show resolved Hide resolved
@westonpace westonpace merged commit 3da803d into apache:master Nov 9, 2022
@ursabot
Copy link

ursabot commented Nov 10, 2022

Benchmark runs are scheduled for baseline = a590b00 and contender = 3da803d. 3da803d is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️0.58% ⬆️0.17%] test-mac-arm
[Finished ⬇️0.27% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️1.06% ⬆️0.28%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] 3da803db ec2-t3-xlarge-us-east-2
[Finished] 3da803db test-mac-arm
[Finished] 3da803db ursa-i9-9960x
[Finished] 3da803db ursa-thinkcentre-m75q
[Finished] a590b00b ec2-t3-xlarge-us-east-2
[Failed] a590b00b test-mac-arm
[Finished] a590b00b ursa-i9-9960x
[Finished] a590b00b ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants