-
Notifications
You must be signed in to change notification settings - Fork 27
/
db_mutex.py
136 lines (112 loc) · 4.55 KB
/
db_mutex.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from datetime import timedelta
import functools
import logging
from django.conf import settings
from django.db import transaction, IntegrityError
from django.utils import timezone
from .exceptions import DBMutexError, DBMutexTimeoutError
from .models import DBMutex
LOG = logging.getLogger(__name__)
class db_mutex(object):
"""
An object that acts as a context manager and a function decorator for acquiring a
DB mutex lock.
"""
mutex_ttl_seconds_settings_key = 'DB_MUTEX_TTL_SECONDS'
def __init__(self, lock_id, suppress_acquisition_exceptions=False):
"""
This context manager/function decorator can be used in the following way
.. code-block:: python
from db_mutex.db_mutex import db_mutex
# Lock a critical section of code
try:
with db_mutex('lock_id'):
# Run critical code here
pass
except DBMutexError:
print('Could not obtain lock')
except DBMutexTimeoutError:
print('Task completed but the lock timed out')
# Lock a function
@db_mutex('lock_id'):
def critical_function():
# Critical code goes here
pass
try:
critical_function()
except DBMutexError:
print('Could not obtain lock')
except DBMutexTimeoutError:
print('Task completed but the lock timed out')
:type lock_id: str
:param lock_id: The ID of the lock one is trying to acquire
:type suppress_acquisition_exceptions: bool
:param suppress_acquisition_exceptions: Suppress exceptions when acquiring the lock and instead
log an error message. Note that this is only applicable when using this as a decorator and
not a context manager.
:raises:
* :class:`DBMutexError <db_mutex.exceptions.DBMutexError>` when the lock cannot be obtained
* :class:`DBMutexTimeoutError <db_mutex.exceptions.DBMutexTimeoutError>` when the
lock was deleted during execution
"""
self.lock_id = lock_id
self.lock = None
self.suppress_acquisition_exceptions = suppress_acquisition_exceptions
def get_mutex_ttl_seconds(self):
"""
Returns a TTL for mutex locks. It defaults to 30 minutes. If the user specifies None
as the TTL, locks never expire.
:rtype: int
:returns: the mutex's ttl in seconds
"""
return getattr(settings, self.mutex_ttl_seconds_settings_key, timedelta(minutes=30).total_seconds())
def delete_expired_locks(self):
"""
Deletes all expired mutex locks if a ttl is provided.
"""
ttl_seconds = self.get_mutex_ttl_seconds()
if ttl_seconds is not None:
DBMutex.objects.filter(creation_time__lte=timezone.now() - timedelta(seconds=ttl_seconds)).delete()
def __call__(self, func):
return self.decorate_callable(func)
def __enter__(self):
self.start()
def __exit__(self, *args):
self.stop()
def start(self):
"""
Acquires the db mutex lock. Takes the necessary steps to delete any stale locks.
Throws a DBMutexError if it can't acquire the lock.
"""
# Delete any expired locks first
self.delete_expired_locks()
try:
with transaction.atomic():
self.lock = DBMutex.objects.create(lock_id=self.lock_id)
except IntegrityError:
raise DBMutexError('Could not acquire lock: {0}'.format(self.lock_id))
def stop(self):
"""
Releases the db mutex lock. Throws an error if the lock was released before the function finished.
"""
if not DBMutex.objects.filter(id=self.lock.id).exists():
raise DBMutexTimeoutError('Lock {0} expired before function completed'.format(self.lock_id))
else:
self.lock.delete()
def decorate_callable(self, func):
"""
Decorates a function with the db_mutex decorator by using this class as a context manager around
it.
"""
def wrapper(*args, **kwargs):
try:
with self:
result = func(*args, **kwargs)
return result
except DBMutexError as e:
if self.suppress_acquisition_exceptions:
LOG.error(e)
else:
raise e
functools.update_wrapper(wrapper, func)
return wrapper