diff --git a/docs/api/gevent.hub.rst b/docs/api/gevent.hub.rst index 9797ede63..36ffc970a 100644 --- a/docs/api/gevent.hub.rst +++ b/docs/api/gevent.hub.rst @@ -30,6 +30,7 @@ All implementations of the loop provide a common minimum interface. .. autointerface:: gevent._interfaces.ILoop .. autointerface:: gevent._interfaces.IWatcher +.. autointerface:: gevent._interfaces.ICallback Utilities ========= diff --git a/docs/changes/1739.misc b/docs/changes/1739.misc index d4edc7c13..be957f368 100644 --- a/docs/changes/1739.misc +++ b/docs/changes/1739.misc @@ -1,4 +1,4 @@ -Make ``AsyncResult`` print a warning when it detects improper +Make ``gevent.event.AsyncResult`` print a warning when it detects improper cross-thread usage instead of hanging. ``AsyncResult`` has *never* been safe to use from multiple threads. @@ -18,4 +18,8 @@ are still possible, especially under PyPy 7.3.3. At the same time, ``AsyncResult`` is tuned to behave more like it did in older versions, meaning that the hang is once again much less likely. If you were getting lucky and using ``AsyncResult`` -successfully across threads, this may restore your luck. +successfully across threads, this may restore your luck. In addition, +cross-thread wakeups are faster. Note that the gevent hub now uses an +extra file descriptor to implement this. + +Similar changes apply to ``gevent.event.Event`` (see :issue:`1735`). diff --git a/setup.py b/setup.py index fd9e0d508..62882c73d 100755 --- a/setup.py +++ b/setup.py @@ -406,6 +406,7 @@ def run_setup(ext_modules): 'docs': [ 'repoze.sphinx.autointerface', 'sphinxcontrib-programoutput', + 'zope.schema', ], # To the extent possible, we should work to make sure # our tests run, at least a basic set, without any of diff --git a/src/gevent/_abstract_linkable.py b/src/gevent/_abstract_linkable.py index 14ba4c66d..c475a1209 100644 --- a/src/gevent/_abstract_linkable.py +++ b/src/gevent/_abstract_linkable.py @@ -47,7 +47,11 @@ def get_roots_and_hubs(): return { x.parent: x for x in get_objects() - if isinstance(x, Hub) + # Make sure to only find hubs that have a loop + # and aren't destroyed. If we don't do that, we can + # get an old hub that no longer works leading to issues in + # combined test cases. + if isinstance(x, Hub) and x.loop is not None } @@ -393,9 +397,9 @@ def _handle_unswitched_notifications(self, unswitched): root_greenlets = get_roots_and_hubs() hub = root_greenlets.get(glet) - if hub is not None: - hub.loop.run_callback(link, self) - if hub is None: + if hub is not None and hub.loop is not None: + hub.loop.run_callback_threadsafe(link, self) + if hub is None or hub.loop is None: # We couldn't handle it self.__print_unswitched_warning(link, printed_tb) printed_tb = True diff --git a/src/gevent/_ffi/callback.py b/src/gevent/_ffi/callback.py index e3b2d1afc..1b0d4f1be 100644 --- a/src/gevent/_ffi/callback.py +++ b/src/gevent/_ffi/callback.py @@ -1,10 +1,16 @@ -from __future__ import absolute_import, print_function +from __future__ import absolute_import +from __future__ import print_function + +from zope.interface import implementer + +from gevent._interfaces import ICallback __all__ = [ 'callback', ] +@implementer(ICallback) class callback(object): __slots__ = ('callback', 'args') diff --git a/src/gevent/_ffi/loop.py b/src/gevent/_ffi/loop.py index 44b858eb4..8dd3e8c06 100644 --- a/src/gevent/_ffi/loop.py +++ b/src/gevent/_ffi/loop.py @@ -392,6 +392,7 @@ class AbstractLoop(object): _default = None _keepaliveset = _DiscardedSet() + _threadsafe_async = None def __init__(self, ffi, lib, watchers, flags=None, default=None): self._ffi = ffi @@ -405,7 +406,6 @@ def __init__(self, ffi, lib, watchers, flags=None, default=None): self._keepaliveset = set() self._init_loop_and_aux_watchers(flags, default) - def _init_loop_and_aux_watchers(self, flags=None, default=None): self._ptr = self._init_loop(flags, default) @@ -436,6 +436,8 @@ def _init_loop_and_aux_watchers(self, flags=None, default=None): self._timer0.data = self._handle_to_self self._init_callback_timer() + self._threadsafe_async = self.async_(ref=False) + self._threadsafe_async.start(lambda: None) # TODO: We may be able to do something nicer and use the existing python_callback # combined with onerror and the class check/timer/prepare to simplify things # and unify our handling @@ -546,7 +548,9 @@ def _run_callbacks(self): # pylint:disable=too-many-branches self.starting_timer_may_update_loop_time = False def _stop_aux_watchers(self): - raise NotImplementedError() + if self._threadsafe_async is not None: + self._threadsafe_async.close() + self._threadsafe_async = None def destroy(self): ptr = self.ptr @@ -739,9 +743,13 @@ def run_callback(self, func, *args): # _run_callbacks), this could happen almost immediately, # without the loop cycling. cb = callback(func, args) - self._callbacks.append(cb) - self._setup_for_run_callback() + self._callbacks.append(cb) # Relying on the GIL for this to be threadsafe + self._setup_for_run_callback() # XXX: This may not be threadsafe. + return cb + def run_callback_threadsafe(self, func, *args): + cb = self.run_callback(func, *args) + self._threadsafe_async.send() return cb def _format(self): diff --git a/src/gevent/_gevent_c_abstract_linkable.pxd b/src/gevent/_gevent_c_abstract_linkable.pxd index 0f1be96c9..d2ec604b6 100644 --- a/src/gevent/_gevent_c_abstract_linkable.pxd +++ b/src/gevent/_gevent_c_abstract_linkable.pxd @@ -69,6 +69,7 @@ cdef class AbstractLinkable(object): @cython.nonecheck(False) cpdef _notify_links(self, list arrived_while_waiting) + @cython.locals(hub=SwitchOutGreenletWithLoop) cdef _handle_unswitched_notifications(self, list unswitched) cdef __print_unswitched_warning(self, link, bint printed_tb) diff --git a/src/gevent/_interfaces.py b/src/gevent/_interfaces.py index 119cdf855..05a5051f7 100644 --- a/src/gevent/_interfaces.py +++ b/src/gevent/_interfaces.py @@ -19,12 +19,31 @@ from zope.interface import Interface from zope.interface import Attribute +try: + from zope import schema +except ImportError: # pragma: no cover + class _Field(Attribute): + __allowed_kw__ = ('readonly', 'min',) + def __init__(self, description, required=False, **kwargs): + description = "%s (required? %s)" % (description, required) + for k in self.__allowed_kw__: + kwargs.pop(k, None) + if kwargs: + raise TypeError("Unexpected keyword arguments: %r" % (kwargs,)) + Attribute.__init__(self, description) + + class schema(object): + Bool = _Field + Float = _Field + + # pylint:disable=no-method-argument, unused-argument, no-self-argument # pylint:disable=inherit-non-class __all__ = [ 'ILoop', 'IWatcher', + 'ICallback', ] class ILoop(Interface): @@ -46,13 +65,20 @@ class ILoop(Interface): this watcher is still started. *priority* is event loop specific. """ - default = Attribute("Boolean indicating whether this is the default loop") + default = schema.Bool( + description=u"Boolean indicating whether this is the default loop", + required=True, + readonly=True, + ) - approx_timer_resolution = Attribute( - "Floating point number of seconds giving (approximately) the minimum " + approx_timer_resolution = schema.Float( + description=u"Floating point number of seconds giving (approximately) the minimum " "resolution of a timer (and hence the minimun value the sleep can sleep for). " "On libuv, this is fixed by the library, but on libev it is just a guess " - "and the actual value is system dependent." + "and the actual value is system dependent.", + required=True, + min=0.0, + readonly=True, ) def run(nowait=False, once=False): @@ -160,7 +186,7 @@ def fork(ref=True, priority=None): """ Create a watcher that fires when the process forks. - Availability: POSIX + Availability: Unix. """ def async_(ref=True, priority=None): @@ -182,6 +208,8 @@ def child(pid, trace=0, ref=True): Create a watcher that fires for events on the child with process ID *pid*. This is platform specific and not available on Windows. + + Availability: Unix. """ def stat(path, interval=0.0, ref=True, priority=None): @@ -196,8 +224,29 @@ def run_callback(func, *args): """ Run the *func* passing it *args* at the next opportune moment. - This is a way of handing control to the event loop and deferring - an action. + The next opportune moment may be the next iteration of the event loop, + the current iteration, or some other time in the future. + + Returns a :class:`ICallback` object. See that documentation for + important caveats. + + .. seealso:: :meth:`asyncio.loop.call_soon` + The :mod:`asyncio` equivalent. + """ + + def run_callback_threadsafe(func, *args): + """ + Like :meth:`run_callback`, but for use from *outside* the + thread that is running this loop. + + This not only schedules the *func* to run, it also causes the + loop to notice that the *func* has been scheduled (e.g., it causes + the loop to wake up). + + .. versionadded:: NEXT + + .. seealso:: :meth:`asyncio.loop.call_soon_threadsafe` + The :mod:`asyncio` equivalent. """ class IWatcher(Interface): @@ -242,3 +291,25 @@ def close(): undefined. You should dispose of any references you have to it after calling this method. """ + +class ICallback(Interface): + """ + Represents a function that will be run some time in the future. + + Callback functions run in the hub, and as such they cannot use + gevent's blocking API; any exception they raise cannot be caught. + """ + + pending = schema.Bool(description="Has this callback run yet?", + readonly=True) + + def stop(): + """ + If this object is still `pending`, cause it to + no longer be `pending`; the function will not be run. + """ + + def close(): + """ + An alias of `stop`. + """ diff --git a/src/gevent/_semaphore.py b/src/gevent/_semaphore.py index 9f70a4d32..b86cea68b 100644 --- a/src/gevent/_semaphore.py +++ b/src/gevent/_semaphore.py @@ -405,7 +405,7 @@ def __acquire_using_other_hub(self, owning_hub, timeout): thread_lock.acquire() results = [] - owning_hub.loop.run_callback( + owning_hub.loop.run_callback_threadsafe( spawn_raw, self.__acquire_from_other_thread_cb, results, diff --git a/src/gevent/libev/corecext.pyx b/src/gevent/libev/corecext.pyx index 919f3c328..7fbf723e9 100644 --- a/src/gevent/libev/corecext.pyx +++ b/src/gevent/libev/corecext.pyx @@ -392,6 +392,7 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]: ## embedded struct members cdef libev.ev_prepare _prepare cdef libev.ev_timer _timer0 + cdef libev.ev_async _threadsafe_async # We'll only actually start this timer if we're on Windows, # but it doesn't hurt to compile it in on all platforms. cdef libev.ev_timer _periodic_signal_checker @@ -421,6 +422,8 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]: libev.ev_timer_init(&self._timer0, gevent_noop, 0.0, 0.0) + libev.ev_async_init(&self._threadsafe_async, + gevent_noop) cdef unsigned int c_flags if ptr: @@ -454,6 +457,10 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]: libev.ev_prepare_start(self._ptr, &self._prepare) libev.ev_unref(self._ptr) + libev.ev_async_start(self._ptr, &self._threadsafe_async) + libev.ev_unref(self._ptr) + + def __init__(self, object flags=None, object default=None, libev.intptr_t ptr=0): self._callbacks = CallbackFIFO() # See libev.corecffi for this attribute. @@ -507,6 +514,9 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]: if libev.ev_is_active(&self._periodic_signal_checker): libev.ev_ref(ptr) libev.ev_timer_stop(ptr, &self._periodic_signal_checker) + if libev.ev_is_active(&self._threadsafe_async): + libev.ev_ref(ptr) + libev.ev_async_stop(ptr, &self._threadsafe_async) def destroy(self): cdef libev.ev_loop* ptr = self._ptr @@ -520,8 +530,8 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]: # else with it will likely cause a crash. return # Mark as destroyed - libev.ev_set_userdata(ptr, NULL) self._stop_watchers(ptr) + libev.ev_set_userdata(ptr, NULL) if SYSERR_CALLBACK == self._handle_syserr: set_syserr_cb(None) libev.ev_loop_destroy(ptr) @@ -715,6 +725,12 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]: libev.ev_ref(self._ptr) return cb + def run_callback_threadsafe(self, func, *args): + # We rely on the GIL to make this threadsafe. + cb = self.run_callback(func, *args) + libev.ev_async_send(self._ptr, &self._threadsafe_async) + return cb + def _format(self): if not self._ptr: return 'destroyed' @@ -775,15 +791,17 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]: # Explicitly not EV_USE_SIGNALFD raise AttributeError("sigfd") -try: - from zope.interface import classImplements -except ImportError: - pass -else: - # XXX: This invokes the side-table lookup, we would - # prefer to have it stored directly on the class. - from gevent._interfaces import ILoop - classImplements(loop, ILoop) + +from zope.interface import classImplements + +# XXX: This invokes the side-table lookup, we would +# prefer to have it stored directly on the class. That means we +# need a class variable ``__implemented__``, but that's hard in +# Cython +from gevent._interfaces import ILoop +from gevent._interfaces import ICallback +classImplements(loop, ILoop) +classImplements(callback, ICallback) # about readonly _flags attribute: # bit #1 set if object owns Python reference to itself (Py_INCREF was diff --git a/src/gevent/libev/corecffi.py b/src/gevent/libev/corecffi.py index aa96cc821..d40e874de 100644 --- a/src/gevent/libev/corecffi.py +++ b/src/gevent/libev/corecffi.py @@ -284,6 +284,7 @@ def _start_callback_timer(self): libev.ev_timer_start(self._ptr, self._timer0) def _stop_aux_watchers(self): + super(loop, self)._stop_aux_watchers() if libev.ev_is_active(self._prepare): self.ref() libev.ev_prepare_stop(self._ptr, self._prepare) diff --git a/src/gevent/libuv/loop.py b/src/gevent/libuv/loop.py index 21fc96067..877e82db6 100644 --- a/src/gevent/libuv/loop.py +++ b/src/gevent/libuv/loop.py @@ -334,6 +334,7 @@ def _start_callback_timer(self): def _stop_aux_watchers(self): + super(loop, self)._stop_aux_watchers() assert self._prepare assert self._check assert self._signal_idle @@ -456,7 +457,8 @@ def unref(self): pass def break_(self, how=None): - libuv.uv_stop(self.ptr) + if self.ptr: + libuv.uv_stop(self.ptr) def reinit(self): # TODO: How to implement? We probably have to simply diff --git a/src/gevent/tests/test__event.py b/src/gevent/tests/test__event.py index bf0dc7896..824749f69 100644 --- a/src/gevent/tests/test__event.py +++ b/src/gevent/tests/test__event.py @@ -140,6 +140,26 @@ def test_nonblocking_get(self): self.assertRaises(gevent.Timeout, ar.get, block=False) self.assertRaises(gevent.Timeout, ar.get_nowait) +class TestAsyncResultCrossThread(greentest.TestCase): + + def _makeOne(self): + return AsyncResult() + + def _setOne(self, one): + one.set('from main') + + BG_WAIT_DELAY = 60 + + def _check_pypy_switch(self): + # On PyPy 7.3.3, switching to the main greenlet of a thread from a + # different thread silently does nothing. We can't detect the cross-thread + # switch, and so this test breaks + # https://foss.heptapod.net/pypy/pypy/-/issues/3381 + if greentest.PYPY: + import sys + if sys.pypy_version_info[:3] <= (7, 3, 3): # pylint:disable=no-member + self.skipTest("PyPy bug: https://foss.heptapod.net/pypy/pypy/-/issues/3381") + @greentest.ignores_leakcheck def test_cross_thread_use(self, timed_wait=False, wait_in_bg=False): # Issue 1739. @@ -153,15 +173,10 @@ def test_cross_thread_use(self, timed_wait=False, wait_in_bg=False): from threading import Thread as NativeThread from threading import Event as NativeEvent - # On PyPy 7.3.3, switching to the main greenlet of a thread from a - # different thread silently does nothing. We can't detect the cross-thread - # switch, and so this test breaks - # https://foss.heptapod.net/pypy/pypy/-/issues/3381 - if not wait_in_bg and greentest.PYPY: - import sys - if sys.pypy_version_info[:3] <= (7, 3, 3): # pylint:disable=no-member - self.skipTest("PyPy bug: https://foss.heptapod.net/pypy/pypy/-/issues/3381") + if not wait_in_bg: + self._check_pypy_switch() + test = self class Thread(NativeThread): def __init__(self): NativeThread.__init__(self) @@ -169,7 +184,7 @@ def __init__(self): self.running_event = NativeEvent() self.finished_event = NativeEvent() - self.async_result = AsyncResult() + self.async_result = test._makeOne() self.result = '' def run(self): @@ -182,10 +197,12 @@ def spin(): def work(): self.running_event.set() - # XXX: If we use a timed wait(), the bug doesn't manifest. - # Why not? + # If we use a timed wait(), the bug doesn't manifest. + # This is probably because the loop wakes up to handle the timer, + # and notices the callback. + # See https://github.com/gevent/gevent/issues/1735 if timed_wait: - self.result = self.async_result.wait(DELAY * 5) + self.result = self.async_result.wait(test.BG_WAIT_DELAY) else: self.result = self.async_result.wait() @@ -207,12 +224,15 @@ def work(): thread.start() try: thread.running_event.wait() - thread.async_result.set('from main') + self._setOne(thread.async_result) thread.finished_event.wait(DELAY * 5) finally: thread.join(DELAY * 15) - self.assertEqual(thread.result, 'from main') + self._check_result(thread.result) + + def _check_result(self, result): + self.assertEqual(result, 'from main') def test_cross_thread_use_bg(self): self.test_cross_thread_use(timed_wait=False, wait_in_bg=True) @@ -223,6 +243,57 @@ def test_cross_thread_use_timed(self): def test_cross_thread_use_timed_bg(self): self.test_cross_thread_use(timed_wait=True, wait_in_bg=True) + @greentest.ignores_leakcheck + def test_cross_thread_use_set_in_bg(self): + self.assertNotMonkeyPatched() # Need real threads, event objects + from threading import Thread as NativeThread + from threading import Event as NativeEvent + + self._check_pypy_switch() + test = self + class Thread(NativeThread): + def __init__(self): + NativeThread.__init__(self) + self.daemon = True + self.running_event = NativeEvent() + self.finished_event = NativeEvent() + + self.async_result = test._makeOne() + self.result = '' + + def run(self): + self.running_event.set() + test._setOne(self.async_result) + + self.finished_event.set() + gevent.get_hub().destroy(destroy_loop=True) + + thread = Thread() + try: + glet = gevent.spawn(thread.start) + result = thread.async_result.wait(self.BG_WAIT_DELAY) + finally: + thread.join(DELAY * 15) + glet.join(DELAY) + self._check_result(result) + + @greentest.ignores_leakcheck + def test_cross_thread_use_set_in_bg2(self): + # Do it again to make sure it works multiple times. + self.test_cross_thread_use_set_in_bg() + +class TestEventCrossThread(TestAsyncResultCrossThread): + + def _makeOne(self): + return Event() + + def _setOne(self, one): + one.set() + + def _check_result(self, result): + self.assertTrue(result) + + class TestAsyncResultAsLinkTarget(greentest.TestCase): error_fatal = False diff --git a/src/gevent/tests/test__hub.py b/src/gevent/tests/test__hub.py index 26d192237..5ffa0cea2 100644 --- a/src/gevent/tests/test__hub.py +++ b/src/gevent/tests/test__hub.py @@ -337,6 +337,24 @@ def test_implemensts_ILoop(self): verify.verifyObject(ILoop, loop) + def test_callback_implements_ICallback(self): + from gevent.testing import verify + from gevent._interfaces import ICallback + + loop = get_hub().loop + + cb = loop.run_callback(lambda: None) + verify.verifyObject(ICallback, cb) + + def test_callback_ts_implements_ICallback(self): + from gevent.testing import verify + from gevent._interfaces import ICallback + + loop = get_hub().loop + + cb = loop.run_callback_threadsafe(lambda: None) + verify.verifyObject(ICallback, cb) + class TestHandleError(unittest.TestCase):