Skip to content

Commit

Permalink
Debug the test failure in TestSemaphoreMultiThread.test_dueling_threa…
Browse files Browse the repository at this point in the history
…ds_with_hub

And fix. Leave a comment about what was discovered. It's somewhat surprising.
  • Loading branch information
jamadden committed Jan 12, 2021
1 parent ca2de79 commit 7eb9b59
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 18 deletions.
21 changes: 21 additions & 0 deletions docs/changes/1739.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
Make ``AsyncResult`` print a warning when it detects improper
cross-thread usage instead of hanging.

``AsyncResult`` has *never* been safe to use from multiple threads.
It, like most gevent objects, is intended to work with greenlets from
a single thread. Using ``AsyncResult`` from multiple threads has
undefined semantics. The safest way to communicate between threads is
using an event loop async watcher.

Those undefined semantics changed in recent gevent versions, making it
more likely that an abused ``AsyncResult`` would misbehave in ways
that could cause the program to hang.

Now, when ``AsyncResult`` detects a situation that would hang, it
prints a warning to stderr. Note that this is best-effort, and hangs
are still possible, especially under PyPy 7.3.3.

At the same time, ``AsyncResult`` is tuned to behave more like it did
in older versions, meaning that the hang is once again much less
likely. If you were getting lucky and using ``AsyncResult``
successfully across threads, this may restore your luck.
8 changes: 6 additions & 2 deletions src/gevent/_abstract_linkable.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,16 @@ def _handle_unswitched_notifications(self, unswitched):

# Move this to be a callback in that thread.
# (This relies on holding the GIL *or* ``Hub.loop.run_callback`` being
# thread-safe!)
# thread-safe! Note that the CFFI implementations are definitely
# NOT thread-safe. TODO: Make them? Or an alternative?)
#
# Otherwise, print some error messages.

# TODO: Inline this for individual links. That handles the
# "only while ready" case automatically.
# "only while ready" case automatically. Be careful about locking in that case.
#
# TODO: Add a 'strict' mode that prevents doing this dance, since it's
# inherently not safe.
root_greenlets = None
printed_tb = False
only_while_ready = not self._notify_all
Expand Down
25 changes: 25 additions & 0 deletions src/gevent/_semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,31 @@ def __enter__(self):
def __exit__(self, t, v, tb):
self.release()

def _handle_unswitched_notifications(self, unswitched):
# If we fail to switch to a greenlet in another thread to send
# a notification, just re-queue it, in the hopes that the
# other thread will eventually run notifications itself.
#
# We CANNOT do what the ``super()`` does and actually allow
# this notification to get run sometime in the future by
# scheduling a callback in the other thread. The algorithm
# that we use to handle cross-thread locking/unlocking was
# designed before the schedule-a-callback mechanism was
# implemented. If we allow this to be run as a callback, we
# can find ourself the victim of ``InvalidSwitchError`` (or
# worse, silent corruption) because the switch can come at an
# unexpected time: *after* the destination thread has already
# acquired the lock.
#
# This manifests in a fairly reliable test failure,
# ``gevent.tests.test__semaphore``
# ``TestSemaphoreMultiThread.test_dueling_threads_with_hub``,
# but ONLY when running in PURE_PYTHON mode.
#
# TODO: Maybe we can rewrite that part of the algorithm to be friendly to
# running the callbacks?
self._links.extend(unswitched)

def __add_link(self, link):
if not self._notifier:
self.rawlink(link)
Expand Down
50 changes: 36 additions & 14 deletions src/gevent/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class Event(AbstractLinkable): # pylint:disable=undefined-variable
one or more others. It has the same interface as
:class:`threading.Event` but works across greenlets.
.. important::
This object is for communicating among greenlets within the
same thread *only*! Do not try to use it to communicate across threads.
An event object manages an internal flag that can be set to true
with the :meth:`set` method and reset to false with the
:meth:`clear` method. The :meth:`wait` method blocks until the
Expand Down Expand Up @@ -169,20 +173,27 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
"""
A one-time event that stores a value or an exception.
Like :class:`Event` it wakes up all the waiters when :meth:`set` or :meth:`set_exception`
is called. Waiters may receive the passed value or exception by calling :meth:`get`
instead of :meth:`wait`. An :class:`AsyncResult` instance cannot be reset.
Like :class:`Event` it wakes up all the waiters when :meth:`set`
or :meth:`set_exception` is called. Waiters may receive the passed
value or exception by calling :meth:`get` instead of :meth:`wait`.
An :class:`AsyncResult` instance cannot be reset.
.. important::
This object is for communicating among greenlets within the
same thread *only*! Do not try to use it to communicate across threads.
To pass a value call :meth:`set`. Calls to :meth:`get` (those that are currently blocking as well as
those made in the future) will return the value:
To pass a value call :meth:`set`. Calls to :meth:`get` (those that
are currently blocking as well as those made in the future) will
return the value::
>>> from gevent.event import AsyncResult
>>> result = AsyncResult()
>>> result.set(100)
>>> result.get()
100
To pass an exception call :meth:`set_exception`. This will cause :meth:`get` to raise that exception:
To pass an exception call :meth:`set_exception`. This will cause
:meth:`get` to raise that exception::
>>> result = AsyncResult()
>>> result.set_exception(RuntimeError('failure'))
Expand All @@ -191,7 +202,8 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
...
RuntimeError: failure
:class:`AsyncResult` implements :meth:`__call__` and thus can be used as :meth:`link` target:
:class:`AsyncResult` implements :meth:`__call__` and thus can be
used as :meth:`link` target::
>>> import gevent
>>> result = AsyncResult()
Expand All @@ -203,23 +215,33 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
ZeroDivisionError
.. note::
The order and timing in which waiting greenlets are awakened is not determined.
As an implementation note, in gevent 1.1 and 1.0, waiting greenlets are awakened in a
undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets
(those not waiting to be awakened) may run between the current greenlet yielding and
the waiting greenlets being awakened. These details may change in the future.
.. versionchanged:: 1.1
The exact order in which waiting greenlets are awakened is not the same
as in 1.0.
The exact order in which waiting greenlets
are awakened is not the same as in 1.0.
.. versionchanged:: 1.1
Callbacks :meth:`linked <rawlink>` to this object are required to be hashable, and duplicates are
merged.
Callbacks :meth:`linked <rawlink>` to this object are required to
be hashable, and duplicates are merged.
.. versionchanged:: 1.5a3
Waiting greenlets are now awakened in the order in which they waited.
Waiting greenlets are now awakened in the order in which they
waited.
.. versionchanged:: 1.5a3
The low-level ``rawlink`` method (most users won't use this) now automatically
unlinks waiters before calling them.
The low-level ``rawlink`` method
(most users won't use this) now automatically unlinks waiters
before calling them.
"""

__slots__ = ('_value', '_exc_info', '_imap_task_index')
Expand Down
6 changes: 4 additions & 2 deletions src/gevent/tests/test__semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,10 @@ def do_it(ix):
if not run:
break

sem.acquire(*acquire_args)
sem.release()
acquired = sem.acquire(*acquire_args)
assert acquire_args or acquired
if acquired:
sem.release()
results[ix] = i
if not create_hub:
# We don't artificially create the hub.
Expand Down

0 comments on commit 7eb9b59

Please sign in to comment.