Skip to content

Commit

Permalink
Merge pull request #1488 from gevent/issue1487
Browse files Browse the repository at this point in the history
Make Semaphores fair.
  • Loading branch information
jamadden committed Dec 6, 2019
2 parents d80b368 + 9d7a12c commit 6115e43
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 89 deletions.
7 changes: 7 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@
Using spin locks is not recommended, but may have been done in code
written for threads, especially on Python 3. See :issue:`1464`.

- Fix Semaphore (and monkey-patched threading locks) to be fair. This
eliminates the rare potential for starvation of greenlets. As part
of this change, the low-level method ``rawlink`` of Semaphore,
Event, and AsyncResult now always remove the link object when
calling it, so ``unlink`` can sometimes be optimized out. See
:issue:`1487`.

1.5a2 (2019-10-21)
==================

Expand Down
3 changes: 2 additions & 1 deletion src/gevent/__abstract_linkable.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ cdef class AbstractLinkable(object):
cdef readonly SwitchOutGreenletWithLoop hub

cdef _notifier
cdef set _links
cdef list _links
cdef bint _notify_all

cpdef rawlink(self, callback)
cpdef bint ready(self)
cpdef unlink(self, callback)

cdef _check_and_notify(self)
@cython.nonecheck(False)
cpdef _notify_links(self)
cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, waited, wait_success)
Expand Down
157 changes: 85 additions & 72 deletions src/gevent/_abstract_linkable.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,44 @@ class AbstractLinkable(object):
# Encapsulates the standard parts of the linking and notifying
# protocol common to both repeatable events (Event, Semaphore) and
# one-time events (AsyncResult).

__slots__ = ('hub', '_links', '_notifier', '_notify_all', '__weakref__')
#
# TODO: As of gevent 1.5, we use the same datastructures and almost
# the same algorithm as Greenlet. See about unifying them more.

__slots__ = (
'hub',
'_links',
'_notifier',
'_notify_all',
'__weakref__'
)

def __init__(self):
# Before this implementation, AsyncResult and Semaphore
# maintained the order of notifications, but Event did not.

# In gevent 1.3, before Semaphore extended this class,
# that was changed to not maintain the order. It was done because
# In gevent 1.3, before Semaphore extended this class, that
# was changed to not maintain the order. It was done because
# Event guaranteed to only call callbacks once (a set) but
# AsyncResult had no such guarantees.

# Semaphore likes to maintain order of callbacks, though,
# so when it was added we went back to a list implementation
# for storing callbacks. But we want to preserve the unique callback
# property, so we manually check.

# We generally don't expect to have so many waiters (for any of those
# objects) that testing membership and removing is a bottleneck.

# In PyPy 2.6.1 with Cython 0.23, `cdef public` or `cdef
# readonly` or simply `cdef` attributes of type `object` can appear to leak if
# a Python subclass is used (this is visible simply
# instantiating this subclass if _links=[]). Our _links and
# _notifier are such attributes, and gevent.thread subclasses
# this class. Thus, we carefully manage the lifetime of the
# objects we put in these attributes so that, in the normal
# case of a semaphore used correctly (deallocated when it's not
# locked and no one is waiting), the leak goes away (because
# these objects are back to None). This can also be solved on PyPy
# by simply not declaring these objects in the pxd file, but that doesn't work for
# CPython ("No attribute...")
# See https://github.com/gevent/gevent/issues/660
self._links = set()
# AsyncResult had no such guarantees. When Semaphore was
# changed to extend this class, it lost its ordering
# guarantees. Unfortunately, that made it unfair. There are
# rare cases that this can starve a greenlet
# (https://github.com/gevent/gevent/issues/1487) and maybe
# even lead to deadlock (not tested).

