Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zmqstream #31

Merged
9 commits merged into from Oct 11, 2010
59 changes: 51 additions & 8 deletions zmq/eventloop/zmqstream.py
Expand Up @@ -16,8 +16,10 @@
"""A utility class to send to and recv from a non-blocking socket."""

import logging
import time

import zmq
from zmq.core.socket import json, pickle

import ioloop
try:
from queue import Queue
Expand All @@ -29,18 +31,18 @@ class ZMQStream(object):

For use with zmq.eventloop.ioloop

There are 3 main methods:
There are 4 main methods:
on_recv(callback,copy=True):
register a callback to be run every time the socket has something to receive
on_send(callback):
register a callback to be run every time you call send
on_err(callback):
register a callback to be run every time there is an error
send(msg, callback=None)
send(self, msg, flags=0, copy=False, callback=None):
perform a send that will trigger the callback
if callback is passed, on_send is also called

There is also send_multipart()
There are also send_multipart(), send_json, send_pyobj

Two other methods for deactivating the callbacks:
stop_on_recv():
Expand All @@ -51,6 +53,13 @@ class ZMQStream(object):
turn off the error callback

All of which simply call on_<evt>(None).

The entire socket interface, excluding direct recv methods, is also
provided, primarily through direct-linking the methods.
e.g.
>>> stream.bind is stream.socket.bind
True


"""

Expand All @@ -73,9 +82,13 @@ def __init__(self, socket, io_loop=None):

# shortcircuit some socket methods
self.bind = self.socket.bind
self.bind_to_random_port = self.socket.bind_to_random_port
self.connect = self.socket.connect
self.setsockopt = self.socket.setsockopt
self.getsockopt = self.socket.getsockopt
self.setsockopt_unicode = self.socket.setsockopt_unicode
self.getsockopt_unicode = self.socket.getsockopt_unicode


def stop_on_recv(self):
"""Disable callback and automatic receiving."""
Expand Down Expand Up @@ -184,15 +197,42 @@ def send_multipart(self, msg, flags=0, copy=False, callback=None):
# noop callback
self.on_send(lambda *args: None)

def send_unicode(self, u, flags=0, encoding='utf-8', callback=None):
"""Send a unicode message with an encoding.
See zmq.socket.send_unicode for details.
"""
if not isinstance(u, basestring):
raise TypeError("unicode/str objects only")
return self.send(u.encode(encoding), flags=flags, callback=callback)

def send_json(self, obj, flags=0, callback=None):
"""Send json-serialized version of an object.
See zmq.socket.send_json for details.
"""
if json is None:
raise ImportError('cjson, json or simplejson library is required.')
else:
msg = json.dumps(obj)
return self.send(msg, flags=flags, callback=callback)

def send_pyobj(self, obj, flags=0, protocol=-1, callback=None):
"""Send a Python object as a message using pickle to serialize.

See zmq.socket.send_json for details.
"""
msg = pickle.dumps(obj, protocol)
return self.send(msg, flags, callback=callback)

def set_close_callback(self, callback):
"""Call the given callback when the stream is closed."""
self._close_callback = callback

def close(self):
"""Close this stream."""
if self.socket is not None:
self.io_loop.remove_handler(self.socket)
self.socket.close()
dc = ioloop.DelayedCallback(self.socket.close, 100, self.io_loop)
dc.start()
self.socket = None
if self._close_callback:
self._run_callback(self._close_callback)
Expand Down Expand Up @@ -271,10 +311,13 @@ def _handle_send(self):
return

msg = self._send_queue.get()
queue = self.socket.send_multipart(*msg)
try:
status = self.socket.send_multipart(*msg)
except zmq.ZMQError, e:
status = e
if self._send_callback:
callback = self._send_callback
self._run_callback(callback, msg, queue)
self._run_callback(callback, msg, status)

# unregister from event loop:
if not self.sending():
Expand Down