Skip to content

Commit

Permalink
Add loop.run_callback_threadsafe and use it in AbstractLinkable
Browse files Browse the repository at this point in the history
This makes cross-thread wakeups of Event and AsyncResult objects much faster and doesn't rely on the target loop spinning as this wakes it up.

Add tests for this.

Fixes #1735
  • Loading branch information
jamadden committed Jan 15, 2021
1 parent 1e5f1bb commit 0612994
Show file tree
Hide file tree
Showing 14 changed files with 250 additions and 44 deletions.
1 change: 1 addition & 0 deletions docs/api/gevent.hub.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ All implementations of the loop provide a common minimum interface.

.. autointerface:: gevent._interfaces.ILoop
.. autointerface:: gevent._interfaces.IWatcher
.. autointerface:: gevent._interfaces.ICallback

Utilities
=========
Expand Down
8 changes: 6 additions & 2 deletions docs/changes/1739.misc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Make ``AsyncResult`` print a warning when it detects improper
Make ``gevent.event.AsyncResult`` print a warning when it detects improper
cross-thread usage instead of hanging.

``AsyncResult`` has *never* been safe to use from multiple threads.
Expand All @@ -18,4 +18,8 @@ are still possible, especially under PyPy 7.3.3.
At the same time, ``AsyncResult`` is tuned to behave more like it did
in older versions, meaning that the hang is once again much less
likely. If you were getting lucky and using ``AsyncResult``
successfully across threads, this may restore your luck.
successfully across threads, this may restore your luck. In addition,
cross-thread wakeups are faster. Note that the gevent hub now uses an
extra file descriptor to implement this.

Similar changes apply to ``gevent.event.Event`` (see :issue:`1735`).
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ def run_setup(ext_modules):
'docs': [
'repoze.sphinx.autointerface',
'sphinxcontrib-programoutput',
'zope.schema',
],
# To the extent possible, we should work to make sure
# our tests run, at least a basic set, without any of
Expand Down
12 changes: 8 additions & 4 deletions src/gevent/_abstract_linkable.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ def get_roots_and_hubs():
return {
x.parent: x
for x in get_objects()
if isinstance(x, Hub)
# Make sure to only find hubs that have a loop
# and aren't destroyed. If we don't do that, we can
# get an old hub that no longer works leading to issues in
# combined test cases.
if isinstance(x, Hub) and x.loop is not None
}


Expand Down Expand Up @@ -393,9 +397,9 @@ def _handle_unswitched_notifications(self, unswitched):
root_greenlets = get_roots_and_hubs()
hub = root_greenlets.get(glet)

if hub is not None:
hub.loop.run_callback(link, self)
if hub is None:
if hub is not None and hub.loop is not None:
hub.loop.run_callback_threadsafe(link, self)
if hub is None or hub.loop is None:
# We couldn't handle it
self.__print_unswitched_warning(link, printed_tb)
printed_tb = True
Expand Down
8 changes: 7 additions & 1 deletion src/gevent/_ffi/callback.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from __future__ import absolute_import, print_function
from __future__ import absolute_import
from __future__ import print_function

from zope.interface import implementer

from gevent._interfaces import ICallback

__all__ = [
'callback',
]


@implementer(ICallback)
class callback(object):

__slots__ = ('callback', 'args')
Expand Down
16 changes: 12 additions & 4 deletions src/gevent/_ffi/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ class AbstractLoop(object):
_default = None

_keepaliveset = _DiscardedSet()
_threadsafe_async = None

def __init__(self, ffi, lib, watchers, flags=None, default=None):
self._ffi = ffi
Expand All @@ -405,7 +406,6 @@ def __init__(self, ffi, lib, watchers, flags=None, default=None):
self._keepaliveset = set()
self._init_loop_and_aux_watchers(flags, default)


def _init_loop_and_aux_watchers(self, flags=None, default=None):
self._ptr = self._init_loop(flags, default)

Expand Down Expand Up @@ -436,6 +436,8 @@ def _init_loop_and_aux_watchers(self, flags=None, default=None):
self._timer0.data = self._handle_to_self
self._init_callback_timer()

self._threadsafe_async = self.async_(ref=False)
self._threadsafe_async.start(lambda: None)
# TODO: We may be able to do something nicer and use the existing python_callback
# combined with onerror and the class check/timer/prepare to simplify things
# and unify our handling
Expand Down Expand Up @@ -546,7 +548,9 @@ def _run_callbacks(self): # pylint:disable=too-many-branches
self.starting_timer_may_update_loop_time = False

def _stop_aux_watchers(self):
raise NotImplementedError()
if self._threadsafe_async is not None:
self._threadsafe_async.close()
self._threadsafe_async = None

