Permalink
Browse files

Initial work on buffers and base sentry app

  • Loading branch information...
1 parent 7d4c727 commit 30a68e1335da76893e0d02906cbb5b92c96ac1a8 @dcramer dcramer committed May 10, 2012
View
@@ -0,0 +1,17 @@
+"""
+sentry.app
+~~~~~~~~~~
+
+:copyright: (c) 2010-2012 by the Sentry Team, see AUTHORS for more details.
+:license: BSD, see LICENSE for more details.
+"""
+
+from sentry.conf import settings
+from sentry.utils.imports import import_string
+
+
+def get_buffer(path, options):
+ cls = import_string(path)
+ return cls(**options)
+
+buffer = get_buffer(settings.BUFFER, settings.BUFFER_OPTIONS)
@@ -0,0 +1,9 @@
+"""
+sentry.buffer
+~~~~~~~~~~~~~
+
+:copyright: (c) 2010-2012 by the Sentry Team, see AUTHORS for more details.
+:license: BSD, see LICENSE for more details.
+"""
+
+from .base import Buffer
View
@@ -0,0 +1,37 @@
+"""
+sentry.buffer.base
+~~~~~~~~~~~~~~~~~~
+
+:copyright: (c) 2010-2012 by the Sentry Team, see AUTHORS for more details.
+:license: BSD, see LICENSE for more details.
+"""
+
+from django.db.models import F
+from sentry.utils.queue import maybe_delay
+from sentry.tasks.process_buffer import process_incr
+
+
+class Buffer(object):
+ """
+ Buffers act as temporary stores for counters. The default implementation is just a passthru and
+ does not actually buffer anything.
+
+ A useful example might be a Redis buffer. Each time an event gets updated, we send several
+ add events which just store a key and increment its value. Additionally they fire off a task
+ to the queue. That task eventually runs and gets the current update value. If the value is
+ empty, it does nothing, otherwise it updates the row in the database.
+
+ This is useful in situations where a single event might be happening so fast that the queue cant
+ keep up with the updates.
+ """
+ def __init__(self, **options):
+ pass
+
+ def incr(self, model, columns, filters):
+ """
+ >>> incr(Group, columns={'times_seen': 1}, filters={'pk': group.pk})
+ """
+ maybe_delay(process_incr, model=model, columns=columns, filters=filters)
+
+ def process(self, model, columns, filters):
+ model.objects.filter(**filters).update(**dict((c, F(c) + v) for c, v in columns.iteritems()))
View
@@ -0,0 +1,58 @@
+"""
+sentry.buffer.redis
+~~~~~~~~~~~~~~~~~~~
+
+:copyright: (c) 2010-2012 by the Sentry Team, see AUTHORS for more details.
+:license: BSD, see LICENSE for more details.
+"""
+
+from django.db import models
+from hashlib import md5
+from nydus.db import create_cluster
+from sentry.buffer import Buffer
+
+
+class RedisBuffer(Buffer):
+ def __init__(self, hosts=None, router=None, **options):
+ super(RedisBuffer, self).__init__(**options)
+ if hosts is None:
+ hosts = {
+ 0: {} # localhost / default
+ }
+ self.conn = create_cluster({
+ 'engine': 'nydus.db.backends.redis.Redis',
+ 'router': router,
+ 'hosts': hosts,
+ })
+
+ def _map_column(self, model, column, value):
+ if isinstance(value, models.Model):
+ value = value.pk
+ else:
+ value = unicode(value)
+ return value
+
+ def _make_key(self, model, filters, column):
+ """
+ Returns a Redis-compatible key for the model given filters.
+ """
+ return '%s:%s:%s' % (model._meta,
+ md5('&'.join('%s=%s' % (k, self._map_column(model, k, v)) for k, v in sorted(filters.iteritems()))).hexdigest(),
+ column)
+
+ def incr(self, model, columns, filters):
+ with self.conn.map() as conn:
+ for column, amount in columns.iteritems():
+ conn.incr(self._make_key(model, filters, column), amount)
+ super(RedisBuffer, self).incr(model, columns, filters)
+
+ def process(self, model, columns, filters):
+ results = {}
+ with self.conn.map() as conn:
+ for column, amount in columns.iteritems():
+ results[column] = conn.getset(self._make_key(model, filters, column), 0)
+
+ results = dict((k, int(v)) for k, v in results.iteritems() if int(v or 0) > 0)
+ if not results:
+ return
+ super(RedisBuffer, self).process(model, results, filters)
View
@@ -157,4 +157,8 @@
CACHE_BACKEND = 'dummy://'
# The maximum number of events which can be requested as JSON
-MAX_JSON_RESULTS=1000
+MAX_JSON_RESULTS = 1000
+
+# Buffer backend to use
+BUFFER = 'sentry.buffer.Buffer'
+BUFFER_OPTIONS = {}
View
@@ -20,11 +20,12 @@
from sentry.conf import settings
from sentry.exceptions import InvalidInterface, InvalidData, InvalidTimestamp
-from sentry.models import Group, Project, ProjectKey, TeamMember, Team
+from sentry.models import Project, ProjectKey, TeamMember, Team
from sentry.plugins import plugins
from sentry.tasks.store import store_event
from sentry.utils import is_float, json
from sentry.utils.auth import get_signature, parse_auth_header
+from sentry.utils.imports import import_string
from sentry.utils.queue import maybe_delay
logger = logging.getLogger('sentry.errors.coreapi')
@@ -306,7 +307,7 @@ def validate_data(project, data, client=None):
raise InvalidInterface('%r is not a valid interface name' % k)
try:
- interface = Group.objects.module_cache[k]
+ interface = import_string(k)
except (ImportError, AttributeError), e:
raise InvalidInterface('%r is not a valid interface name: %s' % (k, e))
View
@@ -283,17 +283,6 @@ def time_limit(silence): # ~ 3600 per hour
return settings.MAX_SAMPLE_TIME
-class ModuleProxyCache(dict):
- def __missing__(self, key):
- module, class_name = key.rsplit('.', 1)
-
- handler = getattr(__import__(module, {}, {}, [class_name], -1), class_name)
-
- self[key] = handler
-
- return handler
-
-
class ChartMixin(object):
def get_chart_data(self, instance, max_days=90):
if hasattr(instance, '_state'):
@@ -332,10 +321,6 @@ def get_chart_data(self, instance, max_days=90):
class GroupManager(BaseManager, ChartMixin):
use_for_related_fields = True
- def __init__(self, *args, **kwargs):
- super(GroupManager, self).__init__(*args, **kwargs)
- self.module_cache = ModuleProxyCache()
-
@transaction.commit_on_success
def from_kwargs(self, project, **kwargs):
# TODO: this function is way too damn long and needs refactored
@@ -0,0 +1,19 @@
+"""
+sentry.tasks.process_buffer
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+:copyright: (c) 2010-2012 by the Sentry Team, see AUTHORS for more details.
+:license: BSD, see LICENSE for more details.
+"""
+
+from celery.task import task
+
+
+@task(ignore_result=True)
+def process_incr(**kwargs):
+ """
+ Processes a buffer event.
+ """
+ from sentry.app import buffer
+
+ buffer.process(**kwargs)
View
@@ -0,0 +1,29 @@
+"""
+sentry.utils.imports
+~~~~~~~~~~~~~~~~~~~~
+
+:copyright: (c) 2010-2012 by the Sentry Team, see AUTHORS for more details.
+:license: BSD, see LICENSE for more details.
+"""
+
+
+class ModuleProxyCache(dict):
+ def __missing__(self, key):
+ module, class_name = key.rsplit('.', 1)
+
+ handler = getattr(__import__(module, {}, {}, [class_name], -1), class_name)
+
+ self[key] = handler
+
+ return handler
+
+_cache = ModuleProxyCache()
+
+
+def import_string(path):
+ """
+ Path must be module.path.ClassName
+
+ >>> cls = import_string('sentry.models.Group')
+ """
+ return _cache[path]
View
@@ -34,12 +34,15 @@
pass
tests_require = [
- 'nose==1.1.2',
'django-nose==0.1.3',
- 'mock==0.8.0',
'eventlet==0.9.16',
+ 'nose==1.1.2',
+ 'nydus==0.8.1',
+ 'mock==0.8.0',
+ 'redis',
]
+
install_requires = [
'cssutils>=0.9.9',
'BeautifulSoup>=3.2.1',
View
@@ -2,10 +2,12 @@ class BrokenRequestMiddleware(object):
def process_request(self, request):
raise ImportError('request')
+
class BrokenResponseMiddleware(object):
def process_response(self, request, response):
raise ImportError('response')
+
class BrokenViewMiddleware(object):
def process_view(self, request, func, args, kwargs):
- raise ImportError('view')
+ raise ImportError('view')
No changes.
View
@@ -0,0 +1,12 @@
+# -*- coding: utf-8 -*-
+
+from __future__ import absolute_import
+
+from sentry import app
+from tests.base import TestCase
+
+
+class AppTest(TestCase):
+ def test_buffer_is_a_buffer(self):
+ from sentry.buffer.base import Buffer
+ self.assertEquals(type(app.buffer), Buffer)
No changes.
No changes.
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+
+from __future__ import absolute_import
+
+import mock
+
+from sentry.buffer.base import Buffer
+from sentry.models import Group, Project
+from sentry.tasks.process_buffer import process_incr
+from tests.base import TestCase
+
+
+class BufferTest(TestCase):
+ def setUp(self):
+ self.buf = Buffer()
+
+ @mock.patch('sentry.buffer.base.maybe_delay')
+ def test_incr_delays_task(self, maybe_delay):
+ model = mock.Mock()
+ columns = {'times_seen': 1}
+ filters = {'pk': 1}
+ self.buf.incr(model, columns, filters)
+ maybe_delay.assert_called_once_with(process_incr, model=model, columns=columns, filters=filters)
+
+ def test_process_saves_data(self):
+ group = Group.objects.create(project=Project(id=1))
+ columns = {'times_seen': 1}
+ filters = {'pk': group.pk}
+ self.buf.process(Group, columns, filters)
+ self.assertEquals(Group.objects.get(pk=group.pk).times_seen, group.times_seen + 1)
No changes.
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+
+from __future__ import absolute_import
+
+import mock
+
+from sentry.buffer.redis import RedisBuffer
+from sentry.models import Group, Project
+from sentry.tasks.process_buffer import process_incr
+from tests.base import TestCase
+
+
+class RedisBufferTest(TestCase):
+ def setUp(self):
+ self.buf = RedisBuffer(hosts={
+ 0: {'db': 9}
+ })
+ self.buf.conn.flushdb()
+
+ def test_map_column_handles_foreignkeys(self):
+ self.assertEquals(self.buf._map_column(Group, 'project', Project(id=1)), 1)
+
+ def test_make_key_response(self):
+ column = 'times_seen'
+ filters = {'pk': 1}
+ self.assertEquals(self.buf._make_key(Group, filters, column), 'sentry.group:88b48b31b5f100719c64316596b10b0f:times_seen')
+
+ @mock.patch('sentry.buffer.redis.RedisBuffer._make_key', mock.Mock(return_value='foo'))
+ @mock.patch('sentry.buffer.base.maybe_delay')
+ def test_incr_delays_task(self, maybe_delay):
+ model = mock.Mock()
+ columns = {'times_seen': 1}
+ filters = {'pk': 1}
+ self.buf.incr(model, columns, filters)
+ maybe_delay.assert_called_once_with(process_incr, model=model, columns=columns, filters=filters)
+
+ @mock.patch('sentry.buffer.redis.RedisBuffer._make_key', mock.Mock(return_value='foo'))
+ @mock.patch('sentry.buffer.base.maybe_delay', mock.Mock())
+ def test_incr_does_buffer_to_conn(self):
+ model = mock.Mock()
+ columns = {'times_seen': 1}
+ filters = {'pk': 1}
+ self.buf.incr(model, columns, filters)
+ self.assertEquals(self.buf.conn.get('foo'), '1')
+
+ @mock.patch('sentry.buffer.redis.RedisBuffer._make_key', mock.Mock(return_value='foo'))
+ @mock.patch('sentry.buffer.base.Buffer.process')
+ def test_process_does_not_save_empty_results(self, process):
+ group = Group.objects.create(project=Project(id=1))
+ columns = {'times_seen': 1}
+ filters = {'pk': group.pk}
+ self.buf.process(Group, columns, filters)
+ self.assertFalse(process.called)
+
+ @mock.patch('sentry.buffer.redis.RedisBuffer._make_key', mock.Mock(return_value='foo'))
+ @mock.patch('sentry.buffer.base.Buffer.process')
+ def test_process_does_save_call_with_results(self, process):
+ group = Group.objects.create(project=Project(id=1))
+ columns = {'times_seen': 1}
+ filters = {'pk': group.pk}
+ self.buf.conn.set('foo', 2)
+ self.buf.process(Group, columns, filters)
+ process.assert_called_once_with(Group, {'times_seen': 2}, filters)
+
+ @mock.patch('sentry.buffer.redis.RedisBuffer._make_key', mock.Mock(return_value='foo'))
+ @mock.patch('sentry.buffer.base.Buffer.process')
+ def test_process_does_clear_buffer(self, process):
+ group = Group.objects.create(project=Project(id=1))
+ columns = {'times_seen': 1}
+ filters = {'pk': group.pk}
+ self.buf.conn.set('foo', 2)
+ self.buf.process(Group, columns, filters)
+ self.assertEquals(self.buf.conn.get('foo'), '0')

0 comments on commit 30a68e1

Please sign in to comment.