Skip to content

Commit

Permalink
Progress on cross-thread locks.
Browse files Browse the repository at this point in the history
Tests for semaphore are passing in both pure and compiled code. gevent.tests.test__lock.TestLockMultiThread still exposes a race condition though.
  • Loading branch information
jamadden committed May 18, 2020
1 parent aa16985 commit 85ae2a1
Show file tree
Hide file tree
Showing 9 changed files with 300 additions and 63 deletions.
6 changes: 6 additions & 0 deletions docs/changes/issue1437.bugfix
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
Make gevent locks that are monkey-patched work across native
threads as well as across greenlets within a single thread. This
is expensive, and not a recommended programming pattern, but it can
happen when using the threadpool. Locks that are only used in a single
thread do not take a performance hit.

The underlying Semaphore always behaves in an atomic fashion (as if
the GIL was not released) when PURE_PYTHON is set. Previously, it only
correctly did so on PyPy.
169 changes: 143 additions & 26 deletions src/gevent/_abstract_linkable.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ class AbstractLinkable(object):
# protocol common to both repeatable events (Event, Semaphore) and
# one-time events (AsyncResult).
#
# With a few careful exceptions, instances of this object can only
# be used from a single thread. The exception is that certain methods
# may be used from multiple threads IFF:
#
# 1. They are documented as safe for that purpose; AND
# 2a. This object is compiled with Cython and thus is holding the GIL
# for the entire duration of the method; OR
# 2b. A subclass ensures that a Python-level native thread lock is held
# for the duration of the method; this is necessary in pure-Python mode.
# The only known implementation of such
# a subclass is for Semaphore.
#
# TODO: As of gevent 1.5, we use the same datastructures and almost
# the same algorithm as Greenlet. See about unifying them more.

Expand Down Expand Up @@ -70,7 +82,9 @@ def __init__(self, hub=None):
# If its false, we only call callbacks as long as ready() returns true.
self._notify_all = True
# we don't want to do get_hub() here to allow defining module-level objects
# without initializing the hub
# without initializing the hub. However, for multiple-thread safety, as soon
# as a waiting method is entered, even if it won't have to wait, we
# grab the hub.
self.hub = hub

def linkcount(self):
Expand Down Expand Up @@ -148,6 +162,10 @@ def _notify_link_list(self, links):
break

def _notify_links(self, arrived_while_waiting):
# This method must hold the GIL, or be guarded with the lock that guards
# this object. Thus, while we are notifying objects, an object from another
# thread simply cannot arrive and mutate ``_links`` or ``arrived_while_waiting``

# ``arrived_while_waiting`` is a list of greenlet.switch methods
# to call. These were objects that called wait() while we were processing,
# and which would have run *before* those that had actually waited
Expand All @@ -170,7 +188,6 @@ def _notify_links(self, arrived_while_waiting):
# became true that it was once true, even though it may not be
# any more. In that case, we must not keep notifying anyone that's
# newly added after that, even if we go ready again.

try:
self._notify_link_list(self._links)
# Now, those that arrived after we had begun the notification
Expand All @@ -187,39 +204,134 @@ def _notify_links(self, arrived_while_waiting):
# same callback while self._notifier is still true.
assert self._notifier is notifier
self._notifier = None
# TODO: Maybe we should intelligently reset self.hub to
# free up thread affinity? In case of a pathological situation where
# one object was used from one thread once & first, but usually is
# used by another thread.

# Now we may be ready or not ready. If we're ready, which
# could have happened during the last link we called, then we
# must have more links than we started with. We need to schedule the
# wakeup.
self._check_and_notify()

def __unlink_all(self, obj):
if obj is None:
return

self.unlink(obj)
if self._notifier is not None and self._notifier.args:
try:
self._notifier.args[0].remove(obj)
except ValueError:
pass

def __wait_to_be_notified(self, rawlink): # pylint:disable=too-many-branches
# We've got to watch where we could potentially release the GIL.
# Decisions we make based an the state of this object must be in blocks
# that cannot release the GIL.
resume_this_greenlet = None
watcher = None
current_hub = get_hub()
send = None

while 1:
my_hub = self.hub
if my_hub is current_hub:
break

# We're owned by another hub.
if my_hub.dead: # dead is a property, this could have released the GIL.
# We have the GIL back. Did anything change?
if my_hub is not self.hub:
continue # start over.
# The other hub is dead, so we can take ownership.
self.hub = current_hub
break
# Some other hub owns this object. We must ask it to wake us
# up. We can't use a Python-level ``Lock`` because
# (1) it doesn't support a timeout on all platforms; and
# (2) we don't want to block this hub from running. So we need to
# do so in a way that cooperates with *two* hubs. That's what an
# async watcher is built for.
#
# Allocating and starting the watcher *could* release the GIL.
# with the libev corcext, allocating won't, but starting briefly will.
# With other backends, allocating might, and starting might also.
# So...XXX: Race condition here, tiny though it may be.
watcher = current_hub.loop.async_()
send = watcher.send_ignoring_arg
if rawlink:
# Make direct calls to self.rawlink, the most common case,
# so cython can more easily optimize.
self.rawlink(send)
else:
self._notifier.args[0].append(send)
watcher.start(getcurrent().switch, self) # pylint:disable=undefined-variable
break

if self.hub is current_hub:
resume_this_greenlet = getcurrent().switch # pylint:disable=undefined-variable
if rawlink:
self.rawlink(resume_this_greenlet)
else:
self._notifier.args[0].append(resume_this_greenlet)

