Skip to content

Commit

Permalink
Move count updates into a locking key to prevent more race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
dcramer committed May 9, 2012
1 parent 27acf81 commit af97d0f
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 80 deletions.
106 changes: 60 additions & 46 deletions sentry/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

from django.core.signals import request_finished
from django.db import models, transaction, IntegrityError
from django.db.models import Sum, F
from django.db.models import Sum
from django.db.models.expressions import F, ExpressionNode
from django.db.models.signals import post_save, post_delete, post_init, class_prepared
from django.utils.encoding import force_unicode, smart_str

Expand All @@ -27,7 +28,7 @@
from sentry.tasks.index import index_event
from sentry.utils.cache import cache, Lock
from sentry.utils.dates import utc_to_local, get_sql_date_trunc
from sentry.utils.db import get_db_engine, has_charts
from sentry.utils.db import get_db_engine, has_charts, resolve_expression_node
from sentry.utils.queue import maybe_delay

logger = logging.getLogger('sentry.errors')
Expand All @@ -51,30 +52,6 @@ class BaseManager(models.Manager):
'iexact': lambda x: x.upper(),
}

def get_or_create(self, **kwargs):
"""
A modified version of Django's get_or_create which will create a distributed
lock (using the cache backend) whenever it hits the create clause.
"""
defaults = kwargs.pop('defaults', {})

# before locking attempt to fetch the instance
try:
return self.get(**kwargs), False
except self.model.DoesNotExist:
pass
lock_key = self._make_key('lock', kwargs)

# instance not found, lets grab a lock and attempt to create it
with Lock(lock_key):
# its important we get() before create() to ensure that if
# someone beat us to creating it from the time we did our very
# first .get(), that we get the result back as we cannot
# rely on unique constraints existing
instance, created = super(BaseManager, self).get_or_create(defaults=defaults, **kwargs)

return instance, created

def __init__(self, *args, **kwargs):
self.cache_fields = kwargs.pop('cache_fields', [])
self.cache_ttl = kwargs.pop('cache_ttl', 60 * 5)
Expand Down Expand Up @@ -214,6 +191,56 @@ def get_from_cache(self, **kwargs):

return retval

def get_or_create(self, **kwargs):
"""
A modified version of Django's get_or_create which will create a distributed
lock (using the cache backend) whenever it hits the create clause.
"""
defaults = kwargs.pop('defaults', {})

# before locking attempt to fetch the instance
try:
return self.get(**kwargs), False
except self.model.DoesNotExist:
pass
lock_key = self._make_key('lock', kwargs)

# instance not found, lets grab a lock and attempt to create it
with Lock(lock_key):
# its important we get() before create() to ensure that if
# someone beat us to creating it from the time we did our very
# first .get(), that we get the result back as we cannot
# rely on unique constraints existing
instance, created = super(BaseManager, self).get_or_create(defaults=defaults, **kwargs)

return instance, created

def create_or_update(self, **kwargs):
"""
Similar to get_or_create, either updates a row or creates it.
The result will be (rows affected, False), if the row was not created,
or (instance, True) if the object is new.
"""
defaults = kwargs.pop('defaults', {})

# before locking attempt to fetch the instance
affected = self.filter(**kwargs).update(**defaults)
if affected:
return affected, False
lock_key = self._make_key('lock', kwargs)

# instance not found, lets grab a lock and attempt to create it
with Lock(lock_key) as lock:
if lock.was_locked:
affected = self.filter(**kwargs).update(**defaults)
return affected, False

for k, v in defaults.iteritems():
if isinstance(v, ExpressionNode):
kwargs[k] = resolve_expression_node(self.model(), v)
return super(BaseManager, self).create(**kwargs), True


class ScoreClause(object):
def __init__(self, group):
Expand Down Expand Up @@ -462,7 +489,7 @@ def _create_group(self, event, **kwargs):
))
for g in groups[1:]:
g.delete()
group = groups[0]
group, is_new = groups[0], False

if not is_new:
if group.status == STATUS_RESOLVED:
Expand Down Expand Up @@ -508,29 +535,16 @@ def _create_group(self, event, **kwargs):
'time_spent_count': F('time_spent_count') + 1,
})

