From 3b11de7de45ca325c3745129fae2efc6039b2720 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 14 Jun 2017 13:01:36 +0200 Subject: [PATCH 1/3] Fix PeriodicCallback usage for Tornado 5 The io_loop argument to PeriodicCallback is removed in Tornado 5.0. --- distributed/client.py | 7 +-- distributed/core.py | 9 ++-- distributed/counter.py | 11 ++-- distributed/deploy/adaptive.py | 7 ++- distributed/diagnostics/progress.py | 1 - distributed/stealing.py | 7 +-- distributed/utils.py | 79 +++++++++++++++++++++++++++-- distributed/utils_test.py | 9 ---- distributed/worker.py | 13 ++--- 9 files changed, 102 insertions(+), 41 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 18dfde53831..366c4bc31b6 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -31,7 +31,7 @@ from tornado import gen from tornado.gen import TimeoutError from tornado.locks import Event, Condition -from tornado.ioloop import IOLoop, PeriodicCallback +from tornado.ioloop import IOLoop from tornado.queues import Queue from .batched import BatchedSend @@ -47,7 +47,8 @@ from .security import Security from .worker import dumps_task from .utils import (All, sync, funcname, ignoring, queue_to_iterator, - tokey, log_errors, str_graph, key_split, format_bytes) + tokey, log_errors, str_graph, key_split, format_bytes, + PeriodicCallback) from .versions import get_versions @@ -563,7 +564,7 @@ def start(self, asynchronous=None, **kwargs): self._should_close_loop = True while not self.loop._running: sleep(0.001) - pc = PeriodicCallback(lambda: None, 1000, io_loop=self.loop) + pc = PeriodicCallback(lambda: None, 1000) self.loop.add_callback(pc.start) _set_global_client(self) if asynchronous: diff --git a/distributed/core.py b/distributed/core.py index df8a2a01e88..f79e28853ec 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -12,7 +12,7 @@ from toolz import assoc from tornado import gen -from tornado.ioloop import IOLoop, PeriodicCallback +from tornado.ioloop import IOLoop from tornado.locks import Event from .comm import (connect, listen, CommClosedError, @@ -20,7 +20,8 @@ unparse_host_port, get_address_host_port) from .metrics import time from .system_monitor import SystemMonitor -from .utils import get_traceback, truncate_exception, ignoring, shutting_down +from .utils import (get_traceback, truncate_exception, ignoring, shutting_down, + PeriodicCallback) from . import protocol @@ -109,11 +110,11 @@ def __init__(self, handlers, connection_limit=512, deserialize=True, self.events = defaultdict(lambda: deque(maxlen=10000)) self.event_counts = defaultdict(lambda: 0) - pc = PeriodicCallback(self.monitor.update, 500, io_loop=self.loop) + pc = PeriodicCallback(self.monitor.update, 500) self.loop.add_callback(pc.start) if self.digests is not None: self._last_tick = time() - self._tick_pc = PeriodicCallback(self._measure_tick, 20, io_loop=self.loop) + self._tick_pc = PeriodicCallback(self._measure_tick, 20) self.loop.add_callback(self._tick_pc.start) self.__stopped = False diff --git a/distributed/counter.py b/distributed/counter.py index 14b3bcab72f..8d76def7189 100644 --- a/distributed/counter.py +++ b/distributed/counter.py @@ -2,7 +2,10 @@ from collections import defaultdict -from tornado.ioloop import PeriodicCallback, IOLoop +from tornado.ioloop import IOLoop + +from .utils import PeriodicCallback + try: from crick import TDigest @@ -15,8 +18,7 @@ def __init__(self, loop=None, intervals=(5, 60, 3600)): self.components = [TDigest() for i in self.intervals] self.loop = loop or IOLoop.current() - self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000, - io_loop=self.loop) + self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000) self.loop.add_callback(self._pc.start) def add(self, item): @@ -44,8 +46,7 @@ def __init__(self, loop=None, intervals=(5, 60, 3600)): self.components = [defaultdict(lambda: 0) for i in self.intervals] self.loop = loop or IOLoop.current() - self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000, - io_loop=self.loop) + self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000) self.loop.add_callback(self._pc.start) def add(self, item): diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index 5b2ea4a144d..f58235cb826 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -1,10 +1,10 @@ from __future__ import print_function, division, absolute_import import logging -from ..utils import log_errors from tornado import gen -from tornado.ioloop import PeriodicCallback + +from ..utils import log_errors, PeriodicCallback logger = logging.getLogger(__name__) @@ -33,8 +33,7 @@ def __init__(self, scheduler, cluster, interval=1000, startup_cost=1): self.scheduler = scheduler self.cluster = cluster self.startup_cost = startup_cost - self._adapt_callback = PeriodicCallback(self._adapt, interval, - self.scheduler.loop) + self._adapt_callback = PeriodicCallback(self._adapt, interval) self.scheduler.loop.add_callback(self._adapt_callback.start) self._adapting = False diff --git a/distributed/diagnostics/progress.py b/distributed/diagnostics/progress.py index d4305187580..987cc720a98 100644 --- a/distributed/diagnostics/progress.py +++ b/distributed/diagnostics/progress.py @@ -9,7 +9,6 @@ import dask from toolz import valmap, groupby, concat -from tornado.ioloop import PeriodicCallback, IOLoop from tornado import gen from .plugin import SchedulerPlugin diff --git a/distributed/stealing.py b/distributed/stealing.py index 79f371d2aad..40c62afebdd 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -6,12 +6,10 @@ import os from time import time -from tornado.ioloop import PeriodicCallback - from .config import config from .core import CommClosedError from .diagnostics.plugin import SchedulerPlugin -from .utils import key_split, log_errors +from .utils import key_split, log_errors, PeriodicCallback try: from cytoolz import topk @@ -43,8 +41,7 @@ def __init__(self, scheduler): self.add_worker(worker=worker) self._pc = PeriodicCallback(callback=self.balance, - callback_time=100, - io_loop=self.scheduler.loop) + callback_time=100) self.scheduler.loop.add_callback(self._pc.start) self.scheduler.plugins.append(self) self.scheduler.extensions['stealing'] = self diff --git a/distributed/utils.py b/distributed/utils.py index 1274e5529bf..d550e5c4e5b 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -4,24 +4,25 @@ from collections import Iterable from contextlib import contextmanager from datetime import timedelta +from importlib import import_module import inspect import functools import logging +import math import multiprocessing import operator import os import re import shutil import socket -from importlib import import_module - -import six import sys -import tblib.pickling_support import tempfile import threading import warnings +import six +import tblib.pickling_support + from .compatibility import cache_from_source, getargspec, invalidate_caches, reload try: @@ -32,6 +33,7 @@ from dask import istask from toolz import memoize, valmap from tornado import gen +from tornado.ioloop import IOLoop from .compatibility import Queue, PY3, PY2, get_thread_identity, unicode from .config import config @@ -845,3 +847,72 @@ def nbytes(frame): return len(frame) else: return frame.nbytes + + +class PeriodicCallback(object): + """ + Modified version of tornado.IOLoop.PeriodicCallback that doesn't + store the "current" IOLoop in its constructor. The "current" IOLoop + is per-thread and we typically instantiate a PeriodicCallback in + another thread than the one running the event loop. + + Schedules the given callback to be called periodically. + + The callback is called every ``callback_time`` milliseconds. + Note that the timeout is given in milliseconds, while most other + time-related functions in Tornado use seconds. + + If the callback runs for longer than ``callback_time`` milliseconds, + subsequent invocations will be skipped to get back on schedule. + + `start` must be called after the `PeriodicCallback` is created. + """ + def __init__(self, callback, callback_time, io_loop=None): + self.callback = callback + if callback_time <= 0: + raise ValueError("Periodic callback must have a positive callback_time") + self.callback_time = callback_time + self._running = False + self._timeout = None + + def start(self): + """Starts the timer.""" + self.io_loop = IOLoop.current(instance=False) + self._running = True + self._next_timeout = self.io_loop.time() + self._schedule_next() + + def stop(self): + """Stops the timer.""" + self._running = False + if self._timeout is not None: + self.io_loop.remove_timeout(self._timeout) + self._timeout = None + + def is_running(self): + """Return True if this `.PeriodicCallback` has been started. + + .. versionadded:: 4.1 + """ + return self._running + + def _run(self): + if not self._running: + return + try: + return self.callback() + except Exception: + self.io_loop.handle_callback_exception(self.callback) + finally: + self._schedule_next() + + def _schedule_next(self): + if self._running: + current_time = self.io_loop.time() + + if self._next_timeout <= current_time: + callback_time_sec = self.callback_time / 1000.0 + self._next_timeout += (math.floor((current_time - self._next_timeout) / + callback_time_sec) + 1) * callback_time_sec + + self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 9dad79e9d52..a34db81d4a4 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -241,13 +241,10 @@ def background_read(): def run_scheduler(q, nputs, **kwargs): from distributed import Scheduler - from tornado.ioloop import IOLoop, PeriodicCallback # On Python 2.7 and Unix, fork() is used to spawn child processes, # so avoid inheriting the parent's IO loop. with pristine_loop() as loop: - PeriodicCallback(lambda: None, 500).start() - scheduler = Scheduler(validate=True, **kwargs) done = scheduler.start('127.0.0.1') @@ -261,12 +258,9 @@ def run_scheduler(q, nputs, **kwargs): def run_worker(q, scheduler_q, **kwargs): from distributed import Worker - from tornado.ioloop import IOLoop, PeriodicCallback with log_errors(): with pristine_loop() as loop: - PeriodicCallback(lambda: None, 500).start() - scheduler_addr = scheduler_q.get() worker = Worker(scheduler_addr, validate=True, **kwargs) loop.run_sync(lambda: worker._start(0)) @@ -279,12 +273,9 @@ def run_worker(q, scheduler_q, **kwargs): def run_nanny(q, scheduler_q, **kwargs): from distributed import Nanny - from tornado.ioloop import IOLoop, PeriodicCallback with log_errors(): with pristine_loop() as loop: - PeriodicCallback(lambda: None, 500).start() - scheduler_addr = scheduler_q.get() worker = Nanny(scheduler_addr, validate=True, **kwargs) loop.run_sync(lambda: worker._start(0)) diff --git a/distributed/worker.py b/distributed/worker.py index cf9d110de14..dcac0ee82aa 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -20,7 +20,7 @@ from toolz import pluck from tornado.gen import Return from tornado import gen -from tornado.ioloop import IOLoop, PeriodicCallback +from tornado.ioloop import IOLoop from tornado.locks import Event from .batched import BatchedSend @@ -39,7 +39,7 @@ from .threadpoolexecutor import ThreadPoolExecutor from .utils import (funcname, get_ip, has_arg, _maybe_complex, log_errors, ignoring, validate_key, mp_context, import_file, - silence_logging) + silence_logging, PeriodicCallback) from .utils_comm import pack_data, gather_from_workers _ncores = mp_context.cpu_count() @@ -160,8 +160,7 @@ def __init__(self, scheduler_ip, scheduler_port=None, ncores=None, **kwargs) self.heartbeat_callback = PeriodicCallback(self.heartbeat, - self.heartbeat_interval, - io_loop=self.loop) + self.heartbeat_interval) @property def worker_address(self): @@ -363,8 +362,10 @@ def executor_submit(self, key, function, *args, **kwargs): job_counter[0] += 1 # logger.info("%s:%d Starts job %d, %s", self.ip, self.port, i, key) future = self.executor.submit(function, *args, **kwargs) - pc = PeriodicCallback(lambda: logger.debug("future state: %s - %s", - key, future._state), 1000, io_loop=self.loop); pc.start() + pc = PeriodicCallback( + lambda: logger.debug("future state: %s - %s", key, future._state), + 1000) + pc.start() try: yield future finally: From 4a5b02dffbf7c6b37d6cb6a3330c26a8003352e3 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 23 Oct 2017 15:38:56 +0200 Subject: [PATCH 2/3] Remove more io_loop argument references --- distributed/nanny.py | 6 +++--- distributed/worker.py | 15 +++++---------- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 6db831cad82..b62200e4bbe 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -9,7 +9,7 @@ import threading from tornado import gen -from tornado.ioloop import IOLoop, TimeoutError, PeriodicCallback +from tornado.ioloop import IOLoop, TimeoutError from tornado.locks import Event from .comm import get_address_host, get_local_address_for, unparse_host_port @@ -20,7 +20,7 @@ from .process import AsyncProcess from .security import Security from .utils import (get_ip, mp_context, silence_logging, json_load_robust, - ignoring) + ignoring, PeriodicCallback) from .worker import _ncores, run, TOTAL_MEMORY @@ -97,7 +97,7 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, connection_args=self.connection_args, **kwargs) - pc = PeriodicCallback(self.memory_monitor, 100, io_loop=self.loop) + pc = PeriodicCallback(self.memory_monitor, 100) self.periodic_callbacks['memory'] = pc self._listen_address = listen_address diff --git a/distributed/worker.py b/distributed/worker.py index 94e540f89c5..4bb91a39203 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -192,8 +192,7 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, self._memory_monitoring = False pc = PeriodicCallback(self.memory_monitor, - self.memory_monitor_interval, - io_loop=self.loop) + self.memory_monitor_interval) self.periodic_callbacks['memory'] = pc @property @@ -266,8 +265,7 @@ def _register_with_scheduler(self): pid=os.getpid()) if self.death_timeout: diff = self.death_timeout - (time() - start) - future = gen.with_timeout(timedelta(seconds=diff), future, - io_loop=self.loop) + future = gen.with_timeout(timedelta(seconds=diff), future) response = yield future _end = time() middle = (_start + _end) / 2 @@ -384,8 +382,7 @@ def _close(self, report=True, timeout=10, nanny=True): with ignoring(EnvironmentError, gen.TimeoutError): if report: yield gen.with_timeout(timedelta(seconds=timeout), - self.scheduler.unregister(address=self.contact_address), - io_loop=self.loop) + self.scheduler.unregister(address=self.contact_address)) self.scheduler.close_rpc() if isinstance(self.executor, ThreadPoolExecutor): self.executor.shutdown(timeout=timeout) @@ -1125,13 +1122,11 @@ def __init__(self, *args, **kwargs): WorkerBase.__init__(self, *args, **kwargs) pc = PeriodicCallback(self.trigger_profile, - config.get('profile-interval', 10), - io_loop=self.loop) + config.get('profile-interval', 10)) self.periodic_callbacks['profile'] = pc pc = PeriodicCallback(self.cycle_profile, - profile_cycle_interval, - io_loop=self.loop) + profile_cycle_interval) pc.start() self.periodic_callbacks['profile-cycle'] = pc From 4c076ffaa8cdad448d38ebf1832f6143cb5b3bd9 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 30 Oct 2017 18:25:16 +0100 Subject: [PATCH 3/3] Simplify compatibility code after https://github.com/tornadoweb/tornado/pull/2179 was merged --- distributed/utils.py | 73 +++++--------------------------------------- 1 file changed, 8 insertions(+), 65 deletions(-) diff --git a/distributed/utils.py b/distributed/utils.py index 0d8dceb587e..cc9e7ab2180 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -34,6 +34,7 @@ from dask import istask from toolz import memoize, valmap +import tornado from tornado import gen from tornado.ioloop import IOLoop @@ -951,73 +952,15 @@ def nbytes(frame, _bytes_like=(bytes, bytearray)): return frame.nbytes -class PeriodicCallback(object): +def PeriodicCallback(callback, callback_time, io_loop=None): """ - Modified version of tornado.IOLoop.PeriodicCallback that doesn't - store the "current" IOLoop in its constructor. The "current" IOLoop - is per-thread and we typically instantiate a PeriodicCallback in - another thread than the one running the event loop. - - Schedules the given callback to be called periodically. - - The callback is called every ``callback_time`` milliseconds. - Note that the timeout is given in milliseconds, while most other - time-related functions in Tornado use seconds. - - If the callback runs for longer than ``callback_time`` milliseconds, - subsequent invocations will be skipped to get back on schedule. - - `start` must be called after the `PeriodicCallback` is created. + Wrapper around tornado.IOLoop.PeriodicCallback, for compatibility + with removal of the `io_loop` parameter in Tornado 5.0. """ - def __init__(self, callback, callback_time, io_loop=None): - self.callback = callback - if callback_time <= 0: - raise ValueError("Periodic callback must have a positive callback_time") - self.callback_time = callback_time - self._running = False - self._timeout = None - - def start(self): - """Starts the timer.""" - self.io_loop = IOLoop.current(instance=False) - self._running = True - self._next_timeout = self.io_loop.time() - self._schedule_next() - - def stop(self): - """Stops the timer.""" - self._running = False - if self._timeout is not None: - self.io_loop.remove_timeout(self._timeout) - self._timeout = None - - def is_running(self): - """Return True if this `.PeriodicCallback` has been started. - - .. versionadded:: 4.1 - """ - return self._running - - def _run(self): - if not self._running: - return - try: - return self.callback() - except Exception: - self.io_loop.handle_callback_exception(self.callback) - finally: - self._schedule_next() - - def _schedule_next(self): - if self._running: - current_time = self.io_loop.time() - - if self._next_timeout <= current_time: - callback_time_sec = self.callback_time / 1000.0 - self._next_timeout += (math.floor((current_time - self._next_timeout) / - callback_time_sec) + 1) * callback_time_sec - - self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run) + if tornado.version_info >= (5,): + return tornado.ioloop.PeriodicCallback(callback, callback_time) + else: + return tornado.ioloop.PeriodicCallback(callback, callback_time, io_loop) @contextmanager