Permalink
Browse files

Improved Hubs stuff

Fixed many unit tests... we can run most of the networking stuff now
  • Loading branch information...
1 parent cecd6e3 commit 8a2189d11d2af87c5674735cf7d55d8fcdd4fbaa Alvaro committed Nov 15, 2012
Showing with 57 additions and 138 deletions.
  1. +1 −1 evy/__init__.py
  2. +1 −8 evy/debug.py
  3. +32 −22 evy/hubs/hub.py
  4. +6 −7 evy/hubs/poller.py
  5. +16 −30 evy/uv/watchers.py
  6. +0 −2 tests/stdlib/all.py
  7. +1 −68 tests/test_green_socket.py
View
@@ -45,7 +45,7 @@
spawn_after = greenthread.spawn_after
kill = greenthread.kill
- Timeout = timeout.Timeout
+ Timeout = TimeoutError = timeout.Timeout
with_timeout = timeout.with_timeout
GreenPool = greenpool.GreenPool
View
@@ -42,7 +42,7 @@
__all__ = ['spew', 'unspew', 'format_hub_listeners', 'format_hub_timers',
'hub_listener_stacks', 'hub_exceptions', 'tpool_exceptions',
- 'hub_prevent_multiple_readers', 'hub_timer_stacks',
+ 'hub_timer_stacks',
'hub_blocking_detection']
_token_splitter = re.compile('\W+')
@@ -154,13 +154,6 @@ def hub_timer_stacks (state = False):
timer._g_debug = state
-
-def hub_prevent_multiple_readers (state = True):
- from evy.hubs import hub
-
- hub.g_prevent_multiple_readers = state
-
-
def hub_exceptions (state = True):
"""Toggles whether the hub prints exceptions that are raised from its
timers. This can be useful to see how greenthreads are terminating.
View
@@ -33,10 +33,6 @@
import sys
import os
-from itertools import ifilter
-
-from weakref import WeakValueDictionary
-
from evy.uv.interface import libuv, handle_unref
from evy.uv.interface import ffi
from evy.uv import watchers
@@ -100,7 +96,6 @@ class Hub(object):
"""
SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)
-
READ = READ
WRITE = WRITE
@@ -124,8 +119,7 @@ def __init__ (self, clock = time.time, ptr = None, default = True):
self.running = False
self.timers = set()
- self.pollers = set()
- self.pollers_by_fd = WeakValueDictionary()
+ self.pollers = {}
self.debug_exceptions = True
self.debug_blocking = False
@@ -178,16 +172,22 @@ def add (self, evtype, fileno, cb):
:param fileno: the file number of the file of interest.
:param cb: callback which will be called when the file is ready for reading/writing.
"""
- if fileno in self.pollers_by_fd:
- p = self.pollers_by_fd[fileno]
+ if fileno in self.pollers:
+ p = self.pollers[fileno]
+
+ ## check we do not have another callback on the same descriptor and event
+ if p.notify_readable and evtype is READ:
+ raise RuntimeError('there is already %s reading from descriptor %d' % (str(p), fileno))
+ if p.notify_writable and evtype is WRITE:
+ raise RuntimeError('there is already %s writing to descriptor %d' % (str(p), fileno))
+
p.start(self, evtype, cb, fileno)
else:
p = poller.Poller(fileno)
p.start(self, evtype, cb, fileno)
## register the poller
- self.pollers.add(p)
- self.pollers_by_fd[fileno] = p
+ self.pollers[fileno] = p
return p
@@ -205,14 +205,18 @@ def remove_descriptor (self, fileno):
Completely remove all watchers for this *fileno*. For internal use only.
"""
try:
- p = self.pollers_by_fd[fileno]
+ p = self.pollers[fileno]
except KeyError:
return
- else:
- # invoke the callback in the poller and destroy it
- p(READ) and p(WRITE)
+
+ try:
+ # invoke the callbacks in the poller and destroy it
+ p(READ)
+ p(WRITE)
+ finally:
self._poller_canceled(p)
+
def set_timer_exceptions (self, value):
"""
Debug exceptions
@@ -492,15 +496,17 @@ def _poller_canceled (self, p):
assert p and isinstance(p, poller.Poller)
fileno = p.fileno
- try:
- poller.destroy()
+ p.destroy()
+
+ try:
## remove all references to the poller...
- self.pollers.remove(timer)
- del self.pollers_by_fd[fileno]
- except (AttributeError, TypeError):
+ del self.pollers[fileno]
+ except KeyError:
pass
+ assert fileno not in self.pollers
+
def forget_poller(self, poller):
"""
@@ -725,7 +731,11 @@ def signal_received(self, signal):
## readers and writers
##
def get_readers(self):
- return list(ifilter(lambda i: i.notify_readable, self.pollers))
+ return [x for x in self.pollers.values() if x.notify_readable]
def get_writers(self):
- return list(ifilter(lambda i: i.notify_writable, self.pollers))
+ return [x for x in self.pollers.values() if x.notify_writable]
+
+ def __repr__(self):
+ retval = "Hub(%d pollers, %d timers, %d active)" % (self.poller_count, self.timers_count, self.num_active)
+ return retval
View
@@ -63,11 +63,11 @@ def __init__(self, fileno, persistent = False, **kw):
self.fileno = fileno
self.persistent = persistent
- self.impl = None
self.started = False
self.read_callback = kw.pop('_read_callback', None)
self.write_callback = kw.pop('_write_callback', None)
- self.impl = watchers.Poll(get_hub(), fileno)
+ self.hub = kw.pop('_hub', get_hub())
+ self.impl = watchers.Poll(self.hub, fileno)
if _g_debug:
import traceback, cStringIO
@@ -128,7 +128,7 @@ def cancel(self):
except AttributeError:
pass
- get_hub()._poller_canceled(self)
+ self.hub._poller_canceled(self)
def destroy(self):
"""
@@ -138,9 +138,9 @@ def destroy(self):
"""
self.read_callback = self.write_callback = None
- assert self.impl
- self.impl.stop()
- del self.impl
+ if self.impl is not None:
+ self.impl.stop()
+ self.impl = None
def forget(self):
"""
@@ -149,7 +149,6 @@ def forget(self):
"""
get_hub().forget_poller(self)
-
@property
def notify_readable(self):
return self.read_callback is not None
View
@@ -87,28 +87,11 @@ def _run_callback(self, handle, *args):
It will call the callback provided on the start() method
"""
- #uv_handle = self._cast_libuv_handle(handle)
-
if self.callback:
try:
self.callback()
except:
- try:
- self.hub.handle_error(self, *sys.exc_info())
- finally:
- #if revents & (libuv.UV_READABLE | libuv.UV_WRITABLE):
- # # /* poll watcher: not stopping it may cause the failing callback to be called repeatedly */
- # try:
- # self.stop()
- # except:
- # self.hub.handle_error(self, *sys.exc_info())
- # return
- pass
-
- # callbacks' self.active differs from uv_is_active(...) at this point. don't use it!
- #if not handle_is_active(uv_handle):
- # self.stop()
-
+ self.hub.handle_error(self, *sys.exc_info())
##
## references
@@ -159,7 +142,8 @@ def start(self, callback, *args, **kwargs):
:param callback: callback to invoke when the watcher is done
:param args: arguments for calling the callback
"""
- if callback: self.callback = partial(callback, *args, **kwargs)
+ if callback:
+ self.callback = partial(callback, *args, **kwargs)
self._libuv_unref()
@@ -171,18 +155,11 @@ def stop(self):
"""
Stop the watcher
"""
- if self._flags & 2:
- libuv.uv_ref(self.hub._uv_ptr)
- self._flags &= ~2
-
- if self._stop_func: self._stop_func()
+ if self._stop_func:
+ self._stop_func()
self.callback = None
- if self._flags & 1:
- # Py_DECREF(<PyObjectPtr>self)
- self._flags &= ~1
-
@property
def active(self):
return handle_is_active(self._uv_handle)
@@ -211,6 +188,10 @@ def _cast_libuv_handle(self, handle):
return ffi.cast(self.libuv_handle_type, handle)
+ def __repr__(self):
+ retval = "%s (%d)" % (type(self).__name__, self.active)
+ return retval
+
class Poll(Watcher):
"""
This watcher is used to watch file descriptors for readability and writability, similar to the
@@ -327,8 +308,13 @@ def _set_events(self, events):
events = property(_get_events, _set_events)
- def __repr__ (self):
- return "%s(%r, %r, %r)" % (type(self).__name__, self.events, self.fileno, self.callback)
+ def __repr__(self):
+ events = ''
+ if self.read_callback: events += 'R'
+ if self.write_callback: events += 'W'
+
+ retval = "Poller (%d, '%s')" % (self.fileno, events)
+ return retval
__str__ = __repr__
View
@@ -5,8 +5,6 @@
from evy import debug
-debug.hub_prevent_multiple_readers(False)
-
def restart_hub ():
from evy import hubs
View
@@ -499,19 +499,15 @@ def sender (evt):
server.close()
client.close()
- @skip_with_pyevent
def test_raised_multiple_readers (self):
- debug.hub_prevent_multiple_readers(True)
def handle (sock, addr):
sock.recv(1)
sock.sendall("a")
raise evy.StopServe()
listener = evy.listen(('127.0.0.1', 0))
- server = evy.spawn(evy.serve,
- listener,
- handle)
+ server = evy.spawn(evy.serve, listener, handle)
def reader (s):
s.recv(1)
@@ -523,8 +519,6 @@ def reader (s):
s.sendall('b')
a.wait()
- @skip_with_pyevent
- @skip_if(using_epoll_hub)
def test_closure (self):
def spam_to_me (address):
sock = evy.connect(address)
@@ -676,67 +670,6 @@ def test_truncate (self):
self.assertEquals(f.tell(), 9)
-class TestGreenIoLong(LimitedTestCase):
- TEST_TIMEOUT = 10 # the test here might take a while depending on the OS
-
- @skip_with_pyevent
- def test_multiple_readers (self, clibufsize = False):
- debug.hub_prevent_multiple_readers(False)
- recvsize = 2 * min_buf_size()
- sendsize = 10 * recvsize
- # test that we can have multiple coroutines reading
- # from the same fd. We make no guarantees about which one gets which
- # bytes, but they should both get at least some
- def reader (sock, results):
- while True:
- data = sock.recv(recvsize)
- if not data:
- break
- results.append(data)
-
- results1 = []
- results2 = []
- listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- listener.bind(('127.0.0.1', 0))
- listener.listen(50)
-
- def server ():
- (sock, addr) = listener.accept()
- sock = bufsized(sock)
- try:
- c1 = evy.spawn(reader, sock, results1)
- c2 = evy.spawn(reader, sock, results2)
- try:
- c1.wait()
- c2.wait()
- finally:
- c1.kill()
- c2.kill()
- finally:
- sock.close()
-
- server_coro = evy.spawn(server)
- client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- client.connect(('127.0.0.1', listener.getsockname()[1]))
- if clibufsize:
- bufsized(client, size = sendsize)
- else:
- bufsized(client)
- client.sendall(s2b('*') * sendsize)
- client.close()
- server_coro.wait()
- listener.close()
- self.assert_(len(results1) > 0)
- self.assert_(len(results2) > 0)
- debug.hub_prevent_multiple_readers()
-
- @skipped # by rdw because it fails but it's not clear how to make it pass
- @skip_with_pyevent
- def test_multiple_readers2 (self):
- self.test_multiple_readers(clibufsize = True)
-
-
class TestGreenIoStarvation(LimitedTestCase):
# fixme: this doesn't succeed, because of evy's predetermined
# ordering. two processes, one with server, one with client evys

0 comments on commit 8a2189d

Please sign in to comment.