diff --git a/circus/arbiter.py b/circus/arbiter.py index 1a8e88bba..cae8dbe58 100644 --- a/circus/arbiter.py +++ b/circus/arbiter.py @@ -10,12 +10,14 @@ from circus.controller import Controller from circus.exc import AlreadyExist -from circus.flapping import Flapping from circus import logger from circus.watcher import Watcher from circus.util import debuglog, _setproctitle from circus.config import get_config +# will be imported by plugin registration later XXX +from circus.plugins.flapping import Flapping + class Arbiter(object): """Class used to control a list of watchers. diff --git a/circus/plugins/__init__.py b/circus/plugins/__init__.py new file mode 100644 index 000000000..e6637144d --- /dev/null +++ b/circus/plugins/__init__.py @@ -0,0 +1,94 @@ +""" Base class to create Circus subscribers plugins. +""" +import errno +from threading import Thread +import uuid + +import zmq +from zmq.eventloop import ioloop, zmqstream +from zmq.utils.jsonapi import jsonmod as json + +from circus import logger +from circus.client import make_message, cast_message +from circus.util import debuglog + + +class CircusPlugin(Thread): + + name = '' + + def __init__(self, context, endpoint, pubsub_endpoint, check_delay): + super(CircusPlugin, self).__init__() + self.daemon = True + self.context = context + self.pubsub_endpoint = pubsub_endpoint + self.endpoint = endpoint + self.check_delay = check_delay + self.loop = ioloop.IOLoop() + self._id = uuid.uuid4().hex # XXX os.getpid()+thread id is enough... + + @debuglog + def initialize(self): + self.client = self.context.socket(zmq.DEALER) + self.client.setsockopt(zmq.IDENTITY, self._id) + self.client.connect(self.endpoint) + self.client.linger = 0 + self.sub_socket = self.context.socket(zmq.SUB) + self.sub_socket.setsockopt(zmq.SUBSCRIBE, b'watcher.') + self.sub_socket.connect(self.pubsub_endpoint) + self.substream = zmqstream.ZMQStream(self.sub_socket, self.loop) + self.substream.on_recv(self.handle_recv) + + @debuglog + def run(self): + self.handle_init() + self.initialize() + logger.debug('Flapping entering loop mode.') + while True: + try: + self.loop.start() + except zmq.ZMQError as e: + logger.debug(str(e)) + + if e.errno == errno.EINTR: + continue + elif e.errno == zmq.ETERM: + break + else: + logger.debug("got an unexpected error %s (%s)", str(e), + e.errno) + raise + else: + break + self.client.close() + self.sub_socket.close() + + @debuglog + def stop(self): + try: + self.handle_stop() + finally: + self.loop.stop() + self.join() + + def call(self, command, **props): + msg = make_message(command, **props) + self.client.send(json.dumps(msg)) + msg = self.client.recv() + return json.loads(msg) + + def cast(self, command, **props): + msg = cast_message(command, **props) + self.client.send(json.dumps(msg)) + + # + # methods to override. + # + def handle_recv(self, data): + raise NotImplementedError() + + def handle_stop(self): + pass + + def handle_init(self): + pass diff --git a/circus/flapping.py b/circus/plugins/flapping.py similarity index 56% rename from circus/flapping.py rename to circus/plugins/flapping.py index ccff5d215..9cbd2ac05 100644 --- a/circus/flapping.py +++ b/circus/plugins/flapping.py @@ -1,74 +1,26 @@ -import errno -from threading import Thread, Timer +from threading import Timer import time -import uuid - -import zmq -from zmq.eventloop import ioloop, zmqstream -from zmq.utils.jsonapi import jsonmod as json from circus import logger -from circus.client import make_message, cast_message -from circus.util import debuglog +from circus.plugins import CircusPlugin -class Flapping(Thread): +class Flapping(CircusPlugin): + """ Plugin that controls the flapping and acts upon. + """ + name = 'flapping' def __init__(self, context, endpoint, pubsub_endpoint, check_delay): - super(Flapping, self).__init__() - self.daemon = True - self.context = context - self.pubsub_endpoint = pubsub_endpoint - self.endpoint = endpoint - self.check_delay = check_delay - self.loop = ioloop.IOLoop() - self._id = uuid.uuid4().hex + super(Flapping, self).__init__(context, endpoint, pubsub_endpoint, + check_delay) self.timelines = {} self.timers = {} self.configs = {} self.tries = {} - @debuglog - def initialize(self): - self.client = self.context.socket(zmq.DEALER) - self.client.setsockopt(zmq.IDENTITY, self._id) - self.client.connect(self.endpoint) - self.client.linger = 0 - self.sub_socket = self.context.socket(zmq.SUB) - self.sub_socket.setsockopt(zmq.SUBSCRIBE, b'watcher.') - self.sub_socket.connect(self.pubsub_endpoint) - self.substream = zmqstream.ZMQStream(self.sub_socket, self.loop) - self.substream.on_recv(self.handle_recv) - - @debuglog - def run(self): - self.initialize() - logger.debug('Flapping entering loop mode.') - while True: - try: - self.loop.start() - except zmq.ZMQError as e: - logger.debug(str(e)) - - if e.errno == errno.EINTR: - continue - elif e.errno == zmq.ETERM: - break - else: - logger.debug("got an unexpected error %s (%s)", str(e), - e.errno) - raise - else: - break - self.client.close() - self.sub_socket.close() - - @debuglog - def stop(self): + def handle_stop(self): for _, timer in self.timers.items(): timer.cancel() - self.loop.stop() - self.join() def handle_recv(self, data): topic, msg = data @@ -82,16 +34,6 @@ def handle_recv(self, data): elif topic_parts[2] == "updated": self.update_conf(topic_parts[1]) - def call(self, command, **props): - msg = make_message(command, **props) - self.client.send(json.dumps(msg)) - msg = self.client.recv() - return json.loads(msg) - - def cast(self, command, **props): - msg = cast_message(command, **props) - self.client.send(json.dumps(msg)) - def update_conf(self, watcher_name): msg = self.call("options", name=watcher_name) conf = self.configs.get(watcher_name, {}) diff --git a/docs/source/coverage.rst b/docs/source/coverage.rst index a451fe52c..6ed0ce717 100644 --- a/docs/source/coverage.rst +++ b/docs/source/coverage.rst @@ -8,7 +8,7 @@ Code coverage Name Stmts Miss Cover Missing ------------------------------------------------------------ circus/__init__ 30 19 37% 1-13, 83-95, 101 - circus/arbiter 180 31 83% 65-71, 75-90, 148-152, 187-188, 193, 204-207, 219, 223-228, 247, 263, 293 + circus/arbiter 180 31 83% 65-71, 75-90, 148-152, 187-188, 193, 206-209, 219, 223-228, 247, 263, 293 circus/client 52 11 79% 34-35, 39-40, 45-49, 60-61, 73 circus/commands/addwatcher 24 14 42% 1-66, 73, 78 circus/commands/base 72 55 24% 1-11, 19, 26, 35-79, 82, 86-97, 103-106 @@ -32,7 +32,7 @@ Code coverage circus/commands/util 54 42 22% 1-38, 43, 47, 52, 55-56, 59-60, 64 circus/config 127 60 53% 44-47, 59, 62-65, 76-100, 119-131, 147, 149, 152, 155, 158, 160, 165-194 circus/controller 116 15 87% 75, 85-86, 94-96, 104, 116-119, 122, 145, 151, 156-157 - circus/flapping 113 21 81% 50-60, 103-107, 118, 135, 141-148 + circus/flapping 113 21 81% 50-60, 103-107, 119, 136, 142-149 circus/process 119 39 67% 3-9, 92, 97, 100-120, 133, 147, 185-186, 190, 196, 202, 208-211, 216-221, 234-235, 239 circus/py3compat 47 44 6% 1-38, 43-67 circus/sighandler 36 16 56% 34-44, 47, 50, 53, 56, 59 @@ -44,9 +44,9 @@ Code coverage circus/stream/base 64 11 83% 22, 39, 51, 58-59, 64-65, 74-77 circus/stream/sthread 19 0 100% circus/util 205 119 42% 1-55, 59-62, 68-70, 76, 90-97, 103-111, 116-117, 133-134, 144-145, 149-150, 154-157, 161-162, 168, 173, 182, 191, 204, 212, 224, 232, 234, 238-244, 250-255, 260-313 - circus/watcher 305 69 77% 166, 176, 223, 249, 253, 274, 278, 281-282, 287, 314, 330, 358-359, 362-363, 371, 399-401, 411-413, 419-424, 430-431, 441-442, 478, 498-501, 508, 511, 514-516, 538, 554, 556-557, 559-560, 562-563, 565, 567-568, 572-586 + circus/watcher 308 71 77% 166, 176, 223, 249, 253, 274, 278, 281-282, 287, 314, 330, 358-359, 362-363, 371, 389-391, 404-406, 416-418, 424-429, 435-436, 446-447, 483, 503-506, 513, 516, 519-521, 543, 559, 561-562, 564-565, 567-568, 570, 572-573, 577-591 circus/web/__init__ 0 0 100% ------------------------------------------------------------ - TOTAL 2252 1094 51% + TOTAL 2255 1096 51%