Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky test_broken_worker_during_computation #4173

Open
jrbourbeau opened this issue Oct 20, 2020 · 6 comments
Open

Flaky test_broken_worker_during_computation #4173

jrbourbeau opened this issue Oct 20, 2020 · 6 comments
Labels
flaky test Intermittent failures on CI.

Comments

@jrbourbeau
Copy link
Member

I've noticed distributed/tests/test_failed_workers.py::test_broken_worker_during_computation has been sporadically failing in recent PRs (e.g. https://travis-ci.org/github/dask/distributed/jobs/737168606#L1607)

@martindurant
Copy link
Member

Good news! I can get this to fail locally - occasionally.

@martindurant
Copy link
Member

Git bisect has the test passing (20 runs) up to and not including dcb46d0 (cc @gforsyth ). The funny thing is, that the failure comes after 13 successful runs at that commit, but gets slowly worse with following commits - if I got my bisecting order right.

@martindurant
Copy link
Member

(that commit was the merger of the rather large #4107 , which passed in the PR but failed in py36 on merger)

@gforsyth
Copy link
Contributor

Thanks for digging that up, @martindurant -- I had run this test locally a bunch, but not 20 successive times.

Can you confirm what kind of failure you're seeing when it does fail? Is it an asyncio timeout or something else?

@martindurant
Copy link
Member

Luckily, it failed on the third run once I had tracebacks back on

(py37) /Users/mdurant/code/distributed ((dcb46d07...)|BISECTING)> pytest --count=20 distributed/tests/test_failed_workers.py::test_broken_worker_during_computation -x
========================================================================================== test session starts ===========================================================================================
platform darwin -- Python 3.7.8, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /Users/mdurant/code/distributed, configfile: setup.cfg
plugins: repeat-0.8.0, flaky-3.7.0, hypothesis-5.26.0, cov-2.10.1, profiling-1.7.0
collected 20 items

distributed/tests/test_failed_workers.py ..F

================================================================================================ FAILURES ================================================================================================
______________________________________________________________________________ test_broken_worker_during_computation[3-20] _______________________________________________________________________________

    def test_func():
        result = None
        workers = []
        with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:

            async def coro():
                with dask.config.set(config):
                    s = False
                    for i in range(5):
                        try:
                            s, ws = await start_cluster(
                                nthreads,
                                scheduler,
                                loop,
                                security=security,
                                Worker=Worker,
                                scheduler_kwargs=scheduler_kwargs,
                                worker_kwargs=worker_kwargs,
                            )
                        except Exception as e:
                            logger.error(
                                "Failed to start gen_cluster, retrying",
                                exc_info=True,
                            )
                            await asyncio.sleep(1)
                        else:
                            workers[:] = ws
                            args = [s] + workers
                            break
                    if s is False:
                        raise Exception("Could not start cluster")
                    if client:
                        c = await Client(
                            s.address,
                            loop=loop,
                            security=security,
                            asynchronous=True,
                            **client_kwargs,
                        )
                        args = [c] + args
                    try:
                        future = func(*args)
                        if timeout:
                            future = asyncio.wait_for(future, timeout)
                        result = await future
                        if s.validate:
                            s.validate_state()
                    finally:
                        if client and c.status not in ("closing", "closed"):
                            await c._close(fast=s.status == Status.closed)
                        await end_cluster(s, workers)
                        await asyncio.wait_for(cleanup_global_workers(), 1)

                    try:
                        c = await default_client()
                    except ValueError:
                        pass
                    else:
                        await c._close(fast=True)

                    def get_unclosed():
                        return [c for c in Comm._instances if not c.closed()] + [
                            c
                            for c in _global_clients.values()
                            if c.status != "closed"
                        ]

                    try:
                        start = time()
                        while time() < start + 5:
                            gc.collect()
                            if not get_unclosed():
                                break
                            await asyncio.sleep(0.05)
                        else:
                            if allow_unclosed:
                                print(f"Unclosed Comms: {get_unclosed()}")
                            else:
                                raise RuntimeError("Unclosed Comms", get_unclosed())
                    finally:
                        Comm._instances.clear()
                        _global_clients.clear()

                    return result

            result = loop.run_sync(
>               coro, timeout=timeout * 2 if timeout else timeout
            )

distributed/utils_test.py:954:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../conda/envs/py37/lib/python3.7/site-packages/tornado/ioloop.py:532: in run_sync
    return future_cell[0].result()
distributed/utils_test.py:912: in coro
    result = await future
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

fut = <Task cancelled coro=<test_broken_worker_during_computation() done, defined at /Users/mdurant/code/distributed/distributed/tests/test_failed_workers.py:315>>, timeout = 60

    async def wait_for(fut, timeout, *, loop=None):
        """Wait for the single Future or coroutine to complete, with timeout.

        Coroutine will be wrapped in Task.

        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().

        If the wait is cancelled, the task is also cancelled.

        This function is a coroutine.
        """
        if loop is None:
            loop = events.get_event_loop()

        if timeout is None:
            return await fut

        if timeout <= 0:
            fut = ensure_future(fut, loop=loop)

            if fut.done():
                return fut.result()

            fut.cancel()
            raise futures.TimeoutError()

        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)

        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)

        try:
            # wait until the future completes or the timeout
            try:
                await waiter
            except futures.CancelledError:
                fut.remove_done_callback(cb)
                fut.cancel()
                raise

            if fut.done():
                return fut.result()
            else:
                fut.remove_done_callback(cb)
                # We must ensure that the task is not running
                # after wait_for() returns.
                # See https://bugs.python.org/issue32751
                await _cancel_and_wait(fut, loop=loop)
