Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky distributed/tests/test_client.py::test_reconnect #6896

Closed
fjetter opened this issue Aug 17, 2022 · 2 comments · Fixed by #6978
Closed

Flaky distributed/tests/test_client.py::test_reconnect #6896

fjetter opened this issue Aug 17, 2022 · 2 comments · Fixed by #6978
Labels
flaky test Intermittent failures on CI.

Comments

@fjetter
Copy link
Member

fjetter commented Aug 17, 2022

This test is very reliably flaky now on OSX py3.8

#6847 might be responsible for this. Part of the conversation on this PR was already about this potentially being very slow on OSX

cc @graingert

This graph shows the history since the above PR was merged. First run on this plot is the merge commit of the PR.

image

Example https://github.com/dask/distributed/runs/7872474950?check_suite_focus=true

@fjetter fjetter added the flaky test Intermittent failures on CI. label Aug 17, 2022
@fjetter
Copy link
Member Author

fjetter commented Aug 17, 2022

It's only happening on py3.8 but never failed for py3.10.

If it turns out to be a python version specific problem, we can probably skip this python version for this test

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Aug 18, 2022

@graingert do you have any idea about this?

________________________________ test_reconnect ________________________________

args = (), kwds = {}

    @wraps(func)
    def inner(*args, **kwds):
        with self._recreate_cm():
>           return func(*args, **kwds)

../../../miniconda3/envs/dask-distributed/lib/python3.8/contextlib.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.8/contextlib.py:75: in inner
    return func(*args, **kwds)
distributed/utils_test.py:728: in test_func
    return _run_and_close_tornado(async_fn_outer, func, *args, **kwargs)
distributed/utils_test.py:376: in _run_and_close_tornado
    return asyncio.run(inner_fn())
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/runners.py:44: in run
    return loop.run_until_complete(main)
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/base_events.py:616: in run_until_complete
    return future.result()
distributed/utils_test.py:373: in inner_fn
    return await async_fn(*args, **kwargs)
distributed/utils_test.py:716: in async_fn_outer
    return await asyncio.wait_for(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fut = <Task finished name='Task-20873' coro=<test_reconnect() done, defined at /Users/runner/work/distributed/distributed/distributed/tests/test_client.py:3612> result=None>
timeout = 30

    async def wait_for(fut, timeout, *, loop=None):
        """Wait for the single Future or coroutine to complete, with timeout.
    
        Coroutine will be wrapped in Task.
    
        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().
    
        If the wait is cancelled, the task is also cancelled.
    
        This function is a coroutine.
        """
        if loop is None:
            loop = events.get_running_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)
    
        if timeout is None:
            return await fut
    
        if timeout <= 0:
            fut = ensure_future(fut, loop=loop)
    
            if fut.done():
                return fut.result()
    
            await _cancel_and_wait(fut, loop=loop)
            try:
                fut.result()
            except exceptions.CancelledError as exc:
                raise exceptions.TimeoutError() from exc
            else:
                raise exceptions.TimeoutError()
    
        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)
    
        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)
    
        try:
            # wait until the future completes or the timeout
            try:
                await waiter
            except exceptions.CancelledError:
                if fut.done():
                    return fut.result()
                else:
                    fut.remove_done_callback(cb)
                    # We must ensure that the task is not running
                    # after wait_for() returns.
                    # See https://bugs.python.org/issue32751
                    await _cancel_and_wait(fut, loop=loop)
                    raise
    
            if fut.done():
                return fut.result()
            else:
                fut.remove_done_callback(cb)
                # We must ensure that the task is not running
                # after wait_for() returns.
                # See https://bugs.python.org/issue32751
                await _cancel_and_wait(fut, loop=loop)
>               raise exceptions.TimeoutError()
E               asyncio.exceptions.TimeoutError

../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/tasks.py:501: TimeoutError
----------------------------- Captured stderr call -----------------------------
2022-08-18 01:02:52,240 - distributed.scheduler - CRITICAL - Closed comm <BatchedSend: closed> while trying to write [{'op': 'lost-data', 'key': 'inc-03d935909bba38f9a49655e867cbf56a'}]
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5063, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 822, in handle_stream
    msgs = await comm.read()
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5195, in send_all
    c.send(*msgs)
  File "/Users/runner/work/distributed/distributed/distributed/batched.py", line 156, in send
    raise CommClosedError(f"Comm {self.comm!r} already closed.")
