Skip to content

Commit

Permalink
Improve safety for libuv async and idle watchers.
Browse files Browse the repository at this point in the history
Fixes #1489.
  • Loading branch information
jamadden committed Dec 11, 2019
1 parent 6f73477 commit 6bed079
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 76 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Expand Up @@ -46,6 +46,11 @@
- Make ``gevent.pywsgi`` support ``Connection: keep-alive`` in
HTTP/1.0. Based on :pr:`1331` by tanchuhan.

- Fix a potential crash using ``gevent.idle()`` when using libuv. See
:issue:`1489`.

- Fix some potential crashes using libuv async watchers.

1.5a2 (2019-10-21)
==================

Expand Down
4 changes: 2 additions & 2 deletions src/gevent/_ffi/loop.py
Expand Up @@ -123,8 +123,8 @@ def python_callback(self, handle, revents):
args = _NOARGS
if args and args[0] == GEVENT_CORE_EVENTS:
args = (revents, ) + args[1:]
#print("Calling function", the_watcher.callback, args)
the_watcher.callback(*args)
#_dbg("Calling function", the_watcher.callback, args)
the_watcher.callback(*args) # None here means we weren't started
except: # pylint:disable=bare-except
_dbg("Got exception servicing watcher with handle", handle, sys.exc_info())
# It's possible for ``the_watcher`` to be undefined (UnboundLocalError)
Expand Down
10 changes: 6 additions & 4 deletions src/gevent/_ffi/watcher.py
Expand Up @@ -364,7 +364,6 @@ def __del__(self):
# may fail if __init__ did; will be harmlessly printed
self.close()


def __repr__(self):
formats = self._format()
result = "<%s at 0x%x%s" % (self.__class__.__name__, id(self), formats)
Expand Down Expand Up @@ -398,7 +397,7 @@ def ref(self):
raise NotImplementedError()

def _get_callback(self):
return self._callback
return self._callback if '_callback' in self.__dict__ else None

def _set_callback(self, cb):
if not callable(cb) and cb is not None:
Expand Down Expand Up @@ -435,15 +434,18 @@ def start(self, callback, *args):
self._watcher_ffi_start_unref()

def stop(self):
if self._callback is None:
if self.callback is None:
assert self.loop is None or self not in self.loop._keepaliveset
return
self.callback = None
# Only after setting the signal to make this idempotent do
# we move ahead.
self._watcher_ffi_stop_ref()
self._watcher_ffi_stop()
self.loop._keepaliveset.discard(self)
self._handle = None
self._watcher_set_data(self._watcher, self._FFI.NULL) # pylint:disable=no-member
self.callback = None

self.args = None

def _get_priority(self):
Expand Down
59 changes: 19 additions & 40 deletions src/gevent/_threading.py
Expand Up @@ -25,29 +25,16 @@
# pylint 2.0.dev2 things collections.dequeue.popleft() doesn't return
# pylint:disable=assignment-from-no-return


class _Condition(object):
# pylint:disable=method-hidden

def __init__(self, lock):
self.__lock = lock
self.__waiters = []

# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
# No need to special case for _release_save and
# _acquire_restore; those are only used for RLock, and
# we don't use those.

def __enter__(self):
return self.__lock.__enter__()
Expand All @@ -58,36 +45,28 @@ def __exit__(self, t, v, tb):
def __repr__(self):
return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))

def _release_save(self):
self.__lock.release() # No state to save

def _acquire_restore(self, x): # pylint:disable=unused-argument
self.__lock.acquire() # Ignore saved state

def _is_owned(self):
# Return True if lock is owned by current_thread.
# This method is called only if __lock doesn't have _is_owned().
if self.__lock.acquire(0):
self.__lock.release()
return False
return True

def wait(self):
# The condition MUST be owned, but we don't check that.
waiter = Lock()
waiter.acquire()
self.__waiters.append(waiter)
saved_state = self._release_save()
# This variable is for the monitoring utils to know that
# this is an idle frame and shouldn't be counted.
gevent_threadpool_worker_idle = True # pylint:disable=unused-variable
try: # restore state no matter what (e.g., KeyboardInterrupt)

