Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance issues with dask.scatter #3333

Open
rkube opened this issue Dec 19, 2019 · 5 comments
Open

Performance issues with dask.scatter #3333

rkube opened this issue Dec 19, 2019 · 5 comments
Labels
needs info Needs further information from the user performance

Comments

@rkube
Copy link

rkube commented Dec 19, 2019

Hi,
I use dask.distributed to perform a large number of analysis routines on the same data.
After pre-processing, I scatter my data to the workers like this, but it takes a long time to run:

With a 28.5MB large data packet the following code takes about 7.5s:

>>> tic_sc = timeit.default_timer()
>>> fft_future = dask_client.scatter(fft_data, broadcast=True, direct=True)
>>> toc_sc = timeit.default_timer()
>>> logging.info(f"Scatter took {(toc_sc - tic_sc):6.4f}s")
>>> Scatter took 7.4773s

This is with 32 workers on 2 nodes, connected via infiniband.

@quasiben
Copy link
Member

@rkube thanks for the issue. Unfortunately, there is not a whole to go on here. Would it be possible to generate a reproducible example ?

@rkube
Copy link
Author

rkube commented Dec 20, 2019

Right, here is the simplest example I could come up with:

from distributed import Client
import numpy as np 
import logging
import threading
import queue
import timeit


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)


def consume(Q, dask_client):
    while True:
        (i, data) = Q.get()
        if i == -1:
            Q.task_done()
            break

        tic_sc = timeit.default_timer()
        future = dask_client.scatter(data, broadcast=True, direct=True)
        toc_sc = timeit.default_timer()
        logging.info(f"Scatter took {(toc_sc - tic_sc):6.4f}s")

        Q.task_done()


def main():
    dq = queue.Queue()
    msg = None
    data = np.zeros([192, 512, 38], dtype=np.complex128)
    dask_client = Client(scheduler_file="/scratch/gpfs/rkube/dask_work/scheduler.json")
    worker = threading.Thread(target=consume, args=(dq, dask_client))
    worker.start()

    for i in range(5):
        data = data + np.random.uniform(0.0, 1.0, data.shape)
        dq.put((i, data))

    dq.put((-1, None))

    worker.join()
    dq.join()


if __name__ == "__main__":
    main()

And here is the output on a cluster running with 64 workers on 2 nodes.
$ python processor_dask_mockup.py
11:55:32,23 INFO: Scatter took 17.3621s
11:55:43,372 INFO: Scatter took 11.3483s
11:55:56,62 INFO: Scatter took 12.6898s
11:56:04,863 INFO: Scatter took 8.8010s
11:56:13,95 INFO: Scatter took 8.2318s

@mrocklin
Copy link
Member

Running this locally it takes much less time for me.

from dask.distributed import Client
client = Client()
import numpy as np
data = np.zeros([192, 512, 38], dtype=np.complex128)
%time client.scatter(data, broadcast=True, direct=True)
CPU times: user 120 ms, sys: 8.34 ms, total: 129 ms
Wall time: 654 ms

So I suspect that it has something to do with your setup. Maybe your client isn't well connected to your workers? Maybe something else? Unfortunately as an organization we're not set up to do this level of support for free.

@rkube
Copy link
Author

rkube commented Dec 30, 2019

Right, I checked it on a single node now. In this configuration the scatter doesn't take up much time:

$ python processor_dask_mockup.py 
distributed.scheduler - INFO - Receive client connection: Client-4a49417a-2b0e-11ea-940c-0894ef80904b
distributed.core - INFO - Starting established connection
09:11:36,968 INFO: Scatter took 0.4835s
09:11:37,338 INFO: Scatter took 0.3693s
09:11:37,755 INFO: Scatter took 0.4166s
09:11:38,163 INFO: Scatter took 0.4073s
09:11:38,575 INFO: Scatter took 0.4112s

This is with 64 dask workers running on the same node as the scheduler. In the case reported above, the scheduler was running on one of the two nodes, connected with gigabit ethernet.

@mrocklin
Copy link
Member

You'll probably have to do some profiling to figure out what is going on. I recommend the performance_report function described at the bottom of this documentation section: https://docs.dask.org/en/latest/diagnostics-distributed.html#capture-diagnostics

@GenevieveBuckley GenevieveBuckley added performance needs info Needs further information from the user labels Oct 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs info Needs further information from the user performance
Projects
None yet
Development

No branches or pull requests

4 participants