Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask hangs indefinitely on unit test during client.scatter #4907

Open
freddyaboulton opened this issue Jun 11, 2021 · 3 comments
Open

Dask hangs indefinitely on unit test during client.scatter #4907

freddyaboulton opened this issue Jun 11, 2021 · 3 comments

Comments

@freddyaboulton
Copy link

What happened:

We have recently started seeing our CI fail sporadically on a test that hangs forever. The stacktrace points to client.scatter with broadcast=True. This error doesn't happen every time the test is run but enough times to be noticeable. Due to the sporadic nature of the error, it's hard to create a minimal repro.

We have started seeing this recently, so I'm posting here as a hail-mary to see if anyone can link this to a recent change in dask. We have started seeing this since '2021.05.1'

This is the method that is hanging:

    def send_data_to_cluster(self, X, y):
        """Send data to the cluster.

        The implementation uses caching so the data is only sent once. This follows
        dask best practices.

        Args:
            X (pd.DataFrame): input data for modeling
            y (pd.Series): target data for modeling
        Return:
            dask.Future: the modeling data
        """
        data_hash = joblib.hash(X), joblib.hash(y)
        if data_hash in self._data_futures_cache:
            X_future, y_future = self._data_futures_cache[data_hash]
            if not (X_future.cancelled() or y_future.cancelled()):
                return X_future, y_future
        self._data_futures_cache[data_hash] = self.client.scatter(
            [X, y], broadcast=True
        )
        return self._data_futures_cache[data_hash]

This is the stacktrace. Note that pytest is setting the 360 second timeout. Without it, the test would hang forever.

test_python/lib/python3.8/site-packages/distributed/client.py:2185: in scatter
    return self.sync(
test_python/lib/python3.8/site-packages/distributed/client.py:853: in sync
    return sync(
test_python/lib/python3.8/site-packages/distributed/utils.py:351: in sync
    e.wait(10)
/opt/hostedtoolcache/Python/3.8.10/x64/lib/python3.8/threading.py:558: in wait
    signaled = self._cond.wait(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Condition(<unlocked _thread.lock object at 0x7fcf3bf52f00>, 0)>
timeout = 10

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.
    
        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.
    
        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.
    
        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof).
    
        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.
    
        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
>                   gotit = waiter.acquire(True, timeout)
E                   Failed: Timeout >360.0s

/opt/hostedtoolcache/Python/3.8.10/x64/lib/python3.8/threading.py:306: Failed

The log is spitting this out:

distributed.worker - WARNING - Could not find data: {'Series-32a3ef2ca4739b46a6acc2ac58638b32': ['tcp://127.0.0.1:41061']} on workers: [] (who_has: {'Series-32a3ef2ca4739b46a6acc2ac58638b32': ['tcp://127.0.0.1:41061']})
distributed.scheduler - WARNING - Communication failed during replication: {'status': 'missing-data', 'keys': {'Series-32a3ef2ca4739b46a6acc2ac58638b32': ('tcp://127.0.0.1:41061',)}}
distributed.worker - WARNING - Could not find data: {'Series-32a3ef2ca4739b46a6acc2ac58638b32': ['tcp://127.0.0.1:41061']} on workers: [] (who_has: {'Series-32a3ef2ca4739b46a6acc2ac58638b32': ['tcp://127.0.0.1:41061']})
distributed.scheduler - WARNING - Communication failed during replication: {'status': 'missing-data', 'keys': {'Series-32a3ef2ca4739b46a6acc2ac58638b32': ('tcp://127.0.0.1:41061',)}}
distributed.worker - WARNING - Could not find data: {'Series-32a3ef2ca4739b46a6acc2ac58638b32': ['tcp://127.0.0.1:41061']} on workers: [] (who_has: {'Series-32a3ef2ca4739b46a6acc2ac58638b32': ['tcp://127.0.0.1:41061']})

This is a link to the complete stacktrace.

This is a link to the test that's being run.

Any help would be appreciated!

Environment:

  • Dask version: 2021.05.1
  • Python version: 3.8
  • Operating System: Mac OS/Linux
  • Install method (conda, pip, source): pip
@freddyaboulton freddyaboulton changed the title Dask hangs Dask hangs indefinitely on unit test during client.scatter Jun 11, 2021
@abergou
Copy link

abergou commented Jun 11, 2021

You may want to try the main branch. In particular a similar issue was fixed for me by #4784.

@freddyaboulton
Copy link
Author

freddyaboulton commented Jun 11, 2021

@abergou Thanks! What was your issue? Were you getting the same log output?

@abergou
Copy link

abergou commented Jun 11, 2021

@freddyaboulton I was getting cluster hanging during computations. For these I was not using scatter, but clusters would hang indefinitely. I was also seeing some issues in the logs with missing data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants