Skip to content

Commit

Permalink
Reworked logic for dispatching events
Browse files Browse the repository at this point in the history
It now uses 2 Prepare handles and a Idle handle. Each prepare handle
is used to send the named or wildcard events respectively, and the Idle
handle is just used to ensure that the loop will not block for i/o when
there are pending events.
  • Loading branch information
saghul committed Nov 9, 2012
1 parent aa24ef7 commit f030956
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 37 deletions.
63 changes: 27 additions & 36 deletions gaffer/events.py
Expand Up @@ -132,10 +132,13 @@ def __init__(self, loop, max_size=200):
self._events = {}
self._wildcards = set()

self._triggered = []
self._queue = deque(maxlen=max_size)
self._wqueue = deque(maxlen=max_size)

self._event_dispatcher = pyuv.Prepare(self.loop)
self._wevent_dispatcher = pyuv.Prepare(self.loop)
self._spinner = pyuv.Idle(self.loop)

def close(self):
""" close the event
Expand All @@ -146,8 +149,9 @@ def close(self):
self._events = {}
self._wildcards = set()

# it will be garbage collected later
self._triggered = []
self._event_dispatcher.close()
self._wevent_dispatcher.close()
self._spinner.close()

def publish(self, evtype, *args, **kwargs):
""" emit an event **evtype**
Expand All @@ -171,49 +175,36 @@ def _publish(self, pattern, evtype, *args, **kwargs):
if pattern in self._events:
self._queue.append((pattern, evtype, args, kwargs))

idle = pyuv.Idle(self.loop)
idle.start(self._send)
self._triggered.append(idle)
if not self._event_dispatcher.active:
self._event_dispatcher.start(self._send)
self._spinner.start(lambda h: h.stop())

def _publish_wildcards(self, evtype, *args, **kwargs):
if self._wildcards:
self._wqueue.append((evtype, args, kwargs))

idle = pyuv.Idle(self.loop)
idle.start(self._send_wildcards)
self._triggered.append(idle)
if not self._wevent_dispatcher.active:
self._wevent_dispatcher.start(self._send_wildcards)
self._spinner.start(lambda h: h.stop())

def _send_wildcards(self, handle):
# find an event to send
try:
evtype, args, kwargs = self._wqueue.popleft()
except IndexError:
return

if self._wildcards:
self._wildcards = self._send_listeners(evtype,
self._wildcards.copy(), *args, **kwargs)

# close the handle and removed it from the list of triggered
self._triggered.remove(handle)
handle.close()
queue, self._wqueue = self._wqueue, deque(maxlen=self._wqueue.maxlen)
for evtype, args, kwargs in queue:
if self._wildcards:
self._wildcards = self._send_listeners(evtype,
self._wildcards.copy(), *args, **kwargs)

self._wevent_dispatcher.stop()

def _send(self, handle):
# find an event to send
try:
pattern, evtype, args, kwargs = self._queue.popleft()
except IndexError:
return

# emit the event to all listeners
if pattern in self._events:
self._events[pattern] = self._send_listeners(evtype,
self._events[pattern].copy(), *args, **kwargs)

# close the handle and removed it from the list of triggered
self._triggered.remove(handle)
handle.close()
queue, self._queue = self._queue, deque(maxlen=self._queue.maxlen)
for pattern, evtype, args, kwargs in queue:
# emit the event to all listeners
if pattern in self._events:
self._events[pattern] = self._send_listeners(evtype,
self._events[pattern].copy(), *args, **kwargs)

self._event_dispatcher.stop()

def _send_listeners(self, evtype, listeners, *args, **kwargs):
to_remove = []
Expand Down
1 change: 0 additions & 1 deletion test/test_events.py
Expand Up @@ -26,7 +26,6 @@ def cb(ev):
assert emitted == [True]
assert "test" in emitter._events
assert emitter._events["test"] == set([(False, cb)])
assert emitter._triggered == []


def test_publish_value():
Expand Down

0 comments on commit f030956

Please sign in to comment.