Skip to content

Latest commit

 

History

History
683 lines (442 loc) · 21.2 KB

core.rst

File metadata and controls

683 lines (442 loc) · 21.2 KB

Core API

aiozmq

aiozmq

create_zmq_connection

create_zmq_connection(protocol_factory, zmq_type, *, bind=None, connect=None, zmq_sock=None, loop=None)

Create a ZeroMQ connection.

This method is a coroutine <coroutine>.

If you don't use bind or connect params you can do it later by ZmqTransport.bind and ZmqTransport.connect calls.

param callable protocol_factory

a factory that instantiates ~ZmqProtocol object.

param int zmq_type

a type of ZeroMQ socket (zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB, zmq.PAIR*, zmq.DEALER, zmq.ROUTER, zmq.PULL, zmq.PUSH, etc.)

param bind

endpoints specification.

Every endpoint generates call to ZmqTransport.bind for accepting connections from specified endpoint.

Other side should use connect parameter to connect to this transport.

type bind

str or iterable of strings

param connect

endpoints specification.

Every endpoint generates call to ZmqTransport.connect for connecting transport to specified endpoint.

Other side should use bind parameter to wait for incoming connections.

type connect

str or iterable of strings

param zmq.Socket zmq_sock

a preexisting zmq socket that will be passed to returned transport.

param asyncio.AbstractEventLoop loop

optional event loop instance, None for default event loop.

return

a pair of (transport, protocol) where transport supports ~ZmqTransport interface.

rtype

tuple

0.5

ZmqTransport

Transport for ZeroMQ connections. Implements asyncio.BaseTransport interface.

End user should never create ~ZmqTransport objects directly, he gets it by yield from aiozmq.create_zmq_connection() call.

get_extra_info(key, default=None)

Return optional transport information if name is present otherwise return default.

ZmqTransport supports the only valid key: "zmq_socket". The value is zmq.Socket instance.

param str name

name of info record.

param default

default value

close()

Close the transport.

Buffered data will be flushed asynchronously. No more data will be received. After all buffered data is flushed, the protocol's ~ZmqProtocol.connection_lost method will (eventually) called with None as its argument.

write(data)

Write message to the transport.

param data

iterable to send as multipart message.

This does not block; it buffers the data and arranges for it to be sent out asynchronously.

abort()

Close the transport immediately.

Buffered data will be lost. No more data will be received. The protocol's ~ZmqProtocol.connection_lost method will (eventually) be called with None as it's argument.

getsockopt(option)

Get ZeroMQ socket option.

param int option

a constant like zmq.SUBSCRIBE, zmq.UNSUBSCRIBE, zmq.TYPE etc.

For list of available options please see: http://api.zeromq.org/master:zmq-getsockopt

return

option value

raise OSError

if call to ZeroMQ was unsuccessful.

setsockopt(option, value)

Set ZeroMQ socket option.

param int option

a constant like zmq.SUBSCRIBE, zmq.UNSUBSCRIBE, zmq.TYPE etc.

param value

a new option value, it's type depend of option name.

For list of available options please see: http://api.zeromq.org/master:zmq-setsockopt

get_write_buffer_limits()

Get the high- and low-water limits for write flow control. Return a tuple (low, high) where low and high are positive number of bytes.

Use set_write_buffer_limits to set the limits.

0.6

set_write_buffer_limits(high=None, low=None)

Set the high- and low-water limits for write flow control.

param high

high-water limit

type high

int or None

param low

low-water limit

type low

int or None

These two values control when to call the protocol's ~ZmqProtocol.pause_writing and ~ZmqProtocol.resume_writing() methods. If specified, the low-water limit must be less than or equal to the high-water limit. Neither value can be negative.

The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to a implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes ~ZmqProtocol.pause_writing to be called whenever the buffer becomes non-empty. Setting low to zero causes ~ZmqProtocol.resume_writing to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.

Use get_write_buffer_limits to get the limits.

get_write_buffer_size()

Return the current size of the write buffer.

pause_reading()

Pause the receiving end.

No data will be passed to the protocol's ZmqProtocol.msg_received method until ZmqTransport.resume_reading is called.

ZmqTransport.resume_reading method.

resume_reading()

Resume the receiving end.

Data received will once again be passed to the protocol's ZmqProtocol.msg_received method.

ZmqTransport.pause_reading method.

bind(endpoint)

Bind transpot to endpoint. See http://api.zeromq.org/master:zmq-bind for details.

This method is a coroutine <coroutine>.

param endpoint

a string in format transport://address as ZeroMQ requires.

return

bound endpoint, unwinding wildcards if needed.

rtype

str

raise OSError

on error from ZeroMQ layer

raise TypeError

if endpoint is not a str

unbind(endpoint)

Unbind transpot from endpoint.

This method is a coroutine <coroutine>.

param endpoint

a string in format transport://address as ZeroMQ requires.

return

None

raise OSError

on error from ZeroMQ layer

raise TypeError

if endpoint is not a str

bindings()

Return immutable set of endpoints <endpoint> bound to transport.

Note

Returned endpoints include only ones that has been bound via ZmqTransport.bind or create_zmq_connection calls and do not include bindings that have been done on zmq_sock before create_zmq_connection call.

connect(endpoint)

Connect transpot to endpoint. See http://api.zeromq.org/master:zmq-connect for details.

This method is a coroutine <coroutine>.

param str endpoint

a string in format transport://address as ZeroMQ requires.

For tcp connections the endpoint should specify IPv4 or IPv6 address, not DNS name. Use yield from get_event_loop().getaddrinfo(host, port) for translating DNS into IP address.

return

endpoint

rtype

str

raise ValueError

if the endpoint is a tcp DNS address.

raise OSError

on error from ZeroMQ layer

raise TypeError

if endpoint is not a str

disconnect(endpoint)

Disconnect transpot from endpoint.

This method is a coroutine <coroutine>.

param endpoint

a string in format transport://address as ZeroMQ requires.

return

None

raise OSError

on error from ZeroMQ layer

raise TypeError

if endpoint is not a str

connections()

Return immutable set of endpoints <endpoint> connected to transport.

Note

Returned endpoints include only ones that has been connected via ZmqTransport.connect or create_zmq_connection calls and do not include connections that have been done to zmq_sock before create_zmq_connection call.

subscribe(value)

Establish a new message filter on SUB transport.

Newly created SUB transports filters out all incoming messages, therefore you should call this method to establish an initial message filter.

An empty (b'') value subscribes to all incoming messages. A non-empty value subscribes to all messages beginning with the specified prefix. Multiple filters may be attached to a single SUB transport, in which case a message shall be accepted if it matches at least one filter.

param bytes value

a filter value to add to SUB filters.

raise NotImplementedError

the transport is not SUB.

raise TypeError

when value is not bytes.

Warning

Unlike to ZeroMQ socket level the call first check for value in ZmqTransport.subscriptions and does nothing if the transport already has been subscribed to the value.

unsubscribe(value)

Remove an existing message filter on a SUB transport.

The filter specified must match an existing filter previously established with the ZmqTransport.subscribe.

If the transport has several instances of the same filter attached the .unsubscribe() removes only one instance, leaving the rest in place and functional (if you use ZmqTransport.subscribe to adding new filters that never happens, see difference between aiozmq and ZeroMQ raw sockets <aiozmq-transport-subscribe-warning> for details).

param bytes value

a filter value to add to SUB filters.

raise NotImplementedError

the transport is not SUB.

raise TypeError

when value is not bytes.

subscriptions()

Return immutable set of subscriptions (set of bytes) subscribed on transport.

Note

Returned subscriptions include only ones that has been subscribed via ZmqTransport.subscribe call and do not include subscribtions that have been done to zmq_sock before create_zmq_connection call.

raise NotImplementedError

the transport is not SUB.

enable_monitor(events=None)

Enables socket events to be reported for this socket. Socket events are passed to the protocol's ZmqProtocol.event_received method.

The socket event monitor capability requires libzmq >= 4 and pyzmq >= 14.4.

This method is a coroutine.

param events

a bitmask of socket events to watch for. If no value is specified then all events will monitored (i.e. zmq.EVENT_ALL). For list of available events please see: http://api.zeromq.org/4-0:zmq-socket-monitor

raise NotImplementedError

if libzmq or pyzmq versions do not support socket monitoring.

0.7

disable_monitor()

Stop the socket event monitor.

This method is a coroutine.

0.7

ZmqProtocol

Protocol for ZeroMQ connections. Derives from asyncio.BaseProtocol.

connection_made(transport)

Called when a connection is made.

param ZmqTransport transport

representing the pipe connection. To receive data, wait for ~ZmqProtocol.msg_received calls. When the connection is closed, ~ZmqProtocol.connection_lost is called.

connection_lost(exc)

Called when the connection is lost or closed.

param exc

an exception object or None (the latter meaning the connection was aborted or closed).

type exc

instance of Exception or derived class

pause_writing()

Called when the transport's buffer goes over the high-water mark.

Pause and resume calls are paired --~ZmqProtocol.pause_writing is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventually ~ZmqProtocol.resume_writing is called once when the buffer size reaches the low-water mark.

Note that if the buffer size equals the high-water mark, ~ZmqProtocol.pause_writing is not called -- it must go strictly over. Conversely, ~ZmqProtocol.resume_writing is called when the buffer size is equal or lower than the low-water mark. These end conditions are important to ensure that things go as expected when either mark is zero.

Note

This is the only Protocol callback that is not called through asyncio.AbstractEventLoop.call_soon -- if it were, it would have no effect when it's most needed (when the app keeps writing without yielding until ~ZmqProtocol.pause_writing is called).

resume_writing()

Called when the transport's buffer drains below the low-water mark.

See ~ZmqProtocol.pause_writing for details.

msg_received(data)

Called when some ZeroMQ message is received.

param list data

the multipart list of bytes with at least one item.

event_received(event)

Called when a ZeroMQ socket event is received.

This method is only called when a socket monitor is enabled.

param event

a SocketEvent namedtuple containing 3 items: event, value, and endpoint.

type event

namedtuple

0.7

Exception policy

Every call to zmq.Socket method can raise zmq.ZMQError exception. But all methods of ZmqEventLoop and ZmqTransport translate ZMQError into OSError (or descendat) with errno and strerror borrowed from underlying ZMQError values.

The reason for translation is that Python 3.3 implements 3151 --- Reworking the OS and IO Exception Hierarchy which gets rid of exceptions zoo and uses OSError and descendants for all exceptions generated by system function calls.

aiozmq implements the same pattern. Internally it looks like:

try:
    return self._zmq_sock.getsockopt(option)
except zmq.ZMQError as exc:
    raise OSError(exc.errno, exc.strerror)

Also public methods of aiozmq will never raise InterruptedError (aka EINTR), they process interruption internally.

Getting aiozmq version

version

a text version of the library:

'0.1.0 , Python 3.3.2+ (default, Feb 28 2014, 00:52:16) \n[GCC 4.8.1]'

version_info

a named tuple with version information, useful for comparison:

VersionInfo(major=0, minor=1, micro=0, releaselevel='alpha', serial=0)

The Python itself uses the same schema (sys.version_info).

Installing ZeroMQ event loop

0.5

aiozmq works with any asyncio event loop, it doesn't require dedicated event loop policy.

To use ZeroMQ layer you may install proper event loop first.

The recommended way is to setup global event loop policy:

import asyncio
import aiozmq

asyncio.set_event_loop_policy(aiozmq.ZmqEventLoopPolicy())

That installs ZmqEventLoopPolicy globally. After installing you can get event loop instance from main thread by asyncio.get_event_loop call:

loop = asyncio.get_event_loop()

If you need to execute event loop in your own (not main) thread you have to set it up first:

import threading

def thread_func():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop()

    loop.run_forever()

thread = threading.Thread(target=thread_func)
thread.start()

ZmqEventLoopPolicy

0.5

aiozmq works with any asyncio event loop, it doesn't require dedicated event loop policy.

ZeroMQ policy implementation for accessing the event loop.

In this policy, each thread has its own event loop. However, we only automatically create an event loop by default for the main thread; other threads by default have no event loop.

ZmqEventLoopPolicy implements an asyncio.AbstractEventLoopPolicy interface.

Create policy for ZeroMQ event loops.

Note

policy should be installed, see install-aiozmq-policy.

get_event_loop()

Get the event loop.

If current thread is the main thread and there are no registered event loop for current thread then the call creates new event loop and registers it.

return

Return an instance of ZmqEventLoop.

raise RuntimeError

if there is no registered event loop for current thread.

new_event_loop()

Create a new event loop.

You must call ZmqEventLoopPolicy.set_event_loop to make this the current event loop.

set_event_loop(loop)

Set the event loop.

As a side effect, if a child watcher was set before, then calling .set_event_loop() from the main thread will call asyncio.AbstractChildWatcher.attach_loop on the child watcher.

param loop

an asyncio.AbstractEventLoop instance or None

raise TypeError

if loop is not instance of asyncio.AbstractEventLoop

get_child_watcher()

Get the child watcher

If not yet set, a asyncio.SafeChildWatcher object is automatically created.

return

Return an instance of asyncio.AbstractChildWatcher.

set_child_watcher(watcher)

Set the child watcher.

param watcher

an asyncio.AbstractChildWatcher instance or None

raise TypeError

if watcher is not instance of asyncio.AbstractChildWatcher

ZmqEventLoop

0.5

aiozmq works with any asyncio event loop, it doesn't require dedicated event loop object.

Event loop with ZeroMQ support.

Follows asyncio.AbstractEventLoop specification and has ~ZmqEventLoop.create_zmq_connection method for ZeroMQ sockets layer.

param zmq.Context zmq_context

explicit context to use for ZeroMQ socket creation inside ZmqEventLoop.create_zmq_connection calls. aiozmq shares global context returned by zmq.Context.instance call if zmq_context parameter is None.

create_zmq_connection(protocol_factory, zmq_type, *, bind=None, connect=None, zmq_sock=None)

Create a ZeroMQ connection.

If you don't use bind or connect params you can do it later by ZmqTransport.bind and ZmqTransport.connect calls.

param callable protocol_factory

a factory that instantiates ~ZmqProtocol object.

param int zmq_type

a type of ZeroMQ socket (zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB, zmq.PAIR*, zmq.DEALER, zmq.ROUTER, zmq.PULL, zmq.PUSH, etc.)

param bind

endpoints specification.

Every endpoint generates call to ZmqTransport.bind for accepting connections from specified endpoint.

Other side should use connect parameter to connect to this transport.

type bind

str or iterable of strings

param connect

endpoints specification.

Every endpoint generates call to ZmqTransport.connect for connecting transport to specified endpoint.

Other side should use bind parameter to wait for incoming connections.

type connect

str or iterable of strings

param zmq.Socket zmq_sock

a preexisting zmq socket that will be passed to returned transport.

return

a pair of (transport, protocol) where transport supports ~ZmqTransport interface.

rtype

tuple