Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unmanaged (Old) memory hanging #6232

Open
josuemtzmo opened this issue Apr 28, 2022 · 17 comments
Open

Unmanaged (Old) memory hanging #6232

josuemtzmo opened this issue Apr 28, 2022 · 17 comments

Comments

@josuemtzmo
Copy link

What happened:

I am using Xarray and Dask to analyse large datasets ~100s of Gb, to reduce their dimensions from f(x,y,z,t) to f(t) by doing averages or sums. Several times I've encountered that Unmanaged (Old) memory hangs in the Cluster memory until it kills the client.

MemorySampler for one of my analysis for a spatial average, as show in the graph, the cluster memory never decreases towards it's pre-run state (It looks like a memory leak, but the computation is only dataset.mean()).
image

I've tried setting "MALLOC_TRIM_THRESHOLD_" to 16284 and 0 and the issue persists.

I've also tried manually trimming memory:

import ctypes

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

client.run(trim_memory)

and although, this usually only frees <1Gb and the cluster unmanaged memory remains in the order of 10s of Gb.

In order to better report this issue, I've attached an example that leaves around 1GB of unmanaged (old) memory (I'm not sure if this is related to issue dask/dask#3530)

https://gist.github.com/josuemtzmo/3620e01c9caf88c18109809c52d77180

