Skip to content

Scheduler gets stuck in a loop if one or more workers terminate during client.scatter() #5704

@amohar2

Description

@amohar2

In my code, I have a large object which will be scattered using client.scatter to all workers (broadcast=True).
However, I have seen that the scheduler can get stuck in a loop, if one or more workers go down in the middle of client.scatter() call. The produced logs are as follows:
Communication with worker tcp://192.168.55.103:8002 failed during replication: OSError: Timed out trying to connect to tcp://192.168.55.103:8002 after 60 s
Communication with worker tcp://192.168.55.103:8002 failed during replication: OSError: Timed out trying to connect to tcp://192.168.55.103:8002 after 60 s
Communication with worker tcp://192.168.55.103:8002 failed during replication: OSError: Timed out trying to connect to tcp://192.168.55.103:8002 after 60 s
....

I investigate this and seems that this comes from scheduler's replicate() call, where data/tasks are copied to all workers in a while loop, where copying is done in scheduler's gather_on_workers() function. However, this function does not throw an error if connecting to a terminated worker fails. Hence, making the scheduler to retry after "timeout" seconds.
Here is the snippet from replicate() function:

while tasks:
                gathers = defaultdict(dict)
                for ts in list(tasks):
                    if ts._state == "forgotten":
                        # task is no longer needed by any client or dependant task
                        tasks.remove(ts)
                        continue
                    n_missing = n - len(ts._who_has & workers)
                    if n_missing <= 0:
                        # Already replicated enough
                        tasks.remove(ts)
                        continue

                    count = min(n_missing, branching_factor * len(ts._who_has))
                    assert count > 0

                    for ws in random.sample(workers - ts._who_has, count):
                        gathers[ws._address][ts._key] = [
                            wws._address for wws in ts._who_has
                        ]

                await asyncio.gather(
                    *(
                        # Note: this never raises exceptions
                        self.gather_on_worker(w, who_has)
                        for w, who_has in gathers.items()
                    )
                )

Is there any option to limit the number of scheduler retries in replicate() call?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions