Skip to content

Commit

Permalink
Merge pull request #1322 from gevent/issue1321
Browse files Browse the repository at this point in the history
Make ThreadPool.join respect the loop.min_sleep_time value.
  • Loading branch information
jamadden committed Nov 29, 2018
2 parents 58fcd58 + 7b4495c commit ce5a9c4
Show file tree
Hide file tree
Showing 17 changed files with 69 additions and 29 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Expand Up @@ -63,6 +63,10 @@
if the socket was already in use. Now the correct socket.error
should be raised.

- Fix :meth:`gevent.threadpool.ThreadPool.join` raising a
`UserWarning` when using the libuv backend. Reported in
:issue:`1321` by ZeroNet.

1.3.7 (2018-10-12)
==================

Expand Down
2 changes: 1 addition & 1 deletion src/gevent/__init__.py
Expand Up @@ -163,7 +163,7 @@ def __new__(cls, *args, **kwargs):
# outdated on each major release.

def __dependencies_for_freezing(): # pragma: no cover
# pylint:disable=unused-variable
# pylint:disable=unused-import
from gevent import core
from gevent import resolver_thread
from gevent import resolver_ares
Expand Down
4 changes: 1 addition & 3 deletions src/gevent/_ident.py
Expand Up @@ -21,9 +21,7 @@ class ValuedWeakRef(ref):
"""
A weak ref with an associated value.
"""
# This seems entirely spurious; even on Python 2.7
# weakref.ref descends from object
# pylint: disable=slots-on-old-class

__slots__ = ('value',)


Expand Down
20 changes: 15 additions & 5 deletions src/gevent/_interfaces.py
Expand Up @@ -14,6 +14,7 @@
from __future__ import division
from __future__ import print_function

import sys

from gevent._util import Interface
from gevent._util import Attribute
Expand Down Expand Up @@ -46,6 +47,13 @@ class ILoop(Interface):

default = Attribute("Boolean indicating whether this is the default loop")

approx_timer_resolution = Attribute(
"Floating point number of seconds giving (approximately) the minimum "
"resolution of a timer (and hence the minimun value the sleep can sleep for). "
"On libuv, this is fixed by the library, but on libev it is just a guess "
"and the actual value is system dependent."
)

def run(nowait=False, once=False):
"""
Run the event loop.
Expand Down Expand Up @@ -153,12 +161,14 @@ def async_(ref=True, priority=None):
it will be removed in the future.
"""

def child(pid, trace=0, ref=True):
"""
Create a watcher that fires for events on the child with process ID *pid*.
if sys.platform != "win32":

This is platform specific.
"""
def child(pid, trace=0, ref=True):
"""
Create a watcher that fires for events on the child with process ID *pid*.
This is platform specific and not available on Windows.
"""

def stat(path, interval=0.0, ref=True, priority=None):
"""
Expand Down
2 changes: 1 addition & 1 deletion src/gevent/_ssl3.py
Expand Up @@ -99,7 +99,7 @@ def maximum_version(self, value):
super(orig_SSLContext, orig_SSLContext).maximum_version.__set__(self, value)


class _contextawaresock(socket._gevent_sock_class): # Python 2: pylint:disable=slots-on-old-class
class _contextawaresock(socket._gevent_sock_class):
# We have to pass the raw stdlib socket to SSLContext.wrap_socket.
# That method in turn can pass that object on to things like SNI callbacks.
# It wouldn't have access to any of the attributes on the SSLSocket, like
Expand Down
1 change: 0 additions & 1 deletion src/gevent/baseserver.py
Expand Up @@ -291,7 +291,6 @@ def init_socket(self):
It is not supposed to be called by the user, it is called by :meth:`start` before starting
the accept loop."""
pass

@property
def started(self):
Expand Down
12 changes: 12 additions & 0 deletions src/gevent/libev/corecext.pyx
Expand Up @@ -392,6 +392,7 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
# the libev internal pointer to 0, and ev_is_default_loop will
# no longer work.
cdef bint _default
cdef readonly double approx_timer_resolution

