From 26bd24917857f2f743fde58f9aad1eeb187fd0aa Mon Sep 17 00:00:00 2001 From: benoitc Date: Fri, 1 Mar 2013 09:00:14 +0100 Subject: [PATCH 1/3] simplify the event emitter This change simplify the event listener code by sending wildcards and patterns subscribed all at once. There is no real reason to send them in a different loop also the way we trigger thevent loop using an async worker may be a problem when both a wildcard and a simple event were subcscribed. --- gaffer/events.py | 112 +++++++++++++++++++++-------------------------- 1 file changed, 50 insertions(+), 62 deletions(-) diff --git a/gaffer/events.py b/gaffer/events.py index 3ee0c8a..b7a1287 100644 --- a/gaffer/events.py +++ b/gaffer/events.py @@ -138,7 +138,6 @@ def __init__(self, loop, max_size=200): self._wqueue = deque(maxlen=max_size) self._event_dispatcher = pyuv.Prepare(self.loop) - self._wevent_dispatcher = pyuv.Prepare(self.loop) self._spinner = pyuv.Idle(self.loop) def close(self): @@ -155,9 +154,6 @@ def close(self): if not self._event_dispatcher.closed: self._event_dispatcher.close() - if not self._wevent_dispatcher.closed: - self._wevent_dispatcher.close() - if not self._spinner.closed: self._spinner.close() @@ -168,71 +164,19 @@ def publish(self, evtype, *args, **kwargs): """ if "." in evtype: parts = evtype.split(".") - self._publish(parts[0], evtype, *args, **kwargs) + self._queue.append((parts[0], evtype, args, kwargs)) key = [] for part in parts: key.append(part) - self._publish(".".join(key), evtype, *args, **kwargs) + self._queue.append((".".join(key), evtype, args, kwargs)) else: - self._publish(evtype, evtype, *args, **kwargs) + self._queue.append((evtype, evtype, args, kwargs)) # emit the event for wildcards events - self._publish_wildcards(evtype, *args, **kwargs) - - def _publish(self, pattern, evtype, *args, **kwargs): - if pattern in self._events: - self._queue.append((pattern, evtype, args, kwargs)) - - if not self._event_dispatcher.active: - self._event_dispatcher.start(self._send) - self._spinner.start(lambda h: h.stop()) - - def _publish_wildcards(self, evtype, *args, **kwargs): - if self._wildcards: - self._wqueue.append((evtype, args, kwargs)) - - if not self._wevent_dispatcher.active: - self._wevent_dispatcher.start(self._send_wildcards) - self._spinner.start(lambda h: h.stop()) - - def _send_wildcards(self, handle): - queue, self._wqueue = self._wqueue, deque(maxlen=self._wqueue.maxlen) - for evtype, args, kwargs in queue: - if self._wildcards: - self._wildcards = self._send_listeners(evtype, - self._wildcards.copy(), *args, **kwargs) - - self._wevent_dispatcher.stop() - - def _send(self, handle): - queue, self._queue = self._queue, deque(maxlen=self._queue.maxlen) - for pattern, evtype, args, kwargs in queue: - # emit the event to all listeners - if pattern in self._events: - self._events[pattern] = self._send_listeners(evtype, - self._events[pattern].copy(), *args, **kwargs) - - self._event_dispatcher.stop() - - def _send_listeners(self, evtype, listeners, *args, **kwargs): - to_remove = [] - for once, listener in listeners: - try: - listener(evtype, *args, **kwargs) - except Exception: # we ignore all exception - to_remove.append(listener) + self._wqueue.append((evtype, args, kwargs)) - if once: - # once event - to_remove.append(listener) - - if to_remove: - for listener in to_remove: - try: - listeners.remove((True, listener)) - except KeyError: - pass - return listeners + # send the event for later + self._dispatch_event() def subscribe(self, evtype, listener, once=False): @@ -277,3 +221,47 @@ def unsubscribe_all(self, events=[]): self._wildcards = set() else: self._events[evtype] = set() + + ### private methods + + def _dispatch_event(self): + if not self._event_dispatcher.active: + self._event_dispatcher.start(self._send) + self._spinner.start(lambda h: h.stop()) + + def _send(self, handle): + queue, self._queue = self._queue, deque(maxlen=self._queue.maxlen) + wqueue, self._wqueue = self._wqueue, deque(maxlen=self._wqueue.maxlen) + + for evtype, args, kwargs in wqueue: + if self._wildcards: + self._wildcards = self._send_listeners(evtype, + self._wildcards.copy(), *args, **kwargs) + + for pattern, evtype, args, kwargs in queue: + # emit the event to all listeners + if pattern in self._events: + self._events[pattern] = self._send_listeners(evtype, + self._events[pattern].copy(), *args, **kwargs) + + self._event_dispatcher.stop() + + def _send_listeners(self, evtype, listeners, *args, **kwargs): + to_remove = [] + for once, listener in listeners: + try: + listener(evtype, *args, **kwargs) + except Exception: # we ignore all exception + to_remove.append(listener) + + if once: + # once event + to_remove.append(listener) + + if to_remove: + for listener in to_remove: + try: + listeners.remove((True, listener)) + except KeyError: + pass + return listeners From e102937b22e49b8eed67701996d95255f1324594 Mon Sep 17 00:00:00 2001 From: benoitc Date: Fri, 1 Mar 2013 09:49:47 +0100 Subject: [PATCH 2/3] make sure we trigger all events. following @saghul idea, start the spinner, pass it a dummy callback (lambda h: None) and stop it at the end of _send. This will prevent the loop from blocking for i/o on a given iteration if there are pending events, that is, until the prepare handle is run. Als unref the prepare handle to not block the loop. --- gaffer/events.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/gaffer/events.py b/gaffer/events.py index b7a1287..da3e3d6 100644 --- a/gaffer/events.py +++ b/gaffer/events.py @@ -138,6 +138,9 @@ def __init__(self, loop, max_size=200): self._wqueue = deque(maxlen=max_size) self._event_dispatcher = pyuv.Prepare(self.loop) + self._event_dispatcher.start(self._send) + self._event_dispatcher.unref() + self._spinner = pyuv.Idle(self.loop) def close(self): @@ -225,9 +228,7 @@ def unsubscribe_all(self, events=[]): ### private methods def _dispatch_event(self): - if not self._event_dispatcher.active: - self._event_dispatcher.start(self._send) - self._spinner.start(lambda h: h.stop()) + self._spinner.start(lambda h: None) def _send(self, handle): queue, self._queue = self._queue, deque(maxlen=self._queue.maxlen) @@ -244,7 +245,7 @@ def _send(self, handle): self._events[pattern] = self._send_listeners(evtype, self._events[pattern].copy(), *args, **kwargs) - self._event_dispatcher.stop() + self._spinner.stop() def _send_listeners(self, evtype, listeners, *args, **kwargs): to_remove = [] From c9df573099115166d31c6cd399ff93d8d61a1c90 Mon Sep 17 00:00:00 2001 From: benoitc Date: Fri, 1 Mar 2013 10:08:08 +0100 Subject: [PATCH 3/3] simplify the way we retrieve the vents don't reassing the queue event. While I'm here log exception callbacks. --- gaffer/events.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/gaffer/events.py b/gaffer/events.py index da3e3d6..b49a419 100644 --- a/gaffer/events.py +++ b/gaffer/events.py @@ -118,11 +118,15 @@ """ from collections import deque +import logging import pyuv from .loop import patch_loop +LOGGER = logging.getLogger("gaffer") + + class EventEmitter(object): """ Many events happend in gaffer. For example a process will emist the events "start", "stop", "exit". @@ -231,15 +235,17 @@ def _dispatch_event(self): self._spinner.start(lambda h: None) def _send(self, handle): - queue, self._queue = self._queue, deque(maxlen=self._queue.maxlen) - wqueue, self._wqueue = self._wqueue, deque(maxlen=self._wqueue.maxlen) + lwqueue = len(self._wqueue) + lqueue = len(self._queue) - for evtype, args, kwargs in wqueue: + for i in range(lwqueue): + evtype, args, kwargs = self._wqueue.popleft() if self._wildcards: self._wildcards = self._send_listeners(evtype, self._wildcards.copy(), *args, **kwargs) - for pattern, evtype, args, kwargs in queue: + for i in range(lqueue): + pattern, evtype, args, kwargs = self._queue.popleft() # emit the event to all listeners if pattern in self._events: self._events[pattern] = self._send_listeners(evtype, @@ -253,6 +259,8 @@ def _send_listeners(self, evtype, listeners, *args, **kwargs): try: listener(evtype, *args, **kwargs) except Exception: # we ignore all exception + LOGGER.exception('exception calling listener callback for %r', + self) to_remove.append(listener) if once: