forked from MagicStack/asyncpg
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_transaction.py
141 lines (101 loc) · 4.54 KB
/
test_transaction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Copyright (C) 2016-present the ayncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
import asyncpg
from asyncpg import _testbase as tb
class TestTransaction(tb.ConnectedTestCase):
async def test_transaction_regular(self):
self.assertIsNone(self.con._top_xact)
tr = self.con.transaction()
self.assertIsNone(self.con._top_xact)
with self.assertRaises(ZeroDivisionError):
async with tr as with_tr:
self.assertIs(self.con._top_xact, tr)
# We don't return the transaction object from __aenter__,
# to make it harder for people to use '.rollback()' and
# '.commit()' from within an 'async with' block.
self.assertIsNone(with_tr)
await self.con.execute('''
CREATE TABLE mytab (a int);
''')
1 / 0
self.assertIsNone(self.con._top_xact)
with self.assertRaisesRegex(asyncpg.PostgresError,
'"mytab" does not exist'):
await self.con.prepare('''
SELECT * FROM mytab
''')
async def test_transaction_nested(self):
self.assertIsNone(self.con._top_xact)
tr = self.con.transaction()
self.assertIsNone(self.con._top_xact)
with self.assertRaises(ZeroDivisionError):
async with tr:
self.assertIs(self.con._top_xact, tr)
await self.con.execute('''
CREATE TABLE mytab (a int);
''')
async with self.con.transaction():
self.assertIs(self.con._top_xact, tr)
await self.con.execute('''
INSERT INTO mytab (a) VALUES (1), (2);
''')
self.assertIs(self.con._top_xact, tr)
with self.assertRaises(ZeroDivisionError):
in_tr = self.con.transaction()
async with in_tr:
self.assertIs(self.con._top_xact, tr)
await self.con.execute('''
INSERT INTO mytab (a) VALUES (3), (4);
''')
1 / 0
st = await self.con.prepare('SELECT * FROM mytab;')
recs = []
async for rec in st.cursor():
recs.append(rec)
self.assertEqual(len(recs), 2)
self.assertEqual(recs[0][0], 1)
self.assertEqual(recs[1][0], 2)
self.assertIs(self.con._top_xact, tr)
1 / 0
self.assertIs(self.con._top_xact, None)
with self.assertRaisesRegex(asyncpg.PostgresError,
'"mytab" does not exist'):
await self.con.prepare('''
SELECT * FROM mytab
''')
async def test_transaction_interface_errors(self):
self.assertIsNone(self.con._top_xact)
tr = self.con.transaction(readonly=True, isolation='serializable')
with self.assertRaisesRegex(asyncpg.InterfaceError,
'cannot start; .* already started'):
async with tr:
await tr.start()
self.assertTrue(repr(tr).startswith(
'<asyncpg.Transaction state:rolledback serializable readonly'))
self.assertIsNone(self.con._top_xact)
with self.assertRaisesRegex(asyncpg.InterfaceError,
'cannot start; .* already rolled back'):
async with tr:
pass
self.assertIsNone(self.con._top_xact)
tr = self.con.transaction()
with self.assertRaisesRegex(asyncpg.InterfaceError,
'cannot manually commit.*async with'):
async with tr:
await tr.commit()
self.assertIsNone(self.con._top_xact)
tr = self.con.transaction()
with self.assertRaisesRegex(asyncpg.InterfaceError,
'cannot manually rollback.*async with'):
async with tr:
await tr.rollback()
self.assertIsNone(self.con._top_xact)
tr = self.con.transaction()
with self.assertRaisesRegex(asyncpg.InterfaceError,
'cannot enter context:.*async with'):
async with tr:
async with tr:
pass