Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically restart memory-leaking workers when they reach a critical limit #4221

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

Hoeze
Copy link

@Hoeze Hoeze commented Nov 5, 2020

Attempt to fix #4193.

This pull request successfully restarts memory-leaking workers.
However, some workers still keep freezing.

Minimal example to reproduce memory leak outside of the test suite:

import dask
import dask.distributed
import numpy as np
import time

cluster = dask.distributed.LocalCluster(n_workers=4, threads_per_worker=1, memory_limit="512M")
client = dask.distributed.Client(cluster)

x = {}
def memory_leaking_fn(data):
    x[data] = np.random.randint(100, size=12 * 1024**2 // 8)
    
    time.sleep(0.1)
    return data

futures = client.map(memory_leaking_fn, range(1000))

for f in futures:
    print(f.result())

@mrocklin
Copy link
Member

mrocklin commented Nov 6, 2020

I think that the main question we need to determine is what policy we should use here. Should we always restart? Should we never restart but keep logging? Should we log for a while, but after it appears to not be getting any better (maybe five seconds) then restart? I like this last option personally.

@mrocklin
Copy link
Member

mrocklin commented Nov 6, 2020

Also, thank you for taking the time to submit this.

@Hoeze
Copy link
Author

Hoeze commented Nov 6, 2020

I think that the main question we need to determine is what policy we should use here. Should we always restart? Should we never restart but keep logging? Should we log for a while, but after it appears to not be getting any better (maybe five seconds) then restart? I like this last option personally.

IMHO this should happen directly before starting to work on a new task.
A worker that hits this message should restart as soon as possible because the memory leak eats resources that could be used for the task.

However, I wonder if there is an in-place restarting possibility for the worker.
I.e. "start new worker process" -> "transfer items + tasks to new worker" -> "kill old worker"
This would have a number of advantages:

  1. on-disk data can be kept and does not need to be transferred over network
  2. fast-path transfer via shared memory?
  3. data keeps locality
  4. the scheduler can be relieved

@mrocklin
Copy link
Member

mrocklin commented Nov 6, 2020

However, I wonder if there is an in-place restarting possibility for the worker.
I.e. "start new worker process" -> "transfer items + tasks to new worker" -> "kill old worker"
This would have a number of advantages:

Oh yeah, that actually sounds like a great idea.

fast-path transfer via shared memory?

Fortunately in this situation all of the data is already on disk, so we probably don't need to trouble ourselves with this.

In principle I think that we would want to ...

  1. Use the same Nanny to create a new worker process, providing arguments to the data= keyword to generate a zict buffer pointing to the same file location
  2. Wait until that is up
  3. Shut down this worker process, being careful not to delete the data on disk
  4. Rely on the scheduler to make intelligent choices about moving the tasks-to-be-run on the old worker around. Probably they'll mostly end up on the new worker on this machine, but it's ok if they're not.

This is a non-trivial engineering effort, but might be an interesting project for someone who wants to become more familiar with the worker-nanny-scheduler relationship. Does this work interest you @Hoeze ?

@Hoeze
Copy link
Author

Hoeze commented Nov 6, 2020

This is a non-trivial engineering effort, but might be an interesting project for someone who wants to become more familiar with the worker-nanny-scheduler relationship. Does this work interest you @Hoeze ?

I'd like to solve this problem and think it would save a lot of people (including me) a lot of time if this restarting works bomb-proof.
However, I also have two projects to finish soon.

I'm happy to invest another working day on this issue, but I think it would be better if someone with deeper knowledge could jump in here.

@quasiben
Copy link
Member

quasiben commented Nov 6, 2020

I don't fully understand what is happening here but the function and leaky dictionary are stored in cache_loads on the worker. As an en experiment, I rewrote the function slightly and pull out the serialized function from the worker:

x = {}
def memory_leaking_fn(data):
    x[data] = random.randint(10,100)
    x[data+str(random.randint(10,10000))] = 'foo'
    print(x)
    print(locals())

    time.sleep(0.1)
    return data

In a separate process load the serialized function

In [1]: import pickle

In [2]: ser_func = b"\x80\x04\x95\x93\x02\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\r_builtin_type\x94\x93\x94\x8c\nLambdaType\x94\x85\x94R\x94(h\x02\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x08KCCHt\x00\xa0\x01d\x01d\x02\xa1\x02t\x02|\x00<\x
   ...: 00d\x03t\x02|\x00t\x03t\x00\xa0\x01d\x01d\x04\xa1\x02\x83\x01\x17\x00<\x00t\x04t\x02\x83\x01\x01\x00t\x04t\x05\x83\x00\x83\x01\x01\x00t\x06\xa0\x07d\x05\xa1\x01\x01\x00|\x00S\x00\x94(NK\nKd\x8c\x03foo\x94M\x10'G?\xb9\x99\x99\x99\x99\x99\x9at\x94(\x8c\x06random\x94
   ...: \x8c\x07randint\x94\x8c\x01x\x94\x8c\x03str\x94\x8c\x05print\x94\x8c\x06locals\x94\x8c\x04time\x94\x8c\x05sleep\x94t\x94\x8c\x04data\x94\x85\x94\x8c\x07test.py\x94\x8c\x11memory_leaking_fn\x94K\nC\x0c\x00\x02\x10\x01\x18\x01\x08\x01\n\x02\n\x01\x94))t\x94R\x94}\x9
   ...: 4(\x8c\x0b__package__\x94N\x8c\x08__name__\x94\x8c\x08__main__\x94\x8c\x08__file__\x94\x8c\x07test.py\x94uNNNt\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94h#}\x94}\x94(h\x1eh\x18\x8c\x0c__qualname__\x94h\x18\x8c\x0f__annot
   ...: ations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x1f\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94(h\x12h\x00\x8c\tsubimport\x94\x93\x94\x8c\x04time\x94\x85
   ...: \x94R\x94h\x0ch6h\x0c\x85\x94R\x94h\x0e}\x94uu\x86\x94\x86R0."

In [3]: func = pickle.loads(ser_func)

In [4]: func(str(1))
{'1': 15, '19359': 'foo'}
{'data': '1'}
Out[4]: '1'

In [5]: func(str(2))
{'1': 15, '19359': 'foo', '2': 97, '29211': 'foo'}
{'data': '2'}
Out[5]: '2'

In [6]: func.__globals__['x']
Out[6]: {'1': 15, '19359': 'foo', '2': 97, '29211': 'foo'}

In [7]: globals()['x']
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-7-9edb74b452e4> in <module>
----> 1 globals()['x']

KeyError: 'x'

This is moving out of my knowledge but it looks like the global space of the deserialized function brings its own copy of the dictionary -- this probably one of the ways cloudpickle/pickle can bring in dependencies when serializing a function g which depends locally on a func f.

@mrocklin
Copy link
Member

@quasiben and I spoke about this offline. I think that he is trying to avoid the situation where cloudpickle is handling a function that leaks data in a closed-over variable. I think that this is not a good path to go down. I think that we need to assume that user code can leak in ways that we will never be able to resolve, and that we will need to occasionally restart in these situations.

@@ -2678,6 +2678,7 @@ def check_pause(memory):
if self.memory_limit is not None
else "None",
)
await self.close_gracefully(restart=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to anticipate cases where restarting might not be welcome. I can think of a couple:

  1. There is some slow cleanup process that needs a couple of seconds to clear memory out (I haven't seen this personally). Maybe it makes sense to wait a few times before restarting? This might not be worth the complexity though
  2. The worker is currently working on something, and we should give it a moment to try to finish it. This could be determined by checking the self.executing set of currently executing tasks. If this is empty then sure, restarting seems ok. If not, then we might want to wait for a bit, but not forever.

@mrocklin
Copy link
Member

I'm happy to invest another working day on this issue, but I think it would be better if someone with deeper knowledge could jump in here.

I agree that that would be good. It may not happen quickly though. There is a lot of work happening right now, and most maintainers are unfortunately quite busy these days.

@gioxc88
Copy link

gioxc88 commented Dec 11, 2020

I would like to help but I lack knowledge.
If anyone can point me in the right direction I am willing to spend my time to fix this.

What's the best way to install this repo using pip?
last time I tried using pip install git+https://github.com/Hoeze/distributed/project.git@2.30.x but I noticed that the changes are in the master branch and I have dask 2.30 which is incompatible with the code in master (there are some imports which fail).

I tried to manually do the change in my site-package folder, since it's just one line and it does the trick.

The workers restart when they reach the memory limit BUT the job fails with this traceback:

    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/venv/lib/python3.8/site-packages/distributed/client.py in _gather()
   1849                             exc = CancelledError(key)
   1850                         else:
-> 1851                             raise exception.with_traceback(traceback)
   1852                         raise exc
   1853                     if errors == "skip":

KilledWorker: ('_fit_one_estimator-e733af06-4296-44e5-8a1d-28a876c9f9a0', <Worker 'tcp://10.27.2.239:37489', name: 45, memory: 0, processing: 46>)

If this PR gets completed it would be a huge improvement for using dask with long running jobs.

@Hoeze
Copy link
Author

Hoeze commented Dec 11, 2020

Hi @gioxc88, installing dask-distributed requires (due to a reason I don't know) dask from master.

I updated my pull request to current master and fixed the memory leak test.
You should now be able to just pip install it from my current master branch.

@gioxc88
Copy link

gioxc88 commented Dec 11, 2020

Hi @gioxc88, installing dask-distributed requires (due to a reason I don't know) dask from master.

I updated my pull request to current master and fixed the memory leak test.
You should now be able to just pip install it from my current master branch.

Thanks for the answer, I'll try again tomorrow. Are there any new development on the actual code (aside from the tests)?

Many thanks

@Hoeze
Copy link
Author

Hoeze commented Dec 11, 2020

Not from my side. I do have the same issues (#4193 (comment)).

This patch only fixes one very specific problem in terms of external memory-leaking processes by trying to gracefully restart them.
However, when the worker gets killed by e.g. out-of-memory, there is currently no chance to recover the tasks stored on the worker. They need to be recalculated.

You can try to increase allowed-failures (e.g. 50) in your ~/.config/dask/distributed.yaml. This helps a lot to recover from worker failures.
Also, enabling lifetime restarts makes a huge difference (I set 60min with 10min stagger).


IMHO, what should be done to solve these issues:

  • Directly pass every finished task to the nanny and let it handle the data transfers
  • Allow in-place restarts of the worker

This way, we don't care about worker failures any more. Results are known to have a certain size and reside at a secure location.

@nsmith-
Copy link
Contributor

nsmith- commented Mar 11, 2021

This looks to solve a perennial issue for me. Is there any reason not to take this less-than-perfect solution for now and work on the nanny-restarts-worker solution in the longer term? If it is just a matter of implementing a delay and a check for currently executing tasks in the review comment I can do that.

@crusaderky
Copy link
Collaborator

I expect that this PR would kill off long-running tasks that temporarily allocate a lot of RAM. I'm in the process of designing a more holistic and robust solution.

@nsmith-
Copy link
Contributor

nsmith- commented Mar 11, 2021

If memory usage is high enough that we hit the pause fraction watermark and there are no keys to spill to disk, then I think what we could do is:

  • Stop new task executions
  • Wait for existing tasks to drain out
  • Run gc a few times

I think that if memory usage remained high after that sequence the worker should kill itself. The situation cannot reasonably be expected to improve if tasks are not cleaning up after themselves and gc cannot do it either.

@Hoeze Hoeze requested a review from fjetter as a code owner January 23, 2024 10:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

WARNING - Memory use is high but worker has no data to store to disk
6 participants