-
Notifications
You must be signed in to change notification settings - Fork 943
/
test_uow.py
132 lines (109 loc) · 4.46 KB
/
test_uow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# pylint: disable=broad-except, too-many-arguments
import threading
import time
import traceback
from typing import List
from unittest.mock import Mock
import pytest
from allocation.domain import model
from allocation.service_layer import unit_of_work
from ..random_refs import random_sku, random_batchref, random_orderid
pytestmark = pytest.mark.usefixtures("mappers")
def insert_batch(session, ref, sku, qty, eta, product_version=1):
session.execute(
"INSERT INTO products (sku, version_number) VALUES (:sku, :version)",
dict(sku=sku, version=product_version),
)
session.execute(
"INSERT INTO batches (reference, sku, _purchased_quantity, eta)"
" VALUES (:ref, :sku, :qty, :eta)",
dict(ref=ref, sku=sku, qty=qty, eta=eta),
)
def get_allocated_batch_ref(session, orderid, sku):
[[orderlineid]] = session.execute(
"SELECT id FROM order_lines WHERE orderid=:orderid AND sku=:sku",
dict(orderid=orderid, sku=sku),
)
[[batchref]] = session.execute(
"SELECT b.reference FROM allocations JOIN batches AS b ON batch_id = b.id"
" WHERE orderline_id=:orderlineid",
dict(orderlineid=orderlineid),
)
return batchref
def test_uow_can_retrieve_a_batch_and_allocate_to_it(sqlite_session_factory):
session = sqlite_session_factory()
insert_batch(session, "batch1", "HIPSTER-WORKBENCH", 100, None)
session.commit()
uow = unit_of_work.SqlAlchemyUnitOfWork(sqlite_session_factory)
with uow:
product = uow.products.get(sku="HIPSTER-WORKBENCH")
line = model.OrderLine("o1", "HIPSTER-WORKBENCH", 10)
product.allocate(line)
uow.commit()
batchref = get_allocated_batch_ref(session, "o1", "HIPSTER-WORKBENCH")
assert batchref == "batch1"
def test_rolls_back_uncommitted_work_by_default(sqlite_session_factory):
uow = unit_of_work.SqlAlchemyUnitOfWork(sqlite_session_factory)
with uow:
insert_batch(uow.session, "batch1", "MEDIUM-PLINTH", 100, None)
new_session = sqlite_session_factory()
rows = list(new_session.execute('SELECT * FROM "batches"'))
assert rows == []
def test_rolls_back_on_error(sqlite_session_factory):
class MyException(Exception):
pass
uow = unit_of_work.SqlAlchemyUnitOfWork(sqlite_session_factory)
with pytest.raises(MyException):
with uow:
insert_batch(uow.session, "batch1", "LARGE-FORK", 100, None)
raise MyException()
new_session = sqlite_session_factory()
rows = list(new_session.execute('SELECT * FROM "batches"'))
assert rows == []
def try_to_allocate(orderid, sku, exceptions, session_factory):
line = model.OrderLine(orderid, sku, 10)
try:
with unit_of_work.SqlAlchemyUnitOfWork(session_factory) as uow:
product = uow.products.get(sku=sku)
product.allocate(line)
time.sleep(0.2)
uow.commit()
except Exception as e: # pylint: disable=broad-except
print(traceback.format_exc())
exceptions.append(e)
def test_concurrent_updates_to_version_are_not_allowed(postgres_session_factory):
sku, batch = random_sku(), random_batchref()
session = postgres_session_factory()
insert_batch(session, batch, sku, 100, eta=None, product_version=1)
session.commit()
order1, order2 = random_orderid(1), random_orderid(2)
exceptions = [] # type: List[Exception]
try_to_allocate_order1 = lambda: try_to_allocate(
order1, sku, exceptions, postgres_session_factory
)
try_to_allocate_order2 = lambda: try_to_allocate(
order2, sku, exceptions, postgres_session_factory
)
thread1 = threading.Thread(target=try_to_allocate_order1)
thread2 = threading.Thread(target=try_to_allocate_order2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
[[version]] = session.execute(
"SELECT version_number FROM products WHERE sku=:sku",
dict(sku=sku),
)
assert version == 2
[exception] = exceptions
assert "could not serialize access due to concurrent update" in str(exception)
orders = session.execute(
"SELECT orderid FROM allocations"
" JOIN batches ON allocations.batch_id = batches.id"
" JOIN order_lines ON allocations.orderline_id = order_lines.id"
" WHERE order_lines.sku=:sku",
dict(sku=sku),
)
assert orders.rowcount == 1
with unit_of_work.SqlAlchemyUnitOfWork(postgres_session_factory) as uow:
uow.session.execute("select 1")