Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Fixed #2705: added a `select_for_update()` clause to querysets.

A number of people worked on this patch over the years -- Hawkeye, Colin Grady,
KBS, sakyamuni, anih, jdemoor, and Issak Kelly. Thanks to them all, and
apologies if I missed anyone.

Special thanks to Dan Fairs for picking it up again at the end and seeing this
through to commit.

git-svn-id: http://code.djangoproject.com/svn/django/trunk@16058 bcc190cf-cafb-0310-a4f2-bffc1f526a37
  • Loading branch information...
commit 8f0f73c7b8b110489a1a127cc47e3cabb0eea646 1 parent 99c1794
Jacob Kaplan-Moss authored April 20, 2011
1  AUTHORS
@@ -168,6 +168,7 @@ answer newbie questions, and generally made Django that much better:
168 168
     eriks@win.tue.nl
169 169
     Tomáš Ehrlich <tomas.ehrlich@gmail.com>
170 170
     Dirk Eschler <dirk.eschler@gmx.net>
  171
+    Dan Fairs <dan@fezconsulting.com>
171 172
     Marc Fargas <telenieko@telenieko.com>
172 173
     Szilveszter Farkas <szilveszter.farkas@gmail.com>
173 174
     Grigory Fateyev <greg@dial.com.ru>
11  django/db/backends/__init__.py
@@ -279,6 +279,8 @@ class BaseDatabaseFeatures(object):
279 279
     # integer primary keys.
280 280
     related_fields_match_type = False
281 281
     allow_sliced_subqueries = True
  282
+    has_select_for_update = False
  283
+    has_select_for_update_nowait = False
282 284
 
283 285
     # Does the default test database allow multiple connections?
284 286
     # Usually an indication that the test database is in-memory
@@ -476,6 +478,15 @@ def force_no_ordering(self):
476 478
         """
477 479
         return []
478 480
 
  481
+    def for_update_sql(self, nowait=False):
  482
+        """
  483
+        Returns the FOR UPDATE SQL clause to lock rows for an update operation.
  484
+        """
  485
+        if nowait:
  486
+            return 'FOR UPDATE NOWAIT'
  487
+        else:
  488
+            return 'FOR UPDATE'
  489
+
479 490
     def fulltext_search_sql(self, field_name):
480 491
         """
481 492
         Returns the SQL WHERE clause to use in order to perform a full-text
2  django/db/backends/mysql/base.py
@@ -124,6 +124,8 @@ class DatabaseFeatures(BaseDatabaseFeatures):
124 124
     allows_group_by_pk = True
125 125
     related_fields_match_type = True
126 126
     allow_sliced_subqueries = False
  127
+    has_select_for_update = True
  128
+    has_select_for_update_nowait = False
127 129
     supports_forward_references = False
128 130
     supports_long_model_names = False
129 131
     supports_microsecond_precision = False
2  django/db/backends/oracle/base.py
@@ -70,6 +70,8 @@ class DatabaseFeatures(BaseDatabaseFeatures):
70 70
     needs_datetime_string_cast = False
71 71
     interprets_empty_strings_as_nulls = True
72 72
     uses_savepoints = True
  73
+    has_select_for_update = True
  74
+    has_select_for_update_nowait = True
73 75
     can_return_id_from_insert = True
74 76
     allow_sliced_subqueries = False
75 77
     supports_subqueries_in_group_by = False
3  django/db/backends/postgresql_psycopg2/base.py
@@ -70,6 +70,9 @@ class DatabaseFeatures(BaseDatabaseFeatures):
70 70
     requires_rollback_on_dirty_transaction = True
71 71
     has_real_datatype = True
72 72
     can_defer_constraint_checks = True
  73
+    has_select_for_update = True
  74
+    has_select_for_update_nowait = True
  75
+
73 76
 
74 77
 class DatabaseWrapper(BaseDatabaseWrapper):
75 78
     vendor = 'postgresql'
3  django/db/models/manager.py
@@ -164,6 +164,9 @@ def latest(self, *args, **kwargs):
164 164
     def order_by(self, *args, **kwargs):
165 165
         return self.get_query_set().order_by(*args, **kwargs)
166 166
 
  167
+    def select_for_update(self, *args, **kwargs):
  168
+        return self.get_query_set().select_for_update(*args, **kwargs)
  169
+
167 170
     def select_related(self, *args, **kwargs):
168 171
         return self.get_query_set().select_related(*args, **kwargs)
169 172
 
13  django/db/models/query.py
@@ -435,6 +435,7 @@ def delete(self):
435 435
         del_query._for_write = True
436 436
 
437 437
         # Disable non-supported fields.
  438
+        del_query.query.select_for_update = False
438 439
         del_query.query.select_related = False
439 440
         del_query.query.clear_ordering()
440 441
 
@@ -583,6 +584,18 @@ def complex_filter(self, filter_obj):
583 584
         else:
584 585
             return self._filter_or_exclude(None, **filter_obj)
585 586
 
  587
+    def select_for_update(self, **kwargs):
  588
+        """
  589
+        Returns a new QuerySet instance that will select objects with a
  590
+        FOR UPDATE lock.
  591
+        """
  592
+        # Default to false for nowait
  593
+        nowait = kwargs.pop('nowait', False)
  594
+        obj = self._clone()
  595
+        obj.query.select_for_update = True
  596
+        obj.query.select_for_update_nowait = nowait
  597
+        return obj
  598
+
586 599
     def select_related(self, *fields, **kwargs):
587 600
         """
588 601
         Returns a new QuerySet instance that will select related objects.
15  django/db/models/sql/compiler.py
... ...
@@ -1,11 +1,13 @@
1 1
 from django.core.exceptions import FieldError
2 2
 from django.db import connections
  3
+from django.db import transaction
3 4
 from django.db.backends.util import truncate_name
4 5
 from django.db.models.sql.constants import *
5 6
 from django.db.models.sql.datastructures import EmptyResultSet
6 7
 from django.db.models.sql.expressions import SQLEvaluator
7 8
 from django.db.models.sql.query import get_proxied_model, get_order_dir, \
8 9
      select_related_descend, Query
  10
+from django.db.utils import DatabaseError
9 11
 
10 12
 class SQLCompiler(object):
11 13
     def __init__(self, query, connection, using):
@@ -117,6 +119,14 @@ def as_sql(self, with_limits=True, with_col_aliases=False):
117 119
                         result.append('LIMIT %d' % val)
118 120
                 result.append('OFFSET %d' % self.query.low_mark)
119 121
 
  122
+        if self.query.select_for_update and self.connection.features.has_select_for_update:
  123
+            # If we've been asked for a NOWAIT query but the backend does not support it,
  124
+            # raise a DatabaseError otherwise we could get an unexpected deadlock.
  125
+            nowait = self.query.select_for_update_nowait
  126
+            if nowait and not self.connection.features.has_select_for_update_nowait:
  127
+                raise DatabaseError('NOWAIT is not supported on this database backend.')
  128
+            result.append(self.connection.ops.for_update_sql(nowait=nowait))
  129
+
120 130
         return ' '.join(result), tuple(params)
121 131
 
122 132
     def as_nested_sql(self):
@@ -677,6 +687,11 @@ def results_iter(self):
677 687
         resolve_columns = hasattr(self, 'resolve_columns')
678 688
         fields = None
679 689
         has_aggregate_select = bool(self.query.aggregate_select)
  690
+        # Set transaction dirty if we're using SELECT FOR UPDATE to ensure
  691
+        # a subsequent commit/rollback is executed, so any database locks
  692
+        # are released.
  693
+        if self.query.select_for_update and transaction.is_managed(self.using):
  694
+            transaction.set_dirty(self.using)
680 695
         for rows in self.execute_sql(MULTI):
681 696
             for row in rows:
682 697
                 if resolve_columns:
5  django/db/models/sql/query.py
@@ -125,6 +125,8 @@ def __init__(self, model, where=WhereNode):
125 125
         self.order_by = []
126 126
         self.low_mark, self.high_mark = 0, None  # Used for offset/limit
127 127
         self.distinct = False
  128
+        self.select_for_update = False
  129
+        self.select_for_update_nowait = False
128 130
         self.select_related = False
129 131
         self.related_select_cols = []
130 132
 
@@ -254,6 +256,8 @@ def clone(self, klass=None, memo=None, **kwargs):
254 256
         obj.order_by = self.order_by[:]
255 257
         obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
256 258
         obj.distinct = self.distinct
  259
+        obj.select_for_update = self.select_for_update
  260
+        obj.select_for_update_nowait = self.select_for_update_nowait
257 261
         obj.select_related = self.select_related
258 262
         obj.related_select_cols = []
259 263
         obj.aggregates = copy.deepcopy(self.aggregates, memo=memo)
@@ -360,6 +364,7 @@ def get_aggregation(self, using):
360 364
 
361 365
         query.clear_ordering(True)
362 366
         query.clear_limits()
  367
+        query.select_for_update = False
363 368
         query.select_related = False
364 369
         query.related_select_cols = []
365 370
         query.related_select_fields = []
13  docs/ref/databases.txt
@@ -359,6 +359,13 @@ store a timezone-aware ``time`` or ``datetime`` to a
359 359
 :class:`~django.db.models.TimeField` or :class:`~django.db.models.DateTimeField`
360 360
 respectively, a ``ValueError`` is raised rather than truncating data.
361 361
 
  362
+Row locking with ``QuerySet.select_for_update()``
  363
+-------------------------------------------------
  364
+
  365
+MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE``
  366
+statement. If ``select_for_update()`` is used with ``nowait=True`` then a
  367
+``DatabaseError`` will be raised.
  368
+
362 369
 .. _sqlite-notes:
363 370
 
364 371
 SQLite notes
@@ -493,6 +500,12 @@ If you're getting this error, you can solve it by:
493 500
       This will simply make SQLite wait a bit longer before throwing "database
494 501
       is locked" errors; it won't really do anything to solve them.
495 502
 
  503
+``QuerySet.select_for_update()`` not supported
  504
+----------------------------------------------
  505
+
  506
+SQLite does not support the ``SELECT ... FOR UPDATE`` syntax. Calling it will
  507
+have no effect.
  508
+
496 509
 .. _oracle-notes:
497 510
 
498 511
 Oracle notes
40  docs/ref/models/querysets.txt
@@ -966,6 +966,46 @@ For example::
966 966
     # queries the database with the 'backup' alias
967 967
     >>> Entry.objects.using('backup')
968 968
 
  969
+select_for_update
  970
+~~~~~~~~~~~~~~~~~
  971
+
  972
+.. method:: select_for_update(nowait=False)
  973
+
  974
+.. versionadded:: 1.4
  975
+
  976
+Returns a queryset that will lock rows until the end of the transaction,
  977
+generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases.
  978
+
  979
+For example::
  980
+
  981
+    entries = Entry.objects.select_for_update().filter(author=request.user)
  982
+
  983
+All matched entries will be locked until the end of the transaction block,
  984
+meaning that other transactions will be prevented from changing or acquiring
  985
+locks on them.
  986
+
  987
+Usually, if another transaction has already acquired a lock on one of the
  988
+selected rows, the query will block until the lock is released. If this is
  989
+not the behaviour you want, call ``select_for_update(nowait=True)``. This will
  990
+make the call non-blocking. If a conflicting lock is already acquired by
  991
+another transaction, ``django.db.utils.DatabaseError`` will be raised when
  992
+the queryset is evaluated.
  993
+
  994
+Note that using ``select_for_update`` will cause the current transaction to be
  995
+set dirty, if under transaction management. This is to ensure that Django issues
  996
+a ``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR
  997
+UPDATE``.
  998
+
  999
+Currently, the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
  1000
+database backends support ``select_for_update()``. However, MySQL has no
  1001
+support for the ``nowait`` argument.
  1002
+
  1003
+Passing ``nowait=True`` to ``select_for_update`` using database backends that
  1004
+do not support ``nowait``, such as MySQL, will cause a ``DatabaseError`` to be
  1005
+raised. This is in order to prevent code unexpectedly blocking.
  1006
+
  1007
+Using ``select_for_update`` on backends which do not support
  1008
+``SELECT ... FOR UPDATE`` (such as SQLite) will have no effect.
969 1009
 
970 1010
 Methods that do not return QuerySets
971 1011
 ------------------------------------
1  tests/modeltests/select_for_update/__init__.py
... ...
@@ -0,0 +1 @@
  1
+#
4  tests/modeltests/select_for_update/models.py
... ...
@@ -0,0 +1,4 @@
  1
+from django.db import models
  2
+
  3
+class Person(models.Model):
  4
+    name = models.CharField(max_length=30)
262  tests/modeltests/select_for_update/tests.py
... ...
@@ -0,0 +1,262 @@
  1
+import sys
  2
+import time
  3
+import unittest
  4
+from django.conf import settings
  5
+from django.db import transaction, connection
  6
+from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
  7
+from django.test import (TransactionTestCase, skipIfDBFeature,
  8
+    skipUnlessDBFeature)
  9
+from django.utils.functional import wraps
  10
+from django.utils import unittest
  11
+
  12
+from models import Person
  13
+
  14
+try:
  15
+    import threading
  16
+    def requires_threading(func):
  17
+        return func
  18
+except ImportError:
  19
+    # Note we can't use dummy_threading here, as our tests will actually
  20
+    # block. We just have to skip the test completely.
  21
+    def requires_threading(func):
  22
+        @wraps(func)
  23
+        def wrapped(*args, **kw):
  24
+            raise unittest.SkipTest('threading required')
  25
+
  26
+class SelectForUpdateTests(TransactionTestCase):
  27
+
  28
+    def setUp(self):
  29
+        transaction.enter_transaction_management(True)
  30
+        transaction.managed(True)
  31
+        self.person = Person.objects.create(name='Reinhardt')
  32
+
  33
+        # We have to commit here so that code in run_select_for_update can
  34
+        # see this data.
  35
+        transaction.commit()
  36
+
  37
+        # We need another database connection to test that one connection
  38
+        # issuing a SELECT ... FOR UPDATE will block.
  39
+        new_connections = ConnectionHandler(settings.DATABASES)
  40
+        self.new_connection = new_connections[DEFAULT_DB_ALIAS]
  41
+
  42
+        # We need to set settings.DEBUG to True so we can capture
  43
+        # the output SQL to examine.
  44
+        self._old_debug = settings.DEBUG
  45
+        settings.DEBUG = True
  46
+
  47
+    def tearDown(self):
  48
+        try:
  49
+            # We don't really care if this fails - some of the tests will set
  50
+            # this in the course of their run.
  51
+            transaction.managed(False)
  52
+            transaction.leave_transaction_management()
  53
+        except transaction.TransactionManagementError:
  54
+            pass
  55
+        self.new_connection.close()
  56
+        settings.DEBUG = self._old_debug
  57
+        try:
  58
+            self.end_blocking_transaction()
  59
+        except (DatabaseError, AttributeError):
  60
+            pass
  61
+
  62
+    def start_blocking_transaction(self):
  63
+        # Start a blocking transaction. At some point,
  64
+        # end_blocking_transaction() should be called.
  65
+        self.cursor = self.new_connection.cursor()
  66
+        sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
  67
+            'db_table': Person._meta.db_table,
  68
+            'for_update': self.new_connection.ops.for_update_sql(),
  69
+            }
  70
+        self.cursor.execute(sql, ())
  71
+        result = self.cursor.fetchone()
  72
+
  73
+    def end_blocking_transaction(self):
  74
+        # Roll back the blocking transaction.
  75
+        self.new_connection._rollback()
  76
+
  77
+    def has_for_update_sql(self, tested_connection, nowait=False):
  78
+        # Examine the SQL that was executed to determine whether it
  79
+        # contains the 'SELECT..FOR UPDATE' stanza.
  80
+        for_update_sql = tested_connection.ops.for_update_sql(nowait)
  81
+        sql = tested_connection.queries[-1]['sql']
  82
+        return bool(sql.find(for_update_sql) > -1)
  83
+
  84
+    def check_exc(self, exc):
  85
+        self.failUnless(isinstance(exc, DatabaseError))
  86
+
  87
+    @skipUnlessDBFeature('has_select_for_update')
  88
+    def test_for_update_sql_generated(self):
  89
+        """
  90
+        Test that the backend's FOR UPDATE variant appears in
  91
+        generated SQL when select_for_update is invoked.
  92
+        """
  93
+        list(Person.objects.all().select_for_update())
  94
+        self.assertTrue(self.has_for_update_sql(connection))
  95
+
  96
+    @skipUnlessDBFeature('has_select_for_update_nowait')
  97
+    def test_for_update_sql_generated_nowait(self):
  98
+        """
  99
+        Test that the backend's FOR UPDATE NOWAIT variant appears in
  100
+        generated SQL when select_for_update is invoked.
  101
+        """
  102
+        list(Person.objects.all().select_for_update(nowait=True))
  103
+        self.assertTrue(self.has_for_update_sql(connection, nowait=True))
  104
+
  105
+    # In Python 2.6 beta and some final releases, exceptions raised in __len__
  106
+    # are swallowed (Python issue 1242657), so these cases return an empty
  107
+    # list, rather than raising an exception. Not a lot we can do about that,
  108
+    # unfortunately, due to the way Python handles list() calls internally.
  109
+    # Thus, we skip this test for Python 2.6.
  110
+    @requires_threading
  111
+    @skipUnlessDBFeature('has_select_for_update_nowait')
  112
+    @unittest.skipIf(sys.version_info[:2] == (2, 6), "Python version is 2.6")
  113
+    def test_nowait_raises_error_on_block(self):
  114
+        """
  115
+        If nowait is specified, we expect an error to be raised rather
  116
+        than blocking.
  117
+        """
  118
+        self.start_blocking_transaction()
  119
+        status = []
  120
+        thread = threading.Thread(
  121
+            target=self.run_select_for_update,
  122
+            args=(status,),
  123
+            kwargs={'nowait': True},
  124
+        )
  125
+
  126
+        thread.start()
  127
+        time.sleep(1)
  128
+        thread.join()
  129
+        self.end_blocking_transaction()
  130
+        self.check_exc(status[-1])
  131
+
  132
+    @skipIfDBFeature('has_select_for_update_nowait')
  133
+    @skipUnlessDBFeature('has_select_for_update')
  134
+    def test_unsupported_nowait_raises_error(self):
  135
+        """
  136
+        If a SELECT...FOR UPDATE NOWAIT is run on a database backend
  137
+        that supports FOR UPDATE but not NOWAIT, then we should find
  138
+        that a DatabaseError is raised.
  139
+        """
  140
+        self.assertRaises(
  141
+            DatabaseError,
  142
+            list,
  143
+            Person.objects.all().select_for_update(nowait=True)
  144
+        )
  145
+
  146
+    def run_select_for_update(self, status, nowait=False):
  147
+        """
  148
+        Utility method that runs a SELECT FOR UPDATE against all
  149
+        Person instances. After the select_for_update, it attempts
  150
+        to update the name of the only record, save, and commit.
  151
+
  152
+        In general, this will be run in a separate thread.
  153
+        """
  154
+        status.append('started')
  155
+        try:
  156
+            # We need to enter transaction management again, as this is done on
  157
+            # per-thread basis
  158
+            transaction.enter_transaction_management(True)
  159
+            transaction.managed(True)
  160
+            people = list(
  161
+                Person.objects.all().select_for_update(nowait=nowait)
  162
+            )
  163
+            people[0].name = 'Fred'
  164
+            people[0].save()
  165
+            transaction.commit()
  166
+        except DatabaseError, e:
  167
+            status.append(e)
  168
+        except Exception, e:
  169
+            raise
  170
+
  171
+    @requires_threading
  172
+    @skipUnlessDBFeature('has_select_for_update')
  173
+    @skipUnlessDBFeature('supports_transactions')
  174
+    def test_block(self):
  175
+        """
  176
+        Check that a thread running a select_for_update that
  177
+        accesses rows being touched by a similar operation
  178
+        on another connection blocks correctly.
  179
+        """
  180
+        # First, let's start the transaction in our thread.
  181
+        self.start_blocking_transaction()
  182
+
  183
+        # Now, try it again using the ORM's select_for_update
  184
+        # facility. Do this in a separate thread.
  185
+        status = []
  186
+        thread = threading.Thread(
  187
+            target=self.run_select_for_update, args=(status,)
  188
+        )
  189
+
  190
+        # The thread should immediately block, but we'll sleep
  191
+        # for a bit to make sure.
  192
+        thread.start()
  193
+        sanity_count = 0
  194
+        while len(status) != 1 and sanity_count < 10:
  195
+            sanity_count += 1
  196
+            time.sleep(1)
  197
+        if sanity_count >= 10:
  198
+            raise ValueError, 'Thread did not run and block'
  199
+
  200
+        # Check the person hasn't been updated. Since this isn't
  201
+        # using FOR UPDATE, it won't block.
  202
+        p = Person.objects.get(pk=self.person.pk)
  203
+        self.assertEqual('Reinhardt', p.name)
  204
+
  205
+        # When we end our blocking transaction, our thread should
  206
+        # be able to continue.
  207
+        self.end_blocking_transaction()
  208
+        thread.join(5.0)
  209
+
  210
+        # Check the thread has finished. Assuming it has, we should
  211
+        # find that it has updated the person's name.
  212
+        self.failIf(thread.isAlive())
  213
+        p = Person.objects.get(pk=self.person.pk)
  214
+        self.assertEqual('Fred', p.name)
  215
+
  216
+    @requires_threading
  217
+    @skipUnlessDBFeature('has_select_for_update')
  218
+    def test_raw_lock_not_available(self):
  219
+        """
  220
+        Check that running a raw query which can't obtain a FOR UPDATE lock
  221
+        raises the correct exception
  222
+        """
  223
+        self.start_blocking_transaction()
  224
+        def raw(status):
  225
+            try:
  226
+                list(
  227
+                    Person.objects.raw(
  228
+                        'SELECT * FROM %s %s' % (
  229
+                            Person._meta.db_table,
  230
+                            connection.ops.for_update_sql(nowait=True)
  231
+                        )
  232
+                    )
  233
+                )
  234
+            except DatabaseError, e:
  235
+                status.append(e)
  236
+        status = []
  237
+        thread = threading.Thread(target=raw, kwargs={'status': status})
  238
+        thread.start()
  239
+        time.sleep(1)
  240
+        thread.join()
  241
+        self.end_blocking_transaction()
  242
+        self.check_exc(status[-1])
  243
+
  244
+    @skipUnlessDBFeature('has_select_for_update')
  245
+    def test_transaction_dirty_managed(self):
  246
+        """ Check that a select_for_update sets the transaction to be
  247
+        dirty when executed under txn management. Setting the txn dirty
  248
+        means that it will be either committed or rolled back by Django,
  249
+        which will release any locks held by the SELECT FOR UPDATE.
  250
+        """
  251
+        people = list(Person.objects.select_for_update())
  252
+        self.assertTrue(transaction.is_dirty())
  253
+
  254
+    @skipUnlessDBFeature('has_select_for_update')
  255
+    def test_transaction_not_dirty_unmanaged(self):
  256
+        """ If we're not under txn management, the txn will never be
  257
+        marked as dirty.
  258
+        """
  259
+        transaction.managed(False)
  260
+        transaction.leave_transaction_management()
  261
+        people = list(Person.objects.select_for_update())
  262
+        self.assertFalse(transaction.is_dirty())

0 notes on commit 8f0f73c

Please sign in to comment.
Something went wrong with that request. Please try again.