Permalink
Browse files

Standardize events objects

Also simplify WrappedEvents works.
  • Loading branch information...
1 parent ae784b2 commit 55478434460c5951097639b9c096e98bb39c0af8 @bombela bombela committed May 18, 2012
Showing with 63 additions and 22 deletions.
  1. +28 −8 zerorpc/channel.py
  2. +20 −10 zerorpc/events.py
  3. +15 −4 zerorpc/heartbeat.py
View
@@ -102,6 +102,10 @@ def channel(self, from_event=None):
def active_channels(self):
return self._active_channels
+ @property
+ def context(self):
+ return self._events.context
+
class Channel(object):
@@ -128,28 +132,33 @@ def close(self):
del self._multiplexer._active_channels[self._channel_id]
self._channel_id = None
- def emit(self, name, args, xheader={}):
+ def create_event(self, name, args, xheader={}):
event = self._multiplexer.create_event(name, args, xheader)
-
if self._channel_id is None:
self._channel_id = event.header['message_id']
self._multiplexer._active_channels[self._channel_id] = self
else:
event.header['response_to'] = self._channel_id
+ return event
+
+ def emit(self, name, args, xheader={}):
+ event = self.create_event(name, args, xheader)
+ self._multiplexer.emit_event(event, self._zmqid)
-# TODO debug middleware
-# print time.time(), 'channel emit', event
+ def emit_event(self, event):
self._multiplexer.emit_event(event, self._zmqid)
def recv(self, timeout=None):
try:
event = self._queue.get(timeout=timeout)
except gevent.queue.Empty:
raise TimeoutExpired(timeout)
-# TODO debug middleware
-# print time.time(), 'channel recv', event
return event
+ @property
+ def context(self):
+ return self._multiplexer.context
+
class BufferedChannel(object):
@@ -193,20 +202,27 @@ def _recver(self):
else:
self._input_queue.put(event)
- def emit(self, name, args, xheader={}, block=True, timeout=None):
+ def create_event(self, name, args, xheader={}):
+ return self._channel.create_event(name, args, xheader)
+
+ def emit_event(self, event, block=True, timeout=None):
if self._remote_queue_open_slots == 0:
if not block:
return False
self._remote_can_recv.clear()
self._remote_can_recv.wait(timeout=timeout)
self._remote_queue_open_slots -= 1
try:
- self._channel.emit(name, args, xheader)
+ self._channel.emit_event(event)
except:
self._remote_queue_open_slots += 1
raise
return True
+ def emit(self, name, args, xheader={}, block=True, timeout=None):
+ event = self.create_event(name, args, xheader)
+ return self.emit_event(event, block, timeout)
+
def _request_data(self):
open_slots = self._input_queue_size - self._input_queue_reserved
self._input_queue_reserved += open_slots
@@ -230,3 +246,7 @@ def recv(self, timeout=None):
@property
def channel(self):
return self._channel
+
+ @property
+ def context(self):
+ return self._channel.context
View
@@ -113,7 +113,7 @@ def __init__(self, name, args, context, header=None):
self._name = name
self._args = args
if header is None:
- context = context or Context.get_instance()
+ context = context
self._header = {
'message_id': context.new_msgid(),
'v': 3
@@ -257,6 +257,10 @@ def recv(self):
def setsockopt(self, *args):
return self._socket.setsockopt(*args)
+ @property
+ def context(self):
+ return self._context
+
class WrappedEvents(object):
@@ -271,18 +275,24 @@ def recv_is_available(self):
return self._channel.recv_is_available
def create_event(self, name, args, xheader={}):
- wrapped_event = Event(name, args, None)
- wrapped_event.header.update(xheader)
- return wrapped_event
+ event = Event(name, args, self._channel.context)
+ event.header.update(xheader)
+ return event
def emit_event(self, event, identity=None):
- return self._channel.emit('w', event.pack())
+ event_payload = (event.header, event.name, event.args)
+ wrapper_event = self._channel.create_event('w', event_payload)
+ self._channel.emit_event(wrapper_event)
def emit(self, name, args, xheader={}):
- wrapped_event = self.create_event(name, args, xheader)
- return self._channel.emit('w', wrapped_event.pack())
+ wrapper_event = self.create_event(name, args, xheader)
+ self.emit_event(wrapper_event)
def recv(self, timeout=None):
- event = self._channel.recv()
- wrapped_event = Event.unpack(event.args)
- return wrapped_event
+ wrapper_event = self._channel.recv()
+ (header, name, args) = wrapper_event.args
+ return Event(name, args, None, header)
+
+ @property
+ def context(self):
+ return self._channel.context
View
@@ -95,12 +95,19 @@ def _lost_remote_exception(self):
return LostRemote('Lost remote after {0}s heartbeat'.format(
self._heartbeat_freq * 2))
- def emit(self, name, args, xheader={}):
- if self._lost_remote:
- raise self._lost_remote_exception()
+ def create_event(self, name, args, xheader={}):
if self._compat_v2 and name == '_zpc_more':
name = '_zpc_hb'
- self._channel.emit(name, args, xheader)
+ return self._channel.create_event(name, args, xheader)
+
+ def emit_event(self, event):
+ if self._lost_remote:
+ raise self._lost_remote_exception()
+ self._channel.emit_event(event)
+
+ def emit(self, name, args, xheader={}):
+ event = self.create_event(name, args, xheader)
+ self.emit_event(event)
def recv(self, timeout=None):
if self._lost_remote:
@@ -116,3 +123,7 @@ def recv(self, timeout=None):
@property
def channel(self):
return self._channel
+
+ @property
+ def context(self):
+ return self._channel.context

0 comments on commit 5547843

Please sign in to comment.