diff --git a/circus/circusd-stats.py b/circus/circusd-stats.py new file mode 100644 index 000000000..835033d6b --- /dev/null +++ b/circus/circusd-stats.py @@ -0,0 +1,109 @@ +import sys +import argparse +import os +import logging +import resource +from collections import defaultdict +import zmq +import json + +from circus import logger +from circus import util +from circus.consumer import CircusConsumer +from circus.commands import get_commands +from circus.client import CircusClient + + +class CircusStats(object): + def __init__(self, endpoint, pubsub_endoint): + self.topic = 'watcher.' + self.ctx = zmq.Context() + self.consumer = CircusConsumer([self.topic], context=self.ctx, + endpoint=pubsub_endoint) + self.client = CircusClient(context=self.ctx, endpoint=endpoint) + self.cmds = get_commands() + self.watchers = defaultdict(list) + self._pids = [] + self.running = False + self.stopped = False + + def _init(self): + self.stopped = False + self._pids = [] + # getting the initial list of watchers/pids + msg = self.cmds['list'].make_message() + res = self.client.call(msg) + for watcher in res['watchers']: + msg = self.cmds['listpids'].make_message(name=watcher) + res = self.client.call(msg) + for pid in res['pids']: + if pid in self._pids: + continue + self._pids.append(pid) + + def start(self): + self._init() + print 'initial list is ' + str(self._pids) + self.running = True + while self.running: + # now hooked into the stream + try: + for topic, msg in self.consumer: + __, name, action = topic.split('.') + msg = json.loads(msg) + if action != 'start' and self.stopped: + self._init() + + if action in ('reap', 'kill'): + # a process was reaped + pid = msg['process_pid'] + if pid in self._pids: + self._pids.remove(pid) + print self._pids + elif action == 'spawn': + pid = msg['process_pid'] + if pid not in self._pids: + self._pids.append(pid) + print self._pids + elif action == 'start': + self._init() + print self._pids + elif action == 'stop': + # nothing to do + self.stopped = True + else: + print action + print msg + + except Exception, e: + print str(e) + + def stop(self): + self.running = False + self.ctx.destroy(0) + +def main(): + parser = argparse.ArgumentParser(description= + 'Runs the stats aggregator for Circus') + + parser.add_argument('--endpoint', + help='The ZeroMQ pub/sub socket to connect to', + default='tcp://127.0.0.1:5555') + + + parser.add_argument('--pubsub', + help='The ZeroMQ pub/sub socket to connect to', + default='tcp://127.0.0.1:5556') + + args = parser.parse_args() + stats = CircusStats(args.endpoint, args.pubsub) + + try: + stats.start() + finally: + stats.stop() + sys.exit(0) + + +if __name__ == '__main__': + main() diff --git a/circus/consumer.py b/circus/consumer.py index 003aaaeca..61bf25d80 100644 --- a/circus/consumer.py +++ b/circus/consumer.py @@ -5,6 +5,7 @@ class CircusConsumer(object): def __init__(self, topics, context=None, endpoint='tcp://127.0.0.1:5556'): self.topics = topics + self.keep_context = context is not None self.context = context or zmq.Context() self.endpoint = endpoint self.pubsub_socket = self.context.socket(zmq.SUB) @@ -30,6 +31,8 @@ def iter_messages(self): yield topic, message def stop(self): + if self.keep_context: + return try: self.context.destroy(0) except zmq.ZMQError as e: diff --git a/circus/watcher.py b/circus/watcher.py index 6667fa6ec..c506573d9 100644 --- a/circus/watcher.py +++ b/circus/watcher.py @@ -365,6 +365,7 @@ def kill_process(self, process, sig=signal.SIGTERM): self.stderr_redirector.remove_redirection('stderr', process) self.send_msg("kill", {"process_id": process.wid, + "process_pid": process.pid, "time": time.time()}) logger.debug("%s: kill process %s", self.name, process.pid) process.send_signal(sig) diff --git a/examples/dummy_fly.py b/examples/dummy_fly.py index bb48ef019..a5138416c 100755 --- a/examples/dummy_fly.py +++ b/examples/dummy_fly.py @@ -27,9 +27,10 @@ def run(self): print "hello, fly #%s (pid: %s) is alive" % (self.wid, os.getpid()) while self.alive: - a = 10 * 10 * 10 * 10 + #a = 10 * 10 * 10 * 10 - #time.sleep(0.1) + + time.sleep(0.1) if __name__ == "__main__": DummyFly(sys.argv[1]).run()