Skip to content

Commit

Permalink
Merge pull request #1129 from unkcpz/gsoc/new-tornado
Browse files Browse the repository at this point in the history
circus with new tornado>5.0.2
  • Loading branch information
k4nar committed Jul 9, 2020
2 parents ec8e079 + 518714a commit 2bedab0
Show file tree
Hide file tree
Showing 33 changed files with 86 additions and 46 deletions.
2 changes: 1 addition & 1 deletion circus/arbiter.py
Expand Up @@ -242,7 +242,7 @@ def running(self):
def _init_context(self, context):
self.context = context or zmq.Context.instance()
if self.loop is None:
self.loop = ioloop.IOLoop.instance()
self.loop = ioloop.IOLoop.current()
self.ctrl = Controller(self.endpoint, self.multicast_endpoint,
self.context, self.loop, self, self.check_delay,
self.endpoint_owner)
Expand Down
15 changes: 12 additions & 3 deletions circus/client.py
Expand Up @@ -5,6 +5,7 @@
import zmq.utils.jsonapi as json
from zmq.eventloop.zmqstream import ZMQStream
import tornado
from tornado import concurrent

from circus.exc import CallError
from circus.util import DEFAULT_ENDPOINT_DEALER, get_connection, to_bytes
Expand Down Expand Up @@ -35,7 +36,7 @@ def __init__(self, context=None, endpoint=DEFAULT_ENDPOINT_DEALER,
get_connection(self.socket, endpoint, ssh_server, ssh_keyfile)
self._timeout = timeout
self.timeout = timeout * 1000
self.stream = ZMQStream(self.socket, tornado.ioloop.IOLoop.instance())
self.stream = ZMQStream(self.socket, tornado.ioloop.IOLoop.current())

def _init_context(self, context):
self.context = context or zmq.Context.instance()
Expand Down Expand Up @@ -65,12 +66,20 @@ def call(self, cmd):
raise CallError(str(e))

try:
yield tornado.gen.Task(self.stream.send, cmd)
future = concurrent.Future()

def cb(msg, status):
future.set_result(msg)
self.stream.send(cmd, callback=cb)
yield future
except zmq.ZMQError as e:
raise CallError(str(e))

while True:
messages = yield tornado.gen.Task(self.stream.on_recv)
future = concurrent.Future()
self.stream.on_recv(future.set_result)
messages = yield future

for message in messages:
try:
res = json.loads(message)
Expand Down
2 changes: 1 addition & 1 deletion circus/controller.py
Expand Up @@ -109,7 +109,7 @@ def start(self):
# so with no period callback to manage_watchers
# is probably "unit tests only"
self.caller = ioloop.PeriodicCallback(self.manage_watchers,
self.check_delay, self.loop)
self.check_delay)
self.caller.start()
self.started = True

Expand Down
2 changes: 1 addition & 1 deletion circus/green/arbiter.py
Expand Up @@ -8,6 +8,6 @@
class Arbiter(_Arbiter):
def _init_context(self, context):
self.context = context or Context.instance()
self.loop = ioloop.IOLoop.instance()
self.loop = ioloop.IOLoop.current()
self.ctrl = Controller(self.endpoint, self.multicast_endpoint,
self.context, self.loop, self, self.check_delay)
3 changes: 2 additions & 1 deletion circus/green/controller.py
Expand Up @@ -14,7 +14,8 @@ def _init_stream(self):
self.stream.on_recv(self.handle_message)

def start(self):
self.loop.make_current()
self.initialize()
self.caller = ioloop.PeriodicCallback(self.arbiter.manage_watchers,
self.check_delay, self.loop)
self.check_delay)
self.caller.start()
1 change: 1 addition & 0 deletions circus/plugins/__init__.py
Expand Up @@ -58,6 +58,7 @@ def initialize(self):

@debuglog
def start(self):
self.loop.make_current()
if not self.active:
raise ValueError('Will not start an inactive plugin')
self.handle_init()
Expand Down
3 changes: 1 addition & 2 deletions circus/plugins/command_reloader.py
Expand Up @@ -48,8 +48,7 @@ def look_after(self):

def handle_init(self):
self.period = ioloop.PeriodicCallback(self.look_after,
self.loop_rate * 1000,
self.loop)
self.loop_rate * 1000)
self.period.start()

def handle_stop(self):
Expand Down
2 changes: 1 addition & 1 deletion circus/plugins/statsd.py
Expand Up @@ -81,7 +81,7 @@ def __init__(self, *args, **config):

def handle_init(self):
self.period = ioloop.PeriodicCallback(self.look_after,
self.loop_rate * 1000, self.loop)
self.loop_rate * 1000)
self.period.start()

def handle_stop(self):
Expand Down
3 changes: 1 addition & 2 deletions circus/plugins/watchdog.py
Expand Up @@ -79,8 +79,7 @@ def handle_init(self):
- create the listening UDP socket
"""
self.period = ioloop.PeriodicCallback(self.look_after,
self.loop_rate * 1000,
self.loop)
self.loop_rate * 1000)
self.period.start()
self._bind_socket()

Expand Down
5 changes: 2 additions & 3 deletions circus/stats/collector.py
Expand Up @@ -12,7 +12,7 @@ class BaseStatsCollector(ioloop.PeriodicCallback):

def __init__(self, streamer, name, callback_time=1., io_loop=None):
ioloop.PeriodicCallback.__init__(self, self._callback,
callback_time * 1000, io_loop)
callback_time * 1000)
self.streamer = streamer
self.name = name

Expand Down Expand Up @@ -110,8 +110,7 @@ def __init__(self, streamer, name, callback_time=1., io_loop=None):
callback_time, io_loop)
self._rstats = defaultdict(int)
self.sockets = [sock for sock, address, fd in self.streamer.sockets]
self._p = ioloop.PeriodicCallback(self._select, _LOOP_RES,
io_loop=io_loop)
self._p = ioloop.PeriodicCallback(self._select, _LOOP_RES)

def start(self):
self._p.start()
Expand Down
2 changes: 1 addition & 1 deletion circus/stats/streamer.py
Expand Up @@ -27,7 +27,7 @@ def __init__(self, endpoint, pubsub_endoint, stats_endpoint,
self.sub_socket = self.ctx.socket(zmq.SUB)
self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.topic)
self.sub_socket.connect(self.pubsub_endpoint)
self.loop = loop or ioloop.IOLoop.instance()
self.loop = loop or ioloop.IOLoop.current()
self.substream = zmqstream.ZMQStream(self.sub_socket, self.loop)
self.substream.on_recv(self.handle_recv)
self.client = CircusClient(context=self.ctx, endpoint=endpoint,
Expand Down
2 changes: 1 addition & 1 deletion circus/stream/redirector.py
Expand Up @@ -42,7 +42,7 @@ def __init__(self, stdout_redirect, stderr_redirect, buffer=1024,
self._active = {}
self.redirect = {'stdout': stdout_redirect, 'stderr': stderr_redirect}
self.buffer = buffer
self.loop = loop or ioloop.IOLoop.instance()
self.loop = loop or ioloop.IOLoop.current()

def _start_one(self, fd, stream_name, process, pipe):
if fd not in self._active:
Expand Down
7 changes: 4 additions & 3 deletions circus/tests/support.py
Expand Up @@ -11,11 +11,12 @@
import multiprocessing
import socket
import sysconfig
import concurrent

from unittest import skip, skipIf, TestCase, TestSuite, findTestCases # noqa: F401

from tornado.testing import AsyncTestCase
import mock
from unittest import mock
import tornado

from circus import get_arbiter
Expand Down Expand Up @@ -50,7 +51,7 @@ def __init__(self, name):

def get_ioloop():
from tornado import ioloop
return ioloop.IOLoop.instance()
return ioloop.IOLoop.current()


def get_available_port():
Expand Down Expand Up @@ -473,7 +474,7 @@ def stop(self):
pass


class MagicMockFuture(mock.MagicMock, tornado.concurrent.Future):
class MagicMockFuture(mock.MagicMock, concurrent.futures.Future):

def cancel(self):
return False
Expand Down
25 changes: 20 additions & 5 deletions circus/tests/test_arbiter.py
Expand Up @@ -5,7 +5,7 @@
from tempfile import mkstemp
from time import time
import zmq.utils.jsonapi as json
import mock
from unittest import mock
from urllib.parse import urlparse

from circus.arbiter import Arbiter
Expand Down Expand Up @@ -500,6 +500,7 @@ def test_plugins(self):
loop=get_ioloop())

def incr_processes(cli):
# return a coroutine if cli is Async
return cli.send_message('incr', name='test')

# wait for the plugin to be started
Expand All @@ -511,15 +512,15 @@ def incr_processes(cli):
res = yield cli.send_message('list', name='test')
self.assertEqual(len(res.get('pids')), 1)

incr_processes(cli)
yield incr_processes(cli)
res = yield cli.send_message('list', name='test')
self.assertEqual(len(res.get('pids')), 2)
# wait for the plugin to receive the signal
res = yield async_poll_for(datafile, 'test:spawn')
self.assertTrue(res)
truncate_file(datafile)

incr_processes(cli)
yield incr_processes(cli)
res = yield cli.send_message('list', name='test')
self.assertEqual(len(res.get('pids')), 3)

Expand Down Expand Up @@ -595,8 +596,7 @@ def _test_start_watchers_warmup_delay(self):
@tornado.gen.coroutine
def _sleep(duration):
called.append(duration)
loop = get_ioloop()
yield tornado.gen.Task(loop.add_timeout, time() + duration)
yield tornado.gen.sleep(duration)

watcher_mod.tornado_sleep = _sleep

Expand Down Expand Up @@ -633,6 +633,21 @@ def callback(*args):

self.assertEqual(callee.call_count, 1)

def test_start_with_callback_delay(self):
controller = "tcp://127.0.0.1:%d" % get_available_port()
sub = "tcp://127.0.0.1:%d" % get_available_port()
arbiter = Arbiter([], controller, sub, check_delay=1)

callee = mock.MagicMock()

def callback(*args):
callee()
arbiter.stop()

arbiter.start(cb=callback)

self.assertEqual(callee.call_count, 1)

@tornado.testing.gen_test
def test_start_with_callback_and_given_loop(self):
controller = "tcp://127.0.0.1:%d" % get_available_port()
Expand Down
2 changes: 1 addition & 1 deletion circus/tests/test_circusctl.py
@@ -1,6 +1,6 @@
import subprocess
import shlex
from mock import patch
from unittest.mock import patch
from multiprocessing import Process, Queue

from tornado.testing import gen_test
Expand Down
2 changes: 1 addition & 1 deletion circus/tests/test_config.py
@@ -1,6 +1,6 @@
import os
import signal
from mock import patch
from unittest.mock import patch

from circus import logger
from circus.arbiter import Arbiter
Expand Down
2 changes: 1 addition & 1 deletion circus/tests/test_controller.py
Expand Up @@ -4,7 +4,7 @@
from circus import logger
import circus.controller

import mock
from unittest import mock


class TestController(TestCase):
Expand Down
2 changes: 1 addition & 1 deletion circus/tests/test_plugin_command_reloader.py
@@ -1,4 +1,4 @@
from mock import patch
from unittest.mock import patch

from circus.plugins.command_reloader import CommandReloader
from circus.tests.support import TestCircus, EasyTestSuite
Expand Down
2 changes: 1 addition & 1 deletion circus/tests/test_plugin_flapping.py
@@ -1,4 +1,4 @@
from mock import patch
from unittest.mock import patch

from circus.tests.support import TestCircus, EasyTestSuite
from circus.plugins.flapping import Flapping
Expand Down
4 changes: 2 additions & 2 deletions circus/tests/test_reloadconfig.py
Expand Up @@ -55,10 +55,10 @@ def _tear_down_arbiter(self, a):
a.sockets.close_all()

def get_new_ioloop(self):
return tornado.ioloop.IOLoop.instance()
return tornado.ioloop.IOLoop.current()

def _load_base_arbiter(self, name='reload_base'):
loop = tornado.ioloop.IOLoop.instance()
loop = tornado.ioloop.IOLoop.current()
a = Arbiter.load_from_config(_CONF[name], loop=loop)
a.evpub_socket = FakeSocket()
# initialize watchers
Expand Down
2 changes: 1 addition & 1 deletion circus/tests/test_sockets.py
Expand Up @@ -5,7 +5,7 @@
import IN
except ImportError:
pass
import mock
from unittest import mock
import fcntl

from circus.tests.support import TestCase, skipIf, EasyTestSuite, IS_WINDOWS
Expand Down
1 change: 1 addition & 0 deletions circus/tests/test_stats_collector.py
Expand Up @@ -72,6 +72,7 @@ def __init__(this, streamer):
this.daemon = True

def run(self):
self.loop.make_current()
collector = collector_class(
self.streamer, 'sockets', callback_time=0.1,
io_loop=self.loop)
Expand Down
2 changes: 1 addition & 1 deletion circus/tests/test_stats_publisher.py
@@ -1,4 +1,4 @@
import mock
from unittest import mock

import zmq
import zmq.utils.jsonapi as json
Expand Down
2 changes: 1 addition & 1 deletion circus/tests/test_stats_streamer.py
@@ -1,7 +1,7 @@
import os
import tempfile

import mock
from unittest import mock

from circus.tests.support import TestCircus, EasyTestSuite
from circus.stats.streamer import StatsStreamer
Expand Down
2 changes: 1 addition & 1 deletion circus/tests/test_util.py
Expand Up @@ -13,7 +13,7 @@

import psutil
from psutil import Popen
import mock
from unittest import mock

from circus.tests.support import (TestCase, EasyTestSuite, skipIf,
IS_WINDOWS, SLEEP)
Expand Down
2 changes: 1 addition & 1 deletion circus/tests/test_validate_option.py
@@ -1,5 +1,5 @@
from circus.tests.support import TestCase, EasyTestSuite, IS_WINDOWS
from mock import patch
from unittest.mock import patch

from circus.commands.util import validate_option
from circus.exc import MessageError
Expand Down
4 changes: 2 additions & 2 deletions circus/tests/test_watcher.py
Expand Up @@ -16,7 +16,7 @@
captured_output = None # NOQA

import tornado
import mock
from unittest import mock

from circus import logger
from circus.process import RUNNING, UNEXISTING
Expand Down Expand Up @@ -317,7 +317,7 @@ def __init__(self, stream=False, loop=None, **kw):
self.watcher = None
self.kw = kw
if loop is None:
self.loop = tornado.ioloop.IOLoop.instance()
self.loop = tornado.ioloop.IOLoop.current()
else:
self.loop = loop

Expand Down
5 changes: 2 additions & 3 deletions circus/util.py
Expand Up @@ -26,7 +26,6 @@
fcntl = None
grp = None
pwd = None
from tornado.ioloop import IOLoop
from tornado import gen
from tornado import concurrent

Expand Down Expand Up @@ -1033,7 +1032,7 @@ def wrapper(self, *args, **kwargs):
finally:
if isinstance(resp, concurrent.Future):
cb = functools.partial(_synchronized_cb, arbiter)
resp.add_done_callback(cb)
concurrent.future_add_done_callback(resp, cb)
else:
if arbiter is not None:
arbiter._exclusive_running_command = None
Expand All @@ -1048,7 +1047,7 @@ def tornado_sleep(duration):
To use with a gen.coroutines decorated function
Thanks to http://stackoverflow.com/a/11135204/433050
"""
return gen.Task(IOLoop.instance().add_timeout, time.time() + duration)
return gen.sleep(duration)


class TransformableFuture(concurrent.Future):
Expand Down
2 changes: 1 addition & 1 deletion circus/watcher.py
Expand Up @@ -242,7 +242,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
self.close_child_stdout = close_child_stdout
self.close_child_stderr = close_child_stderr
self.use_papa = use_papa and papa is not None
self.loop = loop or ioloop.IOLoop.instance()
self.loop = loop or ioloop.IOLoop.current()

if singleton and self.numprocesses not in (0, 1):
raise ValueError("Cannot have %d processes with a singleton "
Expand Down

0 comments on commit 2bedab0

Please sign in to comment.