Skip to content

Commit

Permalink
Merge e308a7e into 5e97e11
Browse files Browse the repository at this point in the history
  • Loading branch information
claws committed Jun 29, 2015
2 parents 5e97e11 + e308a7e commit ec66381
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 1 deletion.
82 changes: 81 additions & 1 deletion aiozmq/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from collections import deque, Iterable
from ipaddress import ip_address

from .interface import ZmqTransport
from zmq.utils.monitor import parse_monitor_message

from .interface import ZmqTransport, ZmqProtocol
from .log import logger
from .selector import ZmqSelector
from .util import _EndpointsSet
Expand Down Expand Up @@ -193,6 +195,39 @@ def create_zmq_connection(self, protocol_factory, zmq_type, *,
raise


class _ZmqEventProtocol(ZmqProtocol):
"""This protocol is used internally by aiozmq to receive messages
from a socket event monitor socket. This protocol decodes each event
message into a useful structure and then passes them through to the
protocol running the socket that is being monitored via the
ZmqProtocol.event_received method.
This design simplifies the API visible to the developer at the cost
of adding some internal complexity - a hidden protocol that transfers
events from the monitor protocol to the monitored socket's protocol.
"""

def __init__(self, loop, main_protocol):
self._protocol = main_protocol
self.wait_ready = asyncio.Future(loop=loop)
self.wait_closed = asyncio.Future(loop=loop)

def connection_made(self, transport):
self.transport = transport
self.wait_ready.set_result(True)

def connection_lost(self, exc):
self.wait_closed.set_result(exc)

def msg_received(self, data):
evt = parse_monitor_message(data)
evt.update({'endpoint': evt['endpoint'].decode()})
self.event_received(evt)

def event_received(self, evt):
self._protocol.event_received(evt)


class _BaseTransport(ZmqTransport):

_TCP_RE = re.compile('^tcp://(.+):(\d+)|\*$')
Expand Down Expand Up @@ -221,6 +256,7 @@ def __init__(self, loop, zmq_type, zmq_sock, protocol):
self._subscriptions = set()
self._paused = False
self._conn_lost = 0
self._monitor = None

def __repr__(self):
info = ['ZmqTransport',
Expand Down Expand Up @@ -492,6 +528,42 @@ def subscriptions(self):
raise NotImplementedError("Not supported ZMQ socket type")
return _EndpointsSet(self._subscriptions)

@asyncio.coroutine
def enable_monitor(self, events=None):

# The standard approach of binding and then connecting does not
# work in this specific case. The event loop does not properly
# detect messages on the inproc transport which means that event
# messages get missed.
# pyzmq's 'get_monitor_socket' method can't be used because this
# performs the actions in the wrong order for use with an event
# loop.
# For more information on this issue see:
# https://github.com/mkoppanen/php-zmq/issues/130

if (zmq.zmq_version_info() < (4,) or
zmq.pyzmq_version_info() < (14, 4,)):
raise NotImplementedError(
"Socket monitor requires libzmq >= 4 and pyzmq >= 14.4, "
"have libzmq:{}, pyzmq:{}".format(
zmq.zmq_version(), zmq.pyzmq_version()))

if self._monitor is None:
addr = "inproc://monitor.s-{}".format(self._zmq_sock.FD)
events = events or zmq.EVENT_ALL
_, self._monitor = yield from create_zmq_connection(
lambda: _ZmqEventProtocol(self._loop, self._protocol),
zmq.PAIR, connect=addr, loop=self._loop)
# bind must come after connect
self._zmq_sock.monitor(addr, events)
yield from self._monitor.wait_ready

def disable_monitor(self):
if self._monitor:
self._zmq_sock.disable_monitor()
self._monitor.transport.close()
self._monitor = None


class _ZmqTransportImpl(_BaseTransport):

Expand Down Expand Up @@ -558,6 +630,8 @@ def close(self):
if self._closing:
return
self._closing = True
if self._monitor:
self.disable_monitor()
if not self._paused:
self._loop.remove_reader(self._zmq_sock)
if not self._buffer:
Expand All @@ -567,6 +641,8 @@ def close(self):
def _force_close(self, exc):
if self._conn_lost:
return
if self._monitor:
self.disable_monitor()
if self._buffer:
self._buffer.clear()
self._buffer_size = 0
Expand Down Expand Up @@ -684,6 +760,8 @@ def close(self):
if self._closing:
return
self._closing = True
if self._monitor:
self.disable_monitor()
if not self._buffer:
self._conn_lost += 1
if not self._paused:
Expand All @@ -693,6 +771,8 @@ def close(self):
def _force_close(self, exc):
if self._conn_lost:
return
if self._monitor:
self.disable_monitor()
if self._buffer:
self._buffer.clear()
self._buffer_size = 0
Expand Down
39 changes: 39 additions & 0 deletions aiozmq/interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@

import asyncio
from asyncio import BaseProtocol, BaseTransport


__all__ = ['ZmqTransport', 'ZmqProtocol']


Expand Down Expand Up @@ -185,6 +188,33 @@ def subscriptions(self):
"""
raise NotImplementedError

@asyncio.coroutine
def enable_monitor(self, events=None):
"""Enables socket monitor events to be reported for this socket.
Socket event messages are sent to the ZmqProtocol's event_received
method.
This is a coroutine.
The socket event monitor capability requires libzmq >= 4 and
pyzmq >= 14.4.
events is a bitmask (e.g zmq.EVENT_CONNECTED) defining the events
to monitor. Default is all events (i.e. zmq.EVENT_ALL).
For list of available events please see:
http://api.zeromq.org/4-0:zmq-socket-monitor
Raise NotImplementedError if libzmq or pyzmq versions do no support
socket monitoring.
"""
raise NotImplementedError

def disable_monitor(self):
"""Stop the socket event monitor.
"""
raise NotImplementedError


class ZmqProtocol(BaseProtocol):
"""Interface for ZeroMQ protocol."""
Expand All @@ -194,3 +224,12 @@ def msg_received(self, data):
data is the multipart tuple of bytes with at least one item.
"""

def event_received(self, event):
"""Called when a ZeroMQ socket event is received.
This method is only called when a socket monitor is enabled.
:param event: A dict containing the event description keys `event`,
`value`, and `endpoint`.
"""
118 changes: 118 additions & 0 deletions examples/socket_event_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@

'''
This example demonstrates how to use the ZMQ socket monitor to receive
socket events.
The socket event monitor capability requires libzmq >= 4 and pyzmq >= 14.4.
'''

import asyncio
import aiozmq
import zmq


ZMQ_EVENTS = {
getattr(zmq, name): name.replace('EVENT_', '').lower().replace('_', ' ')
for name in [i for i in dir(zmq) if i.startswith('EVENT_')]}


def event_description(event):
''' Return a human readable description of the event '''
return ZMQ_EVENTS.get(event, 'unknown')


class Protocol(aiozmq.ZmqProtocol):

def __init__(self):
self.wait_ready = asyncio.Future()
self.wait_done = asyncio.Future()
self.wait_closed = asyncio.Future()
self.count = 0

def connection_made(self, transport):
self.transport = transport
self.wait_ready.set_result(True)

def connection_lost(self, exc):
self.wait_closed.set_result(exc)

def msg_received(self, data):
# This protocol is used by both the Router and Dealer sockets in
# this example. Router sockets prefix messages with the identity
# of the sender and hence contain two frames in this simple test
# protocol.
if len(data) == 2:
identity, msg = data
assert msg == b'Hello'
self.transport.write([identity, b'World'])
else:
msg = data[0]
assert msg == b'World'
self.count += 1
if self.count >= 4:
self.wait_done.set_result(True)

def event_received(self, event):
event['description'] = event_description(event['event'])
print(event)


@asyncio.coroutine
def go():

st, sp = yield from aiozmq.create_zmq_connection(
Protocol, zmq.ROUTER, bind='tcp://127.0.0.1:*')
yield from sp.wait_ready
addr = list(st.bindings())[0]

ct, cp = yield from aiozmq.create_zmq_connection(
Protocol, zmq.DEALER, connect=addr)
yield from cp.wait_ready

# Enable the socket monitor on the client socket. Socket events
# are passed to the 'event_received' method on the client protocol.
yield from ct.enable_monitor()

# Trigger some socket events while also sending a message to the
# server. When the client protocol receives 4 response it will
# fire the wait_done future.
for i in range(4):
yield from asyncio.sleep(0.1)
yield from ct.disconnect(addr)
yield from asyncio.sleep(0.1)
yield from ct.connect(addr)
yield from asyncio.sleep(0.1)
ct.write([b'Hello'])

yield from cp.wait_done

# The socket monitor can be explicitly disabled if necessary.
# ct.disable_monitor()

# If a socket monitor is left enabled on a socket being closed,
# the socket monitor will be closed automatically.
ct.close()
yield from cp.wait_closed

st.close()
yield from sp.wait_closed


def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")


if __name__ == '__main__':
# import logging
# logging.basicConfig(level=logging.DEBUG)

if (zmq.zmq_version_info() < (4,) or
zmq.pyzmq_version_info() < (14, 4,)):
raise NotImplementedError(
"Socket monitor requires libzmq >= 4 and pyzmq >= 14.4, "
"have libzmq:{}, pyzmq:{}".format(
zmq.zmq_version(), zmq.pyzmq_version()))

main()
Loading

0 comments on commit ec66381

Please sign in to comment.