>               raise futures.TimeoutError()
E               concurrent.futures._base.TimeoutError

../../conda/envs/py37/lib/python3.7/asyncio/tasks.py:449: TimeoutError
------------------------------------------------------------------------------------------ Captured stderr call ------------------------------------------------------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:52676
distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:52677
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:52677
distributed.worker - INFO -          dashboard at:            127.0.0.1:52678
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:52676
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                   17.18 GB
distributed.worker - INFO -       Local Directory: /Users/mdurant/code/distributed/dask-worker-space/worker-_1zsf66b
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:52679
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:52679
distributed.worker - INFO -          dashboard at:            127.0.0.1:52680
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:52676
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                   17.18 GB
distributed.worker - INFO -       Local Directory: /Users/mdurant/code/distributed/dask-worker-space/worker-cuz87qov
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:52677', name: 0, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:52677
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:52679', name: 1, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:52679
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:52676
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:52676
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-b80cc33a-29b8-11eb-9578-acde48001122
distributed.core - INFO - Starting established connection
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:52685'
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:52686
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:52686
distributed.worker - INFO -          dashboard at:            127.0.0.1:52687
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:52676
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                    2.86 GB
distributed.worker - INFO -       Local Directory: /Users/mdurant/code/distributed/dask-worker-space/dask-worker-space/worker-322ii8r5
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:52686', name: tcp://127.0.0.1:52686, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:52686
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:52676
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Run out-of-band function '_exit'
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:52686', name: tcp://127.0.0.1:52686, memory: 10, processing: 80>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:52686
distributed.core - INFO - Lost connection to 'tcp://127.0.0.1:52684': in <closed TCP>: Stream is closed
distributed.nanny - INFO - Worker process 71091 exited with status 1
distributed.nanny - WARNING - Restarting worker
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:52714
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:52714
distributed.worker - INFO -          dashboard at:            127.0.0.1:52716
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:52676
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                    2.86 GB
distributed.worker - INFO -       Local Directory: /Users/mdurant/code/distributed/dask-worker-space/dask-worker-space/worker-nlpop5co
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:52714', name: tcp://127.0.0.1:52714, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:52714
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:52676
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:52686
Traceback (most recent call last):
  File "/Users/mdurant/code/distributed/distributed/comm/core.py", line 322, in connect
    _raise(error)
  File "/Users/mdurant/code/distributed/distributed/comm/core.py", line 275, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://127.0.0.1:52686' after 5 s: in <distributed.comm.tcp.TCPConnector object at 0x17b03d250>: ConnectionRefusedError: [Errno 61] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/mdurant/code/distributed/distributed/worker.py", line 1993, in gather_dep
    self.rpc, deps, worker, who=self.address
  File "/Users/mdurant/code/distributed/distributed/worker.py", line 3196, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/Users/mdurant/code/distributed/distributed/utils_comm.py", line 390, in retry_operation
    operation=operation,
  File "/Users/mdurant/code/distributed/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/Users/mdurant/code/distributed/distributed/worker.py", line 3173, in _get_data
    comm = await rpc.connect(worker)
  File "/Users/mdurant/code/distributed/distributed/core.py", line 1035, in connect
    **self.connection_args,
  File "/Users/mdurant/code/distributed/distributed/comm/core.py", line 334, in connect
    _raise(error)
  File "/Users/mdurant/code/distributed/distributed/comm/core.py", line 275, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://127.0.0.1:52686' after 5 s: Timed out trying to connect to 'tcp://127.0.0.1:52686' after 5 s: in <distributed.comm.tcp.TCPConnector object at 0x17b03d250>: ConnectionRefusedError: [Errno 61] Connection refused
distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:52686
Traceback (most recent call last):
  File "/Users/mdurant/code/distributed/distributed/comm/core.py", line 322, in connect
    _raise(error)
  File "/Users/mdurant/code/distributed/distributed/comm/core.py", line 275, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://127.0.0.1:52686' after 5 s: in <distributed.comm.tcp.TCPConnector object at 0x17b36e210>: ConnectionRefusedError: [Errno 61] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/mdurant/code/distributed/distributed/worker.py", line 1993, in gather_dep
    self.rpc, deps, worker, who=self.address
  File "/Users/mdurant/code/distributed/distributed/worker.py", line 3196, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/Users/mdurant/code/distributed/distributed/utils_comm.py", line 390, in retry_operation
    operation=operation,
  File "/Users/mdurant/code/distributed/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/Users/mdurant/code/distributed/distributed/worker.py", line 3173, in _get_data
    comm = await rpc.connect(worker)
  File "/Users/mdurant/code/distributed/distributed/core.py", line 1035, in connect
    **self.connection_args,
  File "/Users/mdurant/code/distributed/distributed/comm/core.py", line 334, in connect
    _raise(error)
  File "/Users/mdurant/code/distributed/distributed/comm/core.py", line 275, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://127.0.0.1:52686' after 5 s: Timed out trying to connect to 'tcp://127.0.0.1:52686' after 5 s: in <distributed.comm.tcp.TCPConnector object at 0x17b36e210>: ConnectionRefusedError: [Errno 61] Connection refused
distributed.core - ERROR - Timed out trying to connect to 'tcp://127.0.0.1:52686' after 5 s: Timed out trying to connect to 'tcp://127.0.0.1:52686' after 5 s: in <distributed.comm.tcp.TCPConnector object at 0x111a7e550>: ConnectionRefusedError: [Errno 61] Connection refused
Traceback (most recent call last):
  File "/Users/mdurant/code/distributed/distributed/comm/core.py", line 322, in connect
    _raise(error)
  File "/Users/mdurant/code/distributed/distributed/comm/core.py", line 275, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://127.0.0.1:52686' after 5 s: in <distributed.comm.tcp.TCPConnector object at 0x111a7e550>: ConnectionRefusedError: [Errno 61] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/mdurant/code/distributed/distributed/core.py", line 528, in handle_comm
    result = await result
  File "/Users/mdurant/code/distributed/distributed/scheduler.py", line 3040, in broadcast
    [send_message(address) for address in addresses if address is not None]
  File "/Users/mdurant/code/distributed/distributed/utils.py", line 230, in All
    result = await tasks.next()
  File "/Users/mdurant/code/distributed/distributed/scheduler.py", line 3031, in send_message
    comm = await self.rpc.connect(addr)
  File "/Users/mdurant/code/distributed/distributed/core.py", line 1035, in connect
    **self.connection_args,
  File "/Users/mdurant/code/distributed/distributed/comm/core.py", line 334, in connect
    _raise(error)
  File "/Users/mdurant/code/distributed/distributed/comm/core.py", line 275, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://127.0.0.1:52686' after 5 s: Timed out trying to connect to 'tcp://127.0.0.1:52686' after 5 s: in <distributed.comm.tcp.TCPConnector object at 0x111a7e550>: ConnectionRefusedError: [Errno 61] Connection refused
distributed.scheduler - INFO - Remove client Client-b80cc33a-29b8-11eb-9578-acde48001122
distributed.scheduler - INFO - Remove client Client-b80cc33a-29b8-11eb-9578-acde48001122
distributed.scheduler - INFO - Close client connection: Client-b80cc33a-29b8-11eb-9578-acde48001122
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:52677
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:52679
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:52677', name: 0, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:52677
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:52679', name: 1, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:52679
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:52714', name: tcp://127.0.0.1:52714, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:52714
distributed.scheduler - INFO - Lost all workers
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:52714
distributed.nanny - INFO - Worker closed
distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:52685'

@jrbourbeau
Copy link
Member Author

Thanks for identifying where this started to fail @martindurant! Also, today I learned pytest has a --count option 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flaky test Intermittent failures on CI.
Projects
None yet
Development

No branches or pull requests

3 participants