After lazy loading a test dataset, the prices memory is ~159.01 MiB using psutil.
After running the computation; a mean on space (i.e. f(t,x,y).mean(('x','y')), the process memory is ~256.83 MiB using psutil.
This increase in memory is expected, however the cluster memory remains in 1.27GiB, where each of the workers has around 180MiB of Unmanaged (Old) memory hanging.

Screenshot 2022-04-28 at 16 48 57

Running the manual trim memory, the cluster memory only decreases the cluster memory from 1.26GiB to 1.23GiB. If the same code is run several times, the unmanaged memory continues to increase.

What you expected to happen:

  • Release of unmanaged (old) memory.

Environment:

  • Dask version: dask 2022.4.1
  • Python version: Python 3.9.12
  • Operating System: SLES linux
  • Install method (conda, pip, source): conda

Thanks!

@hayesgb
Copy link

hayesgb commented Apr 28, 2022

Have you tried using the new Active Memory Manager? When I do:

client = Client()
client.amm.start()

with your gist notebook on my local machine, I get the following plot.

I'm using dask and distributed 2022.01.0

FYI: cc @crusaderky

Screen Shot 2022-04-28 at 11 33 42 AM

@crusaderky
Copy link
Collaborator

@josuemtzmo your notebook gist doesn't seem to reproduce the issue. With the same number of workers, threads per worker, and memory limit, for me it finishes in 3.5s instead of your 2m5s (reading from local SSD). Does the generator code for test.nc in the gist produce the full sized data? Or did you make it smaller before you published it?
Also, in both the gist and in your second plot here the peak memory usage is 1 GiB cluster-wide and the shape doesn't look like the first plot you displayed.

@crusaderky
Copy link
Collaborator

crusaderky commented Apr 28, 2022

This increase in memory is expected, however the cluster memory remains in 1.27GiB, where each of the workers has around 180MiB of Unmanaged (Old) memory hanging.

This is barely more than just xarray and all its dependencies.

$ python
>>> import xarray, psutil
>>> psutil.Process().memory_info().rss / 2**20
126.58203125

the cluster memory never decreases towards it's pre-run state

What magnitude of leak are we talking about? Each worker holds its logs in a fixed-size, fairly long deque, so it's normal to have a bit of increase towards the beginning.
Please try rerunning for ~1 hour nonstop without restarting the workers; you should see it plateau to a fairly reasonable level.

@josuemtzmo
Copy link
Author

Thanks for the suggestions.

@hayesgb I had no idea of the Active Memory Manager. Although it improves a bit by reducing the memory usage of the cluster, the issue persists (see figures below).

@crusaderky I ran the notebook in both my local computer (MacOS) and the server referred in the issue and the results are the same. Furthermore, I run only the code provided in #6241 and the issue is still there for both my local and server machines.

Run on SLES linux (server) using dask 2022.4.1 on python 3.10.4 (I created a new env form scratch to test this)

Without AMM:
Screenshot 2022-04-29 at 11 42 24

With AMM:
Screenshot 2022-04-29 at 11 47 38

On my personal computer (MacOS ARM chip) using dask 2022.4.1 on python 3.9.12 (new env too)

Without AMM:
Screenshot 2022-04-29 at 11 55 27

With AMM:
Screenshot 2022-04-29 at 11 55 34

Furthermore, if I rerun the same code in the same cluster, the unmanaged (old) memory continues to increase... (two runs with a 15 second pause in between them)

image

@crusaderky
Copy link
Collaborator

crusaderky commented Apr 29, 2022

The substantial growth of unmanaged memory on MacOSX is a well known issue: #5840
It's not a leak, it's just that it doesn't shrink after use. Run 10-20 times and you'll see the same memory reused.

On SLES, what your plots are showing me is that, in the middle of a computataion, the unmanaged memory is ~10GiB, or ~1.4GiB per worker. Considering that it includes the heap of the tasks and the network buffers, this is perfectly normal and healthy. If it's too much for you, you need to reduce your chunk size.

Furthermore, if I rerun the same code in the same cluster, the unmanaged (old) memory continues to increase... (two runs with a 15 second pause in between them)

Run it 10 times and I expect it to plateau.

To recap:

  • HEALTHY: there is unmanaged memory when the cluster is at rest (you need 150+ MB per process just to load the libraries).
  • HEALTHY: there is substantially more unmanaged memory when the cluster is serving a computation. The actual amount is a direct consequence of chunk size and number of threads.
  • HEALTHY: [Linux] the unmanaged memory, measured between computations, grows slightly over time (due to the accumulation of logs), but eventually plateaus to a reasonable amount.
  • SOMEWHAT HEALTHY (misleading and causes complications): [MacOSX / Windows] the total process memory, measured between computations, does not deflate after a computation. The next computation reuses it - e.g. the managed memory grows and the unmanaged memory deflates and the other way around, but the total process memory doesn't change. Peak process memory plateaus after a few runs to a substantially high value.
  • NOT HEALTHY (but also not observed in any of your plots): Run a computation over and over on the same workers for several hours and you observe a clear creep in memory usage over time, which eventually causes a deadlock as the workers pause.

@crusaderky
Copy link
Collaborator

P.S. I notice that your SLES notebook lost the env={"MALLOC_TRIM_THRESHOLD_":16284}. That was important to work around #5971.

@josuemtzmo
Copy link
Author

After running several times, the .compute() with a delay of 15 seconds between each, the worker memory raises the warning:

distributed.worker_memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information.

Here is the memory usage graph for 10 runs, effectively it plateaus:

image

The drop at the end occurs when the manual trim is performed. Perhaps,I guess I expected the memory to be released more often. "MALLOC_TRIM_THRESHOLD_":16284 setting it up before spinning the client as #5971 doesn't seem to make a big difference, neither decreasing the chunks. In fact from what I've tested the lower the chunks the larger the usage of memory... (I will post the memory usage of smaller chunks later)

@crusaderky
Copy link
Collaborator

setting it up before spinning the client as #5971 doesn't seem to make a big difference

To clarify: are you doing this in Linux? On MacOS that variable won't do anything.

@josuemtzmo
Copy link
Author

To clarify, I'm testing the MALLOC_TRIM_THRESHOLD_ in Linux, so far I get some strange results in which the memory just hangs and kills the kernel. I keep exploring this...

@fmaussion
Copy link

fmaussion commented May 3, 2022

[EDIT]: false alarm. See pangeo-data/rechunker#100 (comment) for solution to my problem. Previous message below for reference.

Just to add my usecase here. On Linux and trying all the solutions I could gather here, I have exactly the same problem with huge memory leaks in even a "fit in memory" dask problem: https://nbviewer.ipython.org/gist/fmaussion/5212e3155256e84e53d033e61085ca30

@dcherian
Copy link

dcherian commented May 3, 2022

@crusaderky's health/not healthy list would be a pretty great addition to the docs!

@josuemtzmo
Copy link
Author

I've investigated a bit more in detail the memory for different chunk sizes, and what I found seems a bit unexpected; the code I'm using to test this is:

import dask
import os
import dask.array as da
import distributed
import time
from distributed.diagnostics import MemorySampler

import ctypes

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

os.environ["MALLOC_TRIM_THRESHOLD_"] = str(dask.config.get("distributed.nanny.environ.MALLOC_TRIM_THRESHOLD_"))

c = distributed.Client(n_workers=7, threads_per_worker=2, memory_limit="2 GiB")
c.amm.start()

a = da.random.random((10_000, 10_000), chunks=(2_000, 2_000))
b = (a @ a.T).mean()

ms = MemorySampler()
with ms.sample("With AMM"):
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(60)
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(15)
    b.compute()
    c.run(trim_memory)
    time.sleep(60)
ms.plot(figsize=(15, 10), grid=True)

The best performance and best memory handling occurs when the chunk size is 2_000.
image
However as I decrease the chunk size to 1_500 or 1_000, the cluster memory seems to flush less frequently.
image
image
If I decrease the chunk size to 800, the tcp transmissions to the workers fails and the execution hangs.

Is this expected?

@crusaderky
Copy link
Collaborator

However as I decrease the chunk size to 1_500 or 1_000, the cluster memory seems to flush less frequently.

1500 * 1500 * 8 = 17 MiB, which is much larger than the MALLOC_TRIM_THRESHOLD_ of 16 kib - so, at face value, this is not expected.
However your graphs don't have a scale, so I don't know how much difference you're talking about. (a @ a.T).mean() includes a bunch of reduction tasks, which are small in size. With a memory_limit=2GiB, they matter.

If I decrease the chunk size to 800, the tcp transmissions to the workers fails and the execution hangs.

This definitely must not happen, but it's unrelated with memory management. Could you open a reproducer, with logs and scheduler dump, in a different ticket?

@n3rV3
Copy link

n3rV3 commented Feb 1, 2023

we are facing this issue, tried the snippet shared earlier:

from dask.distributed import get_client
import dask.array as da
from distributed.diagnostics import MemorySampler

client = get_client('10.10.10.1:8786')

import ctypes

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)


