From 7aa94eb3bc410bfcb0719f0d71a27f4509e6406c Mon Sep 17 00:00:00 2001 From: Alvaro Date: Mon, 12 Nov 2012 13:11:42 +0100 Subject: [PATCH] Forget abou some timers (specially, about timeouts) Some docs --- .idea/evy.iml | 2 +- .idea/misc.xml | 2 +- evy/hubs/hub.py | 47 +++++++++++++++++++++++++++++++--------------- evy/hubs/timer.py | 33 ++++++++++++++++++++------------ evy/timeout.py | 27 +++++++++++++++----------- evy/uv/watchers.py | 15 ++++++++++----- tests/__init__.py | 14 -------------- tests/test_hub.py | 41 ++++++++++++++++++++++------------------ 8 files changed, 104 insertions(+), 77 deletions(-) diff --git a/.idea/evy.iml b/.idea/evy.iml index 134eb68..1f92d3e 100644 --- a/.idea/evy.iml +++ b/.idea/evy.iml @@ -2,7 +2,7 @@ - + diff --git a/.idea/misc.xml b/.idea/misc.xml index d72bd45..aad9379 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,6 +3,6 @@ $APPLICATION_HOME_DIR$/lib/pycharm.jar!/resources/html5-schema/html5.rnc - + diff --git a/evy/hubs/hub.py b/evy/hubs/hub.py index 18872c4..d7abcc9 100644 --- a/evy/hubs/hub.py +++ b/evy/hubs/hub.py @@ -204,6 +204,9 @@ def add (self, evtype, fileno, cb): The *cb* argument is the callback which will be called when the file is ready for reading/writing. """ + if fileno < 0: + raise ValueError('invalid file descriptor: %d' % (fileno)) + listener = self.lclass(evtype, fileno, cb) bucket = self.listeners[evtype] if fileno in bucket: @@ -317,7 +320,7 @@ def wait (self, seconds = None): :param seconds: the amount of seconds to wait :type seconds: integer """ - timer = Timer(self, seconds) + timer = Timer(self, seconds * 1000) timer.start(lambda x: None) try: @@ -448,11 +451,18 @@ def squelch_timer_exception (self, timer, exc_info): ## def add_timer (self, timer): + """ + Add a timer in the hub + + :param timer: + :param unreferenced: if True, we unreference the timer, so the loop does not wait until it is triggered + :return: + """ # scheduled_time = self.clock() + timer.seconds # self.next_timers.append((scheduled_time, timer)) # return scheduled_time # store the pyevent timer object so that we can cancel later - eventtimer = Timer(self, timer.seconds) + eventtimer = Timer(self, timer.seconds * 1000) timer.impltimer = eventtimer eventtimer.start(self.timer_finished, timer) @@ -465,19 +475,26 @@ def timer_canceled (self, timer): # self.next_timers = [t for t in self.next_timers if not t[1].called] # heapq.heapify(self.timers) - #timer.impltimer.stop() - del timer.impltimer - #try: - # timer.impltimer.stop() - # del timer.impltimer - #except (AttributeError, TypeError): - # pass - #finally: - # super(Hub, self).timer_canceled(timer) + try: + #timer.impltimer.stop() + del timer.impltimer + except (AttributeError, TypeError): + pass def timer_finished (self, timer): pass + def forget_timer(self, timer): + """ + Let the hub forget about a timer, so we do not keep the loop running forever until + this timer triggers... + """ + try: + self.unref(timer.impltimer.handle) + except (AttributeError, TypeError): + pass + + def prepare_timers (self): heappush = heapq.heappush t = self.timers @@ -629,7 +646,7 @@ def default_loop(self): ## references ## - def ref(self): + def ref(self, handle): """ The event loop only runs as long as there are active watchers. This system works by having every watcher increase the reference count of the event loop when it is started and decreasing @@ -638,9 +655,9 @@ def ref(self): :return: None """ - libuv.uv_ref(self._uv_ptr) + libuv.uv_ref(handle) - def unref(self): + def unref(self, handle): """ This method can be used with interval timers. You might have a garbage collector which runs every X seconds, or your network service might send a heartbeat to others periodically, but @@ -651,7 +668,7 @@ def unref(self): :return: None """ - libuv.uv_unref(self._uv_ptr) + libuv.uv_unref(handle) def now(self): diff --git a/evy/hubs/timer.py b/evy/hubs/timer.py index 356fca5..2c5b41b 100644 --- a/evy/hubs/timer.py +++ b/evy/hubs/timer.py @@ -27,6 +27,8 @@ # THE SOFTWARE. # +from functools import partial + from evy.support import greenlets as greenlet from evy.hubs import get_hub @@ -54,8 +56,11 @@ def __init__(self, seconds, cb, *args, **kw): calling timer.schedule() or runloop.add_timer(timer). """ self.seconds = seconds - self.tpl = cb, args, kw + + self.callback = kw.pop('callback', partial(cb, *args, **kw)) + self.called = False + if _g_debug: import traceback, cStringIO self.traceback = cStringIO.StringIO() @@ -67,33 +72,38 @@ def pending(self): def __repr__(self): secs = getattr(self, 'seconds', None) - cb, args, kw = getattr(self, 'tpl', (None, None, None)) - retval = "Timer(%s, %s, *%s, **%s)" % (secs, cb, args, kw) + cb = getattr(self, 'callback', None) + retval = "Timer(%s, %s)" % (secs, cb) if _g_debug and hasattr(self, 'traceback'): retval += '\n' + self.traceback.getvalue() return retval def copy(self): - cb, args, kw = self.tpl - return self.__class__(self.seconds, cb, *args, **kw) + return self.__class__(self.seconds, callback = self.callback) def schedule(self): """ - Schedule this timer to run in the current runloop. + Schedule this timer to run in the current loop. """ self.called = False self.scheduled_time = get_hub().add_timer(self) return self + def forget(self): + """ + Let the hub forget about this timer, so we do not keep the loop running forever until + the timer triggers. + """ + get_hub().forget_timer(self) + def __call__(self, *args): if not self.called: self.called = True - cb, args, kw = self.tpl try: - cb(*args, **kw) + self.callback() finally: try: - del self.tpl + del self.callback except AttributeError: pass @@ -106,7 +116,7 @@ def cancel(self): self.called = True get_hub().timer_canceled(self) try: - del self.tpl + del self.callback except AttributeError: pass @@ -134,8 +144,7 @@ def __call__(self, *args): self.called = True if self.greenlet is not None and self.greenlet.dead: return - cb, args, kw = self.tpl - cb(*args, **kw) + self.callback() def cancel(self): """ diff --git a/evy/timeout.py b/evy/timeout.py index c5b1ba7..05b23c9 100644 --- a/evy/timeout.py +++ b/evy/timeout.py @@ -58,19 +58,24 @@ def __init__ (self, seconds = None, exception = None): self.start() def start (self): - """Schedule the timeout. This is called on construction, so + """ + Schedule the timeout. This is called on construction, so it should not be called explicitly, unless the timer has been - canceled.""" - assert not self.pending,\ - '%r is already started; to restart it, cancel it first' % self - if self.seconds is None: # "fake" timeout (never expires) + canceled. + """ + assert not self.pending, '%r is already started; to restart it, cancel it first' % self + + if self.seconds is None: + # "fake" timeout (never expires) self.timer = None - elif self.exception is None or isinstance(self.exception, bool): # timeout that raises self - self.timer = get_hub().schedule_call_global( - self.seconds, greenlet.getcurrent().throw, self) - else: # regular timeout with user-provided exception - self.timer = get_hub().schedule_call_global( - self.seconds, greenlet.getcurrent().throw, self.exception) + else: + hub = get_hub() + if self.exception is None or isinstance(self.exception, bool): # timeout that raises self + self.timer = hub.schedule_call_global(self.seconds, greenlet.getcurrent().throw, self) + else: # regular timeout with user-provided exception + self.timer = hub.schedule_call_global(self.seconds, greenlet.getcurrent().throw, self.exception) + self.timer.forget() + return self @property diff --git a/evy/uv/watchers.py b/evy/uv/watchers.py index a505e2b..a7afd3f 100644 --- a/evy/uv/watchers.py +++ b/evy/uv/watchers.py @@ -32,7 +32,7 @@ from functools import partial -from evy.uv.interface import libuv, ffi, handle_is_active +from evy.uv.interface import libuv, ffi, handle_is_active, cast_to_handle @@ -68,7 +68,6 @@ def __init__(self, _hub, ref = True): ## .. and another one for stopping it if self.libuv_stop_this_watcher: - assert self._uv_handle self._stop_func = partial(self.libuv_stop_this_watcher, self._uv_handle) def _run_callback(self, handle, *args): @@ -177,9 +176,16 @@ def active(self): return handle_is_active(self._uv_handle) ## - ## handles (internal) + ## handles ## + @property + def handle(self): + """ + Return the uv_handle for this watcher + """ + return cast_to_handle(self._uv_handle) + def _new_libuv_handle(self): """ Return a new libuv C handle for this watcher @@ -343,8 +349,7 @@ def start(self, callback, *args, **kwargs): self._libuv_unref() - if update: - libuv.uv_update_time(self.hub._uv_ptr) + if update: libuv.uv_update_time(self.hub._uv_ptr) libuv.uv_timer_start(self._uv_handle, self._cb, self._after, self._repeat) diff --git a/tests/__init__.py b/tests/__init__.py index d3b0d6f..4230e19 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -83,20 +83,6 @@ def wrapped (*a, **kw): return skipped_wrapper -def requires_twisted (func): - """ Decorator that skips a test if Twisted is not present.""" - - def requirement (_f): - from evy.hubs import get_hub - - try: - return 'Twisted' in type(get_hub()).__name__ - except Exception: - return False - - return skip_unless(requirement)(func) - - def using_pyevent (_f): from evy.hubs import get_hub diff --git a/tests/test_hub.py b/tests/test_hub.py index 644257f..fd0ec8a 100644 --- a/tests/test_hub.py +++ b/tests/test_hub.py @@ -34,7 +34,6 @@ import time import evy from evy import hubs -from evy.green import socket DELAY = 0.001 @@ -53,8 +52,8 @@ def test_cancel_immediate (self): for i in xrange(2000): t = hubs.get_hub().schedule_call_global(60, noop) t.cancel() - self.assert_less_than_equal(hub.timers_canceled, - hub.get_timers_count() + 1) + self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1) + # there should be fewer than 1000 new timers and canceled self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers) self.assert_less_than_equal(hub.timers_canceled, 1000) @@ -68,12 +67,11 @@ def test_cancel_accumulated (self): for i in xrange(2000): t = hubs.get_hub().schedule_call_global(60, noop) evy.sleep() - self.assert_less_than_equal(hub.timers_canceled, - hub.get_timers_count() + 1) + self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1) t.cancel() - self.assert_less_than_equal(hub.timers_canceled, - hub.get_timers_count() + 1, hub.timers) + self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1, hub.timers) # there should be fewer than 1000 new timers and canceled + self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers) self.assert_less_than_equal(hub.timers_canceled, 1000) @@ -91,16 +89,15 @@ def test_cancel_proportion (self): t2 = hubs.get_hub().schedule_call_global(60, noop) t3 = hubs.get_hub().schedule_call_global(60, noop) evy.sleep() - self.assert_less_than_equal(hub.timers_canceled, - hub.get_timers_count() + 1) + self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1) t.cancel() - self.assert_less_than_equal(hub.timers_canceled, - hub.get_timers_count() + 1) + self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1) + uncanceled_timers.append(t2) uncanceled_timers.append(t3) # 3000 new timers, plus a few extras - self.assert_less_than_equal(stimers + 3000, - stimers + hub.get_timers_count()) + self.assert_less_than_equal(stimers + 3000, stimers + hub.get_timers_count()) + self.assertEqual(hub.timers_canceled, 1000) for t in uncanceled_timers: t.cancel() @@ -253,8 +250,10 @@ def test_suspend_doesnt_crash (self): shutil.rmtree(self.tempdir) + + class TestBadFilenos(LimitedTestCase): - @skip_with_pyevent + def test_repeated_selects (self): from evy.green import select @@ -264,6 +263,8 @@ def test_repeated_selects (self): from tests.test_patcher import ProcessBase + + class TestFork(ProcessBase): @skip_with_pyevent def test_fork (self): @@ -305,9 +306,11 @@ class CustomException(Exception): pass def test_kill (self): - """ Checks that killing a process after the hub runloop dies does + """ + Checks that killing a process after the hub runloop dies does not immediately return to hub greenlet's parent and schedule a - redundant timer. """ + redundant timer. + """ hub = hubs.get_hub() def dummyproc (): @@ -331,9 +334,11 @@ def dummyproc (): self.assertRaises(self.CustomException, hub.switch) def test_parent (self): - """ Checks that a terminating greenthread whose parent + """ + Checks that a terminating greenthread whose parent was a previous, now-defunct hub greenlet returns execution to - the hub runloop and not the hub greenlet's parent. """ + the hub runloop and not the hub greenlet's parent. + """ hub = hubs.get_hub() def dummyproc ():