# Our __lock MUST be owned, but we don't check that.
waiter = Lock()
waiter.acquire()
self.__waiters.append(waiter)
self.__lock.release()

try:
waiter.acquire() # Block on the native lock
finally:
self._acquire_restore(saved_state)
self.__lock.acquire()

# just good form to release the lock we're holding before it goes
# out of scope
waiter.release()

def notify_one(self):
# The condition MUST be owned, but we don't check that.
# The lock SHOULD be owned, but we don't check that.
try:
waiter = self.__waiters.pop()
except IndexError:
Expand Down Expand Up @@ -154,15 +133,15 @@ def full(self):
def put(self, item):
"""Put an item into the queue.
"""
with self._not_empty:
with self._mutex:
self._queue.append(item)
self.unfinished_tasks += 1
self._not_empty.notify_one()

def get(self):
"""Remove and return an item from the queue.
"""
with self._not_empty:
with self._mutex:
while not self._queue:
self._not_empty.wait()
item = self._queue.popleft()
Expand Down
8 changes: 4 additions & 4 deletions src/gevent/hub.py
Expand Up @@ -171,10 +171,10 @@ def idle(priority=0):
.. seealso:: :func:`sleep`
"""
hub = _get_hub_noargs()
watcher = hub.loop.idle()
if priority:
watcher.priority = priority
hub.wait(watcher)
with hub.loop.idle() as watcher:
if priority:
watcher.priority = priority
hub.wait(watcher)


def kill(greenlet, exception=GreenletExit):
Expand Down
1 change: 0 additions & 1 deletion src/gevent/libuv/_corecffi_cdef.c
Expand Up @@ -138,7 +138,6 @@ struct uv_async_s {
struct uv_loop_s* loop;
uv_handle_type type;
void *data;
void (*async_cb)(struct uv_async_s *);
GEVENT_STRUCT_DONE _;
};

Expand Down
49 changes: 29 additions & 20 deletions src/gevent/libuv/watcher.py
Expand Up @@ -16,6 +16,8 @@
from gevent._ffi import watcher as _base
from gevent._ffi import _dbg

# A set of uv_handle_t* CFFI objects. Kept around
# to keep the memory alive until libuv is done with them.
_closing_watchers = set()

# In debug mode, it would be nice to be able to clear the memory of
Expand All @@ -27,7 +29,9 @@
# crash) suggesting either that we're writing on memory that doesn't
# belong to us, somehow, or that we haven't actually lost all
# references...
_uv_close_callback = ffi.def_extern(name='_uv_close_callback')(_closing_watchers.remove)
_uv_close_callback = ffi.def_extern(name='_uv_close_callback')(
_closing_watchers.remove
)


_events = [(libuv.UV_READABLE, "READ"),
Expand Down Expand Up @@ -125,6 +129,8 @@ def _watcher_ffi_close(cls, ffi_watcher):
# but that don't in CFFI without a cast. But be careful what we use the cast
# for, don't pass it back to C.
ffi_handle_watcher = cls._FFI.cast('uv_handle_t*', ffi_watcher)
ffi_handle_watcher.data = ffi.NULL

if ffi_handle_watcher.type and not libuv.uv_is_closing(ffi_watcher):
# If the type isn't set, we were never properly initialized,
# and trying to close it results in libuv terminating the process.
Expand All @@ -133,9 +139,6 @@ def _watcher_ffi_close(cls, ffi_watcher):
_closing_watchers.add(ffi_watcher)
libuv.uv_close(ffi_watcher, libuv._uv_close_callback)

ffi_handle_watcher.data = ffi.NULL


def _watcher_ffi_set_init_ref(self, ref):
self.ref = ref

Expand Down Expand Up @@ -548,33 +551,39 @@ def _set_waitpid_status(self, pid, status):
class async_(_base.AsyncMixin, watcher):
_watcher_callback_name = '_gevent_async_callback0'

# libuv async watchers are different than all other watchers:
# They don't have a separate start/stop method (presumably
# because of race conditions). Simply initing them places them
# into the active queue.
#
# In the past, we sent a NULL C callback to the watcher, trusting
# that no one would call send() without actually starting us (or after
# closing us); doing so would crash. But we don't want to delay
# initing the struct because it will crash in uv_close() when we get GC'd,
# and send() will also crash. Plus that complicates our lifecycle (managing
# the memory).
#
# Now, we always init the correct C callback, and use a dummy
# Python callback that gets replaced when we are started and
# stopped. This prevents mistakes from being crashes.
_callback = lambda: None

def _watcher_ffi_init(self, args):
# It's dangerous to have a raw, non-initted struct
# around; it will crash in uv_close() when we get GC'd,
# and send() will also crash.
# NOTE: uv_async_init is NOT idempotent. Calling it more than
# once adds the uv_async_t to the internal queue multiple times,
# and uv_close only cleans up one of them, meaning that we tend to
# crash. Thus we have to be very careful not to allow that.
return self._watcher_init(self.loop.ptr, self._watcher, ffi.NULL)
return self._watcher_init(self.loop.ptr, self._watcher,
self._watcher_callback)

def _watcher_ffi_start(self):
# we're created in a started state, but we didn't provide a
# callback (because if we did and we don't have a value in our
# callback attribute, then python_callback would crash.) Note that
# uv_async_t->async_cb is not technically documented as public.
self._watcher.async_cb = self._watcher_callback
pass

def _watcher_ffi_stop(self):
self._watcher.async_cb = ffi.NULL
# We have to unref this because we're setting the cb behind libuv's
# back, basically: once a async watcher is started, it can't ever be
# stopped through libuv interfaces, so it would never lose its active
# status, and thus if it stays reffed it would keep the event loop
# from exiting.
self._watcher_ffi_unref()
pass

def send(self):
assert self._callback is not async_._callback, "Sending to a closed watcher"
if libuv.uv_is_closing(self._watcher):
raise Exception("Closing handle")
libuv.uv_async_send(self._watcher)
Expand Down
23 changes: 18 additions & 5 deletions src/gevent/threadpool.py
Expand Up @@ -201,7 +201,10 @@ def spawn(self, func, *args, **kwargs):
"""
Add a new task to the threadpool that will run ``func(*args, **kwargs)``.
Waits until a slot is available. Creates a new thread if necessary.
Waits until a slot is available. Creates a new native thread if necessary.
This must only be called from the native thread that owns this object's
hub.
:return: A :class:`gevent.event.AsyncResult`.
"""
Expand All @@ -219,12 +222,13 @@ def spawn(self, func, *args, **kwargs):
# we get LoopExit (why?). Previously it was done with a rawlink on the
# AsyncResult and the comment that it is "competing for order with get(); this is not
# good, just make ThreadResult release the semaphore before doing anything else"
assert self.hub == get_hub()
thread_result = ThreadResult(result, self.hub, semaphore.release)
task_queue.put((func, args, kwargs, thread_result))
self.adjust()
except:
if thread_result is not None:
thread_result.destroy()
thread_result.destroy_in_main_thread()
semaphore.release()
raise
return result
Expand Down Expand Up @@ -370,8 +374,14 @@ def exception(self):
return self.exc_info[1] if self.exc_info else None

def _on_async(self):
self.async_watcher.stop()
self.async_watcher.close()
# Called in the hub thread.

aw = self.async_watcher
self.async_watcher = _FakeAsync

aw.stop()
aw.close()


# Typically this is pool.semaphore.release and we have to
# call this in the Hub; if we don't we get the dreaded
Expand All @@ -393,7 +403,10 @@ def _on_async(self):
if self.exc_info:
self.exc_info = (self.exc_info[0], self.exc_info[1], None)

def destroy(self):
def destroy_in_main_thread(self):
"""
This must only be called from the thread running the hub.
"""
self.async_watcher.stop()
self.async_watcher.close()
self.async_watcher = _FakeAsync
Expand Down

0 comments on commit 6bed079

Please sign in to comment.