# So in gevent 1.5 we go back to maintaining order. But it's
# still important not to make duplicate calls, and it's also
# important to avoid O(n^2) behaviour that can result from
# naive use of a simple list due to the need to handle removed
# links in the _notify_links loop. Cython has special support for
# built-in sets, lists, and dicts, but not ordereddict. Rather than
# use two data structures, or a dict({link: order}), we simply use a
# list and remove objects as we go, keeping track of them so as not to
# have duplicates called. This makes `unlink` O(n), but we can avoid
# calling it in the common case in _wait_core (even so, the number of
# waiters should usually be pretty small)
self._links = []
self._notifier = None
# This is conceptually a class attribute, defined here for ease of access in
# cython. If it's true, when notifiers fire, all existing callbacks are called.
Expand Down Expand Up @@ -95,13 +99,15 @@ def rawlink(self, callback):
"""
if not callable(callback):
raise TypeError('Expected callable: %r' % (callback, ))

self._links.add(callback)
self._links.append(callback)
self._check_and_notify()

def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
self._links.discard(callback)
try:
self._links.remove(callback)
except ValueError:
pass

if not self._links and self._notifier is not None:
# If we currently have one queued, de-queue it.
Expand All @@ -110,49 +116,54 @@ def unlink(self, callback):
# But we can't set it to None in case it was actually running.
self._notifier.stop()


def _notify_links(self):
# We release self._notifier here. We are called by it
# at the end of the loop, and it is now false in a boolean way (as soon
# as this method returns).
notifier = self._notifier
# We were ready() at the time this callback was scheduled;
# we may not be anymore, and that status may change during
# callback processing. Some of our subclasses will want to
# notify everyone that the status was once true, even though not it
# may not be anymore.
todo = set(self._links)
# Early links are allowed to remove later links, and links
# are allowed to add more links.
#
# We were ready() at the time this callback was scheduled; we
# may not be anymore, and that status may change during
# callback processing. Some of our subclasses (Event) will
# want to notify everyone who was registered when the status
# became true that it was once true, even though it may not be
# anymore. In that case, we must not keep notifying anyone that's
# newly added after that, even if we go ready again.
final_link = self._links[-1]
only_while_ready = not self._notify_all
done = set() # of ids
try:
for link in todo:
if not self._notify_all and not self.ready():
while self._links: # remember this can be mutated
if only_while_ready and not self.ready():
break

if link not in self._links:
# Been removed already by some previous link. OK, fine.
link = self._links.pop(0) # Cython optimizes using list internals
id_link = id(link)
if id_link in done:
continue
done.add(id_link)
try:
link(self)
except: # pylint:disable=bare-except
# We're running in the hub, so getcurrent() returns
# a hub.
self.hub.handle_error((link, self), *sys.exc_info()) # pylint:disable=undefined-variable
finally:
if getattr(link, 'auto_unlink', None):
# This attribute can avoid having to keep a reference to the function
# *in* the function, which is a cycle
self.unlink(link)
# We're running in the hub, errors must not escape.
self.hub.handle_error((link, self), *sys.exc_info())

if link is final_link:
break
finally:
# We should not have created a new notifier even if callbacks
# released us because we loop through *all* of our links on the
# same callback while self._notifier is still true.
assert self._notifier is notifier
self._notifier = None

# Our set of active links changed, and we were told to stop on the first
# time we went unready. See if we're ready, and if so, go around
# again.
if not self._notify_all and todo != self._links:
self._check_and_notify()
# Now we may be ready or not ready. If we're ready, which
# could have happened during the last link we called, then we
# must have more links than we started with. We need to schedule the
# wakeup.
self._check_and_notify()

def _wait_core(self, timeout, catch=Timeout):
# The core of the wait implementation, handling
Expand All @@ -161,23 +172,25 @@ def _wait_core(self, timeout, catch=Timeout):
# Returns a true value if the wait succeeded without timing out.
switch = getcurrent().switch # pylint:disable=undefined-variable
self.rawlink(switch)
try:
with Timeout._start_new_or_dummy(timeout) as timer:
try:
if self.hub is None:
self.hub = get_hub()
result = self.hub.switch()
if result is not self: # pragma: no cover
raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, ))
return True
except catch as ex:
if ex is not timer:
raise
# test_set_and_clear and test_timeout in test_threading
# rely on the exact return values, not just truthish-ness
return False
finally:
self.unlink(switch)
with Timeout._start_new_or_dummy(timeout) as timer:
try:
if self.hub is None:
self.hub = get_hub()
result = self.hub.switch()
if result is not self: # pragma: no cover
raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, ))
# If we got here, we were automatically unlinked already.
return True
except catch as ex:
self.unlink(switch)
if ex is not timer:
raise
# test_set_and_clear and test_timeout in test_threading
# rely on the exact return values, not just truthish-ness
return False
except:
self.unlink(switch)
raise

def _wait_return_value(self, waited, wait_success):
# pylint:disable=unused-argument
Expand Down
8 changes: 1 addition & 7 deletions src/gevent/_ffi/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,13 @@
]


# For times when *args is captured but often not passed (empty),
# we can avoid keeping the new tuple that was created for *args
# around by using a constant.
_NOARGS = ()


class callback(object):

__slots__ = ('callback', 'args')

def __init__(self, cb, args):
self.callback = cb
self.args = args or _NOARGS
self.args = args

def stop(self):
self.callback = None
Expand Down
6 changes: 6 additions & 0 deletions src/gevent/_semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
The order in which waiters are awakened is not specified. It was not
specified previously, but usually went in FIFO order.
.. versionchanged:: 1.5a3
Waiting greenlets are now awakened in the order in which they waited.
.. versionchanged:: 1.5a3
The low-level ``rawlink`` method (most users won't use this) now automatically
unlinks waiters before calling them.
"""

def __init__(self, value=1):
Expand Down
3 changes: 3 additions & 0 deletions src/gevent/baseserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def set_handle(self, handle):
raise TypeError("'handle' must be provided")

def _start_accepting_if_started(self, _event=None):
print("Begin accepting. Already started?", self.started)
if self.started:
self.start_accepting()

Expand Down Expand Up @@ -209,6 +210,8 @@ def _do_read(self):
for _ in xrange(self.max_accept):
if self.full():
self.stop_accepting()
if self.pool is not None:
self.pool._semaphore.rawlink(self._start_accepting_if_started)
return
try:
args = self.do_read()
Expand Down
13 changes: 12 additions & 1 deletion src/gevent/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@ class Event(AbstractLinkable): # pylint:disable=undefined-variable
.. note::
The order and timing in which waiting greenlets are awakened is not determined.
As an implementation note, in gevent 1.1 and 1.0, waiting greenlets are awakened in a
undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets
undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets
(those not waiting to be awakened) may run between the current greenlet yielding and
the waiting greenlets being awakened. These details may change in the future.
.. versionchanged:: 1.5a3
Waiting greenlets are now awakened in the order in which they waited.
.. versionchanged:: 1.5a3
The low-level ``rawlink`` method (most users won't use this) now automatically
unlinks waiters before calling them.
"""

__slots__ = ('_flag',)
Expand Down Expand Up @@ -181,6 +187,11 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
.. versionchanged:: 1.1
Callbacks :meth:`linked <rawlink>` to this object are required to be hashable, and duplicates are
merged.
.. versionchanged:: 1.5a3
Waiting greenlets are now awakened in the order in which they waited.
.. versionchanged:: 1.5a3
The low-level ``rawlink`` method (most users won't use this) now automatically
unlinks waiters before calling them.
"""

__slots__ = ('_value', '_exc_info', '_imap_task_index')
Expand Down
Loading

0 comments on commit 6115e43

Please sign in to comment.