Skip to content

Commit

Permalink
Add 3.11 stdlib tests; let the ThreadPool threads exit when idle.
Browse files Browse the repository at this point in the history
Exiting when idle is because 3.11's test_socket.BasicTCPTest.testDetach() was always failing complaining about a leaked thread. Turns out this thread was from the default threadpool used for DNS resolution.
Something must have changed to cause the first DNS resolution to occur during that test, and hence the first thread to be spawned.  This is opt-in for all thread pools, with the default thread pool opting
in by default; but a config setting allows that to be changed.

Most 3.11 tests pass, except for some in test_subprocess where new functionality was added.
  • Loading branch information
jamadden committed Oct 7, 2022
1 parent fdc4dd5 commit e17e386
Show file tree
Hide file tree
Showing 57 changed files with 28,822 additions and 36 deletions.
5 changes: 5 additions & 0 deletions docs/changes/1867.feature
Expand Up @@ -11,3 +11,8 @@ Some platforms may or may not have binary wheels at this time.
with a final release of greenlet 2.0 that still
supports those legacy versions, but that may not be
possible; this may be the final release to support them.

:class:`gevent.threadpool.ThreadPool` can now optionally expire idle
threads. This is used by default in the implicit thread pool used for
DNS requests and other user-submitted tasks; other uses of a
thread-pool need to opt-in to this.
18 changes: 18 additions & 0 deletions src/gevent/_config.py
Expand Up @@ -340,6 +340,24 @@ class Threadpool(ImportableSetting, Setting):

default = 'gevent.threadpool.ThreadPool'

class ThreadpoolIdleTaskTimeout(FloatSettingMixin, Setting):
document = True
name = 'threadpool_idle_task_timeout'
environment_key = 'GEVENT_THREADPOOL_IDLE_TASK_TIMEOUT'

desc = """\
How long threads in the default threadpool (used for
DNS by default) are allowed to be idle before exiting.
Use -1 for no timeout.
.. versionadded:: NEXT
"""

# This value is picked pretty much arbitrarily.
# We want to balance performance (keeping threads around)
# with memory/cpu usage (letting threads go).
default = 5.0

class Loop(ImportableSetting, Setting):

Expand Down
5 changes: 5 additions & 0 deletions src/gevent/_socket3.py
Expand Up @@ -147,6 +147,11 @@ def __init_common(self):
def __getattr__(self, name):
return getattr(self._sock, name)

def _accept(self):
# Python 3.11 started checking for this method on the class object,
# so we need to explicitly delegate.
return self._sock._accept()

if hasattr(_socket, 'SOCK_NONBLOCK'):
# Only defined under Linux
@property
Expand Down
101 changes: 84 additions & 17 deletions src/gevent/_threading.py
Expand Up @@ -9,11 +9,13 @@

from gevent import monkey
from gevent._compat import thread_mod_name
from gevent._compat import PY3


__all__ = [
'Lock',
'Queue',
'EmptyTimeout',
]


Expand All @@ -22,7 +24,35 @@
])


# We want to support timeouts on locks. In this way, we can allow idle threads to
# expire from a thread pool. On Python 3, this is native behaviour; on Python 2,
# we have to emulate it. For Python 3, we want this to have the lowest possible overhead,
# so we'd prefer to use a direct call, rather than go through a wrapper. But we also
# don't want to allocate locks at import time because..., so we swizzle out the method
# at runtime.
#
#
# In all cases, a timeout value of -1 means "infinite". Sigh.
if PY3:
def acquire_with_timeout(lock, timeout=-1):
globals()['acquire_with_timeout'] = type(lock).acquire
return lock.acquire(timeout=timeout)
else:
def acquire_with_timeout(lock, timeout=-1,
_time=monkey.get_original('time', 'time'),
_sleep=monkey.get_original('time', 'sleep')):
deadline = _time() + timeout if timeout != -1 else None
while 1:
if lock.acquire(False): # Can we acquire non-blocking?
return True
if deadline is not None and _time() >= deadline:
return False
_sleep(0.005)

class _Condition(object):
# We could use libuv's ``uv_cond_wait`` to implement this whole
# class and get native timeouts and native performance everywhere.

# pylint:disable=method-hidden

__slots__ = (
Expand All @@ -31,6 +61,9 @@ class _Condition(object):
)

def __init__(self, lock):
# This lock is used to protect our own data structures;
# calls to ``wait`` and ``notify_one`` *must* be holding this
# lock.
self._lock = lock
self._waiters = []

Expand All @@ -47,29 +80,44 @@ def __exit__(self, t, v, tb):
def __repr__(self):
return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))

def wait(self, wait_lock):
# TODO: It would be good to support timeouts here so that we can
# let idle threadpool threads die. Under Python 3, ``Lock.acquire``
# has that ability, but Python 2 doesn't expose that. We could use
# libuv's ``uv_cond_wait`` to implement this whole class and get timeouts
# everywhere.

def wait(self, wait_lock, timeout=-1, _wait_for_notify=acquire_with_timeout):
# This variable is for the monitoring utils to know that
# this is an idle frame and shouldn't be counted.
gevent_threadpool_worker_idle = True # pylint:disable=unused-variable

# Our ``_lock`` MUST be owned, but we don't check that.
# The ``wait_lock`` must be *un*owned.
# The _lock must be held.
# The ``wait_lock`` must be *un*owned, so the timeout doesn't apply there.
# Take that lock now.
wait_lock.acquire()
self._waiters.append(wait_lock)
self._lock.release()

self._lock.release()
try:
wait_lock.acquire() # Block on the native lock
# We're already holding this native lock, so when we try to acquire it again,
# that won't work and we'll block until someone calls notify_one() (which might
# have already happened).
notified = _wait_for_notify(wait_lock, timeout)
finally:
self._lock.acquire()

wait_lock.release()
# Now that we've acquired _lock again, no one can call notify_one(), or this
# method.
if not notified:
# We need to come out of the waiters list. IF we're still there; it's
# possible that between the call to _acquire() returning False,
# and the time that we acquired _lock, someone did a ``notify_one``
# and released the lock. For that reason, do a non-blocking acquire()
notified = wait_lock.acquire(False)
if not notified:
# Well narf. No go. We must stil be in the waiters list, so take us out
self._waiters.remove(wait_lock)
# We didn't get notified, but we're still holding a lock that we
# need to release.
wait_lock.release()
else:
# We got notified, so we need to reset.
wait_lock.release()
return notified

def notify_one(self):
# The lock SHOULD be owned, but we don't check that.
Expand All @@ -84,9 +132,13 @@ def notify_one(self):
# is free to be scheduled and resume.
waiter.release()

class EmptyTimeout(Exception):
"""Raised from :meth:`Queue.get` if no item is available in the timeout."""


class Queue(object):
"""Create a queue object.
"""
Create a queue object.
The queue is always infinite size.
"""
Expand Down Expand Up @@ -124,7 +176,11 @@ def task_done(self):
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
raise ValueError(
'task_done() called too many times; %s remaining tasks' % (
self.unfinished_tasks
)
)
self.unfinished_tasks = unfinished

def qsize(self, len=len):
Expand All @@ -147,15 +203,26 @@ def put(self, item):
self.unfinished_tasks += 1
self._not_empty.notify_one()

def get(self, cookie):
"""Remove and return an item from the queue.
def get(self, cookie, timeout=-1):
"""
Remove and return an item from the queue.
If *timeout* is given, and is not -1, then we will
attempt to wait for only that many seconds to get an item.
If those seconds elapse and no item has become available,
raises :class:`EmptyTimeout`.
"""
with self._mutex:
while not self._queue:
# Temporarily release our mutex and wait for someone
# to wake us up. There *should* be an item in the queue
# after that.
self._not_empty.wait(cookie)
notified = self._not_empty.wait(cookie, timeout)
# Ok, we're holding the mutex again, so our state is guaranteed stable.
# It's possible that in the brief window where we didn't hold the lock,
# someone put something in the queue, and if so, we can take it.
if not notified and not self._queue:
raise EmptyTimeout
item = self._queue.popleft()
return item

Expand Down
6 changes: 5 additions & 1 deletion src/gevent/hub.py
Expand Up @@ -862,7 +862,11 @@ def threadpool_class(self):
def _get_threadpool(self):
if self._threadpool is None:
# pylint:disable=not-callable
self._threadpool = self.threadpool_class(self.threadpool_size, hub=self)
self._threadpool = self.threadpool_class(
self.threadpool_size,
hub=self,
idle_task_timeout=GEVENT_CONFIG.threadpool_idle_task_timeout
)
return self._threadpool

def _set_threadpool(self, value):
Expand Down
6 changes: 6 additions & 0 deletions src/gevent/socket.py
Expand Up @@ -85,12 +85,18 @@ def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT, source_address=N
Add the *all_errors* argument. This only has meaning on Python 3.11;
it is a programming error to pass it on earlier versions.
"""
# Sigh. This function is a near-copy of the CPython implementation.
# Even though we simplified some things, it's still a little complex to
# cope with error handling, which got even more complicated in 3.11.
# pylint:disable=too-many-locals,too-many-branches

all_errors = False
if PY311:
all_errors = kwargs.pop('all_errors', False)
if kwargs:
raise TypeError("Too many keyword arguments to create_connection", kwargs)


host, port = address
exceptions = []
# getaddrinfo is documented as returning a list, but our interface
Expand Down
9 changes: 9 additions & 0 deletions src/gevent/subprocess.py
Expand Up @@ -629,6 +629,10 @@ class Popen(object):
.. versionchanged:: 21.12.0
Added the ``pipesize`` argument for compatibility with Python 3.10.
This is ignored on all platforms.
.. versionchanged:: NEXT
Added the ``process_group`` and ``check`` arguments for compatibility with
Python 3.11.
"""

