Skip to content

Commit

Permalink
Make loops automatically break when their owning hub is destroyed.
Browse files Browse the repository at this point in the history
And stop running callbacks.

This fixes #1686 and fixes #1669.
  • Loading branch information
jamadden committed Dec 22, 2020
1 parent a403cb7 commit a9df92a
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 5 deletions.
6 changes: 6 additions & 0 deletions src/gevent/_ffi/loop.py
Expand Up @@ -14,6 +14,7 @@
from gevent._ffi import TRACE
from gevent._ffi.callback import callback
from gevent._compat import PYPY
from gevent.exceptions import HubDestroyed

from gevent import getswitchinterval

Expand Down Expand Up @@ -601,6 +602,11 @@ def _handle_syserr(self, message, errno):
self.handle_error(None, SystemError, SystemError(message), None)

def handle_error(self, context, type, value, tb):
if type is HubDestroyed:
self._callbacks.clear()
self.break_()
return

handle_error = None
error_handler = self.error_handler
if error_handler is not None:
Expand Down
18 changes: 18 additions & 0 deletions src/gevent/exceptions.py
Expand Up @@ -10,6 +10,7 @@
from __future__ import division
from __future__ import print_function

from greenlet import GreenletExit

__all__ = [
'LoopExit',
Expand Down Expand Up @@ -116,3 +117,20 @@ class InvalidThreadUseError(RuntimeError):
.. versionadded:: 1.5a3
"""


class HubDestroyed(GreenletExit):
"""
Internal exception, raised when we're trying to destroy the
hub and we want the loop to stop running callbacks now.
This must not be subclassed; the type is tested by identity.
Clients outside of gevent must not raise this exception.
.. versionadded:: NEXT
"""

def __init__(self, destroy_loop):
GreenletExit.__init__(self, destroy_loop)
self.destroy_loop = destroy_loop
15 changes: 12 additions & 3 deletions src/gevent/hub.py
Expand Up @@ -31,6 +31,7 @@

from gevent._config import config as GEVENT_CONFIG
from gevent._compat import thread_mod_name
from gevent._compat import reraise
from gevent._util import readproperty
from gevent._util import Lazy
from gevent._util import gmctime
Expand All @@ -54,6 +55,7 @@


from gevent.exceptions import LoopExit
from gevent.exceptions import HubDestroyed

from gevent._waiter import Waiter

Expand All @@ -64,6 +66,7 @@
MAIN_THREAD_IDENT = get_thread_ident() # XXX: Assuming import is done on the main thread.



def spawn_raw(function, *args, **kwargs):
"""
Create a new :class:`greenlet.greenlet` object and schedule it to
Expand Down Expand Up @@ -529,6 +532,11 @@ def handle_error(self, context, type, value, tb):
"""
type, value, tb = self._normalize_exception(type, value, tb)

if type is HubDestroyed:
# We must continue propagating this for it to properly
# exit.
reraise(type, value, tb)

if not issubclass(type, self.NOT_ERROR):
self.print_exception(context, type, value, tb)
if context is None or issubclass(type, self.SYSTEM_ERROR):
Expand Down Expand Up @@ -773,6 +781,9 @@ def destroy(self, destroy_loop=None):
is running in. If the hub is destroyed by a different thread
after a ``fork()``, for example, expect some garbage to leak.
"""
if destroy_loop is None:
destroy_loop = not self.loop.default

if self.periodic_monitoring_thread is not None:
self.periodic_monitoring_thread.kill()
self.periodic_monitoring_thread = None
Expand All @@ -791,7 +802,7 @@ def destroy(self, destroy_loop=None):
# loop; if we destroy the loop and then switch into the hub,
# things will go VERY, VERY wrong.
try:
self.throw(GreenletExit)
self.throw(HubDestroyed(destroy_loop))
except LoopExit:
# Expected.
pass
Expand All @@ -801,8 +812,6 @@ def destroy(self, destroy_loop=None):
# in this case.
pass

if destroy_loop is None:
destroy_loop = not self.loop.default
if destroy_loop:
if get_loop() is self.loop:
# Don't let anyone try to reuse this
Expand Down
12 changes: 10 additions & 2 deletions src/gevent/libev/corecext.pyx
Expand Up @@ -41,6 +41,7 @@ import os
import traceback
import signal as signalmodule
from gevent import getswitchinterval
from gevent.exceptions import HubDestroyed


__all__ = ['get_version',
Expand Down Expand Up @@ -334,6 +335,10 @@ cdef class CallbackFIFO(object):
self.head = None
self.tail = None

cdef inline clear(self):
self.head = None
self.tail = None

cdef inline callback popleft(self):
cdef callback head = self.head
self.head = head.next
Expand All @@ -342,7 +347,6 @@ cdef class CallbackFIFO(object):
head.next = None
return head


cdef inline append(self, callback new_tail):
assert not new_tail.next
if self.tail is None:
Expand All @@ -353,7 +357,6 @@ cdef class CallbackFIFO(object):
return
self.tail = self.head


assert self.head is not None
old_tail = self.tail
old_tail.next = new_tail
Expand Down Expand Up @@ -560,6 +563,11 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
cpdef handle_error(self, context, type, value, tb):
cdef object handle_error
cdef object error_handler = self.error_handler
if type is HubDestroyed:
self._callbacks.clear()
self.break_()
return

if error_handler is not None:
# we do want to do getattr every time so that setting Hub.handle_error property just works
handle_error = getattr(error_handler, 'handle_error', error_handler)
Expand Down
85 changes: 85 additions & 0 deletions src/gevent/tests/test__issue1686.py
@@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
"""
Tests for https://github.com/gevent/gevent/issues/1686
which is about destroying a hub when there are active
callbacks or IO in operation.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import unittest

from gevent import testing as greentest

# Don't let the testrunner put us in a process with other
# tests; we are strict on the state of the hub and greenlets.
# pragma: testrunner-no-combine

@greentest.skipOnWindows("Uses os.fork")
class TestDestroyInChildWithActiveSpawn(unittest.TestCase):

def test(self): # pylint:disable=too-many-locals
# If this test is broken, there are a few failure modes.
# - In the original examples, the parent process just hangs, because the
# child has raced ahead, spawned the greenlet and read the data. When the
# greenlet goes to read in the parent, it blocks, and the hub and loop
# wait for it.
# - Here, our child detects the greenlet ran when it shouldn't and
# raises an error, which translates to a non-zero exit status,
# which the parent checks for and fails by raising an exception before
# returning control to the hub. We can replicate the hang by removing the
# assertion in the child.
from time import sleep as hang

from gevent import get_hub
from gevent import spawn
from gevent.socket import wait_read
from gevent.os import nb_read
from gevent.os import nb_write
from gevent.os import make_nonblocking
from gevent.os import fork
from gevent.os import waitpid

pipe_read_fd, pipe_write_fd = os.pipe()
make_nonblocking(pipe_read_fd)
make_nonblocking(pipe_write_fd)

run = []

def reader():
run.append(1)
return nb_read(pipe_read_fd, 4096)

# Put data in the pipe
DATA = b'test'
nb_write(pipe_write_fd, DATA)
# Make sure we're ready to read it
wait_read(pipe_read_fd)

# Schedule a greenlet to start
reader = spawn(reader)

hub = get_hub()
pid = fork()
if pid == 0:
# Child destroys the hub. The reader should not have run.
hub.destroy(destroy_loop=True)
self.assertFalse(run)
os._exit(0)
return

# The parent.
# Briefly prevent us from spinning our event loop.
hang(0.5)
wait_child_result = waitpid(pid, 0)
self.assertEqual(wait_child_result, (pid, 0))
# We should get the data; the greenlet only runs in the parent.
data = reader.get()
self.assertEqual(run, [1])
self.assertEqual(data, DATA)


if __name__ == '__main__':
greentest.main()

0 comments on commit a9df92a

Please sign in to comment.