Skip to content

Commit

Permalink
started circusd-stats. collecting and maintaining the list of PIDs
Browse files Browse the repository at this point in the history
  • Loading branch information
tarekziade committed May 2, 2012
1 parent a3ecc8e commit 653c025
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 2 deletions.
109 changes: 109 additions & 0 deletions circus/circusd-stats.py
@@ -0,0 +1,109 @@
import sys
import argparse
import os
import logging
import resource
from collections import defaultdict
import zmq
import json

from circus import logger
from circus import util
from circus.consumer import CircusConsumer
from circus.commands import get_commands
from circus.client import CircusClient


class CircusStats(object):
def __init__(self, endpoint, pubsub_endoint):
self.topic = 'watcher.'
self.ctx = zmq.Context()
self.consumer = CircusConsumer([self.topic], context=self.ctx,
endpoint=pubsub_endoint)
self.client = CircusClient(context=self.ctx, endpoint=endpoint)
self.cmds = get_commands()
self.watchers = defaultdict(list)
self._pids = []
self.running = False
self.stopped = False

def _init(self):
self.stopped = False
self._pids = []
# getting the initial list of watchers/pids
msg = self.cmds['list'].make_message()
res = self.client.call(msg)
for watcher in res['watchers']:
msg = self.cmds['listpids'].make_message(name=watcher)
res = self.client.call(msg)
for pid in res['pids']:
if pid in self._pids:
continue
self._pids.append(pid)

def start(self):
self._init()
print 'initial list is ' + str(self._pids)
self.running = True
while self.running:
# now hooked into the stream
try:
for topic, msg in self.consumer:
__, name, action = topic.split('.')
msg = json.loads(msg)
if action != 'start' and self.stopped:
self._init()

if action in ('reap', 'kill'):
# a process was reaped
pid = msg['process_pid']
if pid in self._pids:
self._pids.remove(pid)
print self._pids
elif action == 'spawn':
pid = msg['process_pid']
if pid not in self._pids:
self._pids.append(pid)
print self._pids
elif action == 'start':
self._init()
print self._pids
elif action == 'stop':
# nothing to do
self.stopped = True
else:
print action
print msg

except Exception, e:
print str(e)

def stop(self):
self.running = False
self.ctx.destroy(0)

def main():
parser = argparse.ArgumentParser(description=
'Runs the stats aggregator for Circus')

parser.add_argument('--endpoint',
help='The ZeroMQ pub/sub socket to connect to',
default='tcp://127.0.0.1:5555')


parser.add_argument('--pubsub',
help='The ZeroMQ pub/sub socket to connect to',
default='tcp://127.0.0.1:5556')

args = parser.parse_args()
stats = CircusStats(args.endpoint, args.pubsub)

try:
stats.start()
finally:
stats.stop()
sys.exit(0)


if __name__ == '__main__':
main()
3 changes: 3 additions & 0 deletions circus/consumer.py
Expand Up @@ -5,6 +5,7 @@
class CircusConsumer(object):
def __init__(self, topics, context=None, endpoint='tcp://127.0.0.1:5556'):
self.topics = topics
self.keep_context = context is not None
self.context = context or zmq.Context()
self.endpoint = endpoint
self.pubsub_socket = self.context.socket(zmq.SUB)
Expand All @@ -30,6 +31,8 @@ def iter_messages(self):
yield topic, message

def stop(self):
if self.keep_context:
return
try:
self.context.destroy(0)
except zmq.ZMQError as e:
Expand Down
1 change: 1 addition & 0 deletions circus/watcher.py
Expand Up @@ -365,6 +365,7 @@ def kill_process(self, process, sig=signal.SIGTERM):
self.stderr_redirector.remove_redirection('stderr', process)

self.send_msg("kill", {"process_id": process.wid,
"process_pid": process.pid,
"time": time.time()})
logger.debug("%s: kill process %s", self.name, process.pid)
process.send_signal(sig)
Expand Down
5 changes: 3 additions & 2 deletions examples/dummy_fly.py
Expand Up @@ -27,9 +27,10 @@ def run(self):
print "hello, fly #%s (pid: %s) is alive" % (self.wid, os.getpid())

while self.alive:
a = 10 * 10 * 10 * 10
#a = 10 * 10 * 10 * 10

#time.sleep(0.1)

time.sleep(0.1)

if __name__ == "__main__":
DummyFly(sys.argv[1]).run()

0 comments on commit 653c025

Please sign in to comment.