def destroy(self):
ptr = self.ptr
Expand Down Expand Up @@ -739,9 +743,13 @@ def run_callback(self, func, *args):
# _run_callbacks), this could happen almost immediately,
# without the loop cycling.
cb = callback(func, args)
self._callbacks.append(cb)
self._setup_for_run_callback()
self._callbacks.append(cb) # Relying on the GIL for this to be threadsafe
self._setup_for_run_callback() # XXX: This may not be threadsafe.
return cb

def run_callback_threadsafe(self, func, *args):
cb = self.run_callback(func, *args)
self._threadsafe_async.send()
return cb

def _format(self):
Expand Down
1 change: 1 addition & 0 deletions src/gevent/_gevent_c_abstract_linkable.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ cdef class AbstractLinkable(object):
@cython.nonecheck(False)
cpdef _notify_links(self, list arrived_while_waiting)

@cython.locals(hub=SwitchOutGreenletWithLoop)
cdef _handle_unswitched_notifications(self, list unswitched)
cdef __print_unswitched_warning(self, link, bint printed_tb)

Expand Down
85 changes: 78 additions & 7 deletions src/gevent/_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,31 @@
from zope.interface import Interface
from zope.interface import Attribute

try:
from zope import schema
except ImportError: # pragma: no cover
class _Field(Attribute):
__allowed_kw__ = ('readonly', 'min',)
def __init__(self, description, required=False, **kwargs):
description = "%s (required? %s)" % (description, required)
for k in self.__allowed_kw__:
kwargs.pop(k, None)
if kwargs:
raise TypeError("Unexpected keyword arguments: %r" % (kwargs,))
Attribute.__init__(self, description)

class schema(object):
Bool = _Field
Float = _Field


# pylint:disable=no-method-argument, unused-argument, no-self-argument
# pylint:disable=inherit-non-class

__all__ = [
'ILoop',
'IWatcher',
'ICallback',
]

class ILoop(Interface):
Expand All @@ -46,13 +65,20 @@ class ILoop(Interface):
this watcher is still started. *priority* is event loop specific.
"""

default = Attribute("Boolean indicating whether this is the default loop")
default = schema.Bool(
description=u"Boolean indicating whether this is the default loop",
required=True,
readonly=True,
)

approx_timer_resolution = Attribute(
"Floating point number of seconds giving (approximately) the minimum "
approx_timer_resolution = schema.Float(
description=u"Floating point number of seconds giving (approximately) the minimum "
"resolution of a timer (and hence the minimun value the sleep can sleep for). "
"On libuv, this is fixed by the library, but on libev it is just a guess "
"and the actual value is system dependent."
"and the actual value is system dependent.",
required=True,
min=0.0,
readonly=True,
)

def run(nowait=False, once=False):
Expand Down Expand Up @@ -160,7 +186,7 @@ def fork(ref=True, priority=None):
"""
Create a watcher that fires when the process forks.
Availability: POSIX
Availability: Unix.
"""

def async_(ref=True, priority=None):
Expand All @@ -182,6 +208,8 @@ def child(pid, trace=0, ref=True):
Create a watcher that fires for events on the child with process ID *pid*.
This is platform specific and not available on Windows.
Availability: Unix.
"""

def stat(path, interval=0.0, ref=True, priority=None):
Expand All @@ -196,8 +224,29 @@ def run_callback(func, *args):
"""
Run the *func* passing it *args* at the next opportune moment.
This is a way of handing control to the event loop and deferring
an action.
The next opportune moment may be the next iteration of the event loop,
the current iteration, or some other time in the future.
Returns a :class:`ICallback` object. See that documentation for
important caveats.
.. seealso:: :meth:`asyncio.loop.call_soon`
The :mod:`asyncio` equivalent.
"""

def run_callback_threadsafe(func, *args):
"""
Like :meth:`run_callback`, but for use from *outside* the
thread that is running this loop.
This not only schedules the *func* to run, it also causes the
loop to notice that the *func* has been scheduled (e.g., it causes
the loop to wake up).
.. versionadded:: NEXT
.. seealso:: :meth:`asyncio.loop.call_soon_threadsafe`
The :mod:`asyncio` equivalent.
"""

