/
tests.py
389 lines (330 loc) · 15 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
from __future__ import unicode_literals
from unittest import skipIf, skipUnless, SkipTest
from django.db import (connection, connections, transaction, DEFAULT_DB_ALIAS, DatabaseError,
IntegrityError)
from django.db.transaction import commit_on_success, commit_manually, TransactionManagementError
from django.test import TransactionTestCase, skipUnlessDBFeature
from django.test.utils import override_settings, IgnoreDeprecationWarningsMixin
from .models import Mod, M2mA, M2mB, SubMod
class ModelInheritanceTests(TransactionTestCase):
available_apps = ['transactions_regress']
def test_save(self):
# First, create a SubMod, then try to save another with conflicting
# cnt field. The problem was that transactions were committed after
# every parent save when not in managed transaction. As the cnt
# conflict is in the second model, we can check if the first save
# was committed or not.
SubMod(fld=1, cnt=1).save()
# We should have committed the transaction for the above - assert this.
connection.rollback()
self.assertEqual(SubMod.objects.count(), 1)
try:
SubMod(fld=2, cnt=1).save()
except IntegrityError:
connection.rollback()
self.assertEqual(SubMod.objects.count(), 1)
self.assertEqual(Mod.objects.count(), 1)
class TestTransactionClosing(IgnoreDeprecationWarningsMixin, TransactionTestCase):
"""
Tests to make sure that transactions are properly closed
when they should be, and aren't left pending after operations
have been performed in them. Refs #9964.
"""
available_apps = [
'transactions_regress',
'django.contrib.auth',
'django.contrib.contenttypes',
]
def test_raw_committed_on_success(self):
"""
Make sure a transaction consisting of raw SQL execution gets
committed by the commit_on_success decorator.
"""
@commit_on_success
def raw_sql():
"Write a record using raw sql under a commit_on_success decorator"
cursor = connection.cursor()
cursor.execute("INSERT into transactions_regress_mod (fld) values (18)")
raw_sql()
# Rollback so that if the decorator didn't commit, the record is unwritten
transaction.rollback()
self.assertEqual(Mod.objects.count(), 1)
# Check that the record is in the DB
obj = Mod.objects.all()[0]
self.assertEqual(obj.fld, 18)
def test_commit_manually_enforced(self):
"""
Make sure that under commit_manually, even "read-only" transaction require closure
(commit or rollback), and a transaction left pending is treated as an error.
"""
@commit_manually
def non_comitter():
"Execute a managed transaction with read-only operations and fail to commit"
Mod.objects.count()
self.assertRaises(TransactionManagementError, non_comitter)
def test_commit_manually_commit_ok(self):
"""
Test that under commit_manually, a committed transaction is accepted by the transaction
management mechanisms
"""
@commit_manually
def committer():
"""
Perform a database query, then commit the transaction
"""
Mod.objects.count()
transaction.commit()
try:
committer()
except TransactionManagementError:
self.fail("Commit did not clear the transaction state")
def test_commit_manually_rollback_ok(self):
"""
Test that under commit_manually, a rolled-back transaction is accepted by the transaction
management mechanisms
"""
@commit_manually
def roller_back():
"""
Perform a database query, then rollback the transaction
"""
Mod.objects.count()
transaction.rollback()
try:
roller_back()
except TransactionManagementError:
self.fail("Rollback did not clear the transaction state")
def test_commit_manually_enforced_after_commit(self):
"""
Test that under commit_manually, if a transaction is committed and an operation is
performed later, we still require the new transaction to be closed
"""
@commit_manually
def fake_committer():
"Query, commit, then query again, leaving with a pending transaction"
Mod.objects.count()
transaction.commit()
Mod.objects.count()
self.assertRaises(TransactionManagementError, fake_committer)
@skipUnlessDBFeature('supports_transactions')
def test_reuse_cursor_reference(self):
"""
Make sure transaction closure is enforced even when the queries are performed
through a single cursor reference retrieved in the beginning
(this is to show why it is wrong to set the transaction dirty only when a cursor
is fetched from the connection).
"""
@commit_on_success
def reuse_cursor_ref():
"""
Fetch a cursor, perform an query, rollback to close the transaction,
then write a record (in a new transaction) using the same cursor object
(reference). All this under commit_on_success, so the second insert should
be committed.
"""
cursor = connection.cursor()
cursor.execute("INSERT into transactions_regress_mod (fld) values (2)")
transaction.rollback()
cursor.execute("INSERT into transactions_regress_mod (fld) values (2)")
reuse_cursor_ref()
# Rollback so that if the decorator didn't commit, the record is unwritten
transaction.rollback()
self.assertEqual(Mod.objects.count(), 1)
obj = Mod.objects.all()[0]
self.assertEqual(obj.fld, 2)
def test_failing_query_transaction_closed(self):
"""
Make sure that under commit_on_success, a transaction is rolled back even if
the first database-modifying operation fails.
This is prompted by http://code.djangoproject.com/ticket/6669 (and based on sample
code posted there to exemplify the problem): Before Django 1.3,
transactions were only marked "dirty" by the save() function after it successfully
wrote the object to the database.
"""
from django.contrib.auth.models import User
@transaction.commit_on_success
def create_system_user():
"Create a user in a transaction"
user = User.objects.create_user(username='system', password='iamr00t',
email='root@SITENAME.com')
# Redundant, just makes sure the user id was read back from DB
Mod.objects.create(fld=user.pk)
# Create a user
create_system_user()
with self.assertRaises(DatabaseError):
# The second call to create_system_user should fail for violating
# a unique constraint (it's trying to re-create the same user)
create_system_user()
# Try to read the database. If the last transaction was indeed closed,
# this should cause no problems
User.objects.all()[0]
@override_settings(DEBUG=True)
def test_failing_query_transaction_closed_debug(self):
"""
Regression for #6669. Same test as above, with DEBUG=True.
"""
self.test_failing_query_transaction_closed()
@skipIf(connection.vendor == 'sqlite'
and connection.settings_dict['TEST_NAME'] in (None, '', ':memory:'),
"Cannot establish two connections to an in-memory SQLite database.")
class TestNewConnection(IgnoreDeprecationWarningsMixin, TransactionTestCase):
"""
Check that new connections don't have special behaviour.
"""
available_apps = ['transactions_regress']
def setUp(self):
self._old_backend = connections[DEFAULT_DB_ALIAS]
settings = self._old_backend.settings_dict.copy()
new_backend = self._old_backend.__class__(settings, DEFAULT_DB_ALIAS)
connections[DEFAULT_DB_ALIAS] = new_backend
def tearDown(self):
try:
connections[DEFAULT_DB_ALIAS].abort()
connections[DEFAULT_DB_ALIAS].close()
finally:
connections[DEFAULT_DB_ALIAS] = self._old_backend
def test_commit(self):
"""
Users are allowed to commit and rollback connections.
"""
connection.set_autocommit(False)
try:
# The starting value is False, not None.
self.assertIs(connection._dirty, False)
list(Mod.objects.all())
self.assertTrue(connection.is_dirty())
connection.commit()
self.assertFalse(connection.is_dirty())
list(Mod.objects.all())
self.assertTrue(connection.is_dirty())
connection.rollback()
self.assertFalse(connection.is_dirty())
finally:
connection.set_autocommit(True)
def test_enter_exit_management(self):
orig_dirty = connection._dirty
connection.enter_transaction_management()
connection.leave_transaction_management()
self.assertEqual(orig_dirty, connection._dirty)
@skipUnless(connection.vendor == 'postgresql',
"This test only valid for PostgreSQL")
class TestPostgresAutocommitAndIsolation(IgnoreDeprecationWarningsMixin, TransactionTestCase):
"""
Tests to make sure psycopg2's autocommit mode and isolation level
is restored after entering and leaving transaction management.
Refs #16047, #18130.
"""
available_apps = ['transactions_regress']
def setUp(self):
from psycopg2.extensions import (ISOLATION_LEVEL_AUTOCOMMIT,
ISOLATION_LEVEL_SERIALIZABLE,
TRANSACTION_STATUS_IDLE)
self._autocommit = ISOLATION_LEVEL_AUTOCOMMIT
self._serializable = ISOLATION_LEVEL_SERIALIZABLE
self._idle = TRANSACTION_STATUS_IDLE
# We want a clean backend with autocommit = True, so
# first we need to do a bit of work to have that.
self._old_backend = connections[DEFAULT_DB_ALIAS]
settings = self._old_backend.settings_dict.copy()
opts = settings['OPTIONS'].copy()
opts['isolation_level'] = ISOLATION_LEVEL_SERIALIZABLE
settings['OPTIONS'] = opts
new_backend = self._old_backend.__class__(settings, DEFAULT_DB_ALIAS)
connections[DEFAULT_DB_ALIAS] = new_backend
def tearDown(self):
try:
connections[DEFAULT_DB_ALIAS].abort()
finally:
connections[DEFAULT_DB_ALIAS].close()
connections[DEFAULT_DB_ALIAS] = self._old_backend
def test_initial_autocommit_state(self):
# Autocommit is activated when the connection is created.
connection.cursor().close()
self.assertTrue(connection.autocommit)
def test_transaction_management(self):
transaction.enter_transaction_management()
self.assertFalse(connection.autocommit)
self.assertEqual(connection.isolation_level, self._serializable)
transaction.leave_transaction_management()
self.assertTrue(connection.autocommit)
def test_transaction_stacking(self):
transaction.enter_transaction_management()
self.assertFalse(connection.autocommit)
self.assertEqual(connection.isolation_level, self._serializable)
transaction.enter_transaction_management()
self.assertFalse(connection.autocommit)
self.assertEqual(connection.isolation_level, self._serializable)
transaction.leave_transaction_management()
self.assertFalse(connection.autocommit)
self.assertEqual(connection.isolation_level, self._serializable)
transaction.leave_transaction_management()
self.assertTrue(connection.autocommit)
def test_enter_autocommit(self):
transaction.enter_transaction_management()
self.assertFalse(connection.autocommit)
self.assertEqual(connection.isolation_level, self._serializable)
list(Mod.objects.all())
self.assertTrue(transaction.is_dirty())
# Enter autocommit mode again.
transaction.enter_transaction_management(False)
self.assertFalse(transaction.is_dirty())
self.assertEqual(
connection.connection.get_transaction_status(),
self._idle)
list(Mod.objects.all())
self.assertFalse(transaction.is_dirty())
transaction.leave_transaction_management()
self.assertFalse(connection.autocommit)
self.assertEqual(connection.isolation_level, self._serializable)
transaction.leave_transaction_management()
self.assertTrue(connection.autocommit)
class TestManyToManyAddTransaction(IgnoreDeprecationWarningsMixin, TransactionTestCase):
available_apps = ['transactions_regress']
def test_manyrelated_add_commit(self):
"Test for https://code.djangoproject.com/ticket/16818"
a = M2mA.objects.create()
b = M2mB.objects.create(fld=10)
a.others.add(b)
# We're in a TransactionTestCase and have not changed transaction
# behavior from default of "autocommit", so this rollback should not
# actually do anything. If it does in fact undo our add, that's a bug
# that the bulk insert was not auto-committed.
transaction.rollback()
self.assertEqual(a.others.count(), 1)
class SavepointTest(IgnoreDeprecationWarningsMixin, TransactionTestCase):
available_apps = ['transactions_regress']
@skipIf(connection.vendor == 'sqlite',
"SQLite doesn't support savepoints in managed mode")
@skipUnlessDBFeature('uses_savepoints')
def test_savepoint_commit(self):
@commit_manually
def work():
mod = Mod.objects.create(fld=1)
pk = mod.pk
sid = transaction.savepoint()
Mod.objects.filter(pk=pk).update(fld=10)
transaction.savepoint_commit(sid)
mod2 = Mod.objects.get(pk=pk)
transaction.commit()
self.assertEqual(mod2.fld, 10)
work()
@skipIf(connection.vendor == 'sqlite',
"SQLite doesn't support savepoints in managed mode")
@skipUnlessDBFeature('uses_savepoints')
def test_savepoint_rollback(self):
# _mysql_storage_engine issues a query and as such can't be applied in
# a skipIf decorator since that would execute the query on module load.
if (connection.vendor == 'mysql' and
connection.features._mysql_storage_engine == 'MyISAM'):
raise SkipTest("MyISAM MySQL storage engine doesn't support savepoints")
@commit_manually
def work():
mod = Mod.objects.create(fld=1)
pk = mod.pk
sid = transaction.savepoint()
Mod.objects.filter(pk=pk).update(fld=20)
transaction.savepoint_rollback(sid)
mod2 = Mod.objects.get(pk=pk)
transaction.commit()
self.assertEqual(mod2.fld, 1)
work()