Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Active Memory Manager framework + discard excess replicas #5111

Merged
merged 5 commits into from
Sep 7, 2021

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Jul 23, 2021

Find all in-memory tasks that have been replicated on more than max(1, number of waiters) workers and discard the excess copies from the most memory-saturated workers.

This is intended as an early technology demo; see critical out of scope elements below.

In scope

  • Active Memory Manager framework
  • Heuristics to pick which key replicas to destroy
  • Temporary implementation of the actual deletion. It is not safe to run it while a computation is in progress, as it might accidentally delete a key while the computation tries to read it. Users should enable it at their own risk and accept the risk that they might need to restart their workload from time to time.
  • unit tests

Out of scope, left for future PRs

  • High level documentation. This is left for later when the whole system will be more mature and ready for debut.
  • Use bulk comms and make it robust to run while a computation is in progress (blocked by Worker state machine refactor #5046)
  • Prevent destruction of deliberate replicas that were requested through replicate() or scatter(..., broadcast=True)
  • Refactor replicate() and rebalance() to use the new framework
  • Graceful worker shutdown
  • Speed up the ReduceReplicas key scan algorithm, which is currently O(n) to the total number of keys on the cluster - which means it's not something that you want to run frequently when you have several tens of thousands of keys around.

Demo

Available in a different branch: https://github.com/crusaderky/distributed/tree/amm
This also showcases the policies for worker shutdown, replicate(), and rebalance().

https://gist.github.com/crusaderky/6d5302d0a80ec6200b078fec2dd218ca

CC @mrocklin @jrbourbeau @fjetter

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
@mrocklin
Copy link
Member

Thank you for putting this together @crusaderky. I'm excited both for the results, and for how simple this is.

I mentioned this in side channels but I'll bring it up again here. I'm curious what would happen in your notebook if we were more aggressive with removing replicas. Perhaps len waiters divided by two, for example. Naively I would expect memory use to drop and run time to increase, but by how much in each direction I don't know.

More generally, it seems to be that an interesting next step here might be to think of a next policy to implement, and see how that would affect complexity. If you agree, I'm curious, do we have enough logic on the worker side yet to copy data between workers? If so then we might try retiring a single worker (if we need to wait for Florian's work then please ignore). I think that having two policies would raise some interesting challenges and questions to work through (although this time with a concrete implementation to guide conversation).

Just a thought, please ignore if you think there are better next steps.

@crusaderky
Copy link
Collaborator Author

@mrocklin I uploaded an incomplete prototype of the framework. Some of the guts are missing but it should be enough to give you a good feeling of how the finished product will look like. Let me know if you have feedback

def run(self):
# TODO this is O(n) to the total number of in-memory tasks on the cluster; it
# could be made faster by automatically attaching it to a key when it goes
# above one replica and detaching it when it goes below.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

putting a hook on additions of TaskManager.who_has would be a very simple design and it would also mean we can merge ReplicateReduce and ReplicateIncrease into a single plugin

for ts in unique:
yield "replicate", ts, None
if not unique:
self.manager.plugins.remove(self)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing all the logic that will prevent more tasks from landing on a worker in "shutting down" stage

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we probably want this to be separate anyway. I think that workers should probably have a closing state. cc @fjetter

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think that you should probably ignore this for now.

@crusaderky
Copy link
Collaborator Author

word of warning: none of the new module has ever been tested so far

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some quick thoughts while in between meetings. In general I like that policies are broken apart. I'll be curious to see how this behaves :)

for ts in unique:
yield "replicate", ts, None
if not unique:
self.manager.plugins.remove(self)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we probably want this to be separate anyway. I think that workers should probably have a closing state. cc @fjetter

for ts in unique:
yield "replicate", ts, None
if not unique:
self.manager.plugins.remove(self)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think that you should probably ignore this for now.

ts: TaskState

def __init__(self, ts: TaskState):
self.ts = ts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably design this plugin to be around all of the tasks, rather than one per task. Although I guess this opens a question on the scheduler side of if we want to store replicas on the TaskState or in some separate data structure. If on the TaskState (which is our common practice) then it's hard to find the relevant tasks that need to be replicated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll also say though that replicate is infrequently used, and so we probably shouldn't care too much about this policy, and should instead focus on reducing replicas and removing workers. Personally I'd be very happy to merge in a PR that didn't support replicate at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Especially if neglecting this functionality let us move towards a solution faster.

