Skip to content
This repository has been archived by the owner on Feb 26, 2020. It is now read-only.

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ronniekk committed Nov 22, 2012
0 parents commit 133728a
Show file tree
Hide file tree
Showing 9 changed files with 570 additions and 0 deletions.
87 changes: 87 additions & 0 deletions README.rst
@@ -0,0 +1,87 @@
graphite-pymetrics
==================
graphite-pymetrics is a lightweight Python framework which makes it super simple to add application metrics
that is sent to a remote graphite/carbon server.

All that is needed is this package (which also includes pystatsd) and access to a remote graphite server.

To install it just run Pip as usual::

$ pip install graphite-pymetrics

Package requirements:
* pystatsd==0.1.6
* gevent

=====
Usage
=====
Make sure there is a local graphite proxy running - start it at an early point in your application:

from metrics.graphite import start_graphite_proxy
start_graphite_proxy({"host": "graphite.mycompany.com", "port": 2003})

The proxy is pystatsd, a local server that receives UDP packets from the metrics client and periodically
emits data to graphite over TCP.

~~~~~~~~
Counters
~~~~~~~~
To add a counter for anything anywhere in your code, use Metric.add:

from metrics import Metric

Metric.add("foo.bar")

Use the @metric decorator to count specific method invocations:

from metrics import metric

@metric("bar.baz")
def foo():
# do stuff here

~~~~~~
Timing
~~~~~~
There are several ways to log timing. The most naive way is to first measure time manually and then submit it:

from metrics import Metric
import time

start = time.time()
# do stuff
elapsed = time.time() - start
Metric.timing("do.stuff", elapsed)

An easier way is to to let the metric client keep track of time with Metric.start_timing and call done() on the
returned timing instance. Following is an example for measuring time consumed for every endpoint individually
in a Flask webapp:

from metrics import Metric
from flask import Blueprint, current_app, request, g

app = Blueprint("myapp", __name__)

@app.before_request
def before_request():
try:
g.timing = Metric.start_timing(str(request.endpoint)) # start timing
except:
current_app.logger.error("Unable to time call for 'request.endpoint'")

@app.teardown_request
def teardown_request(exc):
try:
g.timing.done() # stop timing
except:
current_app.logger.error("Timing not available")

Similar to the @metric decorator there is a @timing decorator which is used to measure time for specific methods:

from metrics import timing

@timing("heavy.task")
def heavy_task(x, y, z):
# do heavy stuff here

3 changes: 3 additions & 0 deletions metrics/__init__.py
@@ -0,0 +1,3 @@
__author__ = 'ronnie'

from graphite import Metric, metric, timing
187 changes: 187 additions & 0 deletions metrics/graphite.py
@@ -0,0 +1,187 @@
from pystatsd import Client, Server
from helpers import get_time
import logging
import gevent

logger = logging.getLogger("metrics")

# Configuration attributes:
# * host: the host where graphite/carbon is listening
# * port: the port where graphite/carbon is listening
# * namespace (optional): the prefix for all stat keys
# * debug (optional): for debugging
_config = {}
_statsd = None


def start_graphite_proxy(config):
global _config
_config.clear()
_config.update(config)
host = _config.get("host")
port = _config.get("port")
if host and port:
global _statsd
_statsd = Server(pct_threshold=100, debug=_config.get("debug", False))

def _start():
try:
_statsd.serve(graphite_host=host, graphite_port=port)
logger.info("Metrics server started, emitting stats to graphite at %s:%s" % (host, port))
except:
logger.warn("Unable to start metrics UDP server at port 8125 - perhaps one is already running?")
gevent.spawn(_start)
else:
logger.warn("Graphite is not configured, metrics will not be collected")


def stop_graphite_proxy():
global _statsd
if _statsd:
_statsd.stop()
_statsd = None


def metric(name, delta=1):
"""
Convenience decorator used for incrementing an arbitrary stats key when a function is invoked.
>>> from metrics import metric
>>>
>>> @metric("get_toplist")
>>> def get_toplist():
>>> return
@param name: the stats key
@type name: str
@param delta: the value to update
@type delta: int
"""
def _metric(func):
def _wrapper(*args, **kwargs):
res = func(*args, **kwargs)
Metric.add(name, delta)
return res

return _wrapper

return _metric


def timing(name):
"""
Convenience decorator used for timing a function, i.e. track and store the execution time.
>>> from metrics import timing
>>>
>>> @timing("execution_time.get_toplist")
>>> def get_toplist():
>>> return
@param name: the stats key
@type name: str
"""
def _timing(func):
def _wrapper(*args, **kwargs):
start = get_time()
res = func(*args, **kwargs)
elapsed_time = get_time() - start
Metric.timing(name, elapsed_time)
return res

return _wrapper

return _timing


class _Timing(object):
def __init__(self, name):
self.name = name
self.start = get_time()

def done(self):
if not self.start:
return # already submitted once
elapsed_time = get_time() - self.start
Metric.timing(self.name, elapsed_time)
self.start = None # prevent further submitting


class Metric(object):
"""
The metrics client that communicates with graphite via local pystatsd.
>>> from metrics import Metric
>>> Metric.add("foo.bar.baz")
>>> Metric.timing("foo.bar.millis", 123)
"""

_client = Client()

@classmethod
def _add_namespace(cls, name):
namespace = _config.get("namespace")
return "%s.%s" % (namespace, name) if namespace else name


@classmethod
def add(cls, name, delta=1):
"""
Updates a stats counter by arbitrary value (increments by one by default).
>>> Metric.add("foo.bar.baz") # increments by one
>>> Metric.add("baz.bar.foo", 10) # adds 10 to the stats counter
@param name: the stats key
@type name: str
@param delta: the value to update
@type delta: int
"""
if not cls._client:
return
if not name:
return
if not delta:
return

cls._client.update_stats(cls._add_namespace(name), delta)

@classmethod
def timing(cls, name, time):
"""
Submits time value for a given stats key.
>>> Metric.timing("execution.time.baz", 123)
@param name: the stats key
@type name: str
@param time: the time value to submit (in seconds)
@type time: int or float
"""
if not cls._client:
return
if not name:
return
if time:
time = float(time)
if not time:
return

millis = int(time * 1000 + .5)
cls._client.timing(cls._add_namespace(name), millis)

@classmethod
def start_timing(cls, name):
"""
Starts and returns a timing instance that tracks time for a given stats key. The stats
will be updated once done() is invoked on the returned timing instance.
>>> timer = Metric.start_timing("execution.time.baz")
>>> # do stuff here...
>>> timer.done() # submits stats
@param name: the stats key
@type name: str
@rtype: _Timing
"""
return _Timing(name)
9 changes: 9 additions & 0 deletions metrics/helpers.py
@@ -0,0 +1,9 @@
import os
import time


def get_time():
if os.name == "posix":
return time.time()
else:
return time.clock()
1 change: 1 addition & 0 deletions metrics/test/__init__.py
@@ -0,0 +1 @@
__author__ = 'ronnie'
104 changes: 104 additions & 0 deletions metrics/test/mock_server.py
@@ -0,0 +1,104 @@
from socket import AF_INET, SOCK_DGRAM, socket
import re
import time

#
# A stripped copy of pystatsd.Server used for integration tests. The server dows not
# send any data to graphite, instead it takes a callback function that will receive
# the stats lines for verification (lines with empty stats and lines with "numStats" are pruned).
#

TIMER_MSG = '''stats.timers.%(key)s.lower %(min)s %(ts)s
stats.timers.%(key)s.count %(count)s %(ts)s
stats.timers.%(key)s.mean %(mean)s %(ts)s
stats.timers.%(key)s.upper %(max)s %(ts)s
stats.timers.%(key)s.upper_%(pct_threshold)s %(max_threshold)s %(ts)s
'''


class Server(object):
def __init__(self, callback):
self.callback = callback
self.buf = 1024
self.pct_threshold = 100
self.counters = {}
self.timers = {}

def process(self, data):
key, val = data.split(':')

sample_rate = 1
fields = val.split('|')
if None == fields[1]:
return

if fields[1] == 'ms':
if key not in self.timers:
self.timers[key] = []
self.timers[key].append(float(fields[0] or 0))
else:
if len(fields) == 3:
sample_rate = float(re.match('^@([\d\.]+)', fields[2]).groups()[0])
if key not in self.counters:
self.counters[key] = 0
self.counters[key] += float(fields[0] or 1) * (1 / sample_rate)

def flush(self):
ts = int(time.time())
stat_string = ''
for k, v in self.counters.items():
v = float(v)
if not v:
continue
msg = 'stats.%s %s %s\n' % (k, v, ts)
stat_string += msg

self.counters[k] = 0

for k, v in self.timers.items():
if len(v) > 0:
v.sort()
count = len(v)
min = v[0]
max = v[-1]

mean = min
max_threshold = max

if count > 1:
thresh_index = int((self.pct_threshold / 100.0) * count)
max_threshold = v[thresh_index - 1]
total = sum(v[:thresh_index - 1])
mean = total / thresh_index

self.timers[k] = []

stat_string += TIMER_MSG % {
'key': k,
'mean': mean,
'max': max,
'min': min,
'count': count,
'max_threshold': max_threshold,
'pct_threshold': self.pct_threshold,
'ts': ts,
}

self.callback(stat_string)

def serve(self):
addr = ("localhost", 8125)
self._sock = socket(AF_INET, SOCK_DGRAM)
self._sock.bind(addr)

try:
while True:
data, addr = self._sock.recvfrom(self.buf)
if data:
self.process(data)
self.flush()
except:
pass # hide greenlet errors

def stop(self):
self._sock.close()

0 comments on commit 133728a

Please sign in to comment.