Skip to content

Commit

Permalink
stats logger
Browse files Browse the repository at this point in the history
  • Loading branch information
gilles committed May 24, 2012
1 parent 7ddadca commit 88d6e1c
Show file tree
Hide file tree
Showing 2 changed files with 251 additions and 0 deletions.
117 changes: 117 additions & 0 deletions pycassa/logging/pool_stats_logger.py
@@ -0,0 +1,117 @@
import pycassa_logger
import logging
import threading
import functools

def sync(lock_name):
def wrapper(f):
@functools.wraps(f)
def wrapped(self, *args, **kwargs):
lock = getattr(self, lock_name)
try:
lock.acquire()
return f(self, *args, **kwargs)
finally:
lock.release()

return wrapped

return wrapper


class StatsLogger(object):
"""
Basic stats logger, just increment counts, you can then plot as COUNTER or DERIVED (RRD)
or apply derivative (graphite). Except for 'opened' which tracks the currently opened connections.
Usage:
pool = ConnectionPool(...)
stats_logger = StatsLogger()
pool.add_listener(stats_logger)
get your stats by calling stats_logger.stats and push them to your metrics system
"""

def __init__(self):
self._stats = {
'created': {
'success': 0,
'failure': 0,
},
'checked_out': 0,
'checked_in': 0,
'opened': {
'current': 0,
'max': 0
},
'disposed': {
'success': 0,
'failure': 0
},
'recycled': 0,
'failed': 0,
'list': 0,
'at_max': 0
}

#some callbacks are already locked by pool_lock, it's just simpler to have a global here for all operations
self.lock = threading.Lock()

def name_changed(self, new_logger):
self.logger = new_logger

@sync('lock')
def connection_created(self, dic):
level = pycassa_logger.levels[dic.get('level', 'info')]
if level <= logging.INFO:
self._stats['created']['success'] += 1
else:
self._stats['created']['failure'] += 1

@sync('lock')
def connection_checked_out(self, dic):
self._stats['checked_out'] += 1
self._update_opened(1)

@sync('lock')
def connection_checked_in(self, dic):
self._stats['checked_in'] += 1
self._update_opened(-1)

def _update_opened(self, value):
self._stats['opened']['current'] += value
if self._stats['opened']['current'] > self._stats['opened']['max']:
self._stats['opened']['max'] = self._stats['opened']['current']

@sync('lock')
def connection_disposed(self, dic):
level = pycassa_logger.levels[dic.get('level', 'info')]
if level <= logging.INFO:
self._stats['disposed']['success'] += 1
else:
self._stats['disposed']['failure'] += 1

@sync('lock')
def connection_recycled(self, dic):
self._stats['recycled'] += 1

@sync('lock')
def connection_failed(self, dic):
self._stats['failed'] += 1

@sync('lock')
def obtained_server_list(self, dic):
self._stats['list'] += 1

@sync('lock')
def pool_disposed(self, dic):
pass

@sync('lock')
def pool_at_max(self, dic):
self._stats['at_max'] += 1

@property
def stats(self):
return self._stats
134 changes: 134 additions & 0 deletions tests/test_pool_logger.py
@@ -0,0 +1,134 @@
from unittest import TestCase
from nose.tools import assert_equal
from numpy.testing.utils import assert_raises

from pycassa.logging.pool_stats_logger import StatsLogger
from pycassa.pool import ConnectionPool, NoConnectionAvailable, InvalidRequestError

__author__ = 'gilles'

_credentials = {'username': 'jsmith', 'password': 'havebadpass'}

class TestStatsLogger(TestCase):
def __init__(self, methodName='runTest'):
super(TestStatsLogger, self).__init__(methodName)

def setUp(self):
super(TestStatsLogger, self).setUp()
self.logger = StatsLogger()

def test_empty(self):
assert_equal(self.logger.stats, self.logger._stats)

def test_connection_created(self):
self.logger.connection_created({'level': 'info'})
self.logger.connection_created({'level': 'error'})