Comment on lines 84 to 89
plugin_gen = plugin.run()
while True:
try:
cmd, ts, candidates = next(plugin_gen)
except StopIteration:
break # next plugin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
plugin_gen = plugin.run()
while True:
try:
cmd, ts, candidates = next(plugin_gen)
except StopIteration:
break # next plugin
for cmd, ts, candidates in plugin.run():

Is this a valid substitution?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see below that you're also sending actions to this generator below, which would explain the while loop. However, it looks like that information is differently shaped. Maybe it should be a separate iterator?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, it looks like that information is differently shaped. Maybe it should be a separate iterator?

I don't think I understand what you mean; could you articulate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below I see that you call plugin_gen.send(ts). That would explain why you're calling next on this thing rather than using a for loop. However, my brief look at the type of ts showed that it didn't fit the cmd, ts, candidates output expected here. I only looked at this briefly though, and so could easily be wrong.

Copy link
Collaborator Author

@crusaderky crusaderky Jul 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's plugin_gen.send(ws). the type of ws is Optional[WorkerState].

The signature of plugin_gen is

Generator[
    tuple[str, TaskState, Optional[set[WorkerState]]],
    Optional[WorkerState],
    None,
]

class ActiveMemoryManager(SchedulerPlugin):
scheduler: SchedulerState
plugins: "set[ActiveMemoryManagerPlugin]"
interval: float
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future I expect that different policies will want to operate on different timelines. For example policies that are more expensive to run might run much less frequently.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a desirable feature; however is it ok to leave it for the future?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me


def register_plugin(self, plugin: "ActiveMemoryManagerPlugin") -> None:
plugin.manager = self
self.plugins.add(plugin)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect plugins might also finish in the future (like retiring workers).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if by "finish" you mean unregister, I put in two examples of a plugin that detaches itself from the AMM when there's nothing left to do: ReplicateIncrease and WorkerShutdown

@mrocklin
Copy link
Member

If I were to recommend a path forward it would be to try to run a system that is as simple as possible that runs just a couple of policies. I'll suggest removing replicas and retiring workers. Personally I don't know enough about what we'll run into to be able to say much about a design. I think that running two policies concurrently will probably be educational though.

@crusaderky
Copy link
Collaborator Author

crusaderky commented Jul 28, 2021

If I were to recommend a path forward it would be to try to run a system that is as simple as possible that runs just a couple of policies. I'll suggest removing replicas and retiring workers.

As mentioned above, the final version of this PR will ship exclusively with RemoveReplicas. The other policies are there temporarily for illustrative purposes and will be properly introduced at a later stage.

@crusaderky crusaderky changed the title [WIP] Discard excess replicas [WIP] Active Memory Manager framework + discard excess replicas Jul 28, 2021
@crusaderky
Copy link
Collaborator Author

All code is done. The framework should be in its final form now. I've moved it from SchedulerPlugin to scheduler extension.
The POC from last week has been refactored to use the new framework; here's the new link: https://gist.github.com/crusaderky/6d5302d0a80ec6200b078fec2dd218ca

Only missing unit tests and high level documentation.

distributed/active_memory_manager.py Outdated Show resolved Hide resolved
distributed/active_memory_manager.py Outdated Show resolved Hide resolved
if start is None:
start = dask.config.get("distributed.scheduler.active-memory-manager.start")
if start:
self.start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there situations where we would want the extension but not have it running? If this is speculatively valuable then I recommend that we leave it off for now until we need it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because of how scheduler extensions work - from a user's perspective, I think it's a lot more user friendly to have the extension always loaded through DEFAULT_EXTENSIONS and available either through client.scheduler.amm_start() / client.scheduler.amm_run_once() or through a config switch, instead of having to load it with

def load_amm(dask_scheduler):
    from distributed.active_memory_manager import ActiveMemoryManagerExtension
    ActiveMemoryManagerExtension(dask_scheduler)

client.run_on_scheduler(load_amm)

An alternative to the start key would be to move DEFAULT_EXTENSIONS to the dask config, but that would carry the risk of users falling behind: since you can't change a list in the dask config, but only override the whole list, if a user changed it in his custom dask.config and later on we add a default extension, the user won't get it.

return None
# id(ws) is there just to prevent WorkerState objects to be compared in the
# unlikely event that they have exactly the same amount of bytes allocated.
_, _, ws = min((self.workers_memory[ws], id(ws), ws) for ws in candidates)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on making a couple of heaps for workers for memory usage (both min and max ordered)?

