From 133728abb0875a92827a92da52a0f61ca99286b5 Mon Sep 17 00:00:00 2001 From: Ronnie Kolehmainen Date: Thu, 22 Nov 2012 15:44:26 +0100 Subject: [PATCH] Initial commit --- README.rst | 87 ++++++++++++++++ metrics/__init__.py | 3 + metrics/graphite.py | 187 ++++++++++++++++++++++++++++++++++ metrics/helpers.py | 9 ++ metrics/test/__init__.py | 1 + metrics/test/mock_server.py | 104 +++++++++++++++++++ metrics/test/test_graphite.py | 145 ++++++++++++++++++++++++++ requirements.txt | 2 + setup.py | 32 ++++++ 9 files changed, 570 insertions(+) create mode 100644 README.rst create mode 100644 metrics/__init__.py create mode 100644 metrics/graphite.py create mode 100644 metrics/helpers.py create mode 100644 metrics/test/__init__.py create mode 100644 metrics/test/mock_server.py create mode 100644 metrics/test/test_graphite.py create mode 100644 requirements.txt create mode 100644 setup.py diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..4ca4292 --- /dev/null +++ b/README.rst @@ -0,0 +1,87 @@ +graphite-pymetrics +================== +graphite-pymetrics is a lightweight Python framework which makes it super simple to add application metrics +that is sent to a remote graphite/carbon server. + +All that is needed is this package (which also includes pystatsd) and access to a remote graphite server. + +To install it just run Pip as usual:: + + $ pip install graphite-pymetrics + +Package requirements: +* pystatsd==0.1.6 +* gevent + +===== +Usage +===== +Make sure there is a local graphite proxy running - start it at an early point in your application: + + from metrics.graphite import start_graphite_proxy + start_graphite_proxy({"host": "graphite.mycompany.com", "port": 2003}) + +The proxy is pystatsd, a local server that receives UDP packets from the metrics client and periodically +emits data to graphite over TCP. + +~~~~~~~~ +Counters +~~~~~~~~ +To add a counter for anything anywhere in your code, use Metric.add: + + from metrics import Metric + + Metric.add("foo.bar") + +Use the @metric decorator to count specific method invocations: + + from metrics import metric + + @metric("bar.baz") + def foo(): + # do stuff here + +~~~~~~ +Timing +~~~~~~ +There are several ways to log timing. The most naive way is to first measure time manually and then submit it: + + from metrics import Metric + import time + + start = time.time() + # do stuff + elapsed = time.time() - start + Metric.timing("do.stuff", elapsed) + +An easier way is to to let the metric client keep track of time with Metric.start_timing and call done() on the +returned timing instance. Following is an example for measuring time consumed for every endpoint individually +in a Flask webapp: + + from metrics import Metric + from flask import Blueprint, current_app, request, g + + app = Blueprint("myapp", __name__) + + @app.before_request + def before_request(): + try: + g.timing = Metric.start_timing(str(request.endpoint)) # start timing + except: + current_app.logger.error("Unable to time call for 'request.endpoint'") + + @app.teardown_request + def teardown_request(exc): + try: + g.timing.done() # stop timing + except: + current_app.logger.error("Timing not available") + +Similar to the @metric decorator there is a @timing decorator which is used to measure time for specific methods: + + from metrics import timing + + @timing("heavy.task") + def heavy_task(x, y, z): + # do heavy stuff here + diff --git a/metrics/__init__.py b/metrics/__init__.py new file mode 100644 index 0000000..d93b9d3 --- /dev/null +++ b/metrics/__init__.py @@ -0,0 +1,3 @@ +__author__ = 'ronnie' + +from graphite import Metric, metric, timing diff --git a/metrics/graphite.py b/metrics/graphite.py new file mode 100644 index 0000000..5b45511 --- /dev/null +++ b/metrics/graphite.py @@ -0,0 +1,187 @@ +from pystatsd import Client, Server +from helpers import get_time +import logging +import gevent + +logger = logging.getLogger("metrics") + +# Configuration attributes: +# * host: the host where graphite/carbon is listening +# * port: the port where graphite/carbon is listening +# * namespace (optional): the prefix for all stat keys +# * debug (optional): for debugging +_config = {} +_statsd = None + + +def start_graphite_proxy(config): + global _config + _config.clear() + _config.update(config) + host = _config.get("host") + port = _config.get("port") + if host and port: + global _statsd + _statsd = Server(pct_threshold=100, debug=_config.get("debug", False)) + + def _start(): + try: + _statsd.serve(graphite_host=host, graphite_port=port) + logger.info("Metrics server started, emitting stats to graphite at %s:%s" % (host, port)) + except: + logger.warn("Unable to start metrics UDP server at port 8125 - perhaps one is already running?") + gevent.spawn(_start) + else: + logger.warn("Graphite is not configured, metrics will not be collected") + + +def stop_graphite_proxy(): + global _statsd + if _statsd: + _statsd.stop() + _statsd = None + + +def metric(name, delta=1): + """ + Convenience decorator used for incrementing an arbitrary stats key when a function is invoked. + + >>> from metrics import metric + >>> + >>> @metric("get_toplist") + >>> def get_toplist(): + >>> return + + @param name: the stats key + @type name: str + @param delta: the value to update + @type delta: int + """ + def _metric(func): + def _wrapper(*args, **kwargs): + res = func(*args, **kwargs) + Metric.add(name, delta) + return res + + return _wrapper + + return _metric + + +def timing(name): + """ + Convenience decorator used for timing a function, i.e. track and store the execution time. + + >>> from metrics import timing + >>> + >>> @timing("execution_time.get_toplist") + >>> def get_toplist(): + >>> return + + @param name: the stats key + @type name: str + """ + def _timing(func): + def _wrapper(*args, **kwargs): + start = get_time() + res = func(*args, **kwargs) + elapsed_time = get_time() - start + Metric.timing(name, elapsed_time) + return res + + return _wrapper + + return _timing + + +class _Timing(object): + def __init__(self, name): + self.name = name + self.start = get_time() + + def done(self): + if not self.start: + return # already submitted once + elapsed_time = get_time() - self.start + Metric.timing(self.name, elapsed_time) + self.start = None # prevent further submitting + + +class Metric(object): + """ + The metrics client that communicates with graphite via local pystatsd. + + >>> from metrics import Metric + >>> Metric.add("foo.bar.baz") + >>> Metric.timing("foo.bar.millis", 123) + """ + + _client = Client() + + @classmethod + def _add_namespace(cls, name): + namespace = _config.get("namespace") + return "%s.%s" % (namespace, name) if namespace else name + + + @classmethod + def add(cls, name, delta=1): + """ + Updates a stats counter by arbitrary value (increments by one by default). + + >>> Metric.add("foo.bar.baz") # increments by one + >>> Metric.add("baz.bar.foo", 10) # adds 10 to the stats counter + + @param name: the stats key + @type name: str + @param delta: the value to update + @type delta: int + """ + if not cls._client: + return + if not name: + return + if not delta: + return + + cls._client.update_stats(cls._add_namespace(name), delta) + + @classmethod + def timing(cls, name, time): + """ + Submits time value for a given stats key. + + >>> Metric.timing("execution.time.baz", 123) + + @param name: the stats key + @type name: str + @param time: the time value to submit (in seconds) + @type time: int or float + """ + if not cls._client: + return + if not name: + return + if time: + time = float(time) + if not time: + return + + millis = int(time * 1000 + .5) + cls._client.timing(cls._add_namespace(name), millis) + + @classmethod + def start_timing(cls, name): + """ + Starts and returns a timing instance that tracks time for a given stats key. The stats + will be updated once done() is invoked on the returned timing instance. + + >>> timer = Metric.start_timing("execution.time.baz") + >>> # do stuff here... + >>> timer.done() # submits stats + + @param name: the stats key + @type name: str + @rtype: _Timing + """ + return _Timing(name) diff --git a/metrics/helpers.py b/metrics/helpers.py new file mode 100644 index 0000000..84bdfa9 --- /dev/null +++ b/metrics/helpers.py @@ -0,0 +1,9 @@ +import os +import time + + +def get_time(): + if os.name == "posix": + return time.time() + else: + return time.clock() diff --git a/metrics/test/__init__.py b/metrics/test/__init__.py new file mode 100644 index 0000000..b4f1dc0 --- /dev/null +++ b/metrics/test/__init__.py @@ -0,0 +1 @@ +__author__ = 'ronnie' diff --git a/metrics/test/mock_server.py b/metrics/test/mock_server.py new file mode 100644 index 0000000..22522ed --- /dev/null +++ b/metrics/test/mock_server.py @@ -0,0 +1,104 @@ +from socket import AF_INET, SOCK_DGRAM, socket +import re +import time + +# +# A stripped copy of pystatsd.Server used for integration tests. The server dows not +# send any data to graphite, instead it takes a callback function that will receive +# the stats lines for verification (lines with empty stats and lines with "numStats" are pruned). +# + +TIMER_MSG = '''stats.timers.%(key)s.lower %(min)s %(ts)s +stats.timers.%(key)s.count %(count)s %(ts)s +stats.timers.%(key)s.mean %(mean)s %(ts)s +stats.timers.%(key)s.upper %(max)s %(ts)s +stats.timers.%(key)s.upper_%(pct_threshold)s %(max_threshold)s %(ts)s +''' + + +class Server(object): + def __init__(self, callback): + self.callback = callback + self.buf = 1024 + self.pct_threshold = 100 + self.counters = {} + self.timers = {} + + def process(self, data): + key, val = data.split(':') + + sample_rate = 1 + fields = val.split('|') + if None == fields[1]: + return + + if fields[1] == 'ms': + if key not in self.timers: + self.timers[key] = [] + self.timers[key].append(float(fields[0] or 0)) + else: + if len(fields) == 3: + sample_rate = float(re.match('^@([\d\.]+)', fields[2]).groups()[0]) + if key not in self.counters: + self.counters[key] = 0 + self.counters[key] += float(fields[0] or 1) * (1 / sample_rate) + + def flush(self): + ts = int(time.time()) + stat_string = '' + for k, v in self.counters.items(): + v = float(v) + if not v: + continue + msg = 'stats.%s %s %s\n' % (k, v, ts) + stat_string += msg + + self.counters[k] = 0 + + for k, v in self.timers.items(): + if len(v) > 0: + v.sort() + count = len(v) + min = v[0] + max = v[-1] + + mean = min + max_threshold = max + + if count > 1: + thresh_index = int((self.pct_threshold / 100.0) * count) + max_threshold = v[thresh_index - 1] + total = sum(v[:thresh_index - 1]) + mean = total / thresh_index + + self.timers[k] = [] + + stat_string += TIMER_MSG % { + 'key': k, + 'mean': mean, + 'max': max, + 'min': min, + 'count': count, + 'max_threshold': max_threshold, + 'pct_threshold': self.pct_threshold, + 'ts': ts, + } + + self.callback(stat_string) + + def serve(self): + addr = ("localhost", 8125) + self._sock = socket(AF_INET, SOCK_DGRAM) + self._sock.bind(addr) + + try: + while True: + data, addr = self._sock.recvfrom(self.buf) + if data: + self.process(data) + self.flush() + except: + pass # hide greenlet errors + + def stop(self): + self._sock.close() diff --git a/metrics/test/test_graphite.py b/metrics/test/test_graphite.py new file mode 100644 index 0000000..d37883c --- /dev/null +++ b/metrics/test/test_graphite.py @@ -0,0 +1,145 @@ +from gevent import monkey +monkey.patch_all() +from metrics import Metric, metric, timing +from mock_server import Server +import gevent +import logging +import unittest +import sys + +logging.basicConfig(stream=sys.stdout) +logging.root.setLevel(logging.DEBUG) +logger = logging.getLogger("metrics_test") + +NUM_PRIMES = 1000000 + +lines_for_add = lambda x : x +lines_for_timing = lambda x : x * 5 + + +def get_prime_list(n): + """ Returns a list of prime numbers from 2 to < n using a sieve algorithm""" + if n < 2: return [] + if n == 2: return [2] + s = range(3, n + 1, 2) + mroot = n ** 0.5 + half = len(s) + i = 0 + m = 3 + while m <= mroot: + if s[i]: + j = (m * m - 3) // 2 + s[j] = 0 + while j < half: + s[j] = 0 + j += m + i = i + 1 + m = 2 * i + 3 + return [2] + [x for x in s if x] + +@metric("test.add.decorator") +def get_prime_list_with_add(n): + return get_prime_list(n) + +@timing("test.timing.decorator") +def get_prime_list_with_timing(n): + return get_prime_list(n) + +@metric("test.add.decorator.combined") +@timing("test.timing.decorator.combined") +def get_prime_list_with_add_and_timing(n): + return get_prime_list(n) + + +class MetricsTest(unittest.TestCase): + + def wait_buf(self, num_entries, timeout=10): + def wait_loop(): + while True: + if len(self.buf) == num_entries: + return + gevent.sleep(0.1) + + greenlet = gevent.spawn(wait_loop) + greenlet.join(timeout) + + if len(self.buf) != num_entries: + raise Exception("Expected buffer size %d, got %d" % (num_entries, len(self.buf))) + + def setUp(self): + """ + Starts a mock server for every test. + """ + self.buf = [] + def save_to_buf(msg): + self.buf.extend([line.strip() for line in msg.split("\n") if line.strip()]) + self.server = Server(save_to_buf) + gevent.spawn(self.server.serve) + gevent.sleep(1) + + def tearDown(self): + """ + Shuts down mock server. + """ + self.server.stop() + gevent.sleep(1) + + def test_add(self): + logger.info("test_add") + for x in xrange(10): + Metric.add("test.add") + self.wait_buf(lines_for_add(10)) + for line in self.buf: + self.assertTrue(line.startswith("stats.test.add 1.0")) + + def test_timing(self): + logger.info("test_timing") + for x in xrange(10): + timer = Metric.start_timing("test.timing") + primes = get_prime_list(NUM_PRIMES) + timer.done() + logger.debug("Got %d primes", len(primes)) + self.wait_buf(lines_for_timing(10)) + for line in self.buf: + self.assertTrue(line.startswith("stats.timers.test.timing")) + + def test_timing_exact(self): + Metric.timing("exact.time", 1.337) + self.wait_buf(lines_for_timing(1)) + stripped_timestamps = [" ".join(line.split(" ")[:-1]) for line in self.buf] + self.assertTrue("stats.timers.exact.time.lower 1337.0" in stripped_timestamps) + self.assertTrue("stats.timers.exact.time.count 1" in stripped_timestamps) + self.assertTrue("stats.timers.exact.time.mean 1337.0" in stripped_timestamps) + self.assertTrue("stats.timers.exact.time.upper 1337.0" in stripped_timestamps) + self.assertTrue("stats.timers.exact.time.upper_100 1337.0" in stripped_timestamps) + + def test_add_decorator(self): + logger.info("test_add_decorator") + for x in xrange(10): + primes = get_prime_list_with_add(NUM_PRIMES) + logger.debug("Got %d primes", len(primes)) + self.wait_buf(lines_for_add(10)) + for line in self.buf: + self.assertTrue(line.startswith("stats.test.add.decorator 1.0")) + + def test_timing_decorator(self): + logger.info("test_timing_decorator") + for x in xrange(0, 10): + primes = get_prime_list_with_timing(NUM_PRIMES) + logger.debug("Got %d primes", len(primes)) + self.wait_buf(lines_for_timing(10)) + for line in self.buf: + self.assertTrue(line.startswith("stats.timers.test.timing.decorator")) + + def test_nested_decorators(self): + logger.info("test_nested_decorators") + for x in xrange(0, 10): + primes = get_prime_list_with_add_and_timing(NUM_PRIMES) + logger.debug("Got %d primes", len(primes)) + self.wait_buf(lines_for_add(10) + lines_for_timing(10)) + for line in self.buf: + self.assertTrue(line.startswith("stats.test.add.decorator.combined 1.0") or + line.startswith("stats.timers.test.timing.decorator.combined")) + +if __name__ == "__main__": + unittest.main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2da246d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +pystatsd==0.1.6 +gevent diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..08b81fd --- /dev/null +++ b/setup.py @@ -0,0 +1,32 @@ +from distutils.core import setup + +f = open("README.rst") +try: + README = f.read() +finally: + f.close() + +setup( + name="graphite-pymetrics", + version="0.1.0", + description = "A simple Python metrics framework to use with carbon/graphite.", + long_description = README, + author="Ronnie Kolehmainen", + author_email="ronnie@esn.me", + url="http://www.esn.me", + download_url="http://github.com/esnme/graphite-pymetrics", + license="MIT", + platforms=["any"], + packages=["metrics", "metrics.test"], + requires=["pystatsd(==0.1.6)", "gevent"], + classifiers=[ + "Topic :: Software Development :: Libraries :: Python Modules", + "Development Status :: 4 - Beta", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python", + "Programming Language :: Python :: 2.6", + "Programming Language :: Python :: 2.7", + "Intended Audience :: Developers" + ] +)