Skip to content

Commit

Permalink
Document stream API
Browse files Browse the repository at this point in the history
  • Loading branch information
asvetlov committed Feb 14, 2015
1 parent 480d744 commit 1fdaca3
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 29 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ CHANGES

* Add zmq_type to tr.get_extra_info()

* Add zmq streams

0.5.2 (2014-10-09)
^^^^^^^^^^^^^^^^^^

Expand Down
2 changes: 1 addition & 1 deletion aiozmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
'create_zmq_connection',
'version_info', 'version')

__version__ = '0.5.3'
__version__ = '0.6.0'

version = __version__ + ' , Python ' + sys.version

Expand Down
34 changes: 14 additions & 20 deletions aiozmq/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ def create_zmq_stream(zmq_type, *, bind=None, connect=None,
except protocol_factory; most common are positional host and port,
with various optional keyword arguments following.
Additional optional keyword arguments are loop (to set the event loop
instance to use) and limit (to set the buffer limit passed to the
StreamReader).
Additional optional keyword arguments are loop (to set the event
loop instance to use) and high_read, low_read, high_write,
low_write -- high and low watermarks for reading and writing
respectively.
(If you want to customize the StreamReader and/or
StreamReaderProtocol classes, just copy the code -- there's
really nothing special here except some convenience.)
"""
if loop is None:
loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -109,12 +107,15 @@ def msg_received(self, msg):
class ZmqStream:
"""Wraps a ZmqTransport.
This exposes write(), getsockopt(), setsockopt(), connect(),
disconnect(), connections(), bind(), unbind(), bindings(),
subscribe(), unsubscribe(), subscriptions(), get_extra_info() and
close(). It adds drain() which returns an optional Future on
which you can wait for flow control. It also adds a transport
property which references the ZmqTransport directly.
Has write() method and read() coroutine for writing and reading
ZMQ messages.
It adds drain() coroutine which can be used for waiting for flow
control.
It also adds a transport property which references the
ZmqTransport directly.
"""

def __init__(self, loop, *, high=None, low=None):
Expand Down Expand Up @@ -144,19 +145,12 @@ def get_extra_info(self, name, default=None):

@asyncio.coroutine
def drain(self):
"""This method has an unusual return value.
"""Flush the write buffer.
The intended use is to write
w.write(data)
yield from w.drain()
When there's nothing to wait for, drain() returns (), and the
yield-from continues immediately. When the transport buffer
is full (the protocol is paused), drain() creates and returns
a Future and the yield-from will block until that Future is
completed, which will happen when the buffer is (partially)
drained and the protocol is resumed.
"""
if self._exception is not None:
raise self._exception
Expand Down
6 changes: 3 additions & 3 deletions docs/core.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.. _aiozmq-core:

:mod:`aiozmq` --- Core API
==========================
Core API
========


.. module:: aiozmq
Expand Down Expand Up @@ -66,7 +66,7 @@ create_zmq_connection


ZmqTransport
---------------------
------------

.. class:: ZmqTransport

Expand Down
7 changes: 7 additions & 0 deletions docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ Simple DEALER-ROUTER pair implemented on Core level

.. literalinclude:: ../examples/core_dealer_router.py

.. _aiozmq-examples-stream-dealer-router:

DEALER-ROUTER pair implemented with streams
-------------------------------------------

.. literalinclude:: ../examples/stream_dealer_router.py

.. _aiozmq-examples-rpc-rpc:

Remote Procedure Call
Expand Down
36 changes: 34 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,36 @@ Feel free to improve this package and send a pull request to GitHub_.
Getting Started
---------------

Low-level request-reply example::

import asyncio
import aiozmq
import zmq

@asyncio.coroutine
def go():
router = yield from aiozmq.create_zmq_stream(
zmq.ROUTER,
bind='tcp://127.0.0.1:*')

addr = list(router.transport.bindings())[0]
dealer = yield from aiozmq.create_zmq_stream(
zmq.DEALER,
connect=addr)

for i in range(10):
msg = (b'data', b'ask', str(i).encode('utf-8'))
dealer.write(msg)
data = yield from router.read()
router.write(data)
answer = yield from dealer.read()
print(answer)
dealer.close()
router.close()

asyncio.get_event_loop().run_until_complete(go())


Example of RPC usage::

import aiozmq.rpc
Expand All @@ -117,9 +147,10 @@ Example of RPC usage::

asyncio.get_event_loop().run_until_complete(go())

.. note:: To execute the example you need to :ref:`install
.. note:: To execute the last example you need to :ref:`install
msgpack<aiozmq-install-msgpack>` first.


Indices and tables
==================

Expand All @@ -129,7 +160,8 @@ Indices and tables

.. toctree::

core
stream
rpc
core
examples
glossary
6 changes: 3 additions & 3 deletions docs/rpc.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
.. _aiozmq-rpc:

==============================================
:mod:`aiozmq.rpc` --- Remote Procedure Calls
==============================================
======================
Remote Procedure Calls
======================

.. module:: aiozmq.rpc
:synopsis: RPC for ZeroMQ transports
Expand Down
195 changes: 195 additions & 0 deletions docs/stream.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
.. _aiozmq-stream:

Streams API
===========================

.. currentmodule:: aiozmq

.. versionadded:: 0.6

While *aiozmq* library is built on top of low-level
:class:`ZmqTransport` and :class:`ZmqProtocol` API it provides a more
convinient way also.

Please take a look on example::

import asyncio
import aiozmq
import zmq

@asyncio.coroutine
def go():
router = yield from aiozmq.create_zmq_stream(
zmq.ROUTER,
bind='tcp://127.0.0.1:*')

addr = list(router.transport.bindings())[0]
dealer = yield from aiozmq.create_zmq_stream(
zmq.DEALER,
connect=addr)

for i in range(10):
msg = (b'data', b'ask', str(i).encode('utf-8'))
dealer.write(msg)
data = yield from router.read()
router.write(data)
answer = yield from dealer.read()
print(answer)
dealer.close()
router.close()

asyncio.get_event_loop().run_until_complete(go())

The code creates two streams for request and response part of
:term:`ZeroMQ` connection and sends message through the wire with
waiting for response.

create_zmq_stream
-----------------

.. function:: create_zmq_stream(zmq_type, *, bind=None, connect=None, \
loop=None, zmq_sock=None, \
high_read=None, low_read=None, \
high_write=None, low_write=None)

A wrapper for :func:`create_zmq_connection` returning a ZeroMQ
stream (:class:`ZmqStream` instance).

The arguments are all the usual arguments to
:func:`create_zmq_connection` plus high and low watermarks for
reading and writing messages.

This function is a :ref:`coroutine <coroutine>`.

:param int zmq_type: a type of :term:`ZeroMQ` socket
(*zmq.REQ*, *zmq.REP*, *zmq.PUB*, *zmq.SUB*, zmq.PAIR*,
*zmq.DEALER*, *zmq.ROUTER*, *zmq.PULL*, *zmq.PUSH*, etc.)

:param bind: endpoints specification.

Every :term:`endpoint` generates call to
:meth:`ZmqTransport.bind` for accepting connections from
specified endpoint.

Other side should use *connect* parameter to connect to this
transport.
:type bind: str or iterable of strings

:param connect: endpoints specification.

Every :term:`endpoint` generates call to
:meth:`ZmqTransport.connect` for connecting transport to
specified endpoint.

Other side should use bind parameter to wait for incoming
connections.
:type connect: str or iterable of strings

:param zmq.Socket zmq_sock: a preexisting zmq socket that
will be passed to returned
transport.

:param asyncio.AbstractEventLoop loop: optional event loop
instance, ``None`` for
default event loop.

:param int high_read: high-watermark for reading from
:term:`ZeroMQ` socket. ``None`` by default
(no limits).

:param int low_read: low-watermark for reading from
:term:`ZeroMQ` socket. ``None`` by default
(no limits).

:param int high_write: high-watermark for writing into
:term:`ZeroMQ` socket. ``None`` by default
(no limits).

:param int low_write: low-watermark for writing into
:term:`ZeroMQ` socket. ``None`` by default
(no limits).

:return: ZeroMQ stream object, :class:`ZmqStream` instance.


ZmqStream
---------

.. class:: ZmqStream

A class for sending and receiving :term:`ZeroMQ` messages.

.. attribute:: transport

:class:`ZmqTransport` instance, used for the stream.

.. method:: at_closing()

Return ``True`` if the buffer is empty and :meth:`feed_closing`
was called.

.. method:: close()

Close the stream and underlying :term:`ZeroMQ` socket.

.. method:: drain()

Wait until the write buffer of the underlying transport is flushed.

The intended use is to write::

w.write(data)
yield from w.drain()

When the transport buffer is full (the protocol is paused), block until
the buffer is (partially) drained and the protocol is resumed. When there
is nothing to wait for, the yield-from continues immediately.

This method is a :ref:`coroutine <coroutine>`.

.. method:: exception()

Get the stream exception.

.. method:: get_extra_info(name, default=None)

Return optional transport information: see
:meth:`asyncio.BaseTransport.get_extra_info`.

.. method:: read()

Read one :term:`ZeroMQ` message from the wire and return it.

.. method:: write(msg)

Writes message *msg* into :term:`ZeroMQ` socket.

:param msg: a sequence (:class:`tuple` or :class:`list`),
containing multipart message daata.

*Internal API*

.. method:: set_exception(exc)

Set the exception to *exc*. The exception may be retrieved by
:meth:`exception` call or raised by next :meth:`read`, *the
private method*.

.. method:: set_transport(transport)

Set the transport to *transport*, *the private method*.

.. method:: set_read_buffer_limits(high=None, low=None)

Set read buffer limits, *the private method*.

.. method:: feed_closing()

Feed the socket closing signal, *the private method*.

.. method:: feed_msg(msg)

Feed *msg* message to the stream's internal buffer. Any
operations waiting for the data will be resumed.

*The private method*.

0 comments on commit 1fdaca3

Please sign in to comment.