class IWatcher(Interface):
Expand Down Expand Up @@ -242,3 +291,25 @@ def close():
undefined. You should dispose of any references you have to it
after calling this method.
"""

class ICallback(Interface):
"""
Represents a function that will be run some time in the future.
Callback functions run in the hub, and as such they cannot use
gevent's blocking API; any exception they raise cannot be caught.
"""

pending = schema.Bool(description="Has this callback run yet?",
readonly=True)

def stop():
"""
If this object is still `pending`, cause it to
no longer be `pending`; the function will not be run.
"""

def close():
"""
An alias of `stop`.
"""
2 changes: 1 addition & 1 deletion src/gevent/_semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def __acquire_using_other_hub(self, owning_hub, timeout):
thread_lock.acquire()
results = []

owning_hub.loop.run_callback(
owning_hub.loop.run_callback_threadsafe(
spawn_raw,
self.__acquire_from_other_thread_cb,
results,
Expand Down
38 changes: 28 additions & 10 deletions src/gevent/libev/corecext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
## embedded struct members
cdef libev.ev_prepare _prepare
cdef libev.ev_timer _timer0
cdef libev.ev_async _threadsafe_async
# We'll only actually start this timer if we're on Windows,
# but it doesn't hurt to compile it in on all platforms.
cdef libev.ev_timer _periodic_signal_checker
Expand Down Expand Up @@ -421,6 +422,8 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
libev.ev_timer_init(&self._timer0,
<void*>gevent_noop,
0.0, 0.0)
libev.ev_async_init(&self._threadsafe_async,
<void*>gevent_noop)

cdef unsigned int c_flags
if ptr:
Expand Down Expand Up @@ -454,6 +457,10 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
libev.ev_prepare_start(self._ptr, &self._prepare)
libev.ev_unref(self._ptr)

libev.ev_async_start(self._ptr, &self._threadsafe_async)
libev.ev_unref(self._ptr)


def __init__(self, object flags=None, object default=None, libev.intptr_t ptr=0):
self._callbacks = CallbackFIFO()
# See libev.corecffi for this attribute.
Expand Down Expand Up @@ -507,6 +514,9 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
if libev.ev_is_active(&self._periodic_signal_checker):
libev.ev_ref(ptr)
libev.ev_timer_stop(ptr, &self._periodic_signal_checker)
if libev.ev_is_active(&self._threadsafe_async):
libev.ev_ref(ptr)
libev.ev_async_stop(ptr, &self._threadsafe_async)

def destroy(self):
cdef libev.ev_loop* ptr = self._ptr
Expand All @@ -520,8 +530,8 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
# else with it will likely cause a crash.
return
# Mark as destroyed
libev.ev_set_userdata(ptr, NULL)
self._stop_watchers(ptr)
libev.ev_set_userdata(ptr, NULL)
if SYSERR_CALLBACK == self._handle_syserr:
set_syserr_cb(None)
libev.ev_loop_destroy(ptr)
Expand Down Expand Up @@ -715,6 +725,12 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
libev.ev_ref(self._ptr)
return cb

def run_callback_threadsafe(self, func, *args):
# We rely on the GIL to make this threadsafe.
cb = self.run_callback(func, *args)
libev.ev_async_send(self._ptr, &self._threadsafe_async)
return cb

def _format(self):
if not self._ptr:
return 'destroyed'
Expand Down Expand Up @@ -775,15 +791,17 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
# Explicitly not EV_USE_SIGNALFD
raise AttributeError("sigfd")

try:
from zope.interface import classImplements
except ImportError:
pass
else:
# XXX: This invokes the side-table lookup, we would
# prefer to have it stored directly on the class.
from gevent._interfaces import ILoop
classImplements(loop, ILoop)

from zope.interface import classImplements

# XXX: This invokes the side-table lookup, we would
# prefer to have it stored directly on the class. That means we
# need a class variable ``__implemented__``, but that's hard in
# Cython
from gevent._interfaces import ILoop
from gevent._interfaces import ICallback
classImplements(loop, ILoop)
classImplements(callback, ICallback)

# about readonly _flags attribute:
# bit #1 set if object owns Python reference to itself (Py_INCREF was
Expand Down
1 change: 1 addition & 0 deletions src/gevent/libev/corecffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ def _start_callback_timer(self):
libev.ev_timer_start(self._ptr, self._timer0)

def _stop_aux_watchers(self):
super(loop, self)._stop_aux_watchers()
if libev.ev_is_active(self._prepare):
self.ref()
libev.ev_prepare_stop(self._ptr, self._prepare)
Expand Down
4 changes: 3 additions & 1 deletion src/gevent/libuv/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ def _start_callback_timer(self):


def _stop_aux_watchers(self):
super(loop, self)._stop_aux_watchers()
assert self._prepare
assert self._check
assert self._signal_idle
Expand Down Expand Up @@ -456,7 +457,8 @@ def unref(self):
pass

def break_(self, how=None):
libuv.uv_stop(self.ptr)
if self.ptr:
libuv.uv_stop(self.ptr)

def reinit(self):
# TODO: How to implement? We probably have to simply
Expand Down

0 comments on commit 0612994

Please sign in to comment.