diff --git a/distributed/client.py b/distributed/client.py index 751eb748ad6..79237a2f2b8 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -32,7 +32,7 @@ from tornado import gen from tornado.gen import TimeoutError from tornado.locks import Event, Condition -from tornado.ioloop import IOLoop, PeriodicCallback +from tornado.ioloop import IOLoop from tornado.queues import Queue from .batched import BatchedSend @@ -52,7 +52,7 @@ from .worker import dumps_task, get_client, get_worker, secede from .utils import (All, sync, funcname, ignoring, queue_to_iterator, tokey, log_errors, str_graph, key_split, format_bytes, asciitable, - thread_state, no_default) + thread_state, no_default, PeriodicCallback) from .versions import get_versions @@ -634,7 +634,7 @@ def start(self, **kwargs): while not self.loop._running: sleep(0.001) - pc = PeriodicCallback(lambda: None, 1000, io_loop=self.loop) + pc = PeriodicCallback(lambda: None, 1000) self.loop.add_callback(pc.start) _set_global_client(self) self.status = 'connecting' diff --git a/distributed/core.py b/distributed/core.py index f13c6b5408c..1404a9cd5a4 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -13,7 +13,7 @@ from toolz import assoc from tornado import gen -from tornado.ioloop import IOLoop, PeriodicCallback +from tornado.ioloop import IOLoop from tornado.locks import Event from .comm import (connect, listen, CommClosedError, @@ -22,7 +22,8 @@ from .config import config from .metrics import time from .system_monitor import SystemMonitor -from .utils import get_traceback, truncate_exception, ignoring, shutting_down +from .utils import (get_traceback, truncate_exception, ignoring, shutting_down, + PeriodicCallback) from . import protocol @@ -114,13 +115,12 @@ def __init__(self, handlers, connection_limit=512, deserialize=True, self.periodic_callbacks = dict() - pc = PeriodicCallback(self.monitor.update, 500, io_loop=self.io_loop) + pc = PeriodicCallback(self.monitor.update, 500) self.io_loop.add_callback(pc.start) self.periodic_callbacks['monitor'] = pc self._last_tick = time() - pc = PeriodicCallback(self._measure_tick, config.get('tick-time', 20), - io_loop=self.io_loop) + pc = PeriodicCallback(self._measure_tick, config.get('tick-time', 20)) self.io_loop.add_callback(pc.start) self.periodic_callbacks['tick'] = pc diff --git a/distributed/counter.py b/distributed/counter.py index 14b3bcab72f..8d76def7189 100644 --- a/distributed/counter.py +++ b/distributed/counter.py @@ -2,7 +2,10 @@ from collections import defaultdict -from tornado.ioloop import PeriodicCallback, IOLoop +from tornado.ioloop import IOLoop + +from .utils import PeriodicCallback + try: from crick import TDigest @@ -15,8 +18,7 @@ def __init__(self, loop=None, intervals=(5, 60, 3600)): self.components = [TDigest() for i in self.intervals] self.loop = loop or IOLoop.current() - self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000, - io_loop=self.loop) + self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000) self.loop.add_callback(self._pc.start) def add(self, item): @@ -44,8 +46,7 @@ def __init__(self, loop=None, intervals=(5, 60, 3600)): self.components = [defaultdict(lambda: 0) for i in self.intervals] self.loop = loop or IOLoop.current() - self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000, - io_loop=self.loop) + self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000) self.loop.add_callback(self._pc.start) def add(self, item): diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index 10645130ee1..e7991e3ae77 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -1,10 +1,10 @@ from __future__ import print_function, division, absolute_import import logging -from ..utils import log_errors from tornado import gen -from tornado.ioloop import PeriodicCallback + +from ..utils import log_errors, PeriodicCallback logger = logging.getLogger(__name__) @@ -52,8 +52,7 @@ def __init__(self, scheduler, cluster, interval=1000, startup_cost=1, self.cluster = cluster self.startup_cost = startup_cost self.scale_factor = scale_factor - self._adapt_callback = PeriodicCallback(self._adapt, interval, - self.scheduler.loop) + self._adapt_callback = PeriodicCallback(self._adapt, interval) self.scheduler.loop.add_callback(self._adapt_callback.start) self._adapting = False diff --git a/distributed/nanny.py b/distributed/nanny.py index 6db831cad82..b62200e4bbe 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -9,7 +9,7 @@ import threading from tornado import gen -from tornado.ioloop import IOLoop, TimeoutError, PeriodicCallback +from tornado.ioloop import IOLoop, TimeoutError from tornado.locks import Event from .comm import get_address_host, get_local_address_for, unparse_host_port @@ -20,7 +20,7 @@ from .process import AsyncProcess from .security import Security from .utils import (get_ip, mp_context, silence_logging, json_load_robust, - ignoring) + ignoring, PeriodicCallback) from .worker import _ncores, run, TOTAL_MEMORY @@ -97,7 +97,7 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, connection_args=self.connection_args, **kwargs) - pc = PeriodicCallback(self.memory_monitor, 100, io_loop=self.loop) + pc = PeriodicCallback(self.memory_monitor, 100) self.periodic_callbacks['memory'] = pc self._listen_address = listen_address diff --git a/distributed/stealing.py b/distributed/stealing.py index da5782bce90..541b935bafa 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -6,12 +6,10 @@ import os from time import time -from tornado.ioloop import PeriodicCallback - from .config import config from .core import CommClosedError from .diagnostics.plugin import SchedulerPlugin -from .utils import key_split, log_errors +from .utils import key_split, log_errors, PeriodicCallback try: from cytoolz import topk @@ -43,8 +41,7 @@ def __init__(self, scheduler): self.add_worker(worker=worker) pc = PeriodicCallback(callback=self.balance, - callback_time=100, - io_loop=self.scheduler.loop) + callback_time=100) self._pc = pc self.scheduler.loop.add_callback(pc.start) self.scheduler.plugins.append(self) diff --git a/distributed/utils.py b/distributed/utils.py index 6047e6da3be..cc9e7ab2180 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -7,6 +7,7 @@ import functools import json import logging +import math import multiprocessing import operator import os @@ -15,15 +16,15 @@ import socket from time import sleep from importlib import import_module - -import six import sys -import tblib.pickling_support import tempfile import threading import warnings import gc +import six +import tblib.pickling_support + from .compatibility import cache_from_source, getargspec, invalidate_caches, reload try: @@ -33,7 +34,9 @@ from dask import istask from toolz import memoize, valmap +import tornado from tornado import gen +from tornado.ioloop import IOLoop from .compatibility import Queue, PY3, PY2, get_thread_identity, unicode from .config import config @@ -949,6 +952,17 @@ def nbytes(frame, _bytes_like=(bytes, bytearray)): return frame.nbytes +def PeriodicCallback(callback, callback_time, io_loop=None): + """ + Wrapper around tornado.IOLoop.PeriodicCallback, for compatibility + with removal of the `io_loop` parameter in Tornado 5.0. + """ + if tornado.version_info >= (5,): + return tornado.ioloop.PeriodicCallback(callback, callback_time) + else: + return tornado.ioloop.PeriodicCallback(callback, callback_time, io_loop) + + @contextmanager def time_warn(duration, text): start = time() diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 7455d08b138..87439bda058 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -264,13 +264,10 @@ def background_read(): def run_scheduler(q, nputs, **kwargs): from distributed import Scheduler - from tornado.ioloop import PeriodicCallback # On Python 2.7 and Unix, fork() is used to spawn child processes, # so avoid inheriting the parent's IO loop. with pristine_loop() as loop: - PeriodicCallback(lambda: None, 500).start() - scheduler = Scheduler(validate=True, **kwargs) done = scheduler.start('127.0.0.1') @@ -284,12 +281,9 @@ def run_scheduler(q, nputs, **kwargs): def run_worker(q, scheduler_q, **kwargs): from distributed import Worker - from tornado.ioloop import PeriodicCallback with log_errors(): with pristine_loop() as loop: - PeriodicCallback(lambda: None, 500).start() - scheduler_addr = scheduler_q.get() worker = Worker(scheduler_addr, validate=True, **kwargs) loop.run_sync(lambda: worker._start(0)) @@ -306,12 +300,9 @@ def wait_until_closed(): def run_nanny(q, scheduler_q, **kwargs): from distributed import Nanny - from tornado.ioloop import PeriodicCallback with log_errors(): with pristine_loop() as loop: - PeriodicCallback(lambda: None, 500).start() - scheduler_addr = scheduler_q.get() worker = Nanny(scheduler_addr, validate=True, **kwargs) loop.run_sync(lambda: worker._start(0)) diff --git a/distributed/worker.py b/distributed/worker.py index 0fbd194c782..f20268ca6ad 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -22,7 +22,7 @@ from toolz import pluck from tornado.gen import Return from tornado import gen -from tornado.ioloop import IOLoop, PeriodicCallback +from tornado.ioloop import IOLoop from tornado.locks import Event from . import profile @@ -44,7 +44,7 @@ from .utils import (funcname, get_ip, has_arg, _maybe_complex, log_errors, ignoring, validate_key, mp_context, import_file, silence_logging, thread_state, json_load_robust, key_split, - format_bytes, DequeHandler, ThrottledGC) + format_bytes, DequeHandler, ThrottledGC, PeriodicCallback) from .utils_comm import pack_data, gather_from_workers _ncores = mp_context.cpu_count() @@ -186,15 +186,13 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, **kwargs) pc = PeriodicCallback(self.heartbeat, - self.heartbeat_interval, - io_loop=self.loop) + self.heartbeat_interval) self.periodic_callbacks['heartbeat'] = pc self._address = contact_address self._memory_monitoring = False pc = PeriodicCallback(self.memory_monitor, - self.memory_monitor_interval, - io_loop=self.loop) + self.memory_monitor_interval) self.periodic_callbacks['memory'] = pc @property @@ -267,8 +265,7 @@ def _register_with_scheduler(self): pid=os.getpid()) if self.death_timeout: diff = self.death_timeout - (time() - start) - future = gen.with_timeout(timedelta(seconds=diff), future, - io_loop=self.loop) + future = gen.with_timeout(timedelta(seconds=diff), future) response = yield future _end = time() middle = (_start + _end) / 2 @@ -385,8 +382,7 @@ def _close(self, report=True, timeout=10, nanny=True): with ignoring(EnvironmentError, gen.TimeoutError): if report: yield gen.with_timeout(timedelta(seconds=timeout), - self.scheduler.unregister(address=self.contact_address), - io_loop=self.loop) + self.scheduler.unregister(address=self.contact_address)) self.scheduler.close_rpc() if isinstance(self.executor, ThreadPoolExecutor): self.executor.shutdown(timeout=timeout) @@ -442,7 +438,8 @@ def executor_submit(self, key, function, *args, **kwargs): # logger.info("%s:%d Starts job %d, %s", self.ip, self.port, i, key) future = self.executor.submit(function, *args, **kwargs) pc = PeriodicCallback(lambda: logger.debug("future state: %s - %s", - key, future._state), 1000, io_loop=self.loop); pc.start() + key, future._state), 1000) + pc.start() try: yield future finally: @@ -1126,13 +1123,11 @@ def __init__(self, *args, **kwargs): WorkerBase.__init__(self, *args, **kwargs) pc = PeriodicCallback(self.trigger_profile, - config.get('profile-interval', 10), - io_loop=self.loop) + config.get('profile-interval', 10)) self.periodic_callbacks['profile'] = pc pc = PeriodicCallback(self.cycle_profile, - profile_cycle_interval, - io_loop=self.loop) + profile_cycle_interval) pc.start() self.periodic_callbacks['profile-cycle'] = pc