Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
from .queues import Queue
from .scheduler import Scheduler
from .threadpoolexecutor import rejoin
from .utils import sync
from .utils import sync, TimeoutError
from .variable import Variable
from .worker import Worker, get_worker, get_client, secede, Reschedule
from .worker_client import local_client, worker_client

from tornado.gen import TimeoutError

from ._version import get_versions

versions = get_versions()
Expand Down
4 changes: 2 additions & 2 deletions distributed/cfexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tornado import gen

from .metrics import time
from .utils import sync
from .utils import sync, TimeoutError


@gen.coroutine
Expand Down Expand Up @@ -135,7 +135,7 @@ def result_iterator():
if timeout is not None:
try:
yield future.result(end_time - time())
except gen.TimeoutError:
except TimeoutError:
raise cf.TimeoutError
else:
yield future.result()
Expand Down
6 changes: 3 additions & 3 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
except ImportError:
single_key = first
from tornado import gen
from tornado.gen import TimeoutError
from tornado.locks import Event, Condition, Semaphore
from tornado.ioloop import IOLoop
from tornado.queues import Queue
Expand Down Expand Up @@ -85,6 +84,7 @@
Any,
has_keyword,
format_dashboard_link,
TimeoutError,
)
from . import versions as version_module

Expand Down Expand Up @@ -1265,7 +1265,7 @@ async def _close(self, fast=False):

# Give the scheduler 'stream-closed' message 100ms to come through
# This makes the shutdown slightly smoother and quieter
with ignoring(AttributeError, CancelledError, asyncio.TimeoutError):
with ignoring(AttributeError, CancelledError, TimeoutError):
await asyncio.wait_for(
asyncio.shield(self._handle_scheduler_coroutine), 0.1
)
Expand Down Expand Up @@ -1957,7 +1957,7 @@ async def _scatter(
if nthreads is not None:
await asyncio.sleep(0.1)
if time() > start + timeout:
raise gen.TimeoutError("No valid workers found")
raise TimeoutError("No valid workers found")
nthreads = await self.scheduler.ncores(workers=workers)
if not nthreads:
raise ValueError("No valid workers")
Expand Down
4 changes: 2 additions & 2 deletions distributed/comm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import dask

from ..metrics import time
from ..utils import parse_timedelta, ignoring
from ..utils import parse_timedelta, ignoring, TimeoutError
from . import registry
from .addressing import parse_address

Expand Down Expand Up @@ -209,7 +209,7 @@ def _raise(error):
future = connector.connect(
loc, deserialize=deserialize, **(connection_args or {})
)
with ignoring(asyncio.TimeoutError):
with ignoring(TimeoutError):
comm = await asyncio.wait_for(
future, timeout=min(deadline - time(), 1)
)
Expand Down
5 changes: 3 additions & 2 deletions distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import weakref

import dask
from tornado import gen
from tornado.locks import Event
from tornado import gen

from .adaptive import Adaptive
from .cluster import Cluster
Expand All @@ -19,6 +19,7 @@
parse_bytes,
parse_timedelta,
import_term,
TimeoutError,
)
from ..scheduler import Scheduler
from ..security import Security
Expand Down Expand Up @@ -602,6 +603,6 @@ async def run_spec(spec: dict, *args):
@atexit.register
def close_clusters():
for cluster in list(SpecCluster._instances):
with ignoring(gen.TimeoutError):
with ignoring((gen.TimeoutError, TimeoutError)):
if cluster.status != "closed":
cluster.close(timeout=10)
4 changes: 2 additions & 2 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
tls_only_security,
)
from distributed.utils_test import loop # noqa: F401
from distributed.utils import sync
from distributed.utils import sync, TimeoutError

from distributed.deploy.utils_test import ClusterTest

Expand Down Expand Up @@ -523,7 +523,7 @@ def test_memory_nanny(loop, n_workers):


def test_death_timeout_raises(loop):
with pytest.raises(asyncio.TimeoutError):
with pytest.raises(TimeoutError):
with LocalCluster(
scheduler_port=0,
silence_logs=False,
Expand Down
4 changes: 2 additions & 2 deletions distributed/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import tornado.locks

from .client import _get_global_client
from .utils import log_errors
from .utils import log_errors, TimeoutError
from .worker import get_worker

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -47,7 +47,7 @@ async def acquire(self, stream=None, name=None, id=None, timeout=None):
future = asyncio.wait_for(future, timeout)
try:
await future
except asyncio.TimeoutError:
except TimeoutError:
result = False
break
else:
Expand Down
18 changes: 7 additions & 11 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

import dask
from dask.system import CPU_COUNT
from tornado import gen
from tornado.ioloop import IOLoop, TimeoutError
from tornado.ioloop import IOLoop
from tornado.locks import Event
from tornado import gen

from .comm import get_address_host, unparse_host_port
from .comm.addressing import address_from_user_args
Expand All @@ -31,6 +31,7 @@
PeriodicCallback,
parse_timedelta,
ignoring,
TimeoutError,
)
from .worker import run, parse_memory_limit, Worker

Expand Down Expand Up @@ -213,12 +214,7 @@ async def _unregister(self, timeout=10):
if worker_address is None:
return

