Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace gen.sleep with asyncio.sleep #3208

Merged
merged 3 commits into from Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions distributed/client.py
Expand Up @@ -942,15 +942,15 @@ async def _start(self, timeout=no_default, **kwargs):
address = self.cluster.scheduler_address
elif self.scheduler_file is not None:
while not os.path.exists(self.scheduler_file):
await gen.sleep(0.01)
await asyncio.sleep(0.01)
for i in range(10):
try:
with open(self.scheduler_file) as f:
cfg = json.load(f)
address = cfg["address"]
break
except (ValueError, KeyError): # JSON file not yet flushed
await gen.sleep(0.01)
await asyncio.sleep(0.01)
elif self._start_arg is None:
from .deploy import LocalCluster

Expand All @@ -976,7 +976,7 @@ async def _start(self, timeout=no_default, **kwargs):
while not self.cluster.workers or len(self.cluster.scheduler.workers) < len(
self.cluster.workers
):
await gen.sleep(0.01)
await asyncio.sleep(0.01)

address = self.cluster.scheduler_address

Expand Down Expand Up @@ -1017,7 +1017,7 @@ async def _reconnect(self):
break
except EnvironmentError:
# Wait a bit before retrying
await gen.sleep(0.1)
await asyncio.sleep(0.1)
timeout = deadline - self.loop.time()
else:
logger.error(
Expand Down Expand Up @@ -1092,7 +1092,7 @@ async def _update_scheduler_info(self):
async def _wait_for_workers(self, n_workers=0):
info = await self.scheduler.identity()
while n_workers and len(info["workers"]) < n_workers:
await gen.sleep(0.1)
await asyncio.sleep(0.1)
info = await self.scheduler.identity()

def wait_for_workers(self, n_workers=0):
Expand Down Expand Up @@ -1946,7 +1946,7 @@ async def _scatter(
start = time()
while not nthreads:
if nthreads is not None:
await gen.sleep(0.1)
await asyncio.sleep(0.1)
if time() > start + timeout:
raise gen.TimeoutError("No valid workers found")
nthreads = await self.scheduler.ncores(workers=workers)
Expand Down Expand Up @@ -2280,7 +2280,7 @@ def run_on_scheduler(self, function, *args, **kwargs):
>>> async def print_state(dask_scheduler): # doctest: +SKIP
... while True:
... print(dask_scheduler.status)
... await gen.sleep(1)
... await asyncio.sleep(1)

>>> c.run(print_state, wait=False) # doctest: +SKIP

Expand Down Expand Up @@ -2370,7 +2370,7 @@ def run(self, function, *args, **kwargs):
>>> async def print_state(dask_worker): # doctest: +SKIP
... while True:
... print(dask_worker.status)
... await gen.sleep(1)
... await asyncio.sleep(1)

>>> c.run(print_state, wait=False) # doctest: +SKIP
"""
Expand Down
3 changes: 2 additions & 1 deletion distributed/comm/core.py
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod, abstractproperty
import asyncio
from datetime import timedelta
import logging
import weakref
Expand Down Expand Up @@ -224,7 +225,7 @@ def _raise(error):
except EnvironmentError as e:
error = str(e)
if time() < deadline:
await gen.sleep(0.01)
await asyncio.sleep(0.01)
logger.debug("sleeping on connect")
else:
_raise(error)
Expand Down
4 changes: 2 additions & 2 deletions distributed/comm/tests/test_comms.py
Expand Up @@ -683,7 +683,7 @@ def handle_comm(comm):

# Sanity check
comm = yield connect(
listener.contact_address, timeout=0.5, connection_args={"ssl_context": cli_ctx}
listener.contact_address, timeout=2, connection_args={"ssl_context": cli_ctx}
)
yield comm.close()

Expand All @@ -696,7 +696,7 @@ def handle_comm(comm):
with pytest.raises(EnvironmentError) as excinfo:
yield connect(
listener.contact_address,
timeout=0.5,
timeout=2,
connection_args={"ssl_context": cli_ctx},
)
# The wrong error is reported on Python 2, see https://github.com/tornadoweb/tornado/pull/2028
Expand Down
6 changes: 3 additions & 3 deletions distributed/core.py
Expand Up @@ -464,7 +464,7 @@ async def handle_stream(self, comm, extra=None, every_cycle=[]):
handler(**merge(extra, msg))
else:
logger.error("odd message %s", msg)
await gen.sleep(0)
await asyncio.sleep(0)

for func in every_cycle:
func()
Expand Down Expand Up @@ -492,15 +492,15 @@ def close(self):
if not self._comms:
break
else:
yield gen.sleep(0.05)
yield asyncio.sleep(0.05)
yield [comm.close() for comm in self._comms] # then forcefully close
for cb in self._ongoing_coroutines:
cb.cancel()
for i in range(10):
if all(cb.cancelled() for c in self._ongoing_coroutines):
break
else:
yield gen.sleep(0.01)
yield asyncio.sleep(0.01)

self._event_finished.set()

Expand Down
6 changes: 3 additions & 3 deletions distributed/diagnostics/progress.py
@@ -1,9 +1,9 @@
import asyncio
from collections import defaultdict
import logging
from timeit import default_timer

from toolz import groupby, valmap
from tornado import gen

from .plugin import SchedulerPlugin
from ..utils import key_split, key_split_group, log_errors, tokey
Expand Down Expand Up @@ -76,7 +76,7 @@ async def setup(self):
keys = self.keys

while not keys.issubset(self.scheduler.tasks):
await gen.sleep(0.05)
await asyncio.sleep(0.05)

tasks = [self.scheduler.tasks[k] for k in keys]

Expand Down Expand Up @@ -164,7 +164,7 @@ async def setup(self):
keys = self.keys

while not keys.issubset(self.scheduler.tasks):
await gen.sleep(0.05)
await asyncio.sleep(0.05)

tasks = [self.scheduler.tasks[k] for k in keys]

Expand Down
5 changes: 3 additions & 2 deletions distributed/nanny.py
@@ -1,3 +1,4 @@
import asyncio
from datetime import timedelta
import logging
from multiprocessing.queues import Empty
Expand Down Expand Up @@ -595,7 +596,7 @@ async def kill(self, timeout=2, executor_wait=True):
self.child_stop_q.close()

while process.is_alive() and loop.time() < deadline:
await gen.sleep(0.05)
await asyncio.sleep(0.05)

if process.is_alive():
logger.warning(
Expand All @@ -614,7 +615,7 @@ async def _wait_until_connected(self, uid):
try:
msg = self.init_result_q.get_nowait()
except Empty:
await gen.sleep(delay)
await asyncio.sleep(delay)
continue

if msg["uid"] != uid: # ensure that we didn't cross queues
Expand Down
8 changes: 4 additions & 4 deletions distributed/scheduler.py
Expand Up @@ -1271,7 +1271,7 @@ async def close(self, comm=None, fast=False, close_workers=False):
self.worker_send(worker, {"op": "close"})
for i in range(20): # wait a second for send signals to clear
if self.workers:
await gen.sleep(0.05)
await asyncio.sleep(0.05)
else:
break

Expand Down Expand Up @@ -2494,7 +2494,7 @@ async def scatter(
"""
start = time()
while not self.workers:
await gen.sleep(0.2)
await asyncio.sleep(0.2)
if time() > start + timeout:
raise gen.TimeoutError("No workers found")

Expand Down Expand Up @@ -2649,7 +2649,7 @@ async def restart(self, client=None, timeout=3):
self.log_event([client, "all"], {"action": "restart", "client": client})
start = time()
while time() < start + 10 and len(self.workers) < n_workers:
await gen.sleep(0.01)
await asyncio.sleep(0.01)

self.report({"op": "restart"})

Expand Down Expand Up @@ -3292,7 +3292,7 @@ async def feed(
else:
response = function(self, state)
await comm.write(response)
await gen.sleep(interval)
await asyncio.sleep(interval)
except (EnvironmentError, CommClosedError):
pass
finally:
Expand Down
10 changes: 5 additions & 5 deletions distributed/utils_test.py
Expand Up @@ -394,7 +394,7 @@ def apply(func, *args, **kwargs):


async def geninc(x, delay=0.02):
await gen.sleep(delay)
await asyncio.sleep(delay)
return x + 1


Expand All @@ -410,7 +410,7 @@ def compile_snippet(code, dedent=True):
compile_snippet(
"""
async def asyncinc(x, delay=0.02):
await gen.sleep(delay)
await asyncio.sleep(delay)
return x + 1
"""
)
Expand Down Expand Up @@ -813,7 +813,7 @@ async def start_cluster(
while len(s.workers) < len(nthreads) or any(
comm.comm is None for comm in s.stream_comms.values()
):
await gen.sleep(0.01)
await asyncio.sleep(0.01)
if time() - start > 5:
await asyncio.gather(*[w.close(timeout=1) for w in workers])
await s.close(fast=True)
Expand Down Expand Up @@ -939,7 +939,7 @@ async def coro():
if all(c.closed() for c in Comm._instances):
break
else:
await gen.sleep(0.05)
await asyncio.sleep(0.05)
else:
L = [c for c in Comm._instances if not c.closed()]
Comm._instances.clear()
Expand Down Expand Up @@ -1063,7 +1063,7 @@ def wait_for(predicate, timeout, fail_func=None, period=0.001):
async def async_wait_for(predicate, timeout, fail_func=None, period=0.001):
deadline = time() + timeout
while not predicate():
await gen.sleep(period)
await asyncio.sleep(period)
if time() > deadline:
if fail_func is not None:
fail_func()
Expand Down
6 changes: 3 additions & 3 deletions distributed/worker.py
Expand Up @@ -829,7 +829,7 @@ async def _register_with_scheduler(self):
break
except EnvironmentError:
logger.info("Waiting to connect to: %26s", self.scheduler.address)
await gen.sleep(0.1)
await asyncio.sleep(0.1)
except gen.TimeoutError:
logger.info("Timed out when connecting to scheduler")
if response["status"] != "OK":
Expand Down Expand Up @@ -1997,7 +1997,7 @@ async def gather_dep(self, worker, dep, deps, total_nbytes, cause=None):
else:
# Exponential backoff to avoid hammering scheduler/worker
self.repetitively_busy += 1
await gen.sleep(0.100 * 1.5 ** self.repetitively_busy)
await asyncio.sleep(0.100 * 1.5 ** self.repetitively_busy)

# See if anyone new has the data
await self.query_who_has(dep)
Expand Down Expand Up @@ -2586,7 +2586,7 @@ async def memory_monitor(self):
del k, v
total += weight
count += 1
await gen.sleep(0)
await asyncio.sleep(0)
memory = proc.memory_info().rss
if total > need and memory > target:
# Issue a GC to ensure that the evicted data is actually
Expand Down