stats = self.logger.stats
assert_equal(stats['created']['success'], 1)
assert_equal(stats['created']['failure'], 1)

def test_connection_checked(self):
self.logger.connection_checked_out({})
self.logger.connection_checked_out({})
self.logger.connection_checked_in({})
stats = self.logger.stats
assert_equal(stats['checked_out'], 2)
assert_equal(stats['checked_in'], 1)
assert_equal(stats['opened'], {'current': 1, 'max': 2})

def test_connection_disposed(self):
self.logger.connection_disposed({'level': 'info'})
self.logger.connection_disposed({'level': 'error'})

stats = self.logger.stats
assert_equal(stats['disposed']['success'], 1)
assert_equal(stats['disposed']['failure'], 1)

def test_connection_recycled(self):
self.logger.connection_recycled({})
stats = self.logger.stats
assert_equal(stats['recycled'], 1)

def test_connection_failed(self):
self.logger.connection_failed({})
stats = self.logger.stats
assert_equal(stats['failed'], 1)

def test_obtained_server_list(self):
self.logger.obtained_server_list({})
stats = self.logger.stats
assert_equal(stats['list'], 1)

def test_pool_at_max(self):
self.logger.pool_at_max({})
stats = self.logger.stats
assert_equal(stats['at_max'], 1)


class TestInPool(TestCase):
def __init__(self, methodName='runTest'):
super(TestInPool, self).__init__(methodName)

def test_pool(self):
listener = StatsLogger()
pool = ConnectionPool(pool_size=5, max_overflow=5, recycle=10000,
prefill=True, pool_timeout=0.1, timeout=1,
keyspace='PycassaTestKeyspace', credentials=_credentials,
listeners=[listener], use_threadlocal=False)
conns = []
for i in range(10):
conns.append(pool.get())
assert_equal(listener.stats['created']['success'], 10)
assert_equal(listener.stats['created']['failure'], 0)
assert_equal(listener.stats['checked_out'], 10)
assert_equal(listener.stats['opened'], {'current': 10, 'max': 10})

# Pool is maxed out now
assert_raises(NoConnectionAvailable, pool.get)
assert_equal(listener.stats['created']['success'], 10)
assert_equal(listener.stats['checked_out'], 10)
assert_equal(listener.stats['opened'], {'current': 10, 'max': 10})
assert_equal(listener.stats['at_max'], 1)

for i in range(0, 5):
pool.return_conn(conns[i])
assert_equal(listener.stats['disposed']['success'], 0)
assert_equal(listener.stats['checked_in'], 5)
assert_equal(listener.stats['opened'], {'current': 5, 'max': 10})

for i in range(5, 10):
pool.return_conn(conns[i])
assert_equal(listener.stats['disposed']['success'], 5)
assert_equal(listener.stats['checked_in'], 10)

conns = []

# These connections should come from the pool
for i in range(5):
conns.append(pool.get())
assert_equal(listener.stats['created']['success'], 10)
assert_equal(listener.stats['checked_out'], 15)

# But these will need to be made
for i in range(5):
conns.append(pool.get())
assert_equal(listener.stats['created']['success'], 15)
assert_equal(listener.stats['checked_out'], 20)

assert_equal(listener.stats['disposed']['success'], 5)
for i in range(10):
conns[i].return_to_pool()
assert_equal(listener.stats['checked_in'], 20)
assert_equal(listener.stats['disposed']['success'], 10)

assert_raises(InvalidRequestError, conns[0].return_to_pool)
assert_equal(listener.stats['checked_in'], 20)
assert_equal(listener.stats['disposed']['success'], 10)

print "in test:", id(conns[-1])
assert_raises(InvalidRequestError, conns[-1].return_to_pool)
assert_equal(listener.stats['checked_in'], 20)
assert_equal(listener.stats['disposed']['success'], 10)

pool.dispose()

0 comments on commit 88d6e1c

Please sign in to comment.