try:
self._drop_lock_for_switch_out()
result = current_hub.switch() # Probably releases
# If we got here, we were automatically unlinked already.
resume_this_greenlet = None
if result is not self: # pragma: no cover
raise InvalidSwitchError(
'Invalid switch into %s.wait(): %r' % (
self.__class__.__name__,
result,
)
)
finally:
self._acquire_lock_for_switch_in()
self.__unlink_all(resume_this_greenlet)
self.__unlink_all(send)
if watcher is not None:
watcher.stop()
watcher.close()

def _acquire_lock_for_switch_in(self):
return

def _drop_lock_for_switch_out(self):
return

def _wait_core(self, timeout, catch=Timeout):
# The core of the wait implementation, handling
# switching and linking. If *catch* is set to (),
# a timeout that elapses will be allowed to be raised.
# Returns a true value if the wait succeeded without timing out.
switch = getcurrent().switch # pylint:disable=undefined-variable
self.rawlink(switch)
with Timeout._start_new_or_dummy(timeout) as timer:
"""
The core of the wait implementation, handling switching and
linking.
This method is safe to call from multiple threads; it must be holding
the GIL for the entire duration, or be protected by a Python-level
lock for that to be true.
``self.hub`` must be initialized before entering this method.
The hub that is set is considered the owner and cannot be changed.
If *catch* is set to ``()``, a timeout that elapses will be
allowed to be raised.
:return: A true value if the wait succeeded without timing out.
That is, a true return value means we were notified and control
resumed in this greenlet.
"""
with Timeout._start_new_or_dummy(timeout) as timer: # Might release
try:
if self.hub is None:
self.hub = get_hub()
result = self.hub.switch()
if result is not self: # pragma: no cover
raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, ))
# If we got here, we were automatically unlinked already.
self.__wait_to_be_notified(True) # Use rawlink()
return True
except catch as ex:
self.unlink(switch)
if ex is not timer:
raise
# test_set_and_clear and test_timeout in test_threading
# rely on the exact return values, not just truthish-ness
return False
except:
self.unlink(switch)
raise

def _wait_return_value(self, waited, wait_success):
# pylint:disable=unused-argument
Expand All @@ -228,16 +340,21 @@ def _wait_return_value(self, waited, wait_success):
return None # pragma: no cover all extent subclasses override

def _wait(self, timeout=None):
if self.ready():
"""
This method is safe to call from multiple threads, providing
the conditions laid out in the class documentation are met.
"""
# Watch where we could potentially release the GIL.
if self.hub is None: # no release
self.hub = get_hub() # might releases.

if self.ready(): # *might* release, if overridden in Python.
result = self._wait_return_value(False, False) # pylint:disable=assignment-from-none
if self._notifier:
# We're already notifying waiters; one of them must have run
# and switched to us.
switch = getcurrent().switch # pylint:disable=undefined-variable
self._notifier.args[0].append(switch)
switch_result = self.hub.switch()
if switch_result is not self: # pragma: no cover
raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, ))
# and switched to this greenlet, which arrived here. Alternately,
# we could be in a separate thread (but we're holding the GIL/object lock)
self.__wait_to_be_notified(False) # Use self._notifier.args[0] instead of self.rawlink

return result

Expand Down
9 changes: 9 additions & 0 deletions src/gevent/_ffi/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,15 @@ class AsyncMixin(object):
def send(self):
raise NotImplementedError()

def send_ignoring_arg(self, _ignored):
"""
Calling compatibility with ``greenlet.switch(arg)``
as used by waiters that have ``rawlink``.
This is an advanced method, not usually needed.
"""
return self.send()

@property
def pending(self):
raise NotImplementedError()
Expand Down
7 changes: 6 additions & 1 deletion src/gevent/_gevent_c_abstract_linkable.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,18 @@ cdef class AbstractLinkable(object):
cpdef unlink(self, callback)

cdef _check_and_notify(self)
cdef __wait_to_be_notified(self, bint rawlink)
cdef __unlink_all(self, obj)

@cython.nonecheck(False)
cdef _notify_link_list(self, list links)

@cython.nonecheck(False)
cpdef _notify_links(self, list arrived_while_waiting)

cpdef _drop_lock_for_switch_out(self)
cpdef _acquire_lock_for_switch_in(self)

cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, waited, wait_success)
cpdef _wait(self, timeout=*)
cdef _wait(self, timeout=*)
3 changes: 3 additions & 0 deletions src/gevent/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ def get(self, block=True, timeout=None):
# Not ready and not blocking, so immediately timeout
raise Timeout()

if self.hub is None: # pylint:disable=access-member-before-definition
self.hub = get_hub() # pylint:disable=attribute-defined-outside-init

# Wait, raising a timeout that elapses
self._wait_core(timeout, ())

Expand Down
3 changes: 3 additions & 0 deletions src/gevent/libev/corecext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,9 @@ cdef public class async_(watcher) [object PyGeventAsyncObject, type PyGeventAsyn
_check_loop(self.loop)
libev.ev_async_send(self.loop._ptr, &self._watcher)

def send_ignoring_arg(self, _ignored):
return self.send()

async = async_

cdef start_and_stop child_ss = make_ss(<void*>libev.ev_child_start, <void*>libev.ev_child_stop)
Expand Down

0 comments on commit 85ae2a1

Please sign in to comment.