if GenericAlias is not None:
Expand Down Expand Up @@ -657,6 +661,11 @@ def __init__(self, args,
umask=-1,
# Added in 3.10, but ignored.
pipesize=-1,
# Added in 3.11
process_group=None,
# 3.11: check added, but not documented as new (at least as-of
# 3.11rc2)
check=None,
# gevent additions
threadpool=None):

Expand Down
67 changes: 52 additions & 15 deletions src/gevent/testing/monkey_test.py
Expand Up @@ -15,7 +15,6 @@
monkey.patch_all()

from .sysinfo import PY3
from .sysinfo import PY36
from .patched_tests_setup import disable_tests_in_source
from . import support
from . import resources
Expand All @@ -24,7 +23,7 @@


# This uses the internal built-in function ``_thread._count()``,
# which we don't monkey-patch, so it returns inaccurate information.
# which we don't/can't monkey-patch, so it returns inaccurate information.
def threading_setup():
if PY3:
return (1, ())
Expand All @@ -36,19 +35,57 @@ def threading_cleanup(*_args):
support.threading_setup = threading_setup
support.threading_cleanup = threading_cleanup

if PY36:
# On all versions of Python 3.6+, this also uses ``_thread._count()``,
# meaning it suffers from inaccuracies,
# and test_socket.py constantly fails with an extra thread
# on some random test. We disable it entirely.
# XXX: Figure out how to make a *definition* in ./support.py actually
# override the original in test.support, without having to
# manually set it
import contextlib
@contextlib.contextmanager
def wait_threads_exit(timeout=None): # pylint:disable=unused-argument
yield
support.wait_threads_exit = wait_threads_exit

# On all versions of Python 3.6+, this also uses ``_thread._count()``,
# meaning it suffers from inaccuracies,
# and test_socket.py constantly fails with an extra thread
# on some random test. We disable it entirely.
# XXX: Figure out how to make a *definition* in ./support.py actually
# override the original in test.support, without having to
# manually set it
#
import contextlib
@contextlib.contextmanager
def wait_threads_exit(timeout=None): # pylint:disable=unused-argument
yield
support.wait_threads_exit = wait_threads_exit

# On Python 3.11, they changed the way that they deal with this,
# meaning that this method no longer works. (Actually, it's not
# clear that any of our patches to `support` are doing anything on
# Python 3 at all? They certainly aren't on 3.11). This was a good
# thing As it led to adding the timeout value for the threadpool
# idle threads. But...the default of 5s meant that many tests in
# test_socket were each taking at least 5s to run, leading to the
# whole thing exceeding the allowed test timeout. We could set the
# GEVENT_THREADPOOL_IDLE_TASK_TIMEOUT env variable to a smaller
# value, and although that might stress the system nicely, it's
# not indicative of what end users see. And it's still hard to get
# a correct value.
#
# So try harder to make sure our patches apply.
#
# If this fails, symptoms are very long running tests that can be resolved
# by setting that TASK_TIMEOUT value small, and/or setting GEVENT_RESOLVER=block.
# Also, some number of warnings about dangling threads, or failures
# from wait_threads_exit
try:
from test import support as ts
except ImportError:
pass
else:
ts.threading_setup = threading_setup
ts.threading_cleanup = threading_cleanup
ts.wait_threads_exit = wait_threads_exit

try:
from test.support import threading_helper
except ImportError:
pass
else:
threading_helper.wait_threads_exit = wait_threads_exit
threading_helper.threading_setup = threading_setup
threading_helper.threading_cleanup = threading_cleanup

# Configure allowed resources
resources.setup_resources()
Expand Down
11 changes: 11 additions & 0 deletions src/gevent/testing/patched_tests_setup.py
Expand Up @@ -33,6 +33,7 @@
from .sysinfo import PY38
from .sysinfo import PY39
from .sysinfo import PY310
from .sysinfo import PY311

from .sysinfo import WIN
from .sysinfo import OSX
Expand Down Expand Up @@ -1462,6 +1463,16 @@ def test(*args, **kwargs):
'test_threading.SubinterpThreadingTests.test_threads_join_2',
]

if PY311:
disabled_tests += [
# CPython issue #27718: This wants to require all objects to
# have a __module__ of 'signal' because pydoc. Obviously our patches don't.
'test_signal.GenericTests.test_functions_module_attr',
# 3.11 added subprocess._USE_VFORK and subprocess._USE_POSIX_SPAWN.
# We don't support either of those (although USE_VFORK might be possible?)
'test_subprocess.ProcessTestCase.test__use_vfork',
]

if TRAVIS:
disabled_tests += [
# These tests frequently break when we try to use newer Travis CI images,
Expand Down

0 comments on commit e17e386

Please sign in to comment.