Skip to content

Commit

Permalink
Moved the flapping class in circus/plugins.
Browse files Browse the repository at this point in the history
We have a CircusPlugin base class now that gives us the basic
wiring to write plugins: hook on the pub/sub of events,
and provide APIs to call commands against Circus.

The next step is to provide a way to activate and configure
plugins from the ini files and the API.

Then we can provide other plugins like a statsd emitter.
  • Loading branch information
tarekziade committed May 30, 2012
1 parent deb28ed commit e0664e7
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 72 deletions.
4 changes: 3 additions & 1 deletion circus/arbiter.py
Expand Up @@ -10,12 +10,14 @@

from circus.controller import Controller
from circus.exc import AlreadyExist
from circus.flapping import Flapping
from circus import logger
from circus.watcher import Watcher
from circus.util import debuglog, _setproctitle
from circus.config import get_config

# will be imported by plugin registration later XXX
from circus.plugins.flapping import Flapping


class Arbiter(object):
"""Class used to control a list of watchers.
Expand Down
94 changes: 94 additions & 0 deletions circus/plugins/__init__.py
@@ -0,0 +1,94 @@
""" Base class to create Circus subscribers plugins.
"""
import errno
from threading import Thread
import uuid

import zmq
from zmq.eventloop import ioloop, zmqstream
from zmq.utils.jsonapi import jsonmod as json

from circus import logger
from circus.client import make_message, cast_message
from circus.util import debuglog


class CircusPlugin(Thread):

name = ''

def __init__(self, context, endpoint, pubsub_endpoint, check_delay):
super(CircusPlugin, self).__init__()
self.daemon = True
self.context = context
self.pubsub_endpoint = pubsub_endpoint
self.endpoint = endpoint
self.check_delay = check_delay
self.loop = ioloop.IOLoop()
self._id = uuid.uuid4().hex # XXX os.getpid()+thread id is enough...

@debuglog
def initialize(self):
self.client = self.context.socket(zmq.DEALER)
self.client.setsockopt(zmq.IDENTITY, self._id)
self.client.connect(self.endpoint)
self.client.linger = 0
self.sub_socket = self.context.socket(zmq.SUB)
self.sub_socket.setsockopt(zmq.SUBSCRIBE, b'watcher.')
self.sub_socket.connect(self.pubsub_endpoint)
self.substream = zmqstream.ZMQStream(self.sub_socket, self.loop)
self.substream.on_recv(self.handle_recv)

@debuglog
def run(self):
self.handle_init()
self.initialize()
logger.debug('Flapping entering loop mode.')
while True:
try:
self.loop.start()
except zmq.ZMQError as e:
logger.debug(str(e))

if e.errno == errno.EINTR:
continue
elif e.errno == zmq.ETERM:
break
else:
logger.debug("got an unexpected error %s (%s)", str(e),
e.errno)
raise
else:
break
self.client.close()
self.sub_socket.close()

@debuglog
def stop(self):
try:
self.handle_stop()
finally:
self.loop.stop()
self.join()

def call(self, command, **props):
msg = make_message(command, **props)
self.client.send(json.dumps(msg))
msg = self.client.recv()
return json.loads(msg)

def cast(self, command, **props):
msg = cast_message(command, **props)
self.client.send(json.dumps(msg))

#
# methods to override.
#
def handle_recv(self, data):
raise NotImplementedError()

def handle_stop(self):
pass

def handle_init(self):
pass
76 changes: 9 additions & 67 deletions circus/flapping.py → circus/plugins/flapping.py
@@ -1,74 +1,26 @@
import errno
from threading import Thread, Timer
from threading import Timer
import time
import uuid

import zmq
from zmq.eventloop import ioloop, zmqstream
from zmq.utils.jsonapi import jsonmod as json

from circus import logger
from circus.client import make_message, cast_message
from circus.util import debuglog
from circus.plugins import CircusPlugin


class Flapping(Thread):
class Flapping(CircusPlugin):
""" Plugin that controls the flapping and acts upon.
"""
name = 'flapping'

def __init__(self, context, endpoint, pubsub_endpoint, check_delay):
super(Flapping, self).__init__()
self.daemon = True
self.context = context
self.pubsub_endpoint = pubsub_endpoint
self.endpoint = endpoint
self.check_delay = check_delay
self.loop = ioloop.IOLoop()
self._id = uuid.uuid4().hex
super(Flapping, self).__init__(context, endpoint, pubsub_endpoint,
check_delay)
self.timelines = {}
self.timers = {}
self.configs = {}
self.tries = {}

@debuglog
def initialize(self):
self.client = self.context.socket(zmq.DEALER)
self.client.setsockopt(zmq.IDENTITY, self._id)
self.client.connect(self.endpoint)
self.client.linger = 0
self.sub_socket = self.context.socket(zmq.SUB)
self.sub_socket.setsockopt(zmq.SUBSCRIBE, b'watcher.')
self.sub_socket.connect(self.pubsub_endpoint)
self.substream = zmqstream.ZMQStream(self.sub_socket, self.loop)
self.substream.on_recv(self.handle_recv)

@debuglog
def run(self):
self.initialize()
logger.debug('Flapping entering loop mode.')
while True:
try:
self.loop.start()
except zmq.ZMQError as e:
logger.debug(str(e))

if e.errno == errno.EINTR:
continue
elif e.errno == zmq.ETERM:
break
else:
logger.debug("got an unexpected error %s (%s)", str(e),
e.errno)
raise
else:
break
self.client.close()
self.sub_socket.close()

@debuglog
def stop(self):
def handle_stop(self):
for _, timer in self.timers.items():
timer.cancel()
self.loop.stop()
self.join()

def handle_recv(self, data):
topic, msg = data
Expand All @@ -82,16 +34,6 @@ def handle_recv(self, data):
elif topic_parts[2] == "updated":
self.update_conf(topic_parts[1])

def call(self, command, **props):
msg = make_message(command, **props)
self.client.send(json.dumps(msg))
msg = self.client.recv()
return json.loads(msg)

def cast(self, command, **props):
msg = cast_message(command, **props)
self.client.send(json.dumps(msg))

def update_conf(self, watcher_name):
msg = self.call("options", name=watcher_name)
conf = self.configs.get(watcher_name, {})
Expand Down
8 changes: 4 additions & 4 deletions docs/source/coverage.rst
Expand Up @@ -8,7 +8,7 @@ Code coverage
Name Stmts Miss Cover Missing
------------------------------------------------------------
circus/__init__ 30 19 37% 1-13, 83-95, 101
circus/arbiter 180 31 83% 65-71, 75-90, 148-152, 187-188, 193, 204-207, 219, 223-228, 247, 263, 293
circus/arbiter 180 31 83% 65-71, 75-90, 148-152, 187-188, 193, 206-209, 219, 223-228, 247, 263, 293
circus/client 52 11 79% 34-35, 39-40, 45-49, 60-61, 73
circus/commands/addwatcher 24 14 42% 1-66, 73, 78
circus/commands/base 72 55 24% 1-11, 19, 26, 35-79, 82, 86-97, 103-106
Expand All @@ -32,7 +32,7 @@ Code coverage
circus/commands/util 54 42 22% 1-38, 43, 47, 52, 55-56, 59-60, 64
circus/config 127 60 53% 44-47, 59, 62-65, 76-100, 119-131, 147, 149, 152, 155, 158, 160, 165-194
circus/controller 116 15 87% 75, 85-86, 94-96, 104, 116-119, 122, 145, 151, 156-157
circus/flapping 113 21 81% 50-60, 103-107, 118, 135, 141-148
circus/flapping 113 21 81% 50-60, 103-107, 119, 136, 142-149
circus/process 119 39 67% 3-9, 92, 97, 100-120, 133, 147, 185-186, 190, 196, 202, 208-211, 216-221, 234-235, 239
circus/py3compat 47 44 6% 1-38, 43-67
circus/sighandler 36 16 56% 34-44, 47, 50, 53, 56, 59
Expand All @@ -44,9 +44,9 @@ Code coverage
circus/stream/base 64 11 83% 22, 39, 51, 58-59, 64-65, 74-77
circus/stream/sthread 19 0 100%
circus/util 205 119 42% 1-55, 59-62, 68-70, 76, 90-97, 103-111, 116-117, 133-134, 144-145, 149-150, 154-157, 161-162, 168, 173, 182, 191, 204, 212, 224, 232, 234, 238-244, 250-255, 260-313
circus/watcher 305 69 77% 166, 176, 223, 249, 253, 274, 278, 281-282, 287, 314, 330, 358-359, 362-363, 371, 399-401, 411-413, 419-424, 430-431, 441-442, 478, 498-501, 508, 511, 514-516, 538, 554, 556-557, 559-560, 562-563, 565, 567-568, 572-586
circus/watcher 308 71 77% 166, 176, 223, 249, 253, 274, 278, 281-282, 287, 314, 330, 358-359, 362-363, 371, 389-391, 404-406, 416-418, 424-429, 435-436, 446-447, 483, 503-506, 513, 516, 519-521, 543, 559, 561-562, 564-565, 567-568, 570, 572-573, 577-591
circus/web/__init__ 0 0 100%
------------------------------------------------------------
TOTAL 2252 1094 51%
TOTAL 2255 1096 51%


0 comments on commit e0664e7

Please sign in to comment.