Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-blocking operations on scheduler #7233

Open
YJHMITWEB opened this issue Oct 29, 2022 · 7 comments
Open

Non-blocking operations on scheduler #7233

YJHMITWEB opened this issue Oct 29, 2022 · 7 comments

Comments

@YJHMITWEB
Copy link

YJHMITWEB commented Oct 29, 2022

Hi, this is just a general discussion on the scheduler's behavior. I noticed that in initialize:

async def run_scheduler():
            async with Scheduler(
                interface=interface,
                protocol=protocol,
                dashboard=dashboard,
                dashboard_address=dashboard_address,
            ) as scheduler:
                comm.bcast(scheduler.address, root=0)
                comm.Barrier()
                await scheduler.finished()

The scheduler only starts with 1 thread, and since the scheduler has to maintain communication with all workers and client, I'm very curious that once there is an await call, then the schedule will simply block itself there, and only when after that await, it can start to do other communications. It seems this is a huge overhead in communication.

For example, in distributed.comm.ucx,

@log_errors
    async def write(
        self,
        msg: dict,
        serializers: Collection[str] | None = None,
        on_error: str = "message",
    ) -> int:
        if self.closed():
            raise CommClosedError("Endpoint is closed -- unable to send message")
        try:
            if serializers is None:
                serializers = ("cuda", "dask", "pickle", "error")
            # msg can also be a list of dicts when sending batched messages
            logging.info("send msg={}".format(msg))
            frames = await to_frames(
                msg,
                serializers=serializers,
                on_error=on_error,
                allow_offload=self.allow_offload,
            )
            nframes = len(frames)
            cuda_frames = tuple(hasattr(f, "__cuda_array_interface__") for f in frames)
            sizes = tuple(nbytes(f) for f in frames)
            cuda_send_frames, send_frames = zip(
                *(
                    (is_cuda, each_frame)
                    for is_cuda, each_frame in zip(cuda_frames, frames)
                    if nbytes(each_frame) > 0
                )
            )

            # Send meta data

            # Send close flag and number of frames (_Bool, int64)
            await self.ep.send(struct.pack("?Q", False, nframes))
            # Send which frames are CUDA (bool) and
            # how large each frame is (uint64)
            await self.ep.send(
                struct.pack(nframes * "?" + nframes * "Q", *cuda_frames, *sizes)
            )

            # Send frames

            # It is necessary to first synchronize the default stream before start
            # sending We synchronize the default stream because UCX is not
            # stream-ordered and syncing the default stream will wait for other
            # non-blocking CUDA streams. Note this is only sufficient if the memory
            # being sent is not currently in use on non-blocking CUDA streams.
            if any(cuda_send_frames):
                synchronize_stream(0)

            for each_frame in send_frames:
                await self.ep.send(each_frame)
            return sum(sizes)
        except (ucp.exceptions.UCXBaseException):
            self.abort()
            raise CommClosedError("While writing, the connection was closed")

There are some await self.ep.send, and for example, this is a send from scheduler to worker dask/dask-mpi#1, then despite that all other workers can perform computation in parallel, they still have to sequentially wait for the communication with the scheduler. And in cases where communication is heavier than computation, the overhead will be significant.

I'm wondering if there is any way to perform nonblocking send/recv by giving the scheduler more threads.

@kmpaul
Copy link

kmpaul commented Oct 29, 2022

This seems to be a question more for the Distributed community. Dask-MPI is just the tool for launching a Dask cluster.

@kmpaul
Copy link

kmpaul commented Oct 30, 2022

Also, all communication between the scheduler and workers is asynchronous. So, nothing is blocking.

Is there a particular use case you are having problems with?

@YJHMITWEB
Copy link
Author

Oh thanks, I get it. But in case where all the communication happens at the same time, then even with async func, since the scheduler only has 1 thread, it is still going to be sequential.

@jacobtomlinson
Copy link
Member

@pentschev may have some thoughts

@pentschev
Copy link
Member

On a Dask worker, communication occurs on a different thread than that of compute. On the scheduler you're right that communication occurs on the same thread as the remaining of the work and that may in fact block at times, and I don't think there's currently a mechanism to allow offloading communications to a(multiple) separate thread(s). However, under normal circumstances, messages transferred between scheduler and worker as small and that blocking time may not be as substantial.

If you have the time, it would probably be an interesting experiment to do and check if there are any performance gains from offloading communication to one or more threads on the scheduler. As noted by @kmpaul , this discussion is anyway more suited for https://github.com/dask/distributed/issues , so maybe would be worth raising this question there to check what other people involved in Distributed think of this or whether there has been already someone who experimented on this.

@jacobtomlinson jacobtomlinson transferred this issue from dask/dask-mpi Nov 1, 2022
@jacobtomlinson
Copy link
Member

Agreed, transferring this issue to distributed.

@pentschev
Copy link
Member

pentschev commented Nov 1, 2022

Also one important fact I should have mentioned is how async really works in UCX-Py, there are two modes: blocking (default) and non-blocking (aka polling).

In the blocking mode, UCX registers a file descriptor that UCX-Py keeps on watching. Once there is an event on that file descriptor, UCX-Py progress routine will be awaken and only then UCX work will truly block to complete communication, otherwise it will allow other tasks in the event loop to proceed.

The non-blocking mode is much more straightforward, it tries to progress the UCX worker continuously, if there's any work to be done it will block until that completes, otherwise UCX-Py yields for the event loop and try again once the event loop completes that iteration and arrives again at the progress routine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants