Skip to content

Commit

Permalink
now publishing by watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
tarekziade committed May 3, 2012
1 parent 354747e commit 1da2492
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 37 deletions.
22 changes: 15 additions & 7 deletions circus/stats/client.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
from circus.consumer import CircusConsumer
import json


class StatsClient(CircusConsumer):
def __init__(self, endpoint='tcp://127.0.0.1:5557', context=None):
CircusConsumer.__init__(self, ['pid.'], context, endpoint)
CircusConsumer.__init__(self, ['stat.'], context, endpoint)

def iter_messages(self):
""" Yields tuples of (pid, info)"""
""" Yields tuples of (watcher, pid, stat)"""
with self:
while True:
topic, info = self.pubsub_socket.recv_multipart()
pid = topic.split('.')[-1]
yield long(pid), info
topic, stat = self.pubsub_socket.recv_multipart()
__, watcher, pid = topic.split('.')
yield watcher, long(pid), json.loads(stat)


TMP = ('watcher: %(watcher)s - pid: %(pid)d - cpu: %(cpu)s%% - '
'mem: %(mem)s%%')


if __name__ == '__main__':
client = StatsClient()
try:
for pid, info in client:
print '%d: %s' % (pid, info)

for watcher, pid, stat in client:
stat['watcher'] = watcher
stat['pid'] = pid
print TMP % stat
except KeyboardInterrupt:
client.stop()
11 changes: 6 additions & 5 deletions circus/stats/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ def run(self):
self.running = True
while self.running:
try:
pid = self.queue.get(timeout=self.delay)
watcher, pid = self.queue.get(timeout=self.delay)
try:
info = util.get_info(pid, interval=self.interval)
except util.NoSuchProcess:
# the process is gone !
pass
else:
self.results.put(('pid.%d' % pid, info))
self.results.put((watcher, pid, info))
except Queue.Empty:
pass
except Exception:
Expand Down Expand Up @@ -57,9 +57,10 @@ def run(self):
worker.start()

while self.running:
# filling a working queue with all pids
for pid in self.streamer.get_pids():
self.queue.put(pid)
# filling a working queue with pids ordered by watchers
for watcher in self.streamer.get_watchers():
for pid in self.streamer.get_pids(watcher):
self.queue.put((watcher, pid))

def stop(self):
self.running = False
Expand Down
5 changes: 3 additions & 2 deletions circus/stats/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ def run(self):
logger.debug('Starting the Publisher')
while self.running:
try:
pid, info = results.get(timeout=self.delay)
self.socket.send_multipart([pid, json.dumps(info)])
watcher, pid, stat = results.get(timeout=self.delay)
topic = b'stat.%s.%d' % (str(watcher), pid)
self.socket.send_multipart([topic, json.dumps(stat)])
except Queue.Empty:
pass
except Exception:
Expand Down
44 changes: 23 additions & 21 deletions circus/stats/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import threading
import Queue
from itertools import chain

from circus.consumer import CircusConsumer
from circus.commands import get_commands
Expand All @@ -21,44 +22,49 @@ def __init__(self, endpoint, pubsub_endoint, stats_endpoint, pool_size=1):
self.client = CircusClient(context=self.ctx, endpoint=endpoint)
self.cmds = get_commands()
self.watchers = defaultdict(list)
self._pids = []
self._pids = defaultdict(list)
self.running = False
self.stopped = False
self.lock = threading.RLock()
self.results = Queue.Queue()
self.stats = StatsCollector(self, pool_size)
self.publisher = StatsPublisher(self, stats_endpoint, context=self.ctx)

def get_pids(self):
return self._pids
def get_watchers(self):
return self._pids.keys()

def get_pids(self, watcher=None):
if watcher is not None:
return self._pids[watcher]
return chain(self._pid.values())

def _init(self):
with self.lock:
self.stopped = False
self._pids = []
self._pids.clear()
# getting the initial list of watchers/pids
msg = self.cmds['list'].make_message()
res = self.client.call(msg)
for watcher in res['watchers']:
msg = self.cmds['listpids'].make_message(name=watcher)
res = self.client.call(msg)
for pid in res['pids']:
if pid in self._pids:
if pid in self._pids[watcher]:
continue
self._pids.append(pid)
self._pids[watcher].append(pid)

def remove_pid(self, pid):
logger.debug('Removing %d' % pid)
if pid in self._pids:
def remove_pid(self, watcher, pid):
logger.debug('Removing %d from %s' % (pid, watcher))
if pid in self._pids[watcher]:
with self.lock:
self._pids.remove(pid)
self._pids[watcher].remove(pid)

def append_pid(self, pid):
logger.debug('Adding %d' % pid)
if pid in self._pids:
def append_pid(self, watcher, pid):
logger.debug('Adding %d in %s' % (pid, watcher))
if pid in self._pids[watcher]:
return
with self.lock:
self._pids.append(pid)
self._pids[watcher].append(pid)

def start(self):
logger.info('Starting the stats streamer')
Expand All @@ -73,22 +79,18 @@ def start(self):
# now hooked into the stream
try:
for topic, msg in self.consumer:
__, name, action = topic.split('.')
__, watcher, action = topic.split('.')
msg = json.loads(msg)
if action != 'start' and self.stopped:
self._init()

if action in ('reap', 'kill'):
# a process was reaped
pid = msg['process_pid']
if pid in self._pids:
with self.lock:
self._pids.remove(pid)
self.remove_pid(watcher, pid)
elif action == 'spawn':
pid = msg['process_pid']
if pid not in self._pids:
with self.lock:
self._pids.append(pid)
self.append_pid(watcher, pid)
elif action == 'start':
self._init()
elif action == 'stop':
Expand Down
4 changes: 2 additions & 2 deletions examples/dummy_fly.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ def run(self):
print "hello, fly #%s (pid: %s) is alive" % (self.wid, os.getpid())

while self.alive:
#a = 10 * 10 * 10 * 10
a = 10 * 10 * 10 * 10


time.sleep(0.1)
#time.sleep(0.1)

if __name__ == "__main__":
DummyFly(sys.argv[1]).run()

0 comments on commit 1da2492

Please sign in to comment.