Permalink
Browse files

Fixes in the Hubs code...

  • Loading branch information...
1 parent d9bf97e commit 29d015217f8af9b9077b7f52e18b02d1d832cfd9 Alvaro committed Nov 15, 2012
Showing with 109 additions and 91 deletions.
  1. +46 −35 evy/hubs/hub.py
  2. +63 −56 evy/hubs/poller.py
View
81 evy/hubs/hub.py
@@ -33,7 +33,9 @@
import sys
import os
-from weakref import WeakKeyDictionary
+from itertools import ifilter
+
+from weakref import WeakValueDictionary
from evy.uv.interface import libuv, handle_unref
from evy.uv.interface import ffi
@@ -72,16 +74,16 @@ def alarm_signal (seconds):
-READ = "read"
-WRITE = "write"
+READ = libuv.UV_READABLE
+WRITE = libuv.UV_WRITABLE
def alarm_handler (signum, frame):
import inspect
raise RuntimeError("Blocking detector ALARMED at" + str(inspect.getframeinfo(frame)))
-def signal_checker(uv_prepare_handle, status):
+def _signal_checker(uv_prepare_handle, status):
pass # XXX: how do I check for signals from pure python??
@@ -123,7 +125,7 @@ def __init__ (self, clock = time.time, ptr = None, default = True):
self.timers = set()
self.pollers = set()
- self.pollers_by_fd = WeakKeyDictionary()
+ self.pollers_by_fd = WeakValueDictionary()
self.debug_exceptions = True
self.debug_blocking = False
@@ -143,7 +145,7 @@ def __init__ (self, clock = time.time, ptr = None, default = True):
raise SystemError("uv_default_loop() failed")
self._signal_checker = ffi.new("uv_prepare_t *")
- self._signal_checker_cb = ffi.callback("void(*)(uv_prepare_t *, int)", signal_checker)
+ self._signal_checker_cb = ffi.callback("void(*)(uv_prepare_t *, int)", _signal_checker)
libuv.uv_prepare_init(self._uv_ptr, self._signal_checker)
libuv.uv_prepare_start(self._signal_checker, self._signal_checker_cb)
@@ -176,8 +178,18 @@ def add (self, evtype, fileno, cb):
:param fileno: the file number of the file of interest.
:param cb: callback which will be called when the file is ready for reading/writing.
"""
- p = poller.Poller(fileno, evtype, cb, fileno)
- return self.add_poller(p)
+ if fileno in self.pollers_by_fd:
+ p = self.pollers_by_fd[fileno]
+ p.start(self, evtype, cb, fileno)
+ else:
+ p = poller.Poller(fileno)
+ p.start(self, evtype, cb, fileno)
+
+ ## register the poller
+ self.pollers.add(p)
+ self.pollers_by_fd[fileno] = p
+
+ return p
def remove (self, p):
"""
@@ -192,11 +204,14 @@ def remove_descriptor (self, fileno):
"""
Completely remove all watchers for this *fileno*. For internal use only.
"""
- p = self.pollers_by_fd[fileno]
-
- # invoke the callback in the poller and destroy it
- p()
- self._poller_canceled(p)
+ try:
+ p = self.pollers_by_fd[fileno]
+ except KeyError:
+ return
+ else:
+ # invoke the callback in the poller and destroy it
+ p(READ) and p(WRITE)
+ self._poller_canceled(p)
def set_timer_exceptions (self, value):
"""
@@ -408,7 +423,7 @@ def _timer_canceled (self, timer):
try:
self.timers.remove(timer)
- except ValueError:
+ except KeyError:
pass
def _timer_triggered (self, timer):
@@ -449,33 +464,16 @@ def timers_count(self):
## pollers
##
- def add_poller (self, p):
- """
- Add a timer in the hub
- :param timer:
- :param unreferenced: if True, we unreference the timer, so the loop does not wait until it is triggered
- :return:
- """
- assert isinstance(p, poller.Poller)
-
- event_poller = watchers.Poller(self)
- event_poller.impltimer = event_poller
- event_poller.start(self._poller_triggered, p)
-
- ## register the poller
- self.pollers.add(event_poller)
- self.pollers_by_fd[event_poller.fileno] = event_poller
-
- def _poller_triggered (self, p):
+ def _poller_triggered (self, evtype, p):
"""
Performs the poller trigger
:param poller: the poller that has been triggered
:return: nothing
"""
try:
- p()
+ p(evtype)
except self.SYSTEM_EXCEPTIONS:
self.interrupted = True
except:
@@ -491,9 +489,13 @@ def _poller_canceled (self, p):
:param poller: the poller that has been canceled
:return: nothing
"""
+ assert p and isinstance(p, poller.Poller)
+
fileno = p.fileno
try:
poller.destroy()
+
+ ## remove all references to the poller...
self.pollers.remove(timer)
del self.pollers_by_fd[fileno]
except (AttributeError, TypeError):
@@ -616,8 +618,8 @@ def WatcherType(self):
from evy.uv.watchers import Watcher
return Watcher
- def io(self, fd, events, ref=True):
- return watchers.Poll(self, fd, events, ref)
+ def poller(self, fd, ref=True):
+ return watchers.Poll(self, fd, ref)
def timer(self, after, repeat=0.0, ref=True):
return watchers.Timer(self, after, repeat, ref)
@@ -718,3 +720,12 @@ def signal_received(self, signal):
self.interrupted = True
print "signal_received(): TODO"
+
+ ##
+ ## readers and writers
+ ##
+ def get_readers(self):
+ return list(ifilter(lambda i: i.notify_readable, self.pollers))
+
+ def get_writers(self):
+ return list(ifilter(lambda i: i.notify_writable, self.pollers))
View
119 evy/hubs/poller.py
@@ -27,10 +27,16 @@
from functools import partial
from evy.hubs import get_hub
-from evy.hubs.hub import READ, WRITE
from evy.uv import watchers
from evy.uv.interface import libuv
+
+__all__ = [
+ 'Poller',
+ ]
+
+
+
## If true, captures a stack trace for each poller when constructed. This is
## useful for debugging leaking pollers, to find out where the poller was set up.
_g_debug = False
@@ -41,39 +47,27 @@ class Poller(object):
A I/O poller
"""
- def __init__(self, fileno, evtype, persistent = False, read_cb = None, write_cb = None, **kw):
+ def __init__(self, fileno, persistent = False, **kw):
"""
Create a poller.
:param fileno: the file descriptor we are going to poll
- :param cb: The callback to call when we have detected we can use this file descriptor for reading or writting
+ :param cb: the callback to call when we have detected we can read/write from this file descriptor
:param *args: the arguments to pass to cb
:param **kw: the keyword arguments to pass to cb
This poller will not be run unless it is scheduled in a hub by get_hub().add_poller(poller).
"""
-
if fileno < 0:
raise ValueError('invalid file descriptor: %d' % (fileno))
- else:
- self.fileno = fileno
+ self.fileno = fileno
self.persistent = persistent
-
- if '_read_callback' in kw: self.read_callback = kw.pop('_read_callback')
- else: self.read_callback = partial(read_cb, fileno)
-
- if '_write_callback' in kw: self.write_callback = kw.pop('_write_callback')
- else: self.write_callback = partial(write_cb, fileno)
-
- if '_events' in kw:
- self._events = kw.pop('_events')
- else:
- self._events = 0
- if evtype == READ:
- self._events = libuv.UV_READABLE
- elif evtype == WRITE:
- self._events = libuv.UV_WRITABLE
+ self.impl = None
+ self.started = False
+ self.read_callback = kw.pop('_read_callback', None)
+ self.write_callback = kw.pop('_write_callback', None)
+ self.impl = watchers.Poll(get_hub(), fileno)
if _g_debug:
import traceback, cStringIO
@@ -82,28 +76,55 @@ def __init__(self, fileno, evtype, persistent = False, read_cb = None, write_cb
def __repr__(self):
- secs = getattr(self, 'seconds', None)
- cb = getattr(self, 'callback', None)
- retval = "Poller(%s, %s)" % (secs, cb)
+
+ events = ''
+ if self.read_callback: events += 'R'
+ if self.write_callback: events += 'W'
+
+ retval = "Poller(%d, '%s')" % (self.fileno, events)
+
if _g_debug and hasattr(self, 'traceback'):
retval += '\n' + self.traceback.getvalue()
return retval
def copy(self):
- return self.__class__(self.fileno, None,
+ return self.__class__(self.fileno,
persistent = self.persistent,
- _events = self._events,
_read_callback = self.read_callback,
_write_callback = self.write_callback)
+ def start(self, hub, event, cb, *args):
+ """
+ Start the poller for an event on that file descriptor
+
+ :param hub: the hub where this watcher is registered
+ :param cb: the callback
+ :param args: the arguments for the callback
+ :return: the underlying watcher
+ """
+ assert self.impl is not None
+ assert event in [libuv.UV_READABLE, libuv.UV_WRITABLE]
+
+ try:
+ self.impl.start(event, hub._poller_triggered, event, self)
+ except:
+ pass
+ else:
+ cb = partial(cb, *args)
+ if event is libuv.UV_READABLE: self.read_callback = cb
+ else: self.write_callback = cb
+
+ return self.impl
+
+
def cancel(self):
"""
Prevent this poller from being called. If the poller has already
been called or canceled, has no effect.
"""
try:
- if self._events & libuv.UV_READABLE: del self.read_callback
- if self._events & libuv.UV_WRITEABLE: del self.write_callback
+ if self.notify_readable: self.read_callback = None
+ if self.notify_writable: self.write_callback = None
except AttributeError:
pass
@@ -115,8 +136,11 @@ def destroy(self):
Invoke this method when this poller is no longer used
"""
- self.implpoller.stop()
- del self.implpoller
+ self.read_callback = self.write_callback = None
+
+ assert self.impl
+ self.impl.stop()
+ del self.impl
def forget(self):
"""
@@ -126,38 +150,21 @@ def forget(self):
get_hub().forget_poller(self)
- ##
- ## the events we are watching
- ##
-
- def _set_events(self, event):
- new_events = self._events
- if event == READ: new_events |= libuv.UV_READABLE
- elif event == WRITE: new_events |= libuv.UV_WRITABLE
-
- ## TODO: update the events we are polling here...
-
- def _get_events(self):
- return self._events
-
- events = property(_get_events, _set_events)
+ @property
+ def notify_readable(self):
+ return self.read_callback is not None
+ @property
+ def notify_writable(self):
+ return self.write_callback is not None
##
## callbacks
##
- def __call__(self, *args):
- try:
- if self._events & libuv.UV_READABLE: self.read_callback()
- if self._events & libuv.UV_WRITEABLE: self.write_callback()
- finally:
- try:
- if not self.persistent:
- if self._events & libuv.UV_READABLE: del self.read_callback
- if self._events & libuv.UV_WRITEABLE: del self.write_callback
- except AttributeError:
- pass
+ def __call__(self, evtype):
+ if self.notify_readable and evtype is libuv.UV_READABLE: self.read_callback()
+ if self.notify_writable and evtype is libuv.UV_WRITABLE: self.write_callback()
# No default ordering in 3.x. heapq uses <
# FIXME should full set be added?

0 comments on commit 29d0152

Please sign in to comment.