Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement replicate() using the Active Memory Manager #6578

Open
crusaderky opened this issue Jun 15, 2022 · 9 comments
Open

Reimplement replicate() using the Active Memory Manager #6578

crusaderky opened this issue Jun 15, 2022 · 9 comments
Assignees
Labels

Comments

@crusaderky
Copy link
Collaborator

This issue is very tightly related to #4906.

replicate(), as well as its wrapper scatter(..., broadcast=True), have several issues:

Proposed design

Reimplement replicate() on top of the Active Memory Manager.
The command will just start an AMM policy for the involved keys.
The policy will track the keys and, every two seconds, create new replicas if there aren't enough.
Once the keys cease to exist, or the client calls replicate(n=1) on the same keys, the policy detaches itself from the AMM.

The number of desired replicas will be tracked through a replicate: <n> annotation on the involved keys.

Side effects

  • replicate() becomes non-blocking. It won't wait for replication to complete and won't even wait for keys to become in-memory.
  • You don't need to have all target workers online when you invoke replicate. As a matter of fact, replicate(n=inf) will become the default option; it means that if at any point in the future a new worker joins the cluster, the key will be immediately replicated onto it.

Optional additional feature

Alternatively, the client may also annotate the keys directly when building the graph:

a = ...
with dask.annotate(replicate=2):
    b = f(a)
b.compute(optimize_graph=False)

For this to work, when a new key lands on the scheduler, there must be machinery that parses the annotations, detects the replicate tag, and invokes Scheduler.replicate() under the hood.
This can be neatly implemented by a Scheduler plugin.

Proposed contentious breaking chances

  • Do not offer an option to make replicate blocking again
  • Do not offer an option to specify a subset of workers to replicate to (this would make the annotation much simpler)
  • The replicate() command will fail if the AMM is not enabled.
@crusaderky
Copy link
Collaborator Author

@seydar
Copy link

seydar commented Nov 27, 2023

Is there still an appetite to fix this? I just ran into the race condition of #6713 and was wondering if #replicate() is going to be rewritten.

@crusaderky
Copy link
Collaborator Author

crusaderky commented Nov 28, 2023

@seydar there is some appetite but we're currently lacking visibility on the benefit it would provide to users, so it's not on the roadmap at the moment.
Could you explain what's your use case for replicate, and why it is an important feature for you?

@seydar
Copy link

seydar commented Nov 28, 2023

@crusaderky I was getting the same error as #6713, but I've been able to trace down the error a little more precisely to #8375.

Ultimately, I guess I'd like better support for replication when a worker is restarted in the middle of a task.

@crusaderky
Copy link
Collaborator Author

@seydar sorry I may not have been clear.
Could you explain what's your use case for replicate() to begin with?

@seydar
Copy link

seydar commented Nov 29, 2023

@crusaderky replicate() was mentioned in another bug report (#6713), and so I followed the link there to here.

Dask workers are crashing (mentioned in #8377 and #8375), which is then leading to the same error in #6713.

This comment talks about replicate being the root cause of the worker count unexpectedly dropping while a worker is being restarted, so I figured I would follow up.

Looking more closely at the scheduler.py code that is mentioned is #6713, it looks like maybe a solution that doesn't involve replicate() would be to have the list of workers be updated before that call to see which workers are still alive?

@crusaderky
Copy link
Collaborator Author

@seydar, what I'm asking is: why do you call replicate() to begin with? What would happen if you just removed the call from your client code?

@seydar
Copy link

seydar commented Nov 30, 2023

Sorry, I'm not calling replicate(). I'm only here because I was routed here from #6713.

@crusaderky
Copy link
Collaborator Author

If you're not calling replicate(), you're not affected by #6713 in any way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants