Skip to content

Commit

Permalink
Merge pull request #23 from benoitc/fix/events
Browse files Browse the repository at this point in the history
simplify the event emitter
  • Loading branch information
benoitc committed Mar 1, 2013
2 parents 3ca6317 + c9df573 commit 7d237b5
Showing 1 changed file with 59 additions and 62 deletions.
121 changes: 59 additions & 62 deletions gaffer/events.py
Expand Up @@ -118,11 +118,15 @@
"""

from collections import deque
import logging

import pyuv

from .loop import patch_loop

LOGGER = logging.getLogger("gaffer")


class EventEmitter(object):
""" Many events happend in gaffer. For example a process will emist
the events "start", "stop", "exit".
Expand All @@ -138,7 +142,9 @@ def __init__(self, loop, max_size=200):
self._wqueue = deque(maxlen=max_size)

self._event_dispatcher = pyuv.Prepare(self.loop)
self._wevent_dispatcher = pyuv.Prepare(self.loop)
self._event_dispatcher.start(self._send)
self._event_dispatcher.unref()

self._spinner = pyuv.Idle(self.loop)

def close(self):
Expand All @@ -155,9 +161,6 @@ def close(self):
if not self._event_dispatcher.closed:
self._event_dispatcher.close()

if not self._wevent_dispatcher.closed:
self._wevent_dispatcher.close()

if not self._spinner.closed:
self._spinner.close()

Expand All @@ -168,71 +171,19 @@ def publish(self, evtype, *args, **kwargs):
"""
if "." in evtype:
parts = evtype.split(".")
self._publish(parts[0], evtype, *args, **kwargs)
self._queue.append((parts[0], evtype, args, kwargs))
key = []
for part in parts:
key.append(part)
self._publish(".".join(key), evtype, *args, **kwargs)
self._queue.append((".".join(key), evtype, args, kwargs))
else:
self._publish(evtype, evtype, *args, **kwargs)
self._queue.append((evtype, evtype, args, kwargs))

# emit the event for wildcards events
self._publish_wildcards(evtype, *args, **kwargs)

def _publish(self, pattern, evtype, *args, **kwargs):
if pattern in self._events:
self._queue.append((pattern, evtype, args, kwargs))
self._wqueue.append((evtype, args, kwargs))

if not self._event_dispatcher.active:
self._event_dispatcher.start(self._send)
self._spinner.start(lambda h: h.stop())

def _publish_wildcards(self, evtype, *args, **kwargs):
if self._wildcards:
self._wqueue.append((evtype, args, kwargs))

if not self._wevent_dispatcher.active:
self._wevent_dispatcher.start(self._send_wildcards)
self._spinner.start(lambda h: h.stop())

def _send_wildcards(self, handle):
queue, self._wqueue = self._wqueue, deque(maxlen=self._wqueue.maxlen)
for evtype, args, kwargs in queue:
if self._wildcards:
self._wildcards = self._send_listeners(evtype,
self._wildcards.copy(), *args, **kwargs)

self._wevent_dispatcher.stop()

def _send(self, handle):
queue, self._queue = self._queue, deque(maxlen=self._queue.maxlen)
for pattern, evtype, args, kwargs in queue:
# emit the event to all listeners
if pattern in self._events:
self._events[pattern] = self._send_listeners(evtype,
self._events[pattern].copy(), *args, **kwargs)

self._event_dispatcher.stop()

def _send_listeners(self, evtype, listeners, *args, **kwargs):
to_remove = []
for once, listener in listeners:
try:
listener(evtype, *args, **kwargs)
except Exception: # we ignore all exception
to_remove.append(listener)

if once:
# once event
to_remove.append(listener)

if to_remove:
for listener in to_remove:
try:
listeners.remove((True, listener))
except KeyError:
pass
return listeners
# send the event for later
self._dispatch_event()


def subscribe(self, evtype, listener, once=False):
Expand Down Expand Up @@ -277,3 +228,49 @@ def unsubscribe_all(self, events=[]):
self._wildcards = set()
else:
self._events[evtype] = set()

### private methods

def _dispatch_event(self):
self._spinner.start(lambda h: None)

def _send(self, handle):
lwqueue = len(self._wqueue)
lqueue = len(self._queue)

for i in range(lwqueue):
evtype, args, kwargs = self._wqueue.popleft()
if self._wildcards:
self._wildcards = self._send_listeners(evtype,
self._wildcards.copy(), *args, **kwargs)

for i in range(lqueue):
pattern, evtype, args, kwargs = self._queue.popleft()
# emit the event to all listeners
if pattern in self._events:
self._events[pattern] = self._send_listeners(evtype,
self._events[pattern].copy(), *args, **kwargs)

self._spinner.stop()

def _send_listeners(self, evtype, listeners, *args, **kwargs):
to_remove = []
for once, listener in listeners:
try:
listener(evtype, *args, **kwargs)
except Exception: # we ignore all exception
LOGGER.exception('exception calling listener callback for %r',
self)
to_remove.append(listener)

if once:
# once event
to_remove.append(listener)

if to_remove:
for listener in to_remove:
try:
listeners.remove((True, listener))
except KeyError:
pass
return listeners

0 comments on commit 7d237b5

Please sign in to comment.