-
Notifications
You must be signed in to change notification settings - Fork 935
/
lock.py
372 lines (299 loc) · 11.1 KB
/
lock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details.
"""
Locking primitives.
These include semaphores with arbitrary bounds (:class:`Semaphore` and
its safer subclass :class:`BoundedSemaphore`) and a semaphore with
infinite bounds (:class:`DummySemaphore`), along with a reentrant lock
(:class:`RLock`) with the same API as :class:`threading.RLock`.
"""
from __future__ import absolute_import
from __future__ import print_function
from gevent.hub import getcurrent
from gevent._compat import PURE_PYTHON
# This is the one exception to the rule of where to
# import Semaphore, obviously
from gevent import monkey
from gevent._semaphore import Semaphore
from gevent._semaphore import BoundedSemaphore
__all__ = [
'Semaphore',
'BoundedSemaphore',
'DummySemaphore',
'RLock',
]
# On PyPy, we don't compile the Semaphore class with Cython. Under
# Cython, each individual method holds the GIL for its entire
# duration, ensuring that no other thread can interrupt us in an
# unsafe state (only when we _wait do we call back into Python and
# allow switching threads; this is broken down into the
# _drop_lock_for_switch_out and _acquire_lock_for_switch_in methods).
# Simulate that here through the use of a manual lock. (We use a
# separate lock for each semaphore to allow sys.settrace functions to
# use locks *other* than the one being traced.) This, of course, must
# also hold for PURE_PYTHON mode when no optional C extensions are
# used.
_allocate_lock, _get_ident = monkey.get_original(
('_thread', 'thread'),
('allocate_lock', 'get_ident')
)
def atomic(meth):
def m(self, *args):
with self._atomic:
return meth(self, *args)
return m
class _GILLock(object):
__slots__ = (
'_owned_thread_id',
'_gil',
'_atomic',
'_recursion_depth',
)
# Don't allow re-entry to these functions in a single thread, as
# can happen if a sys.settrace is used. (XXX: What does that even
# mean? Our original implementation that did that has been
# replaced by something more robust)
#
# This is essentially a variant of the (pure-Python) RLock from the
# standard library.
def __init__(self):
self._owned_thread_id = None
self._gil = _allocate_lock()
self._atomic = _allocate_lock()
self._recursion_depth = 0
@atomic
def acquire(self):
current_tid = _get_ident()
if self._owned_thread_id == current_tid:
self._recursion_depth += 1
return True
# Not owned by this thread. Only one thread will make it through this point.
while 1:
self._atomic.release()
try:
self._gil.acquire()
finally:
self._atomic.acquire()
if self._owned_thread_id is None:
break
self._owned_thread_id = current_tid
self._recursion_depth = 1
return True
@atomic
def release(self):
current_tid = _get_ident()
if current_tid != self._owned_thread_id:
raise RuntimeError("%s: Releasing lock not owned by you. You: 0x%x; Owner: 0x%x" % (
self,
current_tid, self._owned_thread_id or 0,
))
self._recursion_depth -= 1
if not self._recursion_depth:
self._owned_thread_id = None
self._gil.release()
def __enter__(self):
self.acquire()
def __exit__(self, t, v, tb):
self.release()
def locked(self):
return self._gil.locked()
class _AtomicSemaphoreMixin(object):
# Behaves as though the GIL was held for the duration of acquire, wait,
# and release, just as if we were in Cython.
#
# acquire, wait, and release all acquire the lock on entry and release it
# on exit. acquire and wait can call _wait, which must release it on entry
# and re-acquire it for them on exit.
#
# Note that this does *NOT*, in-and-of itself, make semaphores safe to use from multiple threads
__slots__ = ()
def __init__(self, *args, **kwargs):
self._lock_lock = _GILLock() # pylint:disable=assigning-non-slot
super(_AtomicSemaphoreMixin, self).__init__(*args, **kwargs)
def _acquire_lock_for_switch_in(self):
self._lock_lock.acquire()
def _drop_lock_for_switch_out(self):
self._lock_lock.release()
def _notify_links(self, arrived_while_waiting):
with self._lock_lock:
return super(_AtomicSemaphoreMixin, self)._notify_links(arrived_while_waiting)
def release(self):
with self._lock_lock:
return super(_AtomicSemaphoreMixin, self).release()
def acquire(self, blocking=True, timeout=None):
with self._lock_lock:
return super(_AtomicSemaphoreMixin, self).acquire(blocking, timeout)
_py3k_acquire = acquire
def wait(self, timeout=None):
with self._lock_lock:
return super(_AtomicSemaphoreMixin, self).wait(timeout)
class _AtomicSemaphore(_AtomicSemaphoreMixin, Semaphore):
__doc__ = Semaphore.__doc__
__slots__ = (
'_lock_lock',
)
class _AtomicBoundedSemaphore(_AtomicSemaphoreMixin, BoundedSemaphore):
__doc__ = BoundedSemaphore.__doc__
__slots__ = (
'_lock_lock',
)
def release(self): # pylint:disable=useless-super-delegation
# This method is duplicated here so that it can get
# properly documented.
return super(_AtomicBoundedSemaphore, self).release()
def _fixup_docstrings():
for c in _AtomicSemaphore, _AtomicBoundedSemaphore:
b = c.__mro__[2]
assert b.__name__.endswith('Semaphore') and 'Atomic' not in b.__name__
assert c.__doc__ == b.__doc__
for m in 'acquire', 'release', 'wait':
c_meth = getattr(c, m)
b_meth = getattr(b, m)
c_meth.__doc__ = b_meth.__doc__
_fixup_docstrings()
del _fixup_docstrings
if PURE_PYTHON:
Semaphore = _AtomicSemaphore
Semaphore.__name__ = 'Semaphore'
BoundedSemaphore = _AtomicBoundedSemaphore
BoundedSemaphore.__name__ = 'BoundedSemaphore'
class DummySemaphore(object):
"""
DummySemaphore(value=None) -> DummySemaphore
An object with the same API as :class:`Semaphore`,
initialized with "infinite" initial value. None of its
methods ever block.
This can be used to parameterize on whether or not to actually
guard access to a potentially limited resource. If the resource is
actually limited, such as a fixed-size thread pool, use a real
:class:`Semaphore`, but if the resource is unbounded, use an
instance of this class. In that way none of the supporting code
needs to change.
Similarly, it can be used to parameterize on whether or not to
enforce mutual exclusion to some underlying object. If the
underlying object is known to be thread-safe itself mutual
exclusion is not needed and a ``DummySemaphore`` can be used, but
if that's not true, use a real ``Semaphore``.
"""
# Internally this is used for exactly the purpose described in the
# documentation. gevent.pool.Pool uses it instead of a Semaphore
# when the pool size is unlimited, and
# gevent.fileobject.FileObjectThread takes a parameter that
# determines whether it should lock around IO to the underlying
# file object.
def __init__(self, value=None):
"""
.. versionchanged:: 1.1rc3
Accept and ignore a *value* argument for compatibility with Semaphore.
"""
def __str__(self):
return '<%s>' % self.__class__.__name__
def locked(self):
"""A DummySemaphore is never locked so this always returns False."""
return False
def ready(self):
"""A DummySemaphore is never locked so this always returns True."""
return True
def release(self):
"""Releasing a dummy semaphore does nothing."""
def rawlink(self, callback):
# XXX should still work and notify?
pass
def unlink(self, callback):
pass
def wait(self, timeout=None): # pylint:disable=unused-argument
"""Waiting for a DummySemaphore returns immediately."""
return 1
def acquire(self, blocking=True, timeout=None):
"""
A DummySemaphore can always be acquired immediately so this always
returns True and ignores its arguments.
.. versionchanged:: 1.1a1
Always return *true*.
"""
# pylint:disable=unused-argument
return True
def __enter__(self):
pass
def __exit__(self, typ, val, tb):
pass
class RLock(object):
"""
A mutex that can be acquired more than once by the same greenlet.
A mutex can only be locked by one greenlet at a time. A single greenlet
can `acquire` the mutex as many times as desired, though. Each call to
`acquire` must be paired with a matching call to `release`.
It is an error for a greenlet that has not acquired the mutex
to release it.
Instances are context managers.
"""
__slots__ = (
'_block',
'_owner',
'_count',
'__weakref__',
)
def __init__(self, hub=None):
"""
.. versionchanged:: 20.5.1
Add the ``hub`` argument.
"""
self._block = Semaphore(1, hub)
self._owner = None
self._count = 0
def __repr__(self):
return "<%s at 0x%x _block=%s _count=%r _owner=%r)>" % (
self.__class__.__name__,
id(self),
self._block,
self._count,
self._owner)
def acquire(self, blocking=True, timeout=None):
"""
Acquire the mutex, blocking if *blocking* is true, for up to
*timeout* seconds.
.. versionchanged:: 1.5a4
Added the *timeout* parameter.
:return: A boolean indicating whether the mutex was acquired.
"""
me = getcurrent()
if self._owner is me:
self._count += 1
return 1
rc = self._block.acquire(blocking, timeout)
if rc:
self._owner = me
self._count = 1
return rc
def __enter__(self):
return self.acquire()
def release(self):
"""
Release the mutex.
Only the greenlet that originally acquired the mutex can
release it.
"""
if self._owner is not getcurrent():
raise RuntimeError("cannot release un-acquired lock. Owner: %r Current: %r" % (
self._owner, getcurrent()
))
self._count = count = self._count - 1 # pylint:disable=consider-using-augmented-assign
if not count:
self._owner = None
self._block.release()
def __exit__(self, typ, value, tb):
self.release()
# Internal methods used by condition variables
def _acquire_restore(self, count_owner):
count, owner = count_owner
self._block.acquire()
self._count = count
self._owner = owner
def _release_save(self):
count = self._count
self._count = 0
owner = self._owner
self._owner = None
self._block.release()
return (count, owner)
def _is_owned(self):
return self._owner is getcurrent()