We could do this once at the beginning of a cycle and then use that collection to help order our actions. This might be a way of taking some of the same tricks that you built up in the replicate work, but applying them more generally across all policies. I suspect that by doing this once per cycle over workers and using these data structures consistently that we might be able to reduce the overhead for lots of different operations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because you need to pick the min/max worker from a shortlist, which can be arbitrary. For example, a user may want to replicate a GPU-related key exclusively to GPU-enabled workers.
With heaps, you'd need to cycle through the heap until you hit an element of the shortlist - which risks not only negating the performance benefit but making the runtime substantially worse in some use cases.

Also, this operation is O(n), but in the case of dropping n is the number of workers that hold a replica - meaning, 2 or 3 in most cases. In the case of replica increase it can be most workers on the cluster, but since we need to do this full scan only for the keys that need to be replicated, I expect it to be fast enough not to care.

# goes above one replica and detaching it when it drops below two.
for ts in self.manager.scheduler.tasks.values():
if len(ts.who_has) < 2:
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid lots of work here I wonder if we might ...

  1. Iterate through workers that are saturated first, and then through their tasks
  2. Stop iterating once we get to workers that have enough space on them
  3. Short circuit this entire process if there hasn't been a transition since last time (there is a transition_counter on the scheduler). This should help us avoid activity in an idle cluster.

Copy link
Collaborator Author

@crusaderky crusaderky Jul 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Iterate through workers that are saturated first, and then through their tasks
  2. Stop iterating once we get to workers that have enough space on them

It is possible. Or, better in my opinion, track the tasks with 2+ replicas and iterate only on them; stop tracking them when they go back down to 1 replica. This will require a (straightforward) hook on changes to TaskState.who_has.
Either way I'd prefer leaving this to a following PR.

Short circuit this entire process if there hasn't been a transition since last time

There is no transition when you increase or decrease the number of replicas, unless you change from 0 to 1 or from 1 to 0. We could change it introducing a degenerate transition from "memory" to "memory" whenever you add or remove a replica. This would however break all downstream SchedulerPlugin's that rely on the transition kwargs, and in general I suspect it may be painful to implement - ping @fjetter for opinion.
Regardless of complexity of the change, it seems uninteresting to me to reduce the CPU load on an idle cluster - when there is no CPU load to begin with.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the (memory, memory) transition would cause quite some awkwardness and potential headache. Introducing another counter would be likely much simpler.

I do second the requirement for us to keep activity low when idle. We've been receiving some sparse user reports about this already (in this case workers are usually more annoying, still...) but I am fine with iterating on this on another PR

@mrocklin
Copy link
Member

OK, so what are good next steps here? Some thoughts from me (but I haven't thought deeply about this yet).

  1. Test this against the worker state machine rewrite. Verify that the "three line code fix to make things stable" works as we expect in a busy cluster
  2. Move on from removing replicas to retiring workers. Verify that that works in a busy cluster.
  3. Make removing replicas fast by adding replica state to the scheduler

I think that if we get those three working smoothly then this is a big improvement. Personally I would feel better having two use cases active just to build some confidence in (or alter/modify/tune) the design.

@crusaderky thoughts? Are there other steps that you think would be better short term?

@crusaderky
Copy link
Collaborator Author

Retiring workers is a somewhat onerous task; IMHO we should aim for what gives us a complete, generally usable feature first - meaning (1) and (3).

Refactoring rebalance() and replicate() so that they work in the new framework, while a computation is running, may also be lower hanging fruits than worker retirement.

@mrocklin
Copy link
Member

Refactoring rebalance() and replicate() so that they work in the new framework, while a computation is running, may also be lower hanging fruits than worker retirement.

FWIW I don't think that rebalance and replicate are important enough to prioritize. I think that reducing replicas and retire_workers are both 10x (if not 100x) more important than either of those two methods. I wouldn't want us to spend non-trivial time making them work.

I'm fine with a prioritization of 1,3,2 if you think it's the right path. I like 1,2,3 because to me there is more uncertainty in 2/retire_workers. I wouldn't want us to invest a lot of time into this system and then learn that it isn't the right design for retiring workers and then have to scrap this for something else.

@mrocklin
Copy link
Member

@crusaderky any thoughts on my most recent comment?

@crusaderky
Copy link
Collaborator Author

@mrocklin I'm happy to do 1,3,2 and leave rebalance and replicate for later. I don't see anything that can go wrong with worker retirement atm.

@mrocklin
Copy link
Member

I don't see anything that can go wrong with worker retirement atm.

Neither do I, but that usually doesn't stop things from going wrong for me :)

@mrocklin
Copy link
Member

Neither do I, but that usually doesn't stop things from going wrong for me :)

In general I'd prefer that we validate things a bit before committing this to main. I think that 1 and 2 are both useful for validation. I recommend that we do them both before merging. Happy to chat offline if preferred.

@quasiben
Copy link
Member

In today's sync we briefly were chatting about what will happen to rebalance. I wanted to raise a concern that some folks depend on the ability to rebalance memory/keys/partitions where some other framework then operates on perfectly or nearly balanced data. Often this is staged with some barrier: wait(xyz.persist()). I'm aware of this happening inside of cuML and cuGraph but I think this also occurs in xgboost as well. Is that correct @@trivialfis

cc @VibhuJawa @ayushdg for awareness

@mrocklin
Copy link
Member

So, near-term the current rebalance operation will still be around. Long term I'd like to see it go away (or at least the implementation). My guess is that the client.rebalance call would just increase the importance of rebalancing as a policy, and then would wait until things reached a steady state, and then return. So this would be more similar to other passive waiting functions like Client.wait_for_workers.

@trivialfis
Copy link

think this also occurs in xgboost as well. Is that correct @@trivialfis

XGBoost persists and waits input data yes. We don't explicitly call rebalance though. I think the feature is beneficial to many MPI based distributed algorithms where workers have to work in lock step.

async def _gather_on_worker(
self, worker_address: str, who_has: "dict[Hashable, list[str]]"
async def gather_on_worker(
self, worker_address: str, who_has: "dict[str, list[str]]"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be reverted when we merge #5046

@crusaderky
Copy link
Collaborator Author

crusaderky commented Aug 27, 2021

All code and unit tests are final.
Missing high level documentation which is delayed until the system is more mature.

@crusaderky crusaderky changed the title [WIP] Active Memory Manager framework + discard excess replicas Active Memory Manager framework + discard excess replicas Aug 27, 2021
@crusaderky crusaderky marked this pull request as ready for review August 27, 2021 15:27
@mrocklin
Copy link
Member

@gjoseph92 could I ask you to review this PR sometime today?

@gjoseph92
Copy link
Collaborator

Yup, that's the plan after the demo.

@mrocklin
Copy link
Member

mrocklin commented Aug 31, 2021 via email

distributed/active_memory_manager.py Outdated Show resolved Hide resolved
distributed/active_memory_manager.py Outdated Show resolved Hide resolved
distributed/active_memory_manager.py Outdated Show resolved Hide resolved
distributed/active_memory_manager.py Outdated Show resolved Hide resolved
else:
candidates &= ts.who_has
candidates -= pending_drop
candidates -= {waiter_ts.processing_on for waiter_ts in ts.waiters}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is it possible for a task that's waiting on this one to also be processing? I believe it, I'm just not familiar with this case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a dependent task is removed from waiters when it's finished processing:

>>> import time
>>> import distributed
>>> client = distributed.Client(n_workers=1)
>>> s = client.cluster.scheduler
>>> def f(x):
...     time.sleep(600)
>>> f1 = client.submit(lambda: 1, key="f1")
>>> f2 = client.submit(f, f1, key="f2")
>>> time.sleep(0.2)
>>> s.tasks["f1"].waiters
{<TaskState 'f2' processing>}
>>> s.tasks["f2"].processing_on
<WorkerState 'tcp://10.35.0.6:45913', name: 0, memory: 1, processing: 1>

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. And we aren't willing to release f1 until f2 has completed, even though in theory we could release it as soon as f2 has started (since it's already been passed the necessary data), at the cost of losing resilience to failures in f2?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the best case, if f2 is still running it's also holding a reference to the data of f1, so your RAM benefit from releasing f1 early is nil.
In the worst case, you get a race condition as f2 did not have the time to fetch its inputs yet (didn't check).
In all cases, you lose the ability to immediately rerun f2 on the same node.

continue

desired_replicas = 1 # TODO have a marker on TaskState
ndrop = len(ts.who_has) - max(desired_replicas, len(ts.waiters))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ndrop expected to become negative (if there are more waiters than workers holding the key)?

Relatedly, what if all those waiters are going to run on the same worker? Then len(ts.waiters) would over-count. Would we want something like len({tss.processing_on for tss in ts.waiters if tss.processing_on})?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ndrop can totally become negative. list(range(-123)) -> []. Added clarification.
Good catch on the multiple tasks running on the same worker - I revised it now, see if you like it.

await c.scheduler.amm_stop()
# AMM is not running anymore
await c.replicate(x, 2)
await asyncio.sleep(0.1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could this flakily pass when it should be failing? I feel like even if AMM was still running, on a slow CI node maybe it could take more than 0.1sec for the replicas to be dropped. But I can't think of a non-timeout alternative to write this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the idea is that if there's a deterministic failure then it will start failing extremely frequently. Increasing to 0.2 for peace of mind.

@gen_cluster(client=True, config=demo_config("drop", start=True))
async def test_auto_start(c, s, a, b):
futures = await c.scatter({"x": 123}, broadcast=True)
await asyncio.sleep(0.3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar; could this 0.3sec timeout become flaky on CI?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increased to 0.5s

@crusaderky
Copy link
Collaborator Author

@gjoseph92 all comments have been addressed

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @crusaderky. I like the design of the framework in general. Certainly there will be ways to optimize it, but I'm excited to see this merged so we can start trying it out, improving things, and adding new policies.


self.pending = defaultdict(lambda: (set(), set()))
self.workers_memory = {
w: w.memory.optimistic for w in self.scheduler.workers.values()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over in #4925 (comment) we discovered that iterating over the .values() of a SortedDict was surprisingly slow. Maybe this is an over-optimization, but you could consider replacing scheduler.workers.values() with dict.values(scheduler.workers) (like I did here) throughout this PR, which might make these lines an order of magnitude faster.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll push back on micro optimizing the worker state machine. It's not a bottleneck in performance but is often a stability concern. I think that we should optimize pretty hard for readability.

@crusaderky
Copy link
Collaborator Author

This is ready for merge unless there aren't further concerns?

@gjoseph92
Copy link
Collaborator

No concerns on my end!

@crusaderky
Copy link
Collaborator Author

@mrocklin @jrbourbeau gentle ping

@mrocklin
Copy link
Member

mrocklin commented Sep 7, 2021

@jrbourbeau checking in. My understanding is that the next step here is for you give this a glance and then to hit the green button.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few comments. Nothing blocking, mostly for me to better understand things.

Overall this is really solid and clean work! Will give @jrbourbeau another chance to review but from my side this is ready to merge

distributed/active_memory_manager.py Show resolved Hide resolved
candidates -= pending_repl
if not candidates:
return None
return min(candidates, key=self.workers_memory.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's quite elegant 👍

# goes above one replica and detaching it when it drops below two.
for ts in self.manager.scheduler.tasks.values():
if len(ts.who_has) < 2:
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the (memory, memory) transition would cause quite some awkwardness and potential headache. Introducing another counter would be likely much simpler.

I do second the requirement for us to keep activity low when idle. We've been receiving some sparse user reports about this already (in this case workers are usually more annoying, still...) but I am fine with iterating on this on another PR

if len(ts.who_has) < 2:
continue

desired_replicas = 1 # TODO have a marker on TaskState
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already looking forward to this. Would it make sense for us to open a GH issue for this already?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During my latest chat with @mrocklin, he said he's uninterested in revamping the replicate() functionality and he'd rather deprecate it entirely for the time being and potentially reintroduce it on top of AMM at a later date.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this is definitely something we should only build after AMM is done and is of relatively low priority. The usecase I have in mind is to allow for task annotations to replicate expensive but small intermediate results to increase resilience. That would allow "checkpointing" of computations.

A long time ago, I opened an issue sketching out some ideas around this. Just dropping the ref for now and we can move on for now #3184

ws = None
while True:
try:
cmd, ts, candidates = policy_gen.send(ws)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to admit that I did not even know about this feature. I don't see it being used anywhere. Do you have already an application in mind or even an example where this is used? merely out of curiosity, I don't mind keeping it in this PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks.

For other readers who were not aware of this feature, it is described in PEP 342

Thanks

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @crusaderky! This is in

@jrbourbeau jrbourbeau merged commit 0b7510e into dask:main Sep 7, 2021
@mrocklin
Copy link
Member

mrocklin commented Sep 7, 2021 via email

@jakirkham
Copy link
Member

cc @madsbk (for awareness)

@crusaderky crusaderky deleted the unwanted_replicas branch September 8, 2021 13:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants