Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible memory leak when using LocalCluster #5960

Open
JSKenyon opened this issue Mar 18, 2022 · 33 comments
Open

Possible memory leak when using LocalCluster #5960

JSKenyon opened this issue Mar 18, 2022 · 33 comments
Labels
bug Something is broken memory needs info Needs further information from the user

Comments

@JSKenyon
Copy link

What happened:
Memory usage of code using da.from_array and compute in a for loop grows over time when using a LocalCluster.

What you expected to happen:
Memory usage should be approximately stable (subject to the GC).

Minimal Complete Verifiable Example:

import numpy as np
import dask.array as da
from dask.distributed import Client, LocalCluster


def f(x):
    return np.zeros(x.shape, dtype=x.dtype)


def wrapper(x):
    if not isinstance(x, da.Array):
        x = da.from_array(x, chunks=(1, -1, -1))

    return da.blockwise(f, ('nx', 'ny', 'nz'),
                        x, ('nx', 'ny', 'nz')).compute()


if __name__=='__main__':

    cluster = LocalCluster(
        processes=True,
        n_workers=4,
        threads_per_worker=1
    )
    client = Client(cluster)

    nx, ny, nz = 4, 512, 512
    
    for i in range(500):
        x = np.random.randn(nx, ny, nz)
        wrapper(x)

Anything else we need to know?:

The output of mprof run --multiprocess --include-children reproducer.py visualised by mprof plot using a LocalCluster:
localcluster

The output of mprof run --multiprocess --include-children reproducer.py visualised by mprof plot using the threads scheduler:
threads

The output of mprof run --multiprocess --include-children reproducer.py visualised by mprof plot using a LocalCluster but instantiating x as da.random.standard_normal((nx, ny, nz), chunks=(1, -1, -1)):
nofromarray

Note that child 0 should be ignored in the LocalCluster plots as it is an artifact of the profiling procedure. Memory usage is in fact still climbing in the last plot, but it is much slower than in the first case (using da.from_array). My current best guess is that either the Client or the Cluster is somehow maintaining a reference to an object which should otherwise have been GCed.

Environment:

  • Dask version: 2022.2.1
  • Python version: 3.8.10
  • Operating System: Ubuntu 20.04
  • Install method (conda, pip, source): pip
@scharlottej13
Copy link
Contributor

Noting that this discussion from the Dask discourse seems quite similar-- when using a local cluster, they also noticed memory usage on the client continuing to increase over time.

@gjoseph92
Copy link
Collaborator

  1. I see you're on linux. Can you run this with MALLOC_TRIM_THRESHOLD_=0 mprof run ... and compare?
  2. Can you run the scheduler in a separate process (maybe just via mprof run dask-scheduler ...)? I'd like to isolate whether the memory growth is actually happening on the scheduler, or on the client side, or neither and just happening because they're in the same process.
  3. Could you try disabling all the logs mentioned in Set scheduler log sizes automatically based on available memory #5570 and see any of them make a difference (ideally narrow down which ones)?
  4. I've made very similar mprof plots in the past. One thing I'd like to check but haven't is how the total number of live Python objects changes over time. Maybe just len(gc.get_objects()) after each iteration?

The fact that memory growth is so different when you switch x to da.random.standard_normal is telling. When x is a NumPy array, the graph will contain the entirely of x's data within it. (Due to nuances of NumPy buffers and zero-copy serialization, the chunks of x within the graph may even point to x's underlying buffer, even in serialized form on the scheduler due to pickle5, since the scheduler and client are in the same process. So if anything keeps a reference to even one of those tasks which contains a chunk of x—serialized or not—it may be enough to keep the entire array in memory. This is another reason to try the scheduler in a separate process.)

When x is da.random.standard_normal, the serialized tasks will be much smaller (random seeds are included, but I think they're a bit smaller than the actual array).

In both cases, memory seems to grow with each repeated computation, but when x is a NumPy array it obviously grows much faster.

It makes me think that something is keeping references to (serialized?) tasks alive even after they're complete, either on the scheduler or client.

@djhoese
Copy link

djhoese commented Mar 19, 2022

Using the modified example script in the discourse, starting a separate scheduler, and one explicit worker, I don't see any meaningful/consistent memory usage in the main process.

Script
import sys, os
import psutil
import numpy as np
from dask.distributed import Client, LocalCluster
import random
from time import sleep
import ctypes
import gc


def current_memory():
    process = psutil.Process(os.getpid())
    mem = process.memory_info().rss / (1024 * 1024)  # MB
    return mem


def get_patch(index, dt):
    return None


if __name__ == "__main__":
    #cluster = LocalCluster(n_workers=8, threads_per_worker=1, memory_limit='5GB')
    #client = Client(cluster)
    client = Client("tcp://192.168.86.42:8786")
    dts = list(range(23 * 11))

    initial_mem = curr_mem = current_memory()
    print(f"Initial memory: {curr_mem}")
    curr_mem = current_memory()
    counter = 0
    num_patches = 210
    for dt in dts:
        prev_mem = curr_mem
        curr_mem = current_memory()
        print(f"| Count: {counter:>03d} | Current memory usage: {current_memory():>0.05f} | Memory delta: {curr_mem - prev_mem:>0.05f} |")
        patch_res = client.map(get_patch, [dt + x for x in range(num_patches)], dt=dt)
        final_res = client.gather(patch_res)
        print(f'number of client references {len(client.futures)}')
        counter += 1
        if counter % 10 == 0:
            gc.collect()
Separate scheduler - 1 worker
$ python debug_bradshaw_memory_usage2.py 
Initial memory: 86.23828125
| Count: 000 | Current memory usage: 86.24609 | Memory delta: 0.00000 |
number of client references 210
| Count: 001 | Current memory usage: 89.19531 | Memory delta: 2.94922 |
number of client references 210
| Count: 002 | Current memory usage: 89.63672 | Memory delta: 0.44141 |
number of client references 210
| Count: 003 | Current memory usage: 90.12500 | Memory delta: 0.48828 |
number of client references 210
| Count: 004 | Current memory usage: 91.39062 | Memory delta: 1.26562 |
number of client references 210
| Count: 005 | Current memory usage: 91.39062 | Memory delta: 0.00000 |
number of client references 210
| Count: 006 | Current memory usage: 91.39062 | Memory delta: 0.00000 |
number of client references 210
| Count: 007 | Current memory usage: 91.39062 | Memory delta: 0.00000 |
number of client references 210
| Count: 008 | Current memory usage: 91.39062 | Memory delta: 0.00000 |
number of client references 210
| Count: 009 | Current memory usage: 91.39062 | Memory delta: 0.00000 |
number of client references 210
| Count: 010 | Current memory usage: 91.51172 | Memory delta: 0.12109 |
number of client references 210
| Count: 011 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 012 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 013 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 014 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 015 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 016 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 017 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 018 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 019 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 020 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 021 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 022 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 023 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 024 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 025 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 026 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 027 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 028 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 029 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
number of client references 210
| Count: 030 | Current memory usage: 91.51172 | Memory delta: 0.00000 |
Script-created LocalCluster
$ python debug_bradshaw_memory_usage2.py 
/home/davidh/miniconda3/envs/satpy_py39/lib/python3.9/site-packages/distributed/node.py:160: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 45945 instead
  warnings.warn(
Initial memory: 113.1328125
| Count: 000 | Current memory usage: 113.13281 | Memory delta: 0.00000 |
number of client references 210
| Count: 001 | Current memory usage: 118.75781 | Memory delta: 5.62500 |
number of client references 210
| Count: 002 | Current memory usage: 120.42969 | Memory delta: 1.67188 |
number of client references 210
| Count: 003 | Current memory usage: 121.39453 | Memory delta: 0.96484 |
number of client references 210
| Count: 004 | Current memory usage: 122.40234 | Memory delta: 1.00781 |
number of client references 210
| Count: 005 | Current memory usage: 122.91406 | Memory delta: 0.51172 |
number of client references 210
| Count: 006 | Current memory usage: 123.40234 | Memory delta: 0.48828 |
number of client references 210
| Count: 007 | Current memory usage: 124.12891 | Memory delta: 0.72656 |
number of client references 210
| Count: 008 | Current memory usage: 124.62500 | Memory delta: 0.49609 |
number of client references 210
| Count: 009 | Current memory usage: 124.62500 | Memory delta: 0.00000 |
number of client references 210
| Count: 010 | Current memory usage: 124.83594 | Memory delta: 0.21094 |
number of client references 210
| Count: 011 | Current memory usage: 125.07812 | Memory delta: 0.24219 |
number of client references 210
| Count: 012 | Current memory usage: 125.57031 | Memory delta: 0.49219 |
number of client references 210
| Count: 013 | Current memory usage: 125.81641 | Memory delta: 0.24609 |
number of client references 210
| Count: 014 | Current memory usage: 126.30078 | Memory delta: 0.48438 |
number of client references 210
| Count: 015 | Current memory usage: 126.79688 | Memory delta: 0.49609 |

All try the log options next.

Edit: I should also note that monitoring the scheduler process while running shows that it is growing in memory usage, the workers aren't, and even after the client is closed and the script ended the scheduler still kept that memory.

@djhoese
Copy link

djhoese commented Mar 19, 2022

@gjoseph92 I updated my ~/.config/dask/distributed.yaml with:

distributed:
  scheduler:
    events-log-length: 0
  admin:
    log-length: 0
  diagnostics:
    computations:
      max-history: 0

and used the in-script LocalCluster, but I still saw the same memory usage. Does that YAML look correct? I think I'm done for the night but will try some more things this weekend.

@JSKenyon
Copy link
Author

JSKenyon commented Mar 22, 2022

Thanks @djhoese for doing that follow-up.

@gjoseph92 MALLOC_TRIM_THRESHOLD_ didn't change the results of mprof.

I did not pursue the logging angle as @djhoese seemed to be looking into it. I would be incredibly surprised if logging could explain the growth in the from_array case.

I also tried printing len(gc.getobjects()) after each iteration. As expected, this was growing in the case where the cluster was instantiated in the reproducer, but was not growing when the cluster was instantiated in a separate ipython session.

I believe that it is the workers that are accumulating the memory. To check, I started the cluster in a separate ipython session and attached to it. The idea was that this would let me poke around after the reproducer had run. I used the Ubuntu system monitor to look at the PIDs associated with the cluster (with 4 workers, as above). I could see that even after the reproducer had run, the worker PIDs were holding on to memory (and the ipython process was holding almost none). Rerunning the reproducer multiple times caused the memory footprints to grow further and killing off a worker via asyncio.run(cluster.workers[0].kill()) caused it to drop. Is this adequate proof that the leak is worker related?

Edit: I also put print(os.getpid(), ": ", len(gc.get_objects())) inside f(x). This showed a growing number of objects between iterations.

@djhoese
Copy link

djhoese commented Mar 22, 2022

Interesting @JSKenyon. I found the opposite behavior. With my test script I see major memory increases in the scheduler, but only small increases in the workers. Now that I think about it though, I'm not if that was proportional to the number of workers or maybe even a sum of the increases in each worker (leak_size_in_scheduler = num_workers * leak_size_in_workers). I'll see if I have time for some tests today.

@djhoese
Copy link

djhoese commented Mar 22, 2022

Here are the results of running my script (once with a LocalCluster, once with an external scheduler), the scheduler, and 4 workers with mprof run:

Script (LocalCluster):

script_localcluster_mprof_plot

Script (External Scheduler):

script_external_mprof_plot

Scheduler:

NOTE: The script using this scheduler was started near the end of the plot.

scheduler_mprof_plot

Workers 1 (other 3 are the same):

worker1_mprof_plot

@JSKenyon
Copy link
Author

JSKenyon commented Mar 23, 2022

@djhoese I suspect that our reproducers are probing slightly different behaviours. Using your reproducer, I can also see growth in the scheduler process. However, in my example I am relatively convinced that it is the workers which hold on to the majority of the memory (perhaps you could run it and confirm?).

I have modified your reproducer to the following:

from dask.distributed import Client, LocalCluster
import dask


def f(x):
    return None


if __name__ == "__main__":

    dask.config.set(
        {
            "distributed.scheduler.events-log-length": 0,
            "distributed.admin.log-length": 0,
            "distributed.diagnostics.computations.max-history": 0
        }        
    )

    cluster = LocalCluster(
        n_workers=4,
        threads_per_worker=1,
    )
    client = Client(cluster)

    for dt in range(10000):
        client.gather(client.map(f, [x for x in range(100)]))

This takes a little while to run, but you can use the dashboard to watch the worker memory slowly grow over time. That is not to say that there isn't growth in the scheduler process. Is it possible that something in the scheduler is also causing objects to remain live on the workers? That might explain both types of growth. My original reproducer happens to emphasise the worker behaviour as a result of using from_array whereas @djhoese's reproducer exposes scheduler behaviour by having negligible graph size. I am absolutely convinced that the workers are growing over time in both cases though (in addition to growth in the scheduler process).

Running the reproducer in this comment, I get the following output from mprof run --multiprocess:
djhoese

This is particularly interesting. The black line is the parent/scheduler process in this case (which does not include the memory of children). The coloured lines (excluding blue) are the four worker processes. I think that this clearly demonstrates that both the workers and the scheduler are leaking (or certainly growing) memory.

@djhoese @gjoseph92 thoughts?

Edit: Changed the profiling plot - attaching to the dashboard had muddied the results.

@djhoese
Copy link

djhoese commented Mar 23, 2022

Yep, same results.

many_iterations_mprof_plot

I also ran it with a separate scheduler and single worker process. Watching the dashboard I noticed under the Workers tab that all the leaked memory is considered "unmanaged old":

image

At least for me the scheduler is blowing up way more:

many_iterations_sched_mprof_plot

Worker increases, but not like the scheduler:

many_iterations_worker1_mprof_plot

But I also just noticed I'm running 2021.12.0 of dask and distributed. I'll try updating and rerunning this.

Edit: Reran with 2022.3.0 and got similar results.

@djhoese
Copy link

djhoese commented Mar 23, 2022

So I tried one more thing. I updated your script @JSKenyon to only do 10 iterations, but with a 60 second sleep after each. Here's what resulted:

Separate Scheduler:

many_iterations4_sched_mprof_plot

Worker:

many_iterations4_worker1_mprof_plot

At the end of the script I counted to 60 then killed the worker. I then counted to 30 before killing the scheduler. The jump in memory just after the 600s mark on the scheduler plot seems to coincide with the worker being killed (Ctrl+C). I also note that it seems memory slowly increases over time for both the scheduler and the worker even when the scheduler and worker aren't doing anything. Maybe there is some heartbeat-style logging somewhere? The memory definitely increases each time a set of tasks are given, but still small increases overtime when idle.

@JSKenyon
Copy link
Author

JSKenyon commented Mar 24, 2022

Interesting results @djhoese. I can reproduce on my end. I will try fiddling with config to see if I can change the rate of growth. That might give us a clue. I am still most concerned by the original reproducer though. This slow growth over time is a problem, but I really want to know if it has the same root cause as the explosive growth when using from_array. The slow growth could still be attributed to logs (and I am going to see if I can make some progress on that front), but the explosive growth has something to do with the serialised arrays not being GCed - i.e. the graph (or task etc.) persisting after computation is complete.

@graingert
Copy link
Member

but the explosive growth has something to do with the serialised arrays not being GCed - i.e. the graph (or task etc.) persisting after computation is complete.

@JSKenyon can you see what happens with pyobjgraph? eg what do

objgraph.show_chain(objgraph.find_backref_chain(graph, objgraph.is_proper_module))
objgraph.show_chain(objgraph.find_backref_chain(task, objgraph.is_proper_module))

show?

@JSKenyon
Copy link
Author

@graingert I should emphasise that at this point my hypothesis is just that - I have been struggling to test it. It may be that the cause is something else (I shouldn't have made such a definite statement).

Thanks for those commands though - I will give it a go!

@fjetter
Copy link
Member

fjetter commented Mar 24, 2022

We have a bunch of internal logs that are not externally visible for configuration, e.g. transfer_logs

depending on how dense your graph is (i.e. how many dependencies are fetched, the total size may vary)

self.incoming_transfer_log = deque(maxlen=100000)
self.incoming_count = 0
self.outgoing_transfer_log = deque(maxlen=100000)

In [1]: from distributed.utils import time

In [2]: from dask.sizeof import sizeof

In [3]: def estimate_size_of_transfer_log(nkeys=10):
   ...:     # use time() to proxy any float
   ...:     transfer_log = {
   ...:         "start": time(),
   ...:         "stop": time(),
   ...:         "middle": time(),
   ...:         "duration": time(),
   ...:         "who": "tcp://134.0.0.1",
# this keys / nbytes is "unbound" and scales with the number of dependencies of a fetch
   ...:         "keys": {f"key{ix}": time() for ix in range(nkeys)},
   ...:         "total": time(),
   ...:         "compressed": time(),
   ...:         "bandwidth": time(),
   ...:     }
   ...:     return sizeof(transfer_log) * 100000
# 10 keys per call get_data request, on average. Result in MB
In [4]: estimate_size_of_transfer_log(nkeys=10) / 1024**2
Out[4]: 240.42129516601562
# 32 is a common branching factor, i.e. it's not unreasonable to have many tasks with this many dependencies. It should be very rarely happening 
In [17]: estimate_size_of_transfer_log(nkeys=32) / 1024**2
Out[17]: 515.1748657226562

Then there is the transition log on worker side, see here that will be another 50-100MB. This is currently not controlled by distributed.scheduler.transition-log-length but it should.


I would expect there to be a few hundred MB per worker. What I can see in #5960 (comment) aligns with my expectations

If it's "just" the logging and nothing else, we can easily add in more config options or adjust current default values.

This is not ideal, of course, but it should saturate eventually. All of our logs are bounded but there is some variation in it.

There are ideas around like #5570 to give users a better control over this but this is still in ideation stage.

@fjetter
Copy link
Member

fjetter commented Mar 24, 2022

Things like #5989 will help on scheduler side. There are also a bunch of things on the scheduler that accumulate data but the intention is to only log small things such that it doesn't affect anybody.

Just in case this comes across the wrong way, this is somewhat expected but not intended behaviour. We have to fix this.

@fjetter
Copy link
Member

fjetter commented Mar 24, 2022

To get a measurement on the workers, one can use a script like (use at your own disk)

from dask.sizeof import sizeof
def get_sizeof(dask_worker):
    res = {}
    for key in dask_worker.__dict__:
        try:
            val = dask_worker.__dict__[key]
            # There may be other types not properly sized by dask.sizeof
            if isinstance(val, deque):
                val = list(val)
            _size = sizeof(val) / 1024 ** 2
            # only return stuff that has at least an MB
            if _size > 1:
                res[key] = _size
        except Exception:
            pass
    return res

client.run(get_sizeof)

@JSKenyon
Copy link
Author

@fjetter Thanks for the input! I will definitely take a look at those logs. Do you think the behaviour in the original report (#5960 (comment)) could be explained by the logging?

@fjetter
Copy link
Member

fjetter commented Mar 25, 2022

One thing I forgot, the bokeh backend also logs stuff to build the timeline, e.g. task stream. This all should be relatively small but if there are many small components, this obviously adds up to something. This only pops up on the scheduler.


will definitely take a look at those logs. Do you think the behaviour in the original report (#5960 (comment)) could be explained by the logging?

Hard to say, without more measurements. It's not impossible.

  1. LocalCluster is using more memory than threading scheduler. That makes sense since the threading just uses a ThreadPool and does not use any sophisticated logic behind the scenes. Particularly, no logging, etc. That makes sense.
  2. The graph you are scheduling is trivial so the transfer log should probably be empty. The culprit is likely something else

What is concerning in your original report is that you are accumulating ~4GB of additional data over time with just four workers. That's 1GB per worker and they do not appear to be saturated. Your plots are a bit hard to read since one cannot distinguish the child processes properly. I assume all workers are following the same line? If so, the logging hypothesis is a strong candidate.

@JSKenyon
Copy link
Author

Indeed, all four children in the first plot are on top of one another. I should have stressed that. What I find strange is that it is the from_array that exposes this extreme growth. Is the serialised array stored somewhere in the logs? If that were the case, I would agree with you that the logging is likely to blame.

@fjetter
Copy link
Member

fjetter commented Mar 25, 2022

If that were the case, I would agree with you that the logging is likely to blame.

Not that I am aware of. Logging and storing task results would've caused much more severe regressions

@JSKenyon
Copy link
Author

JSKenyon commented Mar 28, 2022

I have been attempting to track this down but I am still having limited success. Here is a modified version of my original reproducer:

import numpy as np
import dask.array as da
from dask.distributed import Client, LocalCluster


def f(x):
    return np.empty_like(x)

def wrapper(x, n_elem_per_chunk):
    if not isinstance(x, da.Array):
        x = da.from_array(x, chunks=(n_elem_per_chunk))

    return da.blockwise(f, ('x'), x, ('x'))


if __name__=='__main__':

    cluster = LocalCluster(
        processes=True,
        n_workers=1,
        threads_per_worker=1
    )
    client = Client(cluster)

    n_elem_per_chunk = 13107200  # 100MiB of float64 data.
    n_chunk = 1

    for i in range(100):
        x = np.random.randn(n_elem_per_chunk*n_chunk)
        result = wrapper(x, n_elem_per_chunk)
        result.compute()

I have deliberately set this up so that each chunk is exactly 100MiB in size. This is because mprof plots in MiB and I want to make something clear. Note that running this reproducer requires ~10GB of RAM.

The following is a plot generated using mprof run --multiprocess reproducer.py:
exact
In the above plot, the black line is the scheduler process and the red line is the single worker. The blue line can be ignored. Note that the peak memory usage is a little over 10000MiB which happens to be 100 times (the number of times we run the loop) larger than the array x (100MiB by choice). This seems to be pretty conclusive evidence that the arrays (or more likely the versions which end up in the graph) are overstaying their welcome.

I am really struggling to track this down further. I have tried both pympler and objgraph. Neither made it clear to me what is going wrong, although I am not particularly proficient with either.

Edit: I should note that I did try purging the worker and scheduler logs between iterations but it made no difference. I should also stress that @djhoese's case may be entirely different.

@JSKenyon
Copy link
Author

I have finally made a little progress - this is actually a relatively recent regression. Below is a plot produced with the 2022.1.1 versions of dask and distributed, where the black line is the scheduler process, the red line is the single worker and the blue line can be ignored:
jan
The above is consistent with what we would expect. Contrast it with the same plot generated using the 2022.2.0 and later versions of dask and distributed:
feb
This is hopefully a thread that can be pulled to find the cause.

@JSKenyon
Copy link
Author

I think #5653 is the culprit. The following example demonstrates that worker.pending_data_per_worker grows with every compute call.

import numpy as np
import dask.array as da
import dask
from dask.distributed import Client, LocalCluster
from pympler import asizeof


def f(x):
    return np.empty_like(x)


def wrapper(x, n_elem_per_chunk):
    if not isinstance(x, da.Array):
        x = da.from_array(x, chunks=(n_elem_per_chunk))

    return da.blockwise(f, ('x'), x, ('x'))


def check_growth(dask_worker):

    for uth in dask_worker.pending_data_per_worker.values():
        uth_size = asizeof.asizeof(uth) / 1024 ** 2
        print(f"Size of UniqueTaskHeap: {uth_size:.2f} MiB.")


if __name__ == '__main__':

    cluster = LocalCluster(
        processes=True,
        n_workers=1,
        threads_per_worker=1
    )
    client = Client(cluster)

    n_elem_per_chunk = 13107200  # 100MiB of float64.
    n_chunk = 1

    for i in range(10):
        x = np.random.randn(n_elem_per_chunk*n_chunk)

        result = wrapper(x, n_elem_per_chunk)

        dask.compute(result)

        client.run(check_growth)

@fjetter
Copy link
Member

fjetter commented Mar 28, 2022

If pending_data_per_worker grows to become this large, I would expect the actual problem to be TaskState.run_spec which is the only thing on our TaskState that should actually have any relevant size. This is basically the serialized function and arguments you are submitting to the cluster. This can be particularly large if you (accidentally) pickly by value and not by reference.

However, I am a bit confused about seeing pending_data_per_worker to be populated in the first place since I expected your graph to be embarrassingly parallel, isn't it?

@JSKenyon
Copy link
Author

JSKenyon commented Mar 28, 2022

In the last example I have reduced it to a single chunk per compute call. I do not dispute that there will be a 100MiB array serialised into the graph - that is absolutely expected when using from_array. The problem is that the state of pending_data_per_worker is persisting between compute calls, which means you end up with more and more of these serialised arrays in memory. Given that compute is a blocking (to the best of my knowledge) operation, I am a little confused why it is retaining the state of previous calls. I am basically an amateur here, so forgive me if I am a little slow. As to whether or not it is surprising that pending_data_per_worker is populated, I cannot say. I do not believe I am doing anything particularly surprising above and it is definitely a regression after 2022.01.1.

gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Mar 29, 2022
Why does pending data have so much in it?? xref dask#5960 (comment)
@JSKenyon
Copy link
Author

JSKenyon commented Mar 29, 2022

@fjetter I (think) I have isolated the growth to a single function. Specifically, update_who_has:

def update_who_has(self, who_has: dict[str, Collection[str]]) -> None:
try:
for dep, workers in who_has.items():
if not workers:
continue
if dep in self.tasks:
dep_ts = self.tasks[dep]
if self.address in workers and self.tasks[dep].state != "memory":
logger.debug(
"Scheduler claims worker %s holds data for task %s which is not true.",
self.name,
dep,
)
# Do not mutate the input dict. That's rude
workers = set(workers) - {self.address}
dep_ts.who_has.update(workers)
for worker in workers:
self.has_what[worker].add(dep)
self.pending_data_per_worker[worker].push(dep_ts)

When invoked from handle_compute_task, this causes new entries to be added to self.pending_data_per_worker. These entries are never (to the best of my knowledge) removed. This is at odds with the behaviour in 2022.01.1, at which point pending_data_per_worker would not have been updated for my example. Specifically, the following (this is the code from 2022.01.1):
if dep_ts.state in FETCH_INTENDED:
dep_ts.who_has.update(workers)
if dep_ts.state == "missing":
recommendations[dep_ts] = "fetch"
for worker in workers:
self.has_what[worker].add(dep)
self.pending_data_per_worker[worker].append(dep_ts.key)

would not have been triggered. I cannot offer a solution - I am not familiar enough with the worker internals. That said, hopefully this is enough information to make a fix easy.

Edit: Changed link to point to this repo rather than a fork.

@fjetter
Copy link
Member

fjetter commented Mar 29, 2022

Thanks for tracking this down @JSKenyon ! This indeed doesn't look right. I remember iterating on this piece of code a few times. It was very frequently connected to all sorts of deadlocks. I'll try to squeeze out a bit of time to look into a fix.

It should likely be something similar to the if dep_ts.state in FETCH_INTENDED: guard but defining the proper FETCH_INTENDED set will likely be a challenge. The deadlocks connected to this should have a test so it shuold be "safe" to modify

@djhoese
Copy link

djhoese commented Mar 31, 2022

Awesome job @JSKenyon! I was on vacation for a week and just got back. Looks like you got a lot of work done. I agree that what you and I are looking at are probably different things and mine may be linked to some amount of logging somewhere. However, the issues you're finding here are likely (I hope) the causes of the original problem I was running into when helping someone I'm working with. They were seeing memory blow ups and when I trimmed down the script I kept seeing memory increasing so I just kept trimming the script down to the near-nothing script I mentioned above. I will double check versions used in various places and see where your reproducer aligns with my real world case.

@zklaus
Copy link

zklaus commented Apr 11, 2022

Any news on this? Could there be a kind of workaround in the meantime?

@JSKenyon
Copy link
Author

I am not sure @zklaus. The "simple" short term solution is to go back to using the 2022.01.1 versions of dask and distributed. I do not think that I could implement a proper fix on my own right now - the surface area is too large and I am too unfamiliar with the innermost workings of the scheduler. It does seem that this issue may be affecting others so hopefully a proper fix is coming.

@zklaus
Copy link

zklaus commented Apr 11, 2022

Thanks, @JSKenyon. I didn't realize that going back to 2022.01.1 is a temporary fix. Would be great to see this resolved more permanently.

@fjetter
Copy link
Member

fjetter commented May 25, 2022

There are currently two PRs (one will be merged) re-working some of the update_who_has flaws, see

These PRs should address ever increasing who_has/has_what sets. There might be still an issue about pending_data_per_worker / data_needed_per_worker but since this issue was filed, this data structured change significantly and it is possible that this is no longer a problem.

I suggest to rerun the reproducers in here once either of the above PRs are merged

@fjetter
Copy link
Member

fjetter commented Jun 1, 2022

I ran the reproducer posted in #5960 (comment) again after merging #6342

The initial increase is understandable given that we're logging transitions, etc. by default but it does not grow out of bound and every process stays at about ~200MB.
I cannot explain the big drop at ~120s but I suspect this is something like GC or my OS did something funny

gh5960_post_update_who_has

There are probably still a few sources out there that contribute to some increase in memory but it looks like the worst is over.
Another interesting change in our pipeline that should help here is

I'd love for somebody else to confirm if the issue improved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken memory needs info Needs further information from the user
Projects
None yet
Development

No branches or pull requests

7 participants