affected = group.messagecountbyminute_set.filter(
group.messagecountbyminute_set.create_or_update(
project=project,
date=normalized_datetime,
).update(**update_kwargs)
if not affected:
group.messagecountbyminute_set.create(
project=project,
date=normalized_datetime,
times_seen=1,
time_spent_total=time_spent or 0,
time_spent_count=time_spent and 1 or 0,
)
defaults=update_kwargs
)

affected = project.projectcountbyminute_set.filter(
project.projectcountbyminute_set.create_or_update(
date=normalized_datetime,
).update(**update_kwargs)
if not affected:
project.projectcountbyminute_set.create(
date=normalized_datetime,
times_seen=1,
time_spent_total=time_spent or 0,
time_spent_count=time_spent and 1 or 0,
)
defaults=update_kwargs
)

http = event.interfaces.get('sentry.interfaces.Http')
if http:
Expand Down
2 changes: 2 additions & 0 deletions sentry/utils/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ def __enter__(self):
attempt = 0
max_attempts = self.timeout / delay
got_lock = None
self.was_locked = False
while not got_lock and attempt < max_attempts:
got_lock = cache.add(lock_key, '', self.timeout)
if not got_lock:
self.was_locked = True
time.sleep(delay)
attempt += 1

Expand Down
34 changes: 34 additions & 0 deletions sentry/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
"""
import django
import logging
import operator


from django.conf import settings as django_settings
from django.db.models.expressions import ExpressionNode, F


class InstanceManager(object):
Expand Down Expand Up @@ -86,3 +89,34 @@ def has_charts(db):
if engine.startswith('sqlite'):
return False
return True

EXPRESSION_NODE_CALLBACKS = {
ExpressionNode.ADD: operator.add,
ExpressionNode.SUB: operator.sub,
ExpressionNode.MUL: operator.mul,
ExpressionNode.DIV: operator.div,
ExpressionNode.MOD: operator.mod,
ExpressionNode.AND: operator.and_,
ExpressionNode.OR: operator.or_,
}


class CannotResolve(Exception):
pass


def resolve_expression_node(instance, node):
def _resolve(instance, node):
if isinstance(node, F):
return getattr(instance, node.name)
elif isinstance(node, ExpressionNode):
return resolve_expression_node(instance, node)
return node

op = EXPRESSION_NODE_CALLBACKS.get(node.connector, None)
if not op:
raise CannotResolve
runner = _resolve(instance, node.children[0])
for n in node.children[1:]:
runner = op(runner, _resolve(instance, n))
return runner
36 changes: 2 additions & 34 deletions sentry/utils/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,48 +8,16 @@

import base64
import logging
import operator

from django.db import models, router
from django.db.models import signals
from django.db.models.expressions import ExpressionNode, F
from django.db.models.expressions import ExpressionNode

from sentry.utils import cache
from sentry.utils.compat import pickle
from sentry.utils.db import resolve_expression_node

logger = logging.getLogger(__name__)

EXPRESSION_NODE_CALLBACKS = {
ExpressionNode.ADD: operator.add,
ExpressionNode.SUB: operator.sub,
ExpressionNode.MUL: operator.mul,
ExpressionNode.DIV: operator.div,
ExpressionNode.MOD: operator.mod,
ExpressionNode.AND: operator.and_,
ExpressionNode.OR: operator.or_,
}


class CannotResolve(Exception):
pass


def resolve_expression_node(instance, node):
def _resolve(instance, node):
if isinstance(node, F):
return getattr(instance, node.name)
elif isinstance(node, ExpressionNode):
return resolve_expression_node(instance, node)
return node

op = EXPRESSION_NODE_CALLBACKS.get(node.connector, None)
if not op:
raise CannotResolve
runner = _resolve(instance, node.children[0])
for n in node.children[1:]:
runner = op(runner, _resolve(instance, n))
return runner


def update(self, using=None, **kwargs):
"""
Expand Down

0 comments on commit af97d0f

Please sign in to comment.