Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement circus.stats to have accurate stats #97

Merged
merged 22 commits into from May 9, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions circus/commands/__init__.py
Expand Up @@ -8,6 +8,7 @@
incrproc,
list,
listen,
listpids,
numprocesses,
numwatchers,
options,
Expand Down
50 changes: 50 additions & 0 deletions circus/commands/listpids.py
@@ -0,0 +1,50 @@
from circus.commands.base import Command
from circus.exc import ArgumentError


class ListPids(Command):
"""\
Get list of pids in a watcher
=============================

ZMQ Message
-----------


To get the list of pid in a watcher::

{
"command": "listpids",
"properties": {
"name": "nameofwatcher",
}
}


The response return the list asked.

Command line
------------

::

$ circusctl listpids <name>
"""
name = "listpids"

def message(self, *args, **opts):
if len(args) != 1:
raise ArgumentError("invalid number of arguments")

return self.make_message(name=args[0])

def execute(self, arbiter, props):
watcher = self._get_watcher(arbiter, props['name'])
pids = watcher.pids.keys()
pids.sort()
return {"pids": pids}

def console_msg(self, msg):
if 'pids' in msg:
return ",".join([str(pid) for pid in msg.get('pids')])
return self.console_error(msg)
3 changes: 3 additions & 0 deletions circus/consumer.py
Expand Up @@ -5,6 +5,7 @@
class CircusConsumer(object):
def __init__(self, topics, context=None, endpoint='tcp://127.0.0.1:5556'):
self.topics = topics
self.keep_context = context is not None
self.context = context or zmq.Context()
self.endpoint = endpoint
self.pubsub_socket = self.context.socket(zmq.SUB)
Expand All @@ -30,6 +31,8 @@ def iter_messages(self):
yield topic, message

def stop(self):
if self.keep_context:
return
try:
self.context.destroy(0)
except zmq.ZMQError as e:
Expand Down
77 changes: 77 additions & 0 deletions circus/stats/__init__.py
@@ -0,0 +1,77 @@

"""
Stats architecture:

* streamer.StatsStreamer listens to circusd events and maintain a list of pids
* collector.StatsCollector runs a pool of threads that compute stats for each
pid in the list. Each stat is pushed in a queue
* publisher.StatsPublisher continuously pushes those stats in a zmq PUB socket
* client.StatsClient is a simple subscriber that can be used to intercept the
stream of stats.
"""
import sys
import argparse
import logging

from circus.stats.streamer import StatsStreamer
from circus import logger
from circus import util


LOG_LEVELS = {
"critical": logging.CRITICAL,
"error": logging.ERROR,
"warning": logging.WARNING,
"info": logging.INFO,
"debug": logging.DEBUG}

LOG_FMT = r"%(asctime)s [%(process)d] [%(levelname)s] %(message)s"
LOG_DATE_FMT = r"%Y-%m-%d %H:%M:%S"


def main():
desc = 'Runs the stats aggregator for Circus'
parser = argparse.ArgumentParser(description=desc)

parser.add_argument('--endpoint',
help='The circusd ZeroMQ socket to connect to',
default='tcp://127.0.0.1:5555')

parser.add_argument('--pubsub',
help='The circusd ZeroMQ pub/sub socket to connect to',
default='tcp://127.0.0.1:5556')

parser.add_argument('--statspoint',
help='The ZeroMQ pub/sub socket to send data to',
default='tcp://127.0.0.1:5557')

parser.add_argument('--log-level', dest='loglevel', default='info',
help="log level")

parser.add_argument('--log-output', dest='logoutput', default='-',
help="log output")

args = parser.parse_args()

# configure the logger
loglevel = LOG_LEVELS.get(args.loglevel.lower(), logging.INFO)
logger.setLevel(loglevel)
if args.logoutput == "-":
h = logging.StreamHandler()
else:
h = logging.FileHandler(args.logoutput)
util.close_on_exec(h.stream.fileno())
fmt = logging.Formatter(LOG_FMT, LOG_DATE_FMT)
h.setFormatter(fmt)
logger.addHandler(h)

stats = StatsStreamer(args.endpoint, args.pubsub, args.statspoint)
try:
stats.start()
finally:
stats.stop()
sys.exit(0)


if __name__ == '__main__':
main()
40 changes: 40 additions & 0 deletions circus/stats/client.py
@@ -0,0 +1,40 @@
from circus.consumer import CircusConsumer
import json


class StatsClient(CircusConsumer):
def __init__(self, endpoint='tcp://127.0.0.1:5557', context=None):
CircusConsumer.__init__(self, ['stat.'], context, endpoint)

def iter_messages(self):
""" Yields tuples of (watcher, pid, stat)"""
with self:
while True:
topic, stat = self.pubsub_socket.recv_multipart()
topic = topic.split('.')
if len(topic) == 3:
__, watcher, pid = topic
yield watcher, long(pid), json.loads(stat)
else:
__, watcher = topic
yield watcher, None, json.loads(stat)


TMP = ('watcher: %(watcher)s - pid: %(pid)d - cpu: %(cpu)s%% - '
'mem: %(mem)s%%')
TMP2 = ('Summary - watcher: %(watcher)s - cpu: %(cpu)s%% - '
'mem: %(mem)s%%')


if __name__ == '__main__':
client = StatsClient()
try:
for watcher, pid, stat in client:
stat['watcher'] = watcher
if pid is not None:
stat['pid'] = pid
print TMP % stat
else:
print TMP2 % stat
except KeyboardInterrupt:
client.stop()
107 changes: 107 additions & 0 deletions circus/stats/collector.py
@@ -0,0 +1,107 @@
import threading
import Queue
import time

from circus import util
from circus import logger


class StatsWorker(threading.Thread):
def __init__(self, watcher, results, get_pids, delay=.1, interval=1.):
threading.Thread.__init__(self)
self.watcher = watcher
self.delay = delay
self.running = False
self.results = results
self.interval = interval
self.daemon = True
self.get_pids = get_pids

def _aggregate(self, aggregate):
res = {'pid': aggregate.keys()}
# right way to aggregate ?
stats = aggregate.values()
res['cpu'] = sum([stat['cpu'] for stat in stats])
res['mem'] = sum([stat['mem'] for stat in stats])
return res

def run(self):
self.running = True
while self.running:
aggregate = {}

# sending by pids
for pid in self.get_pids(self.watcher):
try:
info = util.get_info(pid, interval=self.interval)
aggregate[pid] = info
except util.NoSuchProcess:
# the process is gone !
pass
except Exception:
logger.exception('Failed to get info for %d' % pid)
else:
self.results.put((self.watcher, pid, info))

# now sending the aggregation
self.results.put((self.watcher, None, self._aggregate(aggregate)))

def stop(self):
self.running = False


class StatsCollector(threading.Thread):

def __init__(self, streamer, check_delay=1.):
threading.Thread.__init__(self)
self.daemon = True
self.streamer = streamer
self.running = False
self.results = Queue.Queue()
self.workers = {}
self.check_delay = check_delay

def run(self):
self.running = True
logger.debug('Starting the collector with %d workers' %
len(self.workers))

# starting the workers
for watcher in self.streamer.get_watchers():
worker = StatsWorker(watcher, self.streamer.results,
self.streamer.get_pids)
self.workers[watcher] = worker
worker.start()

# now will maintain the list of watchers : if a watcher
# is added or removed, we add or remove a thread here
#
# XXX use some all() and any() here
while self.running:
current = self.workers.keys()
current.sort()
watchers = self.streamer.get_watchers()
watchers.sort()
if watchers != current:
# something has changed
for watcher in watchers:
# added one
if watcher not in current:
worker = StatsWorker(watcher, self.streamer.results,
self.streamer.get_pids)
self.workers[watcher] = worker
worker.start()
for watcher in current:
if watcher not in watchers:
# one is gone
self.workers[watcher].stop()
del self.workers[watcher]

# just sleep for a bit
time.sleep(self.check_delay)

def stop(self):
self.running = False
for worker in self.workers.values():
worker.stop()
logger.debug('Collector stopped')
44 changes: 44 additions & 0 deletions circus/stats/publisher.py
@@ -0,0 +1,44 @@
import threading
import zmq
import Queue
import json

from circus import logger


class StatsPublisher(threading.Thread):
def __init__(self, streamer, stats_endpoint='tcp://127.0.0.1:5557',
delay=0.1, context=None):
threading.Thread.__init__(self)
self.streamer = streamer
self.running = False
self.daemon = True
self.delay = delay
self.ctx = context or zmq.Context()
self.destroy_context = context is None
self.stats_endpoint = stats_endpoint
self.socket = self.ctx.socket(zmq.PUB)
self.socket.bind(self.stats_endpoint)
self.socket.linger = 0

def run(self):
self.running = True
results = self.streamer.results
logger.debug('Starting the Publisher')
while self.running:
try:
watcher, pid, stat = results.get(timeout=self.delay)
topic = b'stat.%s' % str(watcher)
if pid is not None:
topic += '.%d' % pid
self.socket.send_multipart([topic, json.dumps(stat)])
except Queue.Empty:
pass
except Exception:
logger.exception('Failed to some data from the queue')

def stop(self):
self.running = False
if self.destroy_context:
self.ctx.destroy(0)
logger.debug('Publisher stopped')