allowed_errors = (
gen.TimeoutError,
CommClosedError,
EnvironmentError,
RPCClosed,
)
allowed_errors = (TimeoutError, CommClosedError, EnvironmentError, RPCClosed)
with ignoring(allowed_errors):
await asyncio.wait_for(
self.scheduler.unregister(address=self.worker_address), timeout
Expand Down Expand Up @@ -317,7 +313,7 @@ async def instantiate(self, comm=None):
result = await asyncio.wait_for(
self.process.start(), self.death_timeout
)
except gen.TimeoutError:
except TimeoutError:
await self.close(timeout=self.death_timeout)
logger.error(
"Timed out connecting Nanny '%s' to scheduler '%s'",
Expand All @@ -340,7 +336,7 @@ async def _():

try:
await asyncio.wait_for(_(), timeout)
except asyncio.TimeoutError:
except TimeoutError:
logger.error("Restart timed out, returning before finished")
return "timed out"
else:
Expand Down Expand Up @@ -729,7 +725,7 @@ async def run():

try:
loop.run_sync(run)
except TimeoutError:
except (TimeoutError, gen.TimeoutError):
# Loop was stopped before wait_until_closed() returned, ignore
pass
except KeyboardInterrupt:
Expand Down
4 changes: 2 additions & 2 deletions distributed/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from .core import Server, ConnectionPool
from .versions import get_versions
from .utils import DequeHandler
from .utils import DequeHandler, TimeoutError


class Node(object):
Expand Down Expand Up @@ -173,7 +173,7 @@ async def wait_for(future, timeout=None):
await asyncio.wait_for(future, timeout=timeout)
except Exception:
await self.close(timeout=1)
raise asyncio.TimeoutError(
raise TimeoutError(
"{} failed to start in {} seconds".format(
type(self).__name__, timeout
)
Expand Down
4 changes: 2 additions & 2 deletions distributed/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import asyncio
import dask

from .utils import mp_context
from .utils import mp_context, TimeoutError

from tornado import gen
from tornado.concurrent import Future
Expand Down Expand Up @@ -283,7 +283,7 @@ def join(self, timeout=None):
else:
try:
yield asyncio.wait_for(self._exit_future, timeout)
except gen.TimeoutError:
except TimeoutError:
pass

def close(self):
Expand Down
9 changes: 6 additions & 3 deletions distributed/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from tornado import gen

from .core import CommClosedError
from .utils import sync
from .utils import sync, TimeoutError
from .protocol.serialize import to_serialize

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -400,10 +400,13 @@ async def _get(self, timeout=None):
if timeout is not None:
timeout2 = timeout - (datetime.datetime.now() - start)
if timeout2.total_seconds() < 0:
raise gen.TimeoutError()
raise TimeoutError()
else:
timeout2 = None
await self.condition.wait(timeout=timeout2)
try:
await self.condition.wait(timeout=timeout2)
except gen.TimeoutError:
raise TimeoutError("Timed out waiting on Sub")

return self.buffer.popleft()

Expand Down
22 changes: 16 additions & 6 deletions distributed/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

import tornado.queues
from tornado.locks import Event
from tornado import gen

from .client import Future, _get_global_client, Client
from .utils import tokey, sync, thread_state
from .utils import tokey, sync, thread_state, TimeoutError
from .worker import get_client

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -78,7 +79,10 @@ async def put(
record = {"type": "msgpack", "value": data}
if timeout is not None:
timeout = datetime.timedelta(seconds=timeout)
await self.queues[name].put(record, timeout=timeout)
try:
await self.queues[name].put(record, timeout=timeout)
except gen.TimeoutError:
raise TimeoutError("Timed out waiting for Queue")

def future_release(self, name=None, key=None, client=None):
self.future_refcount[name, key] -= 1
Expand Down Expand Up @@ -124,7 +128,10 @@ def process(record):
else:
if timeout is not None:
timeout = datetime.timedelta(seconds=timeout)
record = await self.queues[name].get(timeout=timeout)
try:
record = await self.queues[name].get(timeout=timeout)
except gen.TimeoutError:
raise TimeoutError("Timed out waiting for Queue")
record = process(record)
return record

Expand Down Expand Up @@ -225,9 +232,12 @@ def qsize(self, **kwargs):
return self.client.sync(self._qsize, **kwargs)

async def _get(self, timeout=None, batch=False):
resp = await self.client.scheduler.queue_get(
timeout=timeout, name=self.name, batch=batch
)
try:
resp = await self.client.scheduler.queue_get(
timeout=timeout, name=self.name, batch=batch
)
except gen.TimeoutError:
raise TimeoutError("Timed out waiting for Queue")

def process(d):
if d["type"] == "Future":
Expand Down
31 changes: 16 additions & 15 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
except ImportError:
from toolz import frequencies, merge, pluck, merge_sorted, first, merge_with
from toolz import valmap, second, compose, groupby
from tornado import gen
from tornado.ioloop import IOLoop

import dask
Expand Down Expand Up @@ -60,6 +59,7 @@
key_split_group,
empty_context,
tmpfile,
TimeoutError,
)
from .utils_comm import scatter_to_workers, gather_from_workers, retry_operation
from .utils_perf import enable_gc_diagnosis, disable_gc_diagnosis
Expand Down Expand Up @@ -2744,7 +2744,7 @@ async def scatter(
while not self.workers:
await asyncio.sleep(0.2)
if time() > start + timeout:
raise gen.TimeoutError("No workers found")
raise TimeoutError("No workers found")

if workers is None:
nthreads = {w: ws.nthreads for w, ws in self.workers.items()}
Expand Down Expand Up @@ -2874,25 +2874,26 @@ async def restart(self, client=None, timeout=3):
if nanny_address is not None
]

resps = All(
[
nanny.restart(
close=True, timeout=timeout * 0.8, executor_wait=False
)
for nanny in nannies
]
)
try:
resps = All(
[
nanny.restart(
close=True, timeout=timeout * 0.8, executor_wait=False
)
for nanny in nannies
]
)
resps = await asyncio.wait_for(resps, timeout)
if not all(resp == "OK" for resp in resps):
logger.error(
"Not all workers responded positively: %s", resps, exc_info=True
)
except gen.TimeoutError:
except TimeoutError:
logger.error(
"Nannies didn't report back restarted within "
"timeout. Continuuing with restart process"
)
else:
if not all(resp == "OK" for resp in resps):
logger.error(
"Not all workers responded positively: %s", resps, exc_info=True
)
finally:
await asyncio.gather(*[nanny.close_rpc() for nanny in nannies])

Expand Down
Loading