Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unnecessary deep copy causes memory flare on network comms #5107

Closed
crusaderky opened this issue Jul 22, 2021 · 5 comments · Fixed by #5208
Closed

Unnecessary deep copy causes memory flare on network comms #5107

crusaderky opened this issue Jul 22, 2021 · 5 comments · Fixed by #5208

Comments

@crusaderky
Copy link
Collaborator

distributed git tip, Linux x64

import distributed
import numpy

c = distributed.Client(n_workers=4, threads_per_worker=1, memory_limit="2.8 GiB")
f = c.submit(numpy.random.random, 2 ** 27)  # 1 GiB
c.replicate(f, 4)
# Alternative to replicate, identical effect
# futures = [c.submit(lambda x: None, f, pure=False, workers=[w]) for w in c.has_what()]

Expected behaviour

Thanks to pickle 5 buffers, the peak RAM usage on each worker is 1 GiB

Actual behaviour

I can see on the dashboard the RAM of all workers that receive the computed future over the network briefly flare up to 2 GiB and then settle down at 1 GiB.
On stderr I read:

distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 2.07 GiB -- Worker memory limit: 2.80 GiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 2.07 GiB -- Worker memory limit: 2.80 GiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 2.07 GiB -- Worker memory limit: 2.80 GiB

If I reduce the memory_limit to 2 GiB, the workers get killed off.

The sender worker is unaffected by the flaring.

I tested on Python 3.8 and 3.9 and on protocols tcp://, ws:// and ucx:// and all are equally affected.

@crusaderky
Copy link
Collaborator Author

The flare is caused by this line:

merged_frames.append(bytearray().join(frames[offset : offset + n]))

@crusaderky
Copy link
Collaborator Author

This implies that all numpy buffers larger than a single frame are deep-copied upon arrival, which is wasteful. It should be possible to reassemble them directly from the network card's buffer into their final location although that may require some low level work.

@mrocklin
Copy link
Member

mrocklin commented Jul 22, 2021 via email

@gjoseph92
Copy link
Collaborator

Worth noting that the default frame size is (I believe) 64MiB, which is a pretty small chunksize, so it's probably reasonable to think this is happening often:

It should be possible to reassemble them directly from the network card's buffer into their final location although that may require some low level work.

To add more detail here, since initially I thought there might be a simpler approach (there isn't): at first, I thought "instead, could we just make a non-contiguous byearray-like object, so at least we get rid of the copy here"? But that would just move the problem around, because in the end, the NumPy array needs to be contiguous. So long as we've loaded all the 64MiB pieces of the array into non-contiguous memory before we've figured out where they need to go in the end, and then it turns out we need them to become contiguous, using 2x the memory of the final array is unavoidable.

I'd argue that this sort of problem is part of why Arrow exists.

@jakirkham
Copy link
Member

This implies that all numpy buffers larger than a single frame are deep-copied upon arrival, which is wasteful. It should be possible to reassemble them directly from the network card's buffer into their final location although that may require some low level work.

Yeah actually we already read them into a single buffer after PR ( #4506 ). There still may be some copying due to how Tornado is buffering communication, but it should be limited. That said, it sounds like the serialization logic results in extra copies as Gabe discovered in PR ( #5112 ).

Generally we have been aware there are cases where extra copies are happening, but the history is complicated (for example sometimes people have expected writable buffers when bytes were used, etc.). Cutting down on the number of copies is something I know Mads has been working on. Though he's on PTO atm (so would prefer not to bug him about this 😉). Should add this may involve a move away from msgpack to msgspec, which is something we've been thinking about for a bit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants