Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: impure tasks? #6378

Open
martindurant opened this issue May 19, 2022 · 5 comments
Open

ENH: impure tasks? #6378

martindurant opened this issue May 19, 2022 · 5 comments

Comments

@martindurant
Copy link
Member

When creating a DAG, e.g., with delayed, it is possible to declare a task as "impure", meaning that a new unique key is generated even when the input arguments are identical.

I am wondering if there is any appetite for a similar concept on the scheduler: where a task-key is annotated as side-effect only and having no useful return value. Use cases might include IO on some external storage, where we want to ensure than an operation happened, but if the worker that executed it goes down, there is no need to repeat it. In other words, when the scheduler state for the task would normally go to in-memory, it can now be just "completed" (or released) and any task that depends on it can be allowed to run without having to fetch any results. A set of CSV write tasks with a finalize task depending on all of them would be a good example of this (and the barrier doesn't actually need to execute anything in this case, it's only a meta-task of dependencies).

This pattern would weakly move towards tasks that are executed exactly once, where the side-effect is mutation of some resource. It would not guard against a task being run simultaneously on two workers - an opposite to speculative execution. It's probably not feasible to make a strict guarantee of such without a lot of work.

Feel free to say that considering this is unnecessary complexity. I am thinking of it in terms of shared mutable memory between processes on a single node - but the big IO case is also interesting.

@fjetter
Copy link
Member

fjetter commented May 19, 2022

I think for IO intensive workloads this can be a big factor to improve resilience, or rather improve recovery time in failure scenarios. However, I'm wondering how this would be implemented and what the implications on our state machine is. Technically, these tasks would not be allowed to return any result. Would they be allowed to have dependents?

I'm less excited about the execute-exactly-once guarantee since there are many edge cases that could ruin your day.

@martindurant
Copy link
Member Author

Technically, these tasks would not be allowed to return any result. Would they be allowed to have dependents?

Yes, I think so, but they would clearly be special: either they have dep results that don't show up in the argument list at call time, or we fake a bunch of Nones.

I'm less excited about the execute-exactly-once guarantee since there are many edge cases that could ruin your day.

Agreed, but I thought I should mention it in case there are exciting use cases and/or simple ideas for implementation.

@gjoseph92
Copy link
Collaborator

Just noting that this is kind of something we thought a bit about with P2P shuffling (which also uses a barrier task for similar reasons).

Needing exactly-once guarantees in a distributed system are often an antipattern: https://jolynch.github.io/posts/distsys_shibboleths/#negative-shibboleths. It's the sort of thing that could feel like a very useful abstraction to a user, yes. But users may be better off thinking about how to make things idempotent instead.

when the scheduler state for the task would normally go to in-memory, it can now be just "completed" (or released) and any task that depends on it can be allowed to run without having to fetch any results

This basically sounds like just an optimization: a way of marking a task as having no output. You save a little bit of time by having the downstream tasks be able to run as soon as they're assigned to a worker (the worker skips fetching dependencies, since it knows they're all just None). You also get to put the task in a new, terminal state like complete, which is equivalent to memory in every way except that no worker holds the data—so if the worker that ran the task crashes later, you know you don't need to rerun the task. You still don't have the exactly-once guarantee though, so your code needs to be idempotent. You just get some time savings by a) not fetching the None results and b) never re-running the "impure" task due to worker departure.

This seems pretty doable, though I wonder how much value it adds. Thinking of some more use cases here might help. I think we could do all of this by just having tasks return a special distributed.NoResult value.

if the worker that executed it goes down, there is no need to repeat it

What if the worker goes down while it's executing the task? Do we rerun it or not?

I wonder though if your use-cases could also be accomplished by properly supporting tasks-in-tasks #5671, and building more robust synchronization mechanisms #2362. https://dask.discourse.group/t/cross-graph-dependencies-and-starting-criteria/441/7 is maybe related/relevant, in terms of wanting to schedule graphs of impure tasks.

@martindurant
Copy link
Member Author

Thanks for the discussion, everyone.

It occurs to me, that rather than having "no result" tasks, it might be even easier to have "doesn't need input results", which would apply to those barrier tasks we talked about. That could be a simple task annotation and require no big code changes.

@jakirkham
Copy link
Member

Another approach would be to consider Haskell's Monad concept in the context of Dask. Namely add another input (and output) to a function, which is the Monad. Perhaps this could be an object Distributed can create? This can then be threaded (think sewing needle) through various operations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants