Skip to content

Commit

Permalink
Merge pull request #50 from claws/add_events
Browse files Browse the repository at this point in the history
Add socket event monitor capability
  • Loading branch information
asvetlov committed Jul 1, 2015
2 parents 89892a0 + 5b6ae3c commit 4f48376
Show file tree
Hide file tree
Showing 8 changed files with 593 additions and 7 deletions.
89 changes: 87 additions & 2 deletions aiozmq/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
import asyncio.events
import errno
import re
import struct
import sys
import threading
import weakref
import zmq

from collections import deque, Iterable
from collections import deque, Iterable, namedtuple
from ipaddress import ip_address

from .interface import ZmqTransport
from .interface import ZmqTransport, ZmqProtocol
from .log import logger
from .selector import ZmqSelector
from .util import _EndpointsSet
Expand All @@ -25,6 +26,9 @@
__all__ = ['ZmqEventLoop', 'ZmqEventLoopPolicy', 'create_zmq_connection']


SocketEvent = namedtuple('SocketEvent', 'event value endpoint')


@asyncio.coroutine
def create_zmq_connection(protocol_factory, zmq_type, *,
bind=None, connect=None, zmq_sock=None, loop=None):
Expand Down Expand Up @@ -193,6 +197,42 @@ def create_zmq_connection(self, protocol_factory, zmq_type, *,
raise


class _ZmqEventProtocol(ZmqProtocol):
"""This protocol is used internally by aiozmq to receive messages
from a socket event monitor socket. This protocol decodes each event
message into a namedtuple and then passes them through to the
protocol running the socket that is being monitored via the
ZmqProtocol.event_received method.
This design simplifies the API visible to the developer at the cost
of adding some internal complexity - a hidden protocol that transfers
events from the monitor protocol to the monitored socket's protocol.
"""

def __init__(self, loop, main_protocol):
self._protocol = main_protocol
self.wait_ready = asyncio.Future(loop=loop)
self.wait_closed = asyncio.Future(loop=loop)

def connection_made(self, transport):
self.transport = transport
self.wait_ready.set_result(True)

def connection_lost(self, exc):
self.wait_closed.set_result(exc)

def msg_received(self, data):
if len(data) != 2 or len(data[0]) != 6:
raise RuntimeError(
"Invalid event message format: {}".format(data))
event, value = struct.unpack("=hi", data[0])
endpoint = data[1].decode()
self.event_received(SocketEvent(event, value, endpoint))

def event_received(self, evt):
self._protocol.event_received(evt)


class _BaseTransport(ZmqTransport):

_TCP_RE = re.compile('^tcp://(.+):(\d+)|\*$')
Expand Down Expand Up @@ -221,6 +261,7 @@ def __init__(self, loop, zmq_type, zmq_sock, protocol):
self._subscriptions = set()
self._paused = False
self._conn_lost = 0
self._monitor = None

def __repr__(self):
info = ['ZmqTransport',
Expand Down Expand Up @@ -492,6 +533,42 @@ def subscriptions(self):
raise NotImplementedError("Not supported ZMQ socket type")
return _EndpointsSet(self._subscriptions)

@asyncio.coroutine
def enable_monitor(self, events=None):

# The standard approach of binding and then connecting does not
# work in this specific case. The event loop does not properly
# detect messages on the inproc transport which means that event
# messages get missed.
# pyzmq's 'get_monitor_socket' method can't be used because this
# performs the actions in the wrong order for use with an event
# loop.
# For more information on this issue see:
# https://github.com/mkoppanen/php-zmq/issues/130

if (zmq.zmq_version_info() < (4,) or
zmq.pyzmq_version_info() < (14, 4,)):
raise NotImplementedError(
"Socket monitor requires libzmq >= 4 and pyzmq >= 14.4, "
"have libzmq:{}, pyzmq:{}".format(
zmq.zmq_version(), zmq.pyzmq_version()))

if self._monitor is None:
addr = "inproc://monitor.s-{}".format(self._zmq_sock.FD)
events = events or zmq.EVENT_ALL
_, self._monitor = yield from create_zmq_connection(
lambda: _ZmqEventProtocol(self._loop, self._protocol),
zmq.PAIR, connect=addr, loop=self._loop)
# bind must come after connect
self._zmq_sock.monitor(addr, events)
yield from self._monitor.wait_ready

def disable_monitor(self):
if self._monitor:
self._zmq_sock.disable_monitor()
self._monitor.transport.close()
self._monitor = None


class _ZmqTransportImpl(_BaseTransport):

Expand Down Expand Up @@ -558,6 +635,8 @@ def close(self):
if self._closing:
return
self._closing = True
if self._monitor:
self.disable_monitor()
if not self._paused:
self._loop.remove_reader(self._zmq_sock)
if not self._buffer:
Expand All @@ -567,6 +646,8 @@ def close(self):
def _force_close(self, exc):
if self._conn_lost:
return
if self._monitor:
self.disable_monitor()
if self._buffer:
self._buffer.clear()
self._buffer_size = 0
Expand Down Expand Up @@ -684,6 +765,8 @@ def close(self):
if self._closing:
return
self._closing = True
if self._monitor:
self.disable_monitor()
if not self._buffer:
self._conn_lost += 1
if not self._paused:
Expand All @@ -693,6 +776,8 @@ def close(self):
def _force_close(self, exc):
if self._conn_lost:
return
if self._monitor:
self.disable_monitor()
if self._buffer:
self._buffer.clear()
self._buffer_size = 0
Expand Down
39 changes: 39 additions & 0 deletions aiozmq/interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@

import asyncio
from asyncio import BaseProtocol, BaseTransport


__all__ = ['ZmqTransport', 'ZmqProtocol']


Expand Down Expand Up @@ -185,6 +188,33 @@ def subscriptions(self):
"""
raise NotImplementedError

@asyncio.coroutine
def enable_monitor(self, events=None):
"""Enables socket events to be reported for this socket.
Socket events are passed to the protocol's ZmqProtocol's
event_received method.
This method is a coroutine.
The socket event monitor capability requires libzmq >= 4 and
pyzmq >= 14.4.
events is a bitmask (e.g zmq.EVENT_CONNECTED) defining the events
to monitor. Default is all events (i.e. zmq.EVENT_ALL).
For list of available events please see:
http://api.zeromq.org/4-0:zmq-socket-monitor
Raise NotImplementedError if libzmq or pyzmq versions do not support
socket monitoring.
"""
raise NotImplementedError

def disable_monitor(self):
"""Stop the socket event monitor.
"""
raise NotImplementedError


class ZmqProtocol(BaseProtocol):
"""Interface for ZeroMQ protocol."""
Expand All @@ -194,3 +224,12 @@ def msg_received(self, data):
data is the multipart tuple of bytes with at least one item.
"""

def event_received(self, event):
"""Called when a ZeroMQ socket event is received.
This method is only called when a socket monitor is enabled.
:param event: A namedtuple containing 3 items `event`, `value`, and
`endpoint`.
"""
31 changes: 31 additions & 0 deletions docs/core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,28 @@ ZmqTransport

:raise NotImplementedError: the transport is not *SUB*.

.. method:: enable_monitor(events=None):

Enables socket events to be reported for this socket. Socket events are
passed to the protocol's :meth:`ZmqProtocol.event_received` method.

The socket event monitor capability requires ``libzmq >= 4`` and
``pyzmq >= 14.4``.

This method is a coroutine.

:param events: a bitmask of socket events to watch for. If no value is
specified then all events will monitored (i.e. zmq.EVENT_ALL). For list
of available events please see:
http://api.zeromq.org/4-0:zmq-socket-monitor

:raise NotImplementedError: if libzmq or pyzmq versions do not support
socket monitoring.

.. method:: disable_monitor():

Stop the socket event monitor.



ZmqProtocol
Expand Down Expand Up @@ -418,6 +440,15 @@ ZmqProtocol

:param list data: the multipart list of bytes with at least one item.

.. method:: event_received(event)

Called when a ZeroMQ socket event is received.

This method is only called when a socket monitor is enabled.

:param event: a SocketEvent namedtuple containing 3 items:
`event`, `value`, and `endpoint`.
:type event: :class:`namedtuple`

Exception policy
----------------
Expand Down
8 changes: 8 additions & 0 deletions docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,11 @@ Use dynamic RPC lookup
----------------------

.. literalinclude:: ../examples/rpc_dynamic.py


.. _aiozmq-examples-socket-event-monitor:

Socket event monitor
--------------------

.. literalinclude:: ../examples/socket_event_monitor.py
120 changes: 120 additions & 0 deletions examples/socket_event_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@

'''
This example demonstrates how to use the ZMQ socket monitor to receive
socket events.
The socket event monitor capability requires libzmq >= 4 and pyzmq >= 14.4.
'''

import asyncio
import aiozmq
import zmq


ZMQ_EVENTS = {
getattr(zmq, name): name.replace('EVENT_', '').lower().replace('_', ' ')
for name in [i for i in dir(zmq) if i.startswith('EVENT_')]}


def event_description(event):
''' Return a human readable description of the event '''
return ZMQ_EVENTS.get(event, 'unknown')


class Protocol(aiozmq.ZmqProtocol):

def __init__(self):
self.wait_ready = asyncio.Future()
self.wait_done = asyncio.Future()
self.wait_closed = asyncio.Future()
self.count = 0

def connection_made(self, transport):
self.transport = transport
self.wait_ready.set_result(True)

def connection_lost(self, exc):
self.wait_closed.set_result(exc)

def msg_received(self, data):
# This protocol is used by both the Router and Dealer sockets in
# this example. Router sockets prefix messages with the identity
# of the sender and hence contain two frames in this simple test
# protocol.
if len(data) == 2:
identity, msg = data
assert msg == b'Hello'
self.transport.write([identity, b'World'])
else:
msg = data[0]
assert msg == b'World'
self.count += 1
if self.count >= 4:
self.wait_done.set_result(True)

def event_received(self, event):
print(
'event:{}, value:{}, endpoint:{}, description:{}'.format(
event.event, event.value, event.endpoint,
event_description(event.event)))


@asyncio.coroutine
def go():

st, sp = yield from aiozmq.create_zmq_connection(
Protocol, zmq.ROUTER, bind='tcp://127.0.0.1:*')
yield from sp.wait_ready
addr = list(st.bindings())[0]

ct, cp = yield from aiozmq.create_zmq_connection(
Protocol, zmq.DEALER, connect=addr)
yield from cp.wait_ready

# Enable the socket monitor on the client socket. Socket events
# are passed to the 'event_received' method on the client protocol.
yield from ct.enable_monitor()

# Trigger some socket events while also sending a message to the
# server. When the client protocol receives 4 response it will
# fire the wait_done future.
for i in range(4):
yield from asyncio.sleep(0.1)
yield from ct.disconnect(addr)
yield from asyncio.sleep(0.1)
yield from ct.connect(addr)
yield from asyncio.sleep(0.1)
ct.write([b'Hello'])

yield from cp.wait_done

# The socket monitor can be explicitly disabled if necessary.
# ct.disable_monitor()

# If a socket monitor is left enabled on a socket being closed,
# the socket monitor will be closed automatically.
ct.close()
yield from cp.wait_closed

st.close()
yield from sp.wait_closed


def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")


if __name__ == '__main__':
# import logging
# logging.basicConfig(level=logging.DEBUG)

if (zmq.zmq_version_info() < (4,) or
zmq.pyzmq_version_info() < (14, 4,)):
raise NotImplementedError(
"Socket monitor requires libzmq >= 4 and pyzmq >= 14.4, "
"have libzmq:{}, pyzmq:{}".format(
zmq.zmq_version(), zmq.pyzmq_version()))

main()

0 comments on commit 4f48376

Please sign in to comment.