Skip to content

Commit

Permalink
Add db.atomic decorator with test
Browse files Browse the repository at this point in the history
  • Loading branch information
epandurski committed Feb 9, 2019
1 parent 290b882 commit 630b24e
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 16 deletions.
64 changes: 48 additions & 16 deletions flask_signalbus/atomic.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,56 @@ class model(_ModelUtilitiesMixin, model):
declarative_base._flask_signalbus_sa = self
return declarative_base

def atomic(self, func):
"""A decorator that wraps a function in an atomic block.
Example::
@atomic
def f():
write_to_db('a message')
return 'OK'
assert f() == 'OK'
This code defines the function `f`, which is wrapped in an
atomic block. Wrapping a function in an atomic block gives us
two guarantees:
1. The database transaction will be automatically comited if the
function returns normally, and automatically rolled back if the
function raises exception.
2. If a transaction serialization error occurs during the
execution of the function, the function will re-executed.
(This may happen several times.)
"""

@wraps(func)
def wrapper(*args, **kwargs):
session = self.session
session_info = session.info
assert not session_info.get(_ATOMIC_FLAG_SESSION_INFO_KEY), \
'"atomic" blocks can not be nested'
f = retry_on_deadlock(session)(func)
session_info[_ATOMIC_FLAG_SESSION_INFO_KEY] = True
try:
result = f(*args, **kwargs)
session.commit()
return result
except Exception:
session.rollback()
raise
finally:
session_info[_ATOMIC_FLAG_SESSION_INFO_KEY] = False

return wrapper

def execute_atomic(self, __func__, *args, **kwargs):
"""A decorator that executes a function in an atomic block.
For example::
Example::
@execute_atomic
def result():
Expand Down Expand Up @@ -116,21 +162,7 @@ def result():
"""

session = self.session
session_info = session.info
assert not session_info.get(_ATOMIC_FLAG_SESSION_INFO_KEY), \
'"execute_atomic" calls can not be nested'
func = retry_on_deadlock(session)(__func__)
session_info[_ATOMIC_FLAG_SESSION_INFO_KEY] = True
try:
result = func(*args, **kwargs)
session.commit()
return result
except Exception:
session.rollback()
raise
finally:
session_info[_ATOMIC_FLAG_SESSION_INFO_KEY] = False
return self.atomic(__func__)(*args, **kwargs)

def modification(self, func):
"""Raise assertion error if `func` is called outside of atomic block.
Expand Down
20 changes: 20 additions & 0 deletions tests/test_atomic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,26 @@
from flask_signalbus.utils import DBSerializationError


def test_atomic(atomic_db):
db = atomic_db
commit = Mock()
rollback = Mock()
db.session.commit = commit
db.session.rollback = rollback

@db.atomic
def f(x):
return 1 / x

with pytest.raises(ZeroDivisionError):
f(0)
commit.assert_not_called()
rollback.assert_called_once()
assert f(1) == 1
commit.assert_called_once()
rollback.assert_called_once()


def test_execute_atomic(atomic_db):
db = atomic_db
commit = Mock()
Expand Down

0 comments on commit 630b24e

Please sign in to comment.