Permalink
Browse files

Fixed #19861 -- Transaction ._dirty flag improvement

There were a couple of errors in ._dirty flag handling:
  * It started as None, but was never reset to None.
  * The _dirty flag was sometimes used to indicate if the connection
    was inside transaction management, but this was not done
    consistently. This also meant the flag had three separate values.
  * The None value had a special meaning, causing for example inability
    to commit() on new connection unless enter/leave tx management was
    done.
  * The _dirty was tracking "connection in transaction" state, but only
    in managed transactions.
  * Some tests never reset the transaction state of the used connection.
  * And some additional less important changes.

This commit has some potential for regressions, but as the above list
shows, the current situation isn't perfect either.
  • Loading branch information...
1 parent 2108941 commit 50328f0a618674b7143d86acaa7016c5293e9774 @akaariai committed Feb 20, 2013
@@ -41,7 +41,10 @@ def __init__(self, settings_dict, alias=DEFAULT_DB_ALIAS,
# Transaction related attributes
self.transaction_state = []
self.savepoint_state = 0
- self._dirty = None
+ # Tracks if the connection is believed to be in transaction. This is
+ # set somewhat aggressively, as the DBAPI doesn't make it easy to
+ # deduce if the connection is in transaction or not.
+ self._dirty = False
self._thread_ident = thread.get_ident()
self.allow_thread_sharing = allow_thread_sharing
@@ -118,8 +121,7 @@ def abort(self):
stack.
"""
if self._dirty:
- self._rollback()
- self._dirty = False
+ self.rollback()
while self.transaction_state:
self.leave_transaction_management()
@@ -137,9 +139,6 @@ def enter_transaction_management(self, managed=True):
self.transaction_state.append(self.transaction_state[-1])
else:
self.transaction_state.append(settings.TRANSACTIONS_MANAGED)
-
- if self._dirty is None:
- self._dirty = False
self._enter_transaction_management(managed)
def leave_transaction_management(self):
@@ -153,14 +152,16 @@ def leave_transaction_management(self):
else:
raise TransactionManagementError(
"This code isn't under transaction management")
+ # The _leave_transaction_management hook can change the dirty flag,
+ # so memoize it.
+ dirty = self._dirty
# We will pass the next status (after leaving the previous state
# behind) to subclass hook.
self._leave_transaction_management(self.is_managed())
- if self._dirty:
+ if dirty:
self.rollback()
raise TransactionManagementError(
"Transaction managed block ended with pending COMMIT/ROLLBACK")
- self._dirty = False
def validate_thread_sharing(self):
"""
@@ -190,22 +191,15 @@ def set_dirty(self):
to decide in a managed block of code to decide whether there are open
changes waiting for commit.
"""
- if self._dirty is not None:
- self._dirty = True
- else:
- raise TransactionManagementError("This code isn't under transaction "
- "management")
+ self._dirty = True
def set_clean(self):
"""
Resets a dirty flag for the current thread and code streak. This can be used
to decide in a managed block of code to decide whether a commit or rollback
should happen.
"""
- if self._dirty is not None:
- self._dirty = False
- else:
- raise TransactionManagementError("This code isn't under transaction management")
+ self._dirty = False
self.clean_savepoints()
def clean_savepoints(self):
@@ -233,8 +227,7 @@ def managed(self, flag=True):
if top:
top[-1] = flag
if not flag and self.is_dirty():
- self._commit()
- self.set_clean()
+ self.commit()
else:
raise TransactionManagementError("This code isn't under transaction "
"management")
@@ -245,7 +238,7 @@ def commit_unless_managed(self):
"""
self.validate_thread_sharing()
if not self.is_managed():
- self._commit()
+ self.commit()
self.clean_savepoints()
else:
self.set_dirty()
@@ -256,7 +249,7 @@ def rollback_unless_managed(self):
"""
self.validate_thread_sharing()
if not self.is_managed():
- self._rollback()
+ self.rollback()
else:
self.set_dirty()
@@ -343,6 +336,7 @@ def close(self):
if self.connection is not None:
self.connection.close()
self.connection = None
+ self.set_clean()
def cursor(self):
self.validate_thread_sharing()
@@ -485,14 +479,13 @@ def supports_transactions(self):
self.connection.managed(True)
cursor = self.connection.cursor()
cursor.execute('CREATE TABLE ROLLBACK_TEST (X INT)')
- self.connection._commit()
+ self.connection.commit()
cursor.execute('INSERT INTO ROLLBACK_TEST (X) VALUES (8)')
- self.connection._rollback()
+ self.connection.rollback()
cursor.execute('SELECT COUNT(X) FROM ROLLBACK_TEST')
count, = cursor.fetchone()
cursor.execute('DROP TABLE ROLLBACK_TEST')
- self.connection._commit()
- self.connection._dirty = False
+ self.connection.commit()
finally:
self.connection.leave_transaction_management()
return count == 0
@@ -385,8 +385,8 @@ def _create_test_db(self, verbosity, autoclobber):
# Create the test database and connect to it. We need to autocommit
# if the database supports it because PostgreSQL doesn't allow
# CREATE/DROP DATABASE statements within transactions.
- cursor = self.connection.cursor()
self._prepare_for_test_db_ddl()
+ cursor = self.connection.cursor()
try:
cursor.execute(
"CREATE DATABASE %s %s" % (qn(test_database_name), suffix))
@@ -149,6 +149,8 @@ def close(self):
exc_info=sys.exc_info()
)
raise
+ finally:
+ self.set_clean()
@cached_property
def pg_version(self):
@@ -233,10 +235,17 @@ def _set_isolation_level(self, level):
try:
if self.connection is not None:
self.connection.set_isolation_level(level)
+ if level == psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT:
+ self.set_clean()
finally:
self.isolation_level = level
self.features.uses_savepoints = bool(level)
+ def set_dirty(self):
+ if ((self.transaction_state and self.transaction_state[-1]) or
+ not self.features.uses_autocommit):
+ super(DatabaseWrapper, self).set_dirty()
+
def _commit(self):
if self.connection is not None:
try:
@@ -82,6 +82,8 @@ def set_autocommit(self):
def _prepare_for_test_db_ddl(self):
"""Rollback and close the active transaction."""
+ # Make sure there is an open connection.
+ self.connection.cursor()
self.connection.connection.rollback()
self.connection.connection.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
View
@@ -19,13 +19,9 @@ def __init__(self, cursor, db):
self.cursor = cursor
self.db = db
- def set_dirty(self):
- if self.db.is_managed():
- self.db.set_dirty()
-
def __getattr__(self, attr):
if attr in ('execute', 'executemany', 'callproc'):
- self.set_dirty()
+ self.db.set_dirty()
return getattr(self.cursor, attr)
def __iter__(self):
@@ -35,7 +31,7 @@ def __iter__(self):
class CursorDebugWrapper(CursorWrapper):
def execute(self, sql, params=()):
- self.set_dirty()
+ self.db.set_dirty()
start = time()
try:
return self.cursor.execute(sql, params)
@@ -52,7 +48,7 @@ def execute(self, sql, params=()):
)
def executemany(self, sql, param_list):
- self.set_dirty()
+ self.db.set_dirty()
start = time()
try:
return self.cursor.executemany(sql, param_list)
@@ -687,11 +687,6 @@ def results_iter(self):
resolve_columns = hasattr(self, 'resolve_columns')
fields = None
has_aggregate_select = bool(self.query.aggregate_select)
- # Set transaction dirty if we're using SELECT FOR UPDATE to ensure
- # a subsequent commit/rollback is executed, so any database locks
- # are released.
- if self.query.select_for_update and transaction.is_managed(self.using):
- transaction.set_dirty(self.using)
for rows in self.execute_sql(MULTI):
for row in rows:
if resolve_columns:
@@ -1249,11 +1249,6 @@ make the call non-blocking. If a conflicting lock is already acquired by
another transaction, :exc:`~django.db.DatabaseError` will be raised when the
queryset is evaluated.
-Note that using ``select_for_update()`` will cause the current transaction to be
-considered dirty, if under transaction management. This is to ensure that
-Django issues a ``COMMIT`` or ``ROLLBACK``, releasing any locks held by the
-``SELECT FOR UPDATE``.
-
Currently, the ``postgresql_psycopg2``, ``oracle``, and ``mysql`` database
backends support ``select_for_update()``. However, MySQL has no support for the
``nowait`` argument. Obviously, users of external third-party backends should
@@ -23,11 +23,13 @@ def setUp(self):
# Put both DB connections into managed transaction mode
transaction.enter_transaction_management()
transaction.managed(True)
- self.conn2._enter_transaction_management(True)
+ self.conn2.enter_transaction_management()
+ self.conn2.managed(True)
def tearDown(self):
# Close down the second connection.
transaction.leave_transaction_management()
+ self.conn2.abort()
self.conn2.close()
@skipUnlessDBFeature('test_db_allows_multiple_connections')
View
@@ -683,6 +683,9 @@ def setUp(self):
self.response = HttpResponse()
self.response.status_code = 200
+ def tearDown(self):
+ transaction.abort()
+
def test_request(self):
TransactionMiddleware().process_request(self.request)
self.assertTrue(transaction.is_managed())
@@ -697,19 +700,23 @@ def test_managed_response(self):
self.assertEqual(Band.objects.count(), 1)
def test_unmanaged_response(self):
+ transaction.enter_transaction_management()
transaction.managed(False)
+ self.assertEqual(Band.objects.count(), 0)
TransactionMiddleware().process_response(self.request, self.response)
self.assertFalse(transaction.is_managed())
- self.assertFalse(transaction.is_dirty())
+ # The transaction middleware doesn't commit/rollback if management
+ # has been disabled.
+ self.assertTrue(transaction.is_dirty())
def test_exception(self):
transaction.enter_transaction_management()
transaction.managed(True)
Band.objects.create(name='The Beatles')
self.assertTrue(transaction.is_dirty())
TransactionMiddleware().process_exception(self.request, None)
- self.assertEqual(Band.objects.count(), 0)
self.assertFalse(transaction.is_dirty())
+ self.assertEqual(Band.objects.count(), 0)
def test_failing_commit(self):
# It is possible that connection.commit() fails. Check that
@@ -724,8 +731,8 @@ def raise_exception():
self.assertTrue(transaction.is_dirty())
with self.assertRaises(IntegrityError):
TransactionMiddleware().process_response(self.request, None)
- self.assertEqual(Band.objects.count(), 0)
self.assertFalse(transaction.is_dirty())
+ self.assertEqual(Band.objects.count(), 0)
self.assertFalse(transaction.is_managed())
finally:
del connections[DEFAULT_DB_ALIAS].commit
@@ -24,7 +24,7 @@
class SelectForUpdateTests(TransactionTestCase):
def setUp(self):
- transaction.enter_transaction_management(True)
+ transaction.enter_transaction_management()
transaction.managed(True)
self.person = Person.objects.create(name='Reinhardt')
@@ -48,9 +48,8 @@ def tearDown(self):
try:
# We don't really care if this fails - some of the tests will set
# this in the course of their run.
- transaction.managed(False)
- transaction.leave_transaction_management()
- self.new_connection.leave_transaction_management()
+ transaction.abort()
+ self.new_connection.abort()
except transaction.TransactionManagementError:
pass
self.new_connection.close()
@@ -73,7 +72,7 @@ def start_blocking_transaction(self):
def end_blocking_transaction(self):
# Roll back the blocking transaction.
- self.new_connection._rollback()
+ self.new_connection.rollback()
def has_for_update_sql(self, tested_connection, nowait=False):
# Examine the SQL that was executed to determine whether it
@@ -119,6 +118,7 @@ def test_nowait_raises_error_on_block(self):
"""
self.start_blocking_transaction()
status = []
+
thread = threading.Thread(
target=self.run_select_for_update,
args=(status,),
@@ -164,7 +164,7 @@ def run_select_for_update(self, status, nowait=False):
try:
# We need to enter transaction management again, as this is done on
# per-thread basis
- transaction.enter_transaction_management(True)
+ transaction.enter_transaction_management()
transaction.managed(True)
people = list(
Person.objects.all().select_for_update(nowait=nowait)
@@ -177,6 +177,7 @@ def run_select_for_update(self, status, nowait=False):
finally:
# This method is run in a separate thread. It uses its own
# database connection. Close it without waiting for the GC.
+ transaction.abort()
connection.close()
@requires_threading
@@ -271,13 +272,3 @@ def test_transaction_dirty_managed(self):
"""
people = list(Person.objects.select_for_update())
self.assertTrue(transaction.is_dirty())
-
- @skipUnlessDBFeature('has_select_for_update')
- def test_transaction_not_dirty_unmanaged(self):
- """ If we're not under txn management, the txn will never be
- marked as dirty.
- """
- transaction.managed(False)
- transaction.leave_transaction_management()
- people = list(Person.objects.select_for_update())
- self.assertFalse(transaction.is_dirty())
@@ -165,7 +165,6 @@ class TransactionRollbackTests(TransactionTestCase):
def execute_bad_sql(self):
cursor = connection.cursor()
cursor.execute("INSERT INTO transactions_reporter (first_name, last_name) VALUES ('Douglas', 'Adams');")
- transaction.set_dirty()
@skipUnlessDBFeature('requires_rollback_on_dirty_transaction')
def test_bad_sql(self):
@@ -306,5 +305,4 @@ def test_bad_sql(self):
with transaction.commit_on_success():
cursor = connection.cursor()
cursor.execute("INSERT INTO transactions_reporter (first_name, last_name) VALUES ('Douglas', 'Adams');")
- transaction.set_dirty()
transaction.rollback()
Oops, something went wrong.

0 comments on commit 50328f0

Please sign in to comment.