Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

implement circus.stats to have accurate stats #97

Merged
merged 22 commits into from

3 participants

@tarekziade
Owner

From psutil doc:

When interval is 0.0 or None compares system CPU times elapsed since last call or module import, returning immediately. In this case is recommended for accuracy that this function be called with at least 0.1 seconds between calls.

This basically means we're not able to provide accurate CPU percents because:

  • we don't control the stream of hits on the stat/dstats so we might have more than one per 0.1 second
  • we loop on the children without setting an interval

I propose the following refactoring:

  • build the stats in a background thread
  • create a stats module that provides a backend for a stats database
  • serve back the same values in a 0.1 second span to all clients
  • propose a pluggable backend so we may store the data in different backends with various level of peristency depending on the needs (memcached, membase, redis etc)
@tarekziade
Owner

Writes

The background thread does the following:

  • loop against each alive pid to do a get_info() call with an interval of 0.1 in get_cpu_percent()
  • send the data to the backend.

Possible backends:

  • mapping in memory (default)
  • redis
  • membase/memcached

The DB has these config options:

  • max_entries: number of collected data per pid.
  • check_alive_interval: interval in seconds to check if a pid is gone. when a pid is gone the data collected on it is wiped.

Reads

  • If the database is the memory, the circusctl and circushttpd read calls are done via zmq as usual.
  • If the database is redis or membase the circusctl and circushttpd read calls are done directly on that DB.

The circusctl and circushttpd read calls are then done against the database, instead of interacting with circusd via zmq.

The stats/dstats return the last entry or a batch.

@benoitc

So I'm not sure what is the problem by waiting an iterval of 0.1ms (or
s?). Each clientrequest to circus is handled by zeromq, and a 0.1 interval
isn't a big deal.

Anyway, by thread do you mean Python thread? I think if we add a feature
in circus to collect process informations and send it to different
logging backends, we should go for a a full OS process managed by circus.
The process supervision & command dispatching is already a full-time job
and we shouldn't add more complexity to it imo.

So circus will open a process named for example circus-statds which
will do the job to collect process informations and send them to the
backends. This process get process and watchers updates using the
pub/sub feed already used by the flapping feature.

The backend list sound good. I would add a zeromq PUB/SUB backend though
and we can probably add loggly & relic backends too.

About the way circushttpd collect the data, I think I would by default
only read realltime data without storing them by default. For the
persistence we could have the possbility to manage multiple logging
backends at the same time, so the info would be dispatched to each
backend. The reason for that is that the default circushttpd don't need
to provide the history imo. And if we want it maybe we can propose a
system of plugin allowing people to add new items to the UI?

Note 1: I think this is a good way to introduce the priority (#64) to the
process watching so any "circus" processes (statds and why not the
flapping) would be managed in priority.

Note 2: we can make the use of a system process optionnal with a fallback
on thread. So you may use a greenlet for that.

Thoughts?

@tarekziade
Owner

So I'm not sure what is the problem by waiting an iterval of 0.1ms
(or s?). Each clientrequest to circus is handled by zeromq, and a
0.1 interval isn't a big deal.

The problem is the following -- if we want to have something that's accurate, it's better to collect ourselves the CPU times per PID instead of having the clients provoking this calculation.

And have a small short-lived cached of the last 100 values for instance.

The clients should simply pick a value or a batch of values, that have already been made available by the server, which should maintain the stats per PID.

This is also more scalable because you can control the server load here on calculating the CPU load : exactly one call per pid every 0,1 s, no matter how many requests the clients make.

So 1000 clients calling the stats api at the same time will just return a value calculated just once.

If you do it once per client, the server is going to spend 1000 * 0.1 s to do the maths, and even if it's asynchronous, that's still a lot of load for something that could be calculated just once.

The more I think about it, the more I think it's overkill to propose backends for this. Once we have the small cached batches, anyone can build any tool by polling via "circusctl stats" .

I also agree that we should also propose those stats via a dedicated pub/sub, why not. As long as we regulate what we send on it, like a heartbeat.

Anyway, by thread do you mean Python thread? I think if we add
a feature in circus to collect process informations and send it to different
logging backends, we should go for a a full OS process managed by circus.

A thread takes way less ressources than a process. I am not sure to understand the benefit of spawning another process
here.

The process supervision & command dispatching is already a full-time job
and we shouldn't add more complexity to it imo.

Dealing with yet another process to collect things means that you need to set up data exchange between circusd and that process. That adds, imo, more compexlity than having a background thread that picks the info it needs directly in memory.

What makes you say it's more complex to have a loop in a thread, as opposed as inter-process communication for this?

The reason for that is that the default circushttpd don't need
to provide the history imo.

Not an history per se, but a short-lived list of values, like the last 100 ones. That's enough for all our needs since we don't want to keep more persistent data.

And if we want it maybe we can propose a
system of plugin allowing people to add new items to the UI?

Yeah cool, maybe unrelated though.

@benoitc
@tarekziade
Owner

I am OK with doing it as a process as long as :

  • we don't set up yet another complex protocol to exchange data.
  • circusd-stats is launched by circusd

I just thought about a way to do it on another process without having to worry about process inter-communication.

circusd-stats could simply take as an argument the PID of its parent, and from there get the full list of running processes since they are all children of circusd. It's easy then to build a database with the PID as a key.

The circusctl stats and dstats commands can then query the database.

We need to think about a simple default backend that can be reached from circusctl on every client request. Maybe a memory map file ? http://docs.python.org/library/mmap.html

@benoitc
@tarekziade
Owner

But you won't have realtime changes. Which problem do you see in reusing the pub/sub feed
we already have for the flapping?

Why do you want to use a pub/sub to get the pids list ? I don't get it. What's the benefit since the circusd-stats program can get the list directly. It's just over complexity.

Plus, if you want to do a pub sub you need to constantly push the stats or the pid list in the pub/sub channel from circusd -- we are back at square one in that case: when and how will you do it in circusd ? in a background thread ?

We can't assume that circusctl will always be used on the machine where circusd is launched.
Or do you mean between circust-statd and circusd ?

between circust-statd and circusd . The circusctl calls 'stats' and 'dstats' against circusd. This last part remains unchanged, circusd become a client of the DB

@tarekziade
Owner

Pseudo code of circusd-stats:

import zmq
import collections

database = collections.defaultdict(list)

@background
def build_stats():
    while True:
        for pid in get_parent_process().get_children():
            database[pid].append(get_info(pid))

def listen_to_queries():
    while True:
      socket = poll(zmq.REP, "ipc://stats")
      stat_query = socket.recv()
      socket.send(read_stats(stat_query))


build_stats()
listen_to_queries()

In circusd, the existing stats command:

def stats_command():
    socket = socket(zmq.REQ, "ipc://stats")
    socket.send('stats for pid')
    return socket.receive()   
@benoitc
@benoitc

I forgot to add that with the above design, everything is done in the main thread of circus-statds:

  • The wakeup method would collect the stats
  • The handle_message would manage the lists of pids and watchers to handle.
@tarekziade
Owner

Ok so to resume :

  • circusd-stats gets pinged via zmq everytime a change occurs in the configuration, to maintain a list of active PID
  • circusd-stats has a loop to build stats continuously against a list of PIDs (background)
  • circusd-stats builds a stats DB
  • circusd is a client of this stats DB. Either via a dedicated REQ/REP channel either via mmap
  • the circusd-stats process is started by circusd.

Not sure if DB is the right term. But if you mean exchanging data from circusctl to circusd,
a map looks good, or maybe just a simple file ?

A file means we will increase the number of FDs we'll use just to read/write stats data.

Since we don't really care about persistency here, If the data is lost, it's not a big deal since it's very short-term. It's also very small.

That's why I was thinking about mmap. But after more thought, maybe just keeping a mapping in memory is just enough, and we can serve its values via a zmq REQ/REP channel to anyone who wants stats (circusctl included)

That also solves the fact that we wanted a way to publish the stats, anyone can build an application that reads the values by calling the dedicated zmq REQ/REP channel.

@benoitc
@tarekziade
Owner

The wakeup method would collect the stats via ioloop.PeriodicCallback

Is this PeriodicCallback call is going to block the loop ? if so, this is a proiblem because bulding the stats can be time consuming (0.1s per PID, so we're doing 10 pids per sec.)

If PeriodicCallback is blocking we should do this work in parallel to make sure circusd-stats stays responsive on the configuration changes and on the stats reads calls.

@benoitc
@tarekziade
Owner

doesn't need to be in background in my example. Just having an event loop is enough.

See my previous question about blocking.

@benoitc
@benoitc
@tarekziade
Owner

It isn't :)

From my test, unless I have made a mistake, PeriodicCallback blocks the main thread :

from zmq.eventloop.ioloop import PeriodicCallback, IOLoop
import time

loop = IOLoop()

def _buildstats():
    print '======> buildstats started'
    time.sleep(10)          # building the stats
    # for pid in pids:
    #    build_stat(pid)    # takes 0.1
    print '<====== buildstats ended'


def _getcalls():
    print '======> getcall started'
    print 'getting stuff from ZMQ'
    print 'answering stats read queries'
    time.sleep(.5)
    print '<====== getcall ended'


callback2 = PeriodicCallback(_getcalls, 500, loop)
callback2.start()
callback = PeriodicCallback(_buildstats, 100, loop)
callback.start()
loop.start()

This means that while building the stats, we are blocked in the two other events:

  • the stats queries from clients
  • the pub/sub events from circusd

In other words, if the stats takes 10 seconds to build, a client might wait 10 seconds.

I see 3 ways to solve this:

  • cut the stats building work in very small pieces so each callback is taking just 0.1 seconds (noooo Twisted style :( )
  • use gevent (nooo dependency!)
  • use a thread ... :)
@benoitc
@tarekziade
Owner

Of course it will block here. Like with any schedulers. If you sleep 10 seconds in a thread it will also block it.

The big difference is that when the thread sleeps or do something, it won't block the main thread that's still able to send and receive things.

Now more than doing it in a thread, what you want is having a way to parralelize the data gathering.
I think we could use a pool of threads (or greenlets if you want but this isn't needed).
Each thread would collect the data for a process at a time.

How this is different from what I have just said ? I said we need to collect the data from a thread...

So I guess you are now convinced about pushing the stats collecting in the background.

So you will define a pool of workers in the controller. A worker can be a simple function.

Sure yeah, one thread per watcher works for me.

[...]

Then we have full parallelization with a round robin.

Round robin ? if we start one thread per watcher and point then to feed the same mapping, everything will be done parallel.

Last step will be having a way to append the data in the right order which can be done in an ordered queue.

I don't see why we need an ordered queue. If each thread is responsible for a watcher, there will be no conflict, it will just update the data.

The readers can read the data in parallel.

Hope it answers to your question.

Well I am glad you now agree on the thread to avoid blocking the main loop.

@benoitc
@benoitc

By diffferent, I mean the implementation is quite different. The purpose is I guess the same parallelizing (which wasn't possible with only one thread). I don't think such things is really needed in most case, but the best we provide the best it is.

Also i don't think we need to pass data to circusd at all (using nmap or other). Reading the doc of zeromq let me think we could have the stats send to a pub/sub with different patterns:

namewatche.stats = global
namewartcher.stats.pid = stats of a process
namewatcher.stats.pid.child = stats of a child

I think it will ease a lot any code you add later to handle different backends. Thoughts?

@tarekziade
Owner

Ok so if I resume, you want the main thread to build the stats and threads to answer for stats requests and for interactions with circusd.

I think the other way is better because:

  • we can a single SUB socket that loops on all events. The thing we want to parallelize is not each reading of the events from circusd. It's perfectly fine to read those synchronously. It's very fast. We don't need several threads for this.

  • we can't share the same zmq socket between threads

  • the thing we really want to do in the background without disturbing the zmq work in the stats work on PIDs

  • we may or may not need to have more than one thread to do the PID stats. and it's fine to parallelize here.

So +1 for a pool of threads, but threads that do stats calculation against a list of watchers or simply a list of pid

The main loop is then a simple io loop that feeds the pids list from circusd events, and that answer to client stats query requests

@benoitc
@tarekziade
Owner

Sorry you lost me now.

StatWorker is a thread and seem to collect stats on pids, but I don't understand why it's interacting with zmq to do this.

  1. loop listening circusd events to know when a watcher is created and when a pid is added

great. and this loop maintains a list of PIDs., like [122342, 45465, 56474]

  1. workers collecting data for each watchers. Watchers are associaited to them bia a PULL socket.

No. you don't need to use a PULL socket to collect a process stat using psutil. You just call get_info(PID)

So the threads are just looping on the list of PIDs and feeding the stats.

  1. workers collect data and send it to a backend.

I guess yes: they simply push the values in a dictionary in memory. so not really a backend

stats[PID].append(get_info(PID))

Then circusd-stats can provide a REQ/REP socket for people to query this stats mapping (including circusctl)

For the last part I was suggesting to send the data first to another pub/sub that could be used by each backend
to store the data and by circushttpd.

Not needed : circushttpd can call circusd-stats to get the mapping.

@benoitc
@tarekziade tarekziade Merge pull request #105 from benoitc/master
add new properties `options` andd `start` to the `add_watcher` command
1762dc2
@tarekziade tarekziade was assigned
circus/commands/listpids.py
((28 lines not shown))
+
+ ::
+
+ $ circusctl listpids <name>
+ """
+ name = "listpids"
+
+ def message(self, *args, **opts):
+ if len(args) != 1:
+ raise ArgumentError("invalid number of arguments")
+
+ return self.make_message(name=args[0])
+
+ def execute(self, arbiter, props):
+ watcher = self._get_watcher(arbiter, props['name'])
+ pids = [process.pid for process in watcher.processes.values()]
@benoitc
benoitc added a note

you don't neeed to do that . returning keys of watcher.pids is enough.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@benoitc

Tried to start last version and had this error:

$ ./bin/circusd-stats 
Traceback (most recent call last):
  File "./bin/circusd-stats", line 8, in <module>
    load_entry_point('circus==0.4', 'console_scripts', 'circusd-stats')()
  File "/Users/benoitc/work/circus/circus/stats/__init__.py", line 69, in main
    args.poolsize)
AttributeError: 'Namespace' object has no attribute 'poolsize'

I have some general notes on that code that I would be happy to discuss or complete if needed.

  1. We should make the controller using an ioloop:
    • rather than using the Consumer object we should rather use a ZmqStream directly there.
    • we could also use a ZmqStream that get requests. A request would open a thread or greenlet in a pool

Rational behind is to make it more asynchronous and more efficient.

  1. We could have a fixed number of threads instead of using one / watcher to reduce the memory and CPU usage

  2. Reading the doc apprently we don't need to use an iterval each time, but instead getting them in a non blocking fashion (p.get_cpu_times()) each 0.1 . That can be done using an ioloop in each threads.

  3. the data aggregation can be done once we have collected all the infos from the pid . Instead of using a queue in the controller, we could pass the info to a dict in the controler. Only the latest aggregated value would be kept in.

Let me know what do you think

@tarekziade
Owner

a06a732 fixes the error

I don't follow #1 because you are talking about an ioloop and a "controller", then about the client CircusConsumer class, which is a pub/sub client. Also, what do you call request here ? you should point the code and use the class names, so I can follow.

We could have a fixed number of threads instead of using one / watcher to reduce the memory and CPU usage

How would that work since we need for a given watcher to build all stats from all pids before we can return an aggregate stat. If we don't do this in the same thread we'd need to add a synchronization logic somewhere based on a way to signal that all watchers process stats have been collected to make the final stat.

Make a demo based on a pseudo-code for this.

Reading the doc apprently we don't need to use an iterval each time

It's the same effect: either you run every 0.1 second. either you call it with a 0.1 interval value. Why do you want to use an ioloop to collect stats again here ? it seems that you want to put ioloops everywhere even when we don't deal with zeromq.

Here, it's just threafs collecting data, I don't want to add ioloop in the mix, I don't see any advantage, since ioloop is based on sockets events

the data aggregation can be done once we have collected all the infos from the pid .

how do you know when this happens: "once we have collected all the info" ?

@benoitc

I'm using the same wording as my previous post. Of course it can be done in the same thread for 1 watcher, it's easier. But 1 thread could handle multiple watchers. Waiting the processing of each pids for 1 stat is just a matter of :

accumulator 
for process in process: 
     add to accumulator
aggregate

I won't post pseudo code. will just implement it in a branch to make it easier to understand. Pseudo code is on a post above.

Calling 0.1s a loop on al process isn't the same as waiting 0.1s for each processes.

ioloop are reacting on sockets events yes. But you have the pub/sub socket for that.

@tarekziade
Owner

I'm using the same wording as my previous post

Sorry but you are talking about a 'controller' I don't know what that is here.

1 thread could handle multiple watchers. Waiting the processing of each pids for 1 stat is just a matter of :

like: https://github.com/mozilla-services/circus/blob/issue-97/circus/stats/collector.py#L34 but looping on several watchers ? Sure yeah.

I won't post pseudo code. will just implement it in a branch to make it easier to understand.

Please do your branch against this branch, don't start from scratch,

Pseudo code is on a post above.

Which post exactly ?

ioloop are reacting on sockets events yes. But you have the pub/sub socket for that.

again this is quite vague, can you point in the code where you see a missing ioloop ?

@benoitc

Controler is reffering to the main threads controlling workers and message.

The branch will be of course based on this one. Changes aren't so big ;)

@tarekziade
Owner

@benoitc did you have time to poke at this ? I'd like to merge so we can hook the socket.io UI.

Also: to activate circusd-stats, I propose that we add a new option in the ini file: stats_endpoint. When present, the arbiter adds one extra watcher that runs a single circusd-stats process

@benoitc

I started to work on it. I will have a working version sometimes in the week.

I'm actually -1 for any merge in main branch anyway.

Also why socket.io ? Why not sockjs which has a less cluttered api?

@tarekziade
Owner

I started to work on it. I will have a working version sometimes in the week.

Ok I will merge only Friday to give you time to finish, After, it will be an update on master, which is OK since you said it was only small changes

Also why socket.io ? Why not sockjs which has a less cluttered api?

socketjs does not seem to have any momentum, and I did not find a Python-based lib to use it.

You should take about this in #109

@benoitc
@tarekziade
Owner

this code is not enough good to be merged. cpu usage is too high.

It's not if you change the delays. It's not clear to me what you want to do. You sounded like you wanted to improve the code, so that can be done anytime before the release.

why not continue the work in the branch?

Because we have other thing sto build on the top of it. I waited a week already

I will merge changes, that isn't a problem.

Improving the master is not an issue either

@benoitc

This is not about improvements. We really need to fix cpu usages before it lands in the master. I'm on iphone. Will detail soon.

@tarekziade
Owner

@benoitc yes, please enlight us with your correctness ;) but before Friday please

@benoitc

anything better than using 20% pf cpu to collect stats will be correct at this point.

@tarekziade
Owner

@benoitc I am creating an issue for you "optimizing CPU usage", so you can focus on this

@tarekziade
Owner

bug #114 was created for your optimization work. I am merging now this work in master.

@tarekziade tarekziade merged commit 34eb1ac into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 27, 2012
  1. @ametaireau

    commitalacon

    ametaireau authored
Commits on Apr 28, 2012
  1. @tarekziade

    Merge pull request #105 from benoitc/master

    tarekziade authored
    add new properties `options` andd `start` to the `add_watcher` command
Commits on May 2, 2012
  1. @tarekziade

    added the listpids command

    tarekziade authored
  2. @tarekziade
  3. @tarekziade
Commits on May 3, 2012
  1. @tarekziade
  2. @tarekziade

    adding logging

    tarekziade authored
  3. @tarekziade

    added and hooked a client

    tarekziade authored
  4. @tarekziade

    saner stats

    tarekziade authored
  5. @tarekziade

    now publishing by watcher

    tarekziade authored
  6. @tarekziade

    added the listpids command

    tarekziade authored
  7. @tarekziade
  8. @tarekziade
  9. @tarekziade
  10. @tarekziade

    adding logging

    tarekziade authored
  11. @tarekziade

    added and hooked a client

    tarekziade authored
  12. @tarekziade

    saner stats

    tarekziade authored
  13. @tarekziade

    now publishing by watcher

    tarekziade authored
  14. @tarekziade

    merged

    tarekziade authored
  15. @tarekziade

    using one thread per watcher and sendin aggregate values per watcher …

    tarekziade authored
    …- the pub pattern is now stats.WATCHER.PID
  16. @tarekziade

    using .pids instead

    tarekziade authored
  17. @tarekziade
This page is out of date. Refresh to see the latest.
View
1  circus/commands/__init__.py
@@ -8,6 +8,7 @@
incrproc,
list,
listen,
+ listpids,
numprocesses,
numwatchers,
options,
View
50 circus/commands/listpids.py
@@ -0,0 +1,50 @@
+from circus.commands.base import Command
+from circus.exc import ArgumentError
+
+
+class ListPids(Command):
+ """\
+ Get list of pids in a watcher
+ =============================
+
+ ZMQ Message
+ -----------
+
+
+ To get the list of pid in a watcher::
+
+ {
+ "command": "listpids",
+ "properties": {
+ "name": "nameofwatcher",
+ }
+ }
+
+
+ The response return the list asked.
+
+ Command line
+ ------------
+
+ ::
+
+ $ circusctl listpids <name>
+ """
+ name = "listpids"
+
+ def message(self, *args, **opts):
+ if len(args) != 1:
+ raise ArgumentError("invalid number of arguments")
+
+ return self.make_message(name=args[0])
+
+ def execute(self, arbiter, props):
+ watcher = self._get_watcher(arbiter, props['name'])
+ pids = watcher.pids.keys()
+ pids.sort()
+ return {"pids": pids}
+
+ def console_msg(self, msg):
+ if 'pids' in msg:
+ return ",".join([str(pid) for pid in msg.get('pids')])
+ return self.console_error(msg)
View
3  circus/consumer.py
@@ -5,6 +5,7 @@
class CircusConsumer(object):
def __init__(self, topics, context=None, endpoint='tcp://127.0.0.1:5556'):
self.topics = topics
+ self.keep_context = context is not None
self.context = context or zmq.Context()
self.endpoint = endpoint
self.pubsub_socket = self.context.socket(zmq.SUB)
@@ -30,6 +31,8 @@ def iter_messages(self):
yield topic, message
def stop(self):
+ if self.keep_context:
+ return
try:
self.context.destroy(0)
except zmq.ZMQError as e:
View
77 circus/stats/__init__.py
@@ -0,0 +1,77 @@
+
+"""
+Stats architecture:
+
+ * streamer.StatsStreamer listens to circusd events and maintain a list of pids
+ * collector.StatsCollector runs a pool of threads that compute stats for each
+ pid in the list. Each stat is pushed in a queue
+ * publisher.StatsPublisher continuously pushes those stats in a zmq PUB socket
+ * client.StatsClient is a simple subscriber that can be used to intercept the
+ stream of stats.
+"""
+import sys
+import argparse
+import logging
+
+from circus.stats.streamer import StatsStreamer
+from circus import logger
+from circus import util
+
+
+LOG_LEVELS = {
+ "critical": logging.CRITICAL,
+ "error": logging.ERROR,
+ "warning": logging.WARNING,
+ "info": logging.INFO,
+ "debug": logging.DEBUG}
+
+LOG_FMT = r"%(asctime)s [%(process)d] [%(levelname)s] %(message)s"
+LOG_DATE_FMT = r"%Y-%m-%d %H:%M:%S"
+
+
+def main():
+ desc = 'Runs the stats aggregator for Circus'
+ parser = argparse.ArgumentParser(description=desc)
+
+ parser.add_argument('--endpoint',
+ help='The circusd ZeroMQ socket to connect to',
+ default='tcp://127.0.0.1:5555')
+
+ parser.add_argument('--pubsub',
+ help='The circusd ZeroMQ pub/sub socket to connect to',
+ default='tcp://127.0.0.1:5556')
+
+ parser.add_argument('--statspoint',
+ help='The ZeroMQ pub/sub socket to send data to',
+ default='tcp://127.0.0.1:5557')
+
+ parser.add_argument('--log-level', dest='loglevel', default='info',
+ help="log level")
+
+ parser.add_argument('--log-output', dest='logoutput', default='-',
+ help="log output")
+
+ args = parser.parse_args()
+
+ # configure the logger
+ loglevel = LOG_LEVELS.get(args.loglevel.lower(), logging.INFO)
+ logger.setLevel(loglevel)
+ if args.logoutput == "-":
+ h = logging.StreamHandler()
+ else:
+ h = logging.FileHandler(args.logoutput)
+ util.close_on_exec(h.stream.fileno())
+ fmt = logging.Formatter(LOG_FMT, LOG_DATE_FMT)
+ h.setFormatter(fmt)
+ logger.addHandler(h)
+
+ stats = StatsStreamer(args.endpoint, args.pubsub, args.statspoint)
+ try:
+ stats.start()
+ finally:
+ stats.stop()
+ sys.exit(0)
+
+
+if __name__ == '__main__':
+ main()
View
40 circus/stats/client.py
@@ -0,0 +1,40 @@
+from circus.consumer import CircusConsumer
+import json
+
+
+class StatsClient(CircusConsumer):
+ def __init__(self, endpoint='tcp://127.0.0.1:5557', context=None):
+ CircusConsumer.__init__(self, ['stat.'], context, endpoint)
+
+ def iter_messages(self):
+ """ Yields tuples of (watcher, pid, stat)"""
+ with self:
+ while True:
+ topic, stat = self.pubsub_socket.recv_multipart()
+ topic = topic.split('.')
+ if len(topic) == 3:
+ __, watcher, pid = topic
+ yield watcher, long(pid), json.loads(stat)
+ else:
+ __, watcher = topic
+ yield watcher, None, json.loads(stat)
+
+
+TMP = ('watcher: %(watcher)s - pid: %(pid)d - cpu: %(cpu)s%% - '
+ 'mem: %(mem)s%%')
+TMP2 = ('Summary - watcher: %(watcher)s - cpu: %(cpu)s%% - '
+ 'mem: %(mem)s%%')
+
+
+if __name__ == '__main__':
+ client = StatsClient()
+ try:
+ for watcher, pid, stat in client:
+ stat['watcher'] = watcher
+ if pid is not None:
+ stat['pid'] = pid
+ print TMP % stat
+ else:
+ print TMP2 % stat
+ except KeyboardInterrupt:
+ client.stop()
View
107 circus/stats/collector.py
@@ -0,0 +1,107 @@
+import threading
+import Queue
+import time
+
+from circus import util
+from circus import logger
+
+
+class StatsWorker(threading.Thread):
+ def __init__(self, watcher, results, get_pids, delay=.1, interval=1.):
+ threading.Thread.__init__(self)
+ self.watcher = watcher
+ self.delay = delay
+ self.running = False
+ self.results = results
+ self.interval = interval
+ self.daemon = True
+ self.get_pids = get_pids
+
+ def _aggregate(self, aggregate):
+ res = {'pid': aggregate.keys()}
+ # right way to aggregate ?
+ stats = aggregate.values()
+ res['cpu'] = sum([stat['cpu'] for stat in stats])
+ res['mem'] = sum([stat['mem'] for stat in stats])
+ return res
+
+ def run(self):
+ self.running = True
+ while self.running:
+ aggregate = {}
+
+ # sending by pids
+ for pid in self.get_pids(self.watcher):
+ try:
+ info = util.get_info(pid, interval=self.interval)
+ aggregate[pid] = info
+ except util.NoSuchProcess:
+ # the process is gone !
+ pass
+ except Exception:
+ logger.exception('Failed to get info for %d' % pid)
+ else:
+ self.results.put((self.watcher, pid, info))
+
+ # now sending the aggregation
+ self.results.put((self.watcher, None, self._aggregate(aggregate)))
+
+ def stop(self):
+ self.running = False
+
+
+class StatsCollector(threading.Thread):
+
+ def __init__(self, streamer, check_delay=1.):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.streamer = streamer
+ self.running = False
+ self.results = Queue.Queue()
+ self.workers = {}
+ self.check_delay = check_delay
+
+ def run(self):
+ self.running = True
+ logger.debug('Starting the collector with %d workers' %
+ len(self.workers))
+
+ # starting the workers
+ for watcher in self.streamer.get_watchers():
+ worker = StatsWorker(watcher, self.streamer.results,
+ self.streamer.get_pids)
+ self.workers[watcher] = worker
+ worker.start()
+
+ # now will maintain the list of watchers : if a watcher
+ # is added or removed, we add or remove a thread here
+ #
+ # XXX use some all() and any() here
+ while self.running:
+ current = self.workers.keys()
+ current.sort()
+ watchers = self.streamer.get_watchers()
+ watchers.sort()
+ if watchers != current:
+ # something has changed
+ for watcher in watchers:
+ # added one
+ if watcher not in current:
+ worker = StatsWorker(watcher, self.streamer.results,
+ self.streamer.get_pids)
+ self.workers[watcher] = worker
+ worker.start()
+ for watcher in current:
+ if watcher not in watchers:
+ # one is gone
+ self.workers[watcher].stop()
+ del self.workers[watcher]
+
+ # just sleep for a bit
+ time.sleep(self.check_delay)
+
+ def stop(self):
+ self.running = False
+ for worker in self.workers.values():
+ worker.stop()
+ logger.debug('Collector stopped')
View
44 circus/stats/publisher.py
@@ -0,0 +1,44 @@
+import threading
+import zmq
+import Queue
+import json
+
+from circus import logger
+
+
+class StatsPublisher(threading.Thread):
+ def __init__(self, streamer, stats_endpoint='tcp://127.0.0.1:5557',
+ delay=0.1, context=None):
+ threading.Thread.__init__(self)
+ self.streamer = streamer
+ self.running = False
+ self.daemon = True
+ self.delay = delay
+ self.ctx = context or zmq.Context()
+ self.destroy_context = context is None
+ self.stats_endpoint = stats_endpoint
+ self.socket = self.ctx.socket(zmq.PUB)
+ self.socket.bind(self.stats_endpoint)
+ self.socket.linger = 0
+
+ def run(self):
+ self.running = True
+ results = self.streamer.results
+ logger.debug('Starting the Publisher')
+ while self.running:
+ try:
+ watcher, pid, stat = results.get(timeout=self.delay)
+ topic = b'stat.%s' % str(watcher)
+ if pid is not None:
+ topic += '.%d' % pid
+ self.socket.send_multipart([topic, json.dumps(stat)])
+ except Queue.Empty:
+ pass
+ except Exception:
+ logger.exception('Failed to some data from the queue')
+
+ def stop(self):
+ self.running = False
+ if self.destroy_context:
+ self.ctx.destroy(0)
+ logger.debug('Publisher stopped')
View
111 circus/stats/streamer.py
@@ -0,0 +1,111 @@
+from collections import defaultdict
+import zmq
+import json
+import threading
+import Queue
+from itertools import chain
+
+from circus.consumer import CircusConsumer
+from circus.commands import get_commands
+from circus.client import CircusClient
+from circus.stats.collector import StatsCollector
+from circus.stats.publisher import StatsPublisher
+from circus import logger
+
+
+class StatsStreamer(object):
+ def __init__(self, endpoint, pubsub_endoint, stats_endpoint):
+ self.topic = 'watcher.'
+ self.ctx = zmq.Context()
+ self.consumer = CircusConsumer([self.topic], context=self.ctx,
+ endpoint=pubsub_endoint)
+ self.client = CircusClient(context=self.ctx, endpoint=endpoint)
+ self.cmds = get_commands()
+ self.watchers = defaultdict(list)
+ self._pids = defaultdict(list)
+ self.running = False
+ self.stopped = False
+ self.lock = threading.RLock()
+ self.results = Queue.Queue()
+ self.stats = StatsCollector(self)
+ self.publisher = StatsPublisher(self, stats_endpoint, context=self.ctx)
+
+ def get_watchers(self):
+ return self._pids.keys()
+
+ def get_pids(self, watcher=None):
+ if watcher is not None:
+ return self._pids[watcher]
+ return chain(self._pid.values())
+
+ def _init(self):
+ with self.lock:
+ self.stopped = False
+ self._pids.clear()
+ # getting the initial list of watchers/pids
+ msg = self.cmds['list'].make_message()
+ res = self.client.call(msg)
+ for watcher in res['watchers']:
+ msg = self.cmds['listpids'].make_message(name=watcher)
+ res = self.client.call(msg)
+ for pid in res['pids']:
+ if pid in self._pids[watcher]:
+ continue
+ self._pids[watcher].append(pid)
+
+ def remove_pid(self, watcher, pid):
+ logger.debug('Removing %d from %s' % (pid, watcher))
+ if pid in self._pids[watcher]:
+ with self.lock:
+ self._pids[watcher].remove(pid)
+
+ def append_pid(self, watcher, pid):
+ logger.debug('Adding %d in %s' % (pid, watcher))
+ if pid in self._pids[watcher]:
+ return
+ with self.lock:
+ self._pids[watcher].append(pid)
+
+ def start(self):
+ logger.info('Starting the stats streamer')
+ self._init()
+ logger.debug('Initial list is ' + str(self._pids))
+ self.running = True
+ self.stats.start()
+ self.publisher.start()
+
+ logger.debug('Now looping to get circusd events')
+ while self.running:
+ # now hooked into the stream
+ try:
+ for topic, msg in self.consumer:
+ __, watcher, action = topic.split('.')
+ msg = json.loads(msg)
+ if action != 'start' and self.stopped:
+ self._init()
+
+ if action in ('reap', 'kill'):
+ # a process was reaped
+ pid = msg['process_pid']
+ self.remove_pid(watcher, pid)
+ elif action == 'spawn':
+ pid = msg['process_pid']
+ self.append_pid(watcher, pid)
+ elif action == 'start':
+ self._init()
+ elif action == 'stop':
+ # nothing to do
+ self.stopped = True
+ else:
+ logger.debug('Unknown action: %r' % action)
+ logger.debug(msg)
+
+ except Exception:
+ logger.exception('Failed to treat %r' % msg)
+
+ def stop(self):
+ self.running = False
+ self.publisher.stop()
+ self.stats.stop()
+ self.ctx.destroy(0)
+ logger.info('Stats streamer stopped')
View
6 circus/util.py
@@ -91,12 +91,14 @@ def bytes2human(n):
def get_info(process=None, interval=0):
- """Return information about a process.
+ """Return information about a process. (can be an pid or a Process object)
- If process is None, will return the information about the current process
+ If process is None, will return the information about the current process.
"""
if process is None:
process = Process(os.getpid())
+ elif isinstance(process, int):
+ process = Process(process)
info = {}
try:
View
1  circus/watcher.py
@@ -365,6 +365,7 @@ def kill_process(self, process, sig=signal.SIGTERM):
self.stderr_redirector.remove_redirection('stderr', process)
self.send_msg("kill", {"process_id": process.wid,
+ "process_pid": process.pid,
"time": time.time()})
logger.debug("%s: kill process %s", self.name, process.pid)
process.send_signal(sig)
View
1  examples/dummy_fly.py
@@ -29,6 +29,7 @@ def run(self):
while self.alive:
a = 10 * 10 * 10 * 10
+
#time.sleep(0.1)
if __name__ == "__main__":
View
1  setup.py
@@ -35,6 +35,7 @@
entry_points="""
[console_scripts]
circusd = circus.circusd:main
+ circusd-stats = circus.stats:main
circusctl = circus.circusctl:main
circushttpd = circus.web.circushttpd:main
""")
Something went wrong with that request. Please try again.