def __cinit__(self, object flags=None, object default=None, libev.intptr_t ptr=0):
self.starting_timer_may_update_loop_time = 0
Expand Down Expand Up @@ -440,6 +441,8 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:

def __init__(self, object flags=None, object default=None, libev.intptr_t ptr=0):
self._callbacks = CallbackFIFO()
# See libev.corecffi for this attribute.
self.approx_timer_resolution = 0.00001

cdef _run_callbacks(self):
cdef callback cb
Expand Down Expand Up @@ -745,6 +748,15 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
# Explicitly not EV_USE_SIGNALFD
raise AttributeError("sigfd")

try:
from zope.interface import classImplements
except ImportError:
pass
else:
# XXX: This invokes the side-table lookup, we would
# prefer to have it stored directly on the class.
from gevent._interfaces import ILoop
classImplements(loop, ILoop)

# about readonly _flags attribute:
# bit #1 set if object owns Python reference to itself (Py_INCREF was
Expand Down
13 changes: 11 additions & 2 deletions src/gevent/libev/corecffi.py
Expand Up @@ -208,6 +208,16 @@ def time():
class loop(AbstractLoop):
# pylint:disable=too-many-public-methods

# libuv parameters simply won't accept anything lower than 1ms
# (0.001s), but libev takes fractional seconds. In practice, on
# one machine, libev can sleep for very small periods of time:
#
# sleep(0.00001) -> 0.000024
# sleep(0.0001) -> 0.000156
# sleep(0.001) -> 0.00136 (which is comparable to libuv)

approx_timer_resolution = 0.00001

error_handler = None

_CHECK_POINTER = 'struct ev_check *'
Expand All @@ -218,8 +228,7 @@ class loop(AbstractLoop):

def __init__(self, flags=None, default=None):
AbstractLoop.__init__(self, ffi, libev, _watchers, flags, default)
self._default = True if libev.ev_is_default_loop(self._ptr) else False

self._default = bool(libev.ev_is_default_loop(self._ptr))

def _init_loop(self, flags, default):
c_flags = _flags_to_int(flags)
Expand Down
6 changes: 3 additions & 3 deletions src/gevent/libev/watcher.py
Expand Up @@ -100,7 +100,7 @@ def _watcher_ffi_unref(self):
self._flags |= 2 # now we've told libev

def _get_ref(self):
return False if self._flags & 4 else True
return not self._flags & 4

def _set_ref(self, value):
if value:
Expand Down Expand Up @@ -144,7 +144,7 @@ def feed(self, revents, callback, *args):

@property
def pending(self):
return True if self._watcher and libev.ev_is_pending(self._watcher) else False
return bool(self._watcher and libev.ev_is_pending(self._watcher))


class io(_base.IoMixin, watcher):
Expand Down Expand Up @@ -218,7 +218,7 @@ def send(self):

@property
def pending(self):
return True if libev.ev_async_pending(self._watcher) else False
return bool(libev.ev_async_pending(self._watcher))

# Provide BWC for those that have async
locals()['async'] = async_
Expand Down
7 changes: 4 additions & 3 deletions src/gevent/libuv/loop.py
Expand Up @@ -79,9 +79,10 @@ def supported_backends():
@implementer(ILoop)
class loop(AbstractLoop):

# XXX: Undocumented. Maybe better named 'timer_resolution'? We can't
# know this in general on libev
min_sleep_time = 0.001 # 1ms
# libuv parameters simply won't accept anything lower than 1ms. In
# practice, looping on gevent.sleep(0.001) takes about 0.00138 s
# (+- 0.000036s)
approx_timer_resolution = 0.001 # 1ms

error_handler = None

Expand Down
2 changes: 1 addition & 1 deletion src/gevent/libuv/watcher.py
Expand Up @@ -174,7 +174,7 @@ def _get_ref(self):
# Convert 1/0 to True/False
if self._watcher is None:
return None
return True if libuv.uv_has_ref(self._watcher) else False
return bool(libuv.uv_has_ref(self._watcher))

def _set_ref(self, value):
if value:
Expand Down
3 changes: 0 additions & 3 deletions src/gevent/lock.py
Expand Up @@ -158,7 +158,6 @@ def __init__(self, value=None):
.. versionchanged:: 1.1rc3
Accept and ignore a *value* argument for compatibility with Semaphore.
"""
pass

def __str__(self):
return '<%s>' % self.__class__.__name__
Expand All @@ -169,7 +168,6 @@ def locked(self):

def release(self):
"""Releasing a dummy semaphore does nothing."""
pass

def rawlink(self, callback):
# XXX should still work and notify?
Expand All @@ -180,7 +178,6 @@ def unlink(self, callback):

def wait(self, timeout=None):
"""Waiting for a DummySemaphore returns immediately."""
pass

def acquire(self, blocking=True, timeout=None):
"""
Expand Down
1 change: 0 additions & 1 deletion src/gevent/pool.py
Expand Up @@ -488,7 +488,6 @@ def wait_available(self, timeout=None):
In this implementation, because there are no limits on the number
of tracked greenlets, this will always return immediately.
"""
pass

# MappingMixin methods

Expand Down
1 change: 0 additions & 1 deletion src/gevent/pywsgi.py
Expand Up @@ -1188,7 +1188,6 @@ def write(self, msg):

def flush(self):
"No-op; required to be a file-like object"
pass

def writelines(self, lines):
for line in lines:
Expand Down
2 changes: 1 addition & 1 deletion src/gevent/tests/test___monkey_patching.py
Expand Up @@ -92,7 +92,7 @@ def TESTRUNNER(tests=None):
if tests and not sys.platform.startswith("win"):
atexit.register(os.system, 'rm -f */@test*')

basic_args = [sys.executable, '-u', '-W', 'ignore', '-m' 'gevent.testing.monkey_test']
basic_args = [sys.executable, '-u', '-W', 'ignore', '-m', 'gevent.testing.monkey_test']
for filename in tests:
if filename in version_tests:
util.log("Overriding %s from %s with file from %s", filename, test_dir, version_test_dir)
Expand Down
14 changes: 13 additions & 1 deletion src/gevent/tests/test__hub.py
Expand Up @@ -21,6 +21,7 @@

import re
import time
import unittest

import gevent.testing as greentest
import gevent.testing.timing
Expand Down Expand Up @@ -211,7 +212,7 @@ def test_blocking_this_thread(self):

# We must make sure we have switched greenlets at least once,
# otherwise we can't detect a failure.
gevent.sleep(0.0001)
gevent.sleep(hub.loop.approx_timer_resolution)
assert hub.exception_stream is stream
try:
time.sleep(0.3) # Thrice the default
Expand Down Expand Up @@ -316,5 +317,16 @@ def task():
self.assertIn('PeriodicMonitoringThread', data)


class TestLoopInterface(unittest.TestCase):

def test_implemensts_ILoop(self):
from zope.interface import verify
from gevent._interfaces import ILoop

loop = get_hub().loop

verify.verifyObject(ILoop, loop)


if __name__ == '__main__':
greentest.main()
4 changes: 2 additions & 2 deletions src/gevent/threadpool.py
Expand Up @@ -114,7 +114,7 @@ def _set_size(self, size):
self.manager.kill()
while self._size < size:
self._add_thread()
delay = getattr(self.hub.loop, 'min_sleep_time', 0.0001) # For libuv
delay = self.hub.loop.approx_timer_resolution
while self._size > size:
while self._size - size > self.task_queue.unfinished_tasks:
self.task_queue.put(None)
Expand Down Expand Up @@ -150,7 +150,7 @@ def _on_fork(self):

def join(self):
"""Waits until all outstanding tasks have been completed."""
delay = 0.0005
delay = max(0.0005, self.hub.loop.approx_timer_resolution)
while self.task_queue.unfinished_tasks > 0:
sleep(delay)
delay = min(delay * 2, .05)
Expand Down

0 comments on commit ce5a9c4

Please sign in to comment.