client.amm.start()
a = da.random.random((20_000, 20_000), chunks=(2_000, 2_000))
b = (a @ a.T).mean()

ms = MemorySampler()
with ms.sample("With AMM"):
    b.compute()
    
client.run(trim_memory)
ms.plot(figsize=(15,10), grid=True)

The memory increases on workers from around 20GB to 50 GB and keeps growing steadily.
It grows significantly if i make the following change:

a = da.random.random((200_000, 200_000), chunks=(20_000, 20_000))

with above, the memory usage crossed 80GB and stayed there, even with client.run(trim_memory) and other tricks.

I have tried setting MALLOC_TRIM_THRESHOLD_ to 0 for all dask workers, but that doesn't help either.

This is an issue for us, because we run dags which do significantly more processing than the above snippet and the memory utilization of the cluster crosses 250 GB. We have a cluster capacity of around 2 TB(memory).
After some time the memory utilization becomes an issue and the workers get killed in between task executions thereby delaying our pipelines.

I tried the suggestions in this issue and also tried basic jemalloc tuning but nothing has helped so far.

I would like to understand if we can quickly address this.

@crusaderky
Copy link
Collaborator

crusaderky commented Feb 1, 2023

@n3rV3 does the unmanaged memory stay there when there are no tasks running at all?
does it stay there after you release all your futures and persisted collections (so that your managed memory becomes 0)?
what's your dask version?
you say 2tb cluster capacity; what's the capacity per worker?
what kind of cluster does your client connect to? (dask-kubernetes, coiled, in-house...?)
on what OS do the workers run?

@n3rV3
Copy link

n3rV3 commented Feb 3, 2023

Q: Does the unmanaged memory stay there when there are no tasks running at all?
Ans: yes the unmanaged memory stays there even with no tasks running at all.

Q: Does it stay there after you release all your futures and persisted collections (so that your managed memory becomes 0)?
Ans: Futures are not used, not sure about removing persisted collections. But, in the snippet I shared earlier, I wasn't persisting any collections. Still the memory kept growing with each run.

Q: what's your dask version?
Ans: dask version is: 2022.12.1

Q: what's the capacity per worker?
Ans: 14 instances are running, each instance has 12 workers, each worker is single threaded with 14GB RAM
The instances themselves have 16 cpu cores and 123GB RAM. The processes are allocated more memory than available on each instace. But, overall memory utilization of dask never crosses 60% on any node, this allows us to run the cluster without OOMs(so far).

Q: what kind of cluster does your client connect to? (dask-kubernetes, coiled, in-house...?)
Ans: Cluster is running on EC2, with dask-workers(running on many EC2 instances) connecting to dask-scheduler instance at startup. A systemd file manages dask service, relevant execution line:

ExecStart=/usr/local/bin/dask-worker --nthreads 1 --nworkers 12 --memory-limit 14000MB --death-timeout 300 --pid-file /usr/local/dask/run/dask-worker.pid tcp://10.10.10.10:8786

Q: on what OS do the workers run?
Ans: Ubuntu 22.04, Linux Kernel should be 5.15.0-1027-aws on all instances.

@crusaderky
Copy link
Collaborator

Q: Does it stay there after you release all your futures and persisted collections (so that your managed memory becomes 0)?
Ans: Futures are not used, not sure about removing persisted collections. But, in the snippet I shared earlier, I wasn't persisting any collections. Still the memory kept growing with each run.

The dashboard must show managed memory = 0 in the top left corner.
Alternatively, you can run client.run(lambda dask_worker: len(dask_worker.data) to learn how many keys you have in memory.

Once you have no keys in memory: how much memory per node are you talking about, at rest?

If it's significant (>2 GiB), you are likely suffering from a memory leak, potentially something that dask is not responsible for. What libraries are you calling from your tasks?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants