Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from tornado import gen
from tornado.gen import TimeoutError
from tornado.locks import Event, Condition
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.ioloop import IOLoop
from tornado.queues import Queue

from .batched import BatchedSend
Expand All @@ -52,7 +52,7 @@
from .worker import dumps_task, get_client, get_worker, secede
from .utils import (All, sync, funcname, ignoring, queue_to_iterator,
tokey, log_errors, str_graph, key_split, format_bytes, asciitable,
thread_state, no_default)
thread_state, no_default, PeriodicCallback)
from .versions import get_versions


Expand Down Expand Up @@ -634,7 +634,7 @@ def start(self, **kwargs):
while not self.loop._running:
sleep(0.001)

pc = PeriodicCallback(lambda: None, 1000, io_loop=self.loop)
pc = PeriodicCallback(lambda: None, 1000)
self.loop.add_callback(pc.start)
_set_global_client(self)
self.status = 'connecting'
Expand Down
10 changes: 5 additions & 5 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from toolz import assoc

from tornado import gen
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.ioloop import IOLoop
from tornado.locks import Event

from .comm import (connect, listen, CommClosedError,
Expand All @@ -22,7 +22,8 @@
from .config import config
from .metrics import time
from .system_monitor import SystemMonitor
from .utils import get_traceback, truncate_exception, ignoring, shutting_down
from .utils import (get_traceback, truncate_exception, ignoring, shutting_down,
PeriodicCallback)
from . import protocol


Expand Down Expand Up @@ -114,13 +115,12 @@ def __init__(self, handlers, connection_limit=512, deserialize=True,

self.periodic_callbacks = dict()

pc = PeriodicCallback(self.monitor.update, 500, io_loop=self.io_loop)
pc = PeriodicCallback(self.monitor.update, 500)
self.io_loop.add_callback(pc.start)
self.periodic_callbacks['monitor'] = pc

self._last_tick = time()
pc = PeriodicCallback(self._measure_tick, config.get('tick-time', 20),
io_loop=self.io_loop)
pc = PeriodicCallback(self._measure_tick, config.get('tick-time', 20))
self.io_loop.add_callback(pc.start)
self.periodic_callbacks['tick'] = pc

Expand Down
11 changes: 6 additions & 5 deletions distributed/counter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

from collections import defaultdict

from tornado.ioloop import PeriodicCallback, IOLoop
from tornado.ioloop import IOLoop

from .utils import PeriodicCallback


try:
from crick import TDigest
Expand All @@ -15,8 +18,7 @@ def __init__(self, loop=None, intervals=(5, 60, 3600)):
self.components = [TDigest() for i in self.intervals]

self.loop = loop or IOLoop.current()
self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000,
io_loop=self.loop)
self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000)
self.loop.add_callback(self._pc.start)

def add(self, item):
Expand Down Expand Up @@ -44,8 +46,7 @@ def __init__(self, loop=None, intervals=(5, 60, 3600)):
self.components = [defaultdict(lambda: 0) for i in self.intervals]

self.loop = loop or IOLoop.current()
self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000,
io_loop=self.loop)
self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000)
self.loop.add_callback(self._pc.start)

def add(self, item):
Expand Down
7 changes: 3 additions & 4 deletions distributed/deploy/adaptive.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import print_function, division, absolute_import

import logging
from ..utils import log_errors

from tornado import gen
from tornado.ioloop import PeriodicCallback

from ..utils import log_errors, PeriodicCallback

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,8 +52,7 @@ def __init__(self, scheduler, cluster, interval=1000, startup_cost=1,
self.cluster = cluster
self.startup_cost = startup_cost
self.scale_factor = scale_factor
self._adapt_callback = PeriodicCallback(self._adapt, interval,
self.scheduler.loop)
self._adapt_callback = PeriodicCallback(self._adapt, interval)
self.scheduler.loop.add_callback(self._adapt_callback.start)
self._adapting = False

Expand Down
6 changes: 3 additions & 3 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import threading

from tornado import gen
from tornado.ioloop import IOLoop, TimeoutError, PeriodicCallback
from tornado.ioloop import IOLoop, TimeoutError
from tornado.locks import Event

from .comm import get_address_host, get_local_address_for, unparse_host_port
Expand All @@ -20,7 +20,7 @@
from .process import AsyncProcess
from .security import Security
from .utils import (get_ip, mp_context, silence_logging, json_load_robust,
ignoring)
ignoring, PeriodicCallback)
from .worker import _ncores, run, TOTAL_MEMORY


Expand Down Expand Up @@ -97,7 +97,7 @@ def __init__(self, scheduler_ip=None, scheduler_port=None,
connection_args=self.connection_args,
**kwargs)

pc = PeriodicCallback(self.memory_monitor, 100, io_loop=self.loop)
pc = PeriodicCallback(self.memory_monitor, 100)
self.periodic_callbacks['memory'] = pc

self._listen_address = listen_address
Expand Down
7 changes: 2 additions & 5 deletions distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
import os
from time import time

from tornado.ioloop import PeriodicCallback

from .config import config
from .core import CommClosedError
from .diagnostics.plugin import SchedulerPlugin
from .utils import key_split, log_errors
from .utils import key_split, log_errors, PeriodicCallback

try:
from cytoolz import topk
Expand Down Expand Up @@ -43,8 +41,7 @@ def __init__(self, scheduler):
self.add_worker(worker=worker)

pc = PeriodicCallback(callback=self.balance,
callback_time=100,
io_loop=self.scheduler.loop)
callback_time=100)
self._pc = pc
self.scheduler.loop.add_callback(pc.start)
self.scheduler.plugins.append(self)
Expand Down
20 changes: 17 additions & 3 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import functools
import json
import logging
import math
import multiprocessing
import operator
import os
Expand All @@ -15,15 +16,15 @@
import socket
from time import sleep
from importlib import import_module

import six
import sys
import tblib.pickling_support
import tempfile
import threading
import warnings
import gc

import six
import tblib.pickling_support

from .compatibility import cache_from_source, getargspec, invalidate_caches, reload

try:
Expand All @@ -33,7 +34,9 @@

from dask import istask
from toolz import memoize, valmap
import tornado
from tornado import gen
from tornado.ioloop import IOLoop

from .compatibility import Queue, PY3, PY2, get_thread_identity, unicode
from .config import config
Expand Down Expand Up @@ -949,6 +952,17 @@ def nbytes(frame, _bytes_like=(bytes, bytearray)):
return frame.nbytes


def PeriodicCallback(callback, callback_time, io_loop=None):
"""
Wrapper around tornado.IOLoop.PeriodicCallback, for compatibility
with removal of the `io_loop` parameter in Tornado 5.0.
"""
if tornado.version_info >= (5,):
return tornado.ioloop.PeriodicCallback(callback, callback_time)
else:
return tornado.ioloop.PeriodicCallback(callback, callback_time, io_loop)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever actually provide an io_loop keyword? If not then can we remove this function entirely?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, yes, we do, in a lot of places :-)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems odd to me that we are allowed to ignore those imports in Tornado >= 5.0 but need them in <= 5.0.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because we fixed PeriodicCallback so that the IO loop is looked up when its start method is called. Before 5.0 the IO loop is set in the constructor, and we call it from another thread.



@contextmanager
def time_warn(duration, text):
start = time()
Expand Down
9 changes: 0 additions & 9 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,10 @@ def background_read():

def run_scheduler(q, nputs, **kwargs):
from distributed import Scheduler
from tornado.ioloop import PeriodicCallback

# On Python 2.7 and Unix, fork() is used to spawn child processes,
# so avoid inheriting the parent's IO loop.
with pristine_loop() as loop:
PeriodicCallback(lambda: None, 500).start()

scheduler = Scheduler(validate=True, **kwargs)
done = scheduler.start('127.0.0.1')

Expand All @@ -284,12 +281,9 @@ def run_scheduler(q, nputs, **kwargs):

def run_worker(q, scheduler_q, **kwargs):
from distributed import Worker
from tornado.ioloop import PeriodicCallback

with log_errors():
with pristine_loop() as loop:
PeriodicCallback(lambda: None, 500).start()

scheduler_addr = scheduler_q.get()
worker = Worker(scheduler_addr, validate=True, **kwargs)
loop.run_sync(lambda: worker._start(0))
Expand All @@ -306,12 +300,9 @@ def wait_until_closed():

def run_nanny(q, scheduler_q, **kwargs):
from distributed import Nanny
from tornado.ioloop import PeriodicCallback

with log_errors():
with pristine_loop() as loop:
PeriodicCallback(lambda: None, 500).start()

scheduler_addr = scheduler_q.get()
worker = Nanny(scheduler_addr, validate=True, **kwargs)
loop.run_sync(lambda: worker._start(0))
Expand Down
25 changes: 10 additions & 15 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from toolz import pluck
from tornado.gen import Return
from tornado import gen
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.ioloop import IOLoop
from tornado.locks import Event

from . import profile
Expand All @@ -44,7 +44,7 @@
from .utils import (funcname, get_ip, has_arg, _maybe_complex, log_errors,
ignoring, validate_key, mp_context, import_file,
silence_logging, thread_state, json_load_robust, key_split,
format_bytes, DequeHandler, ThrottledGC)
format_bytes, DequeHandler, ThrottledGC, PeriodicCallback)
from .utils_comm import pack_data, gather_from_workers

_ncores = mp_context.cpu_count()
Expand Down Expand Up @@ -186,15 +186,13 @@ def __init__(self, scheduler_ip=None, scheduler_port=None,
**kwargs)

pc = PeriodicCallback(self.heartbeat,
self.heartbeat_interval,
io_loop=self.loop)
self.heartbeat_interval)
self.periodic_callbacks['heartbeat'] = pc
self._address = contact_address

self._memory_monitoring = False
pc = PeriodicCallback(self.memory_monitor,
self.memory_monitor_interval,
io_loop=self.loop)
self.memory_monitor_interval)
self.periodic_callbacks['memory'] = pc

@property
Expand Down Expand Up @@ -267,8 +265,7 @@ def _register_with_scheduler(self):
pid=os.getpid())
if self.death_timeout:
diff = self.death_timeout - (time() - start)
future = gen.with_timeout(timedelta(seconds=diff), future,
io_loop=self.loop)
future = gen.with_timeout(timedelta(seconds=diff), future)
response = yield future
_end = time()
middle = (_start + _end) / 2
Expand Down Expand Up @@ -385,8 +382,7 @@ def _close(self, report=True, timeout=10, nanny=True):
with ignoring(EnvironmentError, gen.TimeoutError):
if report:
yield gen.with_timeout(timedelta(seconds=timeout),
self.scheduler.unregister(address=self.contact_address),
io_loop=self.loop)
self.scheduler.unregister(address=self.contact_address))
self.scheduler.close_rpc()
if isinstance(self.executor, ThreadPoolExecutor):
self.executor.shutdown(timeout=timeout)
Expand Down Expand Up @@ -442,7 +438,8 @@ def executor_submit(self, key, function, *args, **kwargs):
# logger.info("%s:%d Starts job %d, %s", self.ip, self.port, i, key)
future = self.executor.submit(function, *args, **kwargs)
pc = PeriodicCallback(lambda: logger.debug("future state: %s - %s",
key, future._state), 1000, io_loop=self.loop); pc.start()
key, future._state), 1000)
pc.start()
try:
yield future
finally:
Expand Down Expand Up @@ -1126,13 +1123,11 @@ def __init__(self, *args, **kwargs):
WorkerBase.__init__(self, *args, **kwargs)

pc = PeriodicCallback(self.trigger_profile,
config.get('profile-interval', 10),
io_loop=self.loop)
config.get('profile-interval', 10))
self.periodic_callbacks['profile'] = pc

pc = PeriodicCallback(self.cycle_profile,
profile_cycle_interval,
io_loop=self.loop)
profile_cycle_interval)
pc.start()
self.periodic_callbacks['profile-cycle'] = pc

Expand Down