distributed.comm.core.CommClosedError: Comm <TCP (closed) Scheduler->Client local=tcp://127.0.0.1:52627 remote=tcp://127.0.0.1:52631> already closed.
2022-08-18 01:02:52,249 - distributed.scheduler - ERROR - Cannot schedule a new coroutine function as the group is already closed.
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5063, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 822, in handle_stream
    msgs = await comm.read()
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 4503, in remove_worker
    self._ongoing_background_tasks.call_later(
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 231, in call_later
    self.call_soon(_delayed(afunc, delay), *args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 190, in call_soon
    raise AsyncTaskGroupClosedError(
distributed.core.AsyncTaskGroupClosedError: Cannot schedule a new coroutine function as the group is already closed.
2022-08-18 01:02:52,258 - distributed.core - ERROR - Exception while handling op register-worker
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5063, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 822, in handle_stream
    msgs = await comm.read()
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 770, in _handle_comm
    result = await result
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 3908, in add_worker
    await self.handle_worker(comm, address)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5067, in handle_worker
    await self.remove_worker(
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 4503, in remove_worker
    self._ongoing_background_tasks.call_later(
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 231, in call_later
    self.call_soon(_delayed(afunc, delay), *args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 190, in call_soon
    raise AsyncTaskGroupClosedError(
distributed.core.AsyncTaskGroupClosedError: Cannot schedule a new coroutine function as the group is already closed.
2022-08-18 01:02:52,543 - distributed.scheduler - CRITICAL - Closed comm <BatchedSend: closed> while trying to write [{'op': 'lost-data', 'key': 'inc-03d935909bba38f9a49655e867cbf56a'}]
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5063, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 822, in handle_stream
    msgs = await comm.read()
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5195, in send_all
    c.send(*msgs)
  File "/Users/runner/work/distributed/distributed/distributed/batched.py", line 156, in send
    raise CommClosedError(f"Comm {self.comm!r} already closed.")
distributed.comm.core.CommClosedError: Comm <TCP (closed) Scheduler->Client local=tcp://127.0.0.1:52627 remote=tcp://127.0.0.1:52634> already closed.
2022-08-18 01:02:52,551 - distributed.scheduler - ERROR - Cannot schedule a new coroutine function as the group is already closed.
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5063, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 822, in handle_stream
    msgs = await comm.read()
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 4503, in remove_worker
    self._ongoing_background_tasks.call_later(
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 231, in call_later
    self.call_soon(_delayed(afunc, delay), *args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 190, in call_soon
    raise AsyncTaskGroupClosedError(
distributed.core.AsyncTaskGroupClosedError: Cannot schedule a new coroutine function as the group is already closed.
2022-08-18 01:02:52,560 - distributed.core - ERROR - Exception while handling op register-worker
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5063, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 822, in handle_stream
    msgs = await comm.read()
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 770, in _handle_comm
    result = await result
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 3908, in add_worker
    await self.handle_worker(comm, address)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5067, in handle_worker
    await self.remove_worker(
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 4503, in remove_worker
    self._ongoing_background_tasks.call_later(
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 231, in call_later
    self.call_soon(_delayed(afunc, delay), *args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 190, in call_soon
    raise AsyncTaskGroupClosedError(
distributed.core.AsyncTaskGroupClosedError: Cannot schedule a new coroutine function as the group is already closed.
------------------------------ Captured log call -------------------------------
2022-08-18 01:28:22,312 - distributed.utils_perf - WARNING - full garbage collections took 24% CPU time recently (threshold: 10%)
2022-08-18 01:28:23,587 - distributed.utils_perf - WARNING - full garbage collections took 23% CPU time recently (threshold: 10%)
2022-08-18 01:28:24,925 - distributed.utils_perf - WARNING - full garbage collections took 23% CPU time recently (threshold: 10%)
ERROR    asyncio.events:utils.py:825 Cannot schedule a new coroutine function as the group is already closed.
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5063, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 822, in handle_stream
    msgs = await comm.read()
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 3908, in add_worker
    await self.handle_worker(comm, address)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5067, in handle_worker
    await self.remove_worker(
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 4503, in remove_worker
    self._ongoing_background_tasks.call_later(
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 231, in call_later
    self.call_soon(_delayed(afunc, delay), *args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 190, in call_soon
    raise AsyncTaskGroupClosedError(
distributed.core.AsyncTaskGroupClosedError: Cannot schedule a new coroutine function as the group is already closed.
ERROR    asyncio:base_events.py:1707 Task exception was never retrieved
future: <Task finished name='Task-20882' coro=<Server._handle_comm() done, defined at /Users/runner/work/distributed/distributed/distributed/core.py:676> exception=AsyncTaskGroupClosedError('Cannot schedule a new coroutine function as the group is already closed.')>
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5063, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 822, in handle_stream
    msgs = await comm.read()
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 770, in _handle_comm
    result = await result
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 3908, in add_worker
    await self.handle_worker(comm, address)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5067, in handle_worker
    await self.remove_worker(
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 4503, in remove_worker
    self._ongoing_background_tasks.call_later(
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 231, in call_later
    self.call_soon(_delayed(afunc, delay), *args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 190, in call_soon
    raise AsyncTaskGroupClosedError(
distributed.core.AsyncTaskGroupClosedError: Cannot schedule a new coroutine function as the group is already closed.
ERROR    asyncio.events:utils.py:825 Cannot schedule a new coroutine function as the group is already closed.
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5063, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 822, in handle_stream
    msgs = await comm.read()
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 3908, in add_worker
    await self.handle_worker(comm, address)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5067, in handle_worker
    await self.remove_worker(
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 4503, in remove_worker
    self._ongoing_background_tasks.call_later(
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 231, in call_later
    self.call_soon(_delayed(afunc, delay), *args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 190, in call_soon
    raise AsyncTaskGroupClosedError(
distributed.core.AsyncTaskGroupClosedError: Cannot schedule a new coroutine function as the group is already closed.
ERROR    asyncio:base_events.py:1707 Task exception was never retrieved
future: <Task finished name='Task-20948' coro=<Server._handle_comm() done, defined at /Users/runner/work/distributed/distributed/distributed/core.py:676> exception=AsyncTaskGroupClosedError('Cannot schedule a new coroutine function as the group is already closed.')>
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5063, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 822, in handle_stream
    msgs = await comm.read()
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 770, in _handle_comm
    result = await result
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 3908, in add_worker
    await self.handle_worker(comm, address)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 5067, in handle_worker
    await self.remove_worker(
  File "/Users/runner/work/distributed/distributed/distributed/utils.py", line 799, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 4503, in remove_worker
    self._ongoing_background_tasks.call_later(
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 231, in call_later
    self.call_soon(_delayed(afunc, delay), *args, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 190, in call_soon
    raise AsyncTaskGroupClosedError(
distributed.core.AsyncTaskGroupClosedError: Cannot schedule a new coroutine function as the group is already closed.

https://github.com/dask/distributed/runs/7890158694?check_suite_focus=true#step:11:1283

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flaky test Intermittent failures on CI.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants