diff --git a/circuits/protocols/stomp/__init__.py b/circuits/protocols/stomp/__init__.py new file mode 100644 index 000000000..952a9c17f --- /dev/null +++ b/circuits/protocols/stomp/__init__.py @@ -0,0 +1,8 @@ +"""STOMP Protocol + +This package contains a component implementing the STOMP Client protocol. +This can be used with products such as ActiveMQ, RabbitMQ, etc +""" +from .client import StompClient + +# pylama:skip=1 diff --git a/circuits/protocols/stomp/client.py b/circuits/protocols/stomp/client.py new file mode 100755 index 000000000..9ff714f27 --- /dev/null +++ b/circuits/protocols/stomp/client.py @@ -0,0 +1,289 @@ +# -*- coding: utf-8 -*- +""" Circuits component for handling Stomp Connection """ + +import logging +import ssl +import time +import traceback +from circuits import BaseComponent, Timer +from circuits.core.handlers import handler +try: + from stompest.config import StompConfig + from stompest.protocol import StompSpec, StompSession + from stompest.sync import Stomp + from stompest.error import StompConnectionError, StompError + from stompest.sync.client import LOG_CATEGORY +except ImportError: + raise ImportError("No stomp support available. Is stompest installed?") + +from circuits.protocols.stomp.events import (connect, connected, on_stomp_error, disconnect, disconnected, + connection_failed, server_heartbeat, client_heartbeat, send, + message, subscribe, unsubscribe) +from circuits.protocols.stomp.transport import EnhancedStompFrameTransport + +StompSpec.DEFAULT_VERSION = '1.2' +ACK_CLIENT_INDIVIDUAL = StompSpec.ACK_CLIENT_INDIVIDUAL +ACK_AUTO = StompSpec.ACK_AUTO +ACK_CLIENT = StompSpec.ACK_CLIENT +ACK_MODES = (ACK_CLIENT_INDIVIDUAL, ACK_AUTO, ACK_CLIENT) + +LOG = logging.getLogger(__name__) + + +class StompClient(BaseComponent): + """ Send and Receive messages from a STOMP queue """ + channel = "stomp" + def init(self, host, port, username=None, password=None, + connect_timeout=3, connected_timeout=3, + version=StompSpec.VERSION_1_2, accept_versions=["1.0", "1.1", "1.2"], + heartbeats=(0, 0), ssl_context=None, + use_ssl=True, + key_file=None, + cert_file=None, + ca_certs=None, + ssl_version=ssl.PROTOCOL_SSLv23, + key_file_password=None, + proxy_host=None, + proxy_port=None, + proxy_user=None, + proxy_password=None, + channel=channel): + """ Initialize StompClient. Called after __init__ """ + self.channel = channel + if proxy_host: + LOG.info("Connect to %s:%s through proxy %s:%d", host, port, proxy_host, proxy_port) + else: + LOG.info("Connect to %s:%s", host, port) + + if use_ssl and not ssl_context: + + ssl_params = dict(key_file=key_file, + cert_file=cert_file, + ca_certs=ca_certs, + ssl_version=ssl_version, + password=key_file_password) + LOG.info("Request to use old-style socket wrapper: %s", ssl_params) + ssl_context = ssl_params + + if use_ssl: + uri = "ssl://%s:%s" % (host, port) + else: + uri = "tcp://%s:%s" % (host, port) + + # Configure failover options so it only tries to connect once + self._stomp_server = "failover:(%s)?maxReconnectAttempts=1,startupMaxReconnectAttempts=1" % uri + + self._stomp_config = StompConfig(uri=self._stomp_server, sslContext=ssl_context, + version=version, + login=username, + passcode=password) + + self._heartbeats = heartbeats + self._accept_versions = accept_versions + self._connect_timeout = connect_timeout + self._connected_timeout = connected_timeout + Stomp._transportFactory = EnhancedStompFrameTransport + Stomp._transportFactory.proxy_host = proxy_host + Stomp._transportFactory.proxy_port = proxy_port + Stomp._transportFactory.proxy_user = proxy_user + Stomp._transportFactory.proxy_password = proxy_password + self._client = Stomp(self._stomp_config) + self._subscribed = {} + self.server_heartbeat = None + self.client_heartbeat = None + self.ALLOWANCE = 2 # multiplier for heartbeat timeouts + + @property + def connected(self): + if self._client.session: + return self._client.session.state == StompSession.CONNECTED + else: + return False + + @property + def subscribed(self): + return self._subscribed.keys() + + @property + def stomp_logger(self): + return LOG_CATEGORY + + @handler("disconnect") + def _disconnect(self, receipt=None): + if self.connected: + self._client.disconnect(receipt=receipt) + self._client.close(flush=True) + self.fire(disconnected(reconnect=False)) + self._subscribed = {} + return "disconnected" + + def start_heartbeats(self): + LOG.info("Client HB: %s Server HB: %s", self._client.clientHeartBeat, self._client.serverHeartBeat) + if self._client.clientHeartBeat: + if self.client_heartbeat: + # Timer already exists, just reset it + self.client_heartbeat.reset() + else: + LOG.info("Client will send heartbeats to server") + # Send heartbeats at 80% of agreed rate + self.client_heartbeat = Timer((self._client.clientHeartBeat / 1000.0) * 0.8, + client_heartbeat(), persist=True) + self.client_heartbeat.register(self) + else: + LOG.info("No Client heartbeats will be sent") + + if self._client.serverHeartBeat: + if self.server_heartbeat: + # Timer already exists, just reset it + self.server_heartbeat.reset() + else: + LOG.info("Requested heartbeats from server.") + # Allow a grace period on server heartbeats + self.server_heartbeat = Timer((self._client.serverHeartBeat / 1000.0) * self.ALLOWANCE, + server_heartbeat(), persist=True) + self.server_heartbeat.register(self) + else: + LOG.info("Expecting no heartbeats from Server") + + @handler("connect") + def connect(self, event, host=None, *args, **kwargs): + """ connect to Stomp server """ + LOG.info("Connect to Stomp...") + try: + self._client.connect(heartBeats=self._heartbeats, + host=host, + versions=self._accept_versions, + connectTimeout=self._connect_timeout, + connectedTimeout=self._connected_timeout) + LOG.info("State after Connection Attempt: %s", self._client.session.state) + if self.connected: + LOG.info("Connected to %s", self._stomp_server) + self.fire(connected()) + self.start_heartbeats() + return "success" + + except StompConnectionError as err: + LOG.debug(traceback.format_exc()) + self.fire(connection_failed(self._stomp_server)) + event.success = False + return "fail" + + @handler("server_heartbeat") + def check_server_heartbeat(self, event): + """ Confirm that heartbeat from server hasn't timed out """ + now = time.time() + last = self._client.lastReceived or 0 + if last: + elapsed = now-last + else: + elapsed = -1 + LOG.debug("Last received data %d seconds ago", elapsed) + if ((self._client.serverHeartBeat / 1000.0) * self.ALLOWANCE + last) < now: + LOG.error("Server heartbeat timeout. %d seconds since last heartbeat. Disconnecting.", elapsed) + event.success = False + self.fire(heartbeat_timeout()) + if self.connected: + self._client.disconnect() + # TODO: Try to auto-reconnect? + + @handler("client_heartbeat") + def send_heartbeat(self, event): + if self.connected: + LOG.debug("Sending heartbeat") + try: + self._client.beat() + except StompConnectionError as err: + event.success = False + self.fire(disconnected()) + + @handler("generate_events") + def generate_events(self, event): + if not self.connected: + return + try: + if self._client.canRead(1): + frame = self._client.receiveFrame() + LOG.debug("Recieved frame %s", frame) + self.fire(message(frame)) + except StompConnectionError as err: + self.fire(disconnected()) + + @handler("send") + def send(self, event, destination, body, headers=None, receipt=None): + LOG.debug("send()") + if not self.connected: + LOG.error("Can't send when Stomp is disconnected") + self.fire(on_stomp_error(None, Exception("Message send attempted with stomp disconnected"))) + event.success = False + return + try: + self._client.send(destination, body=body.encode('utf-8'), headers=headers, receipt=receipt) + LOG.debug("Message sent") + except StompConnectionError as err: + event.success = False + self.fire(disconnected()) + except StompError as err: + LOG.error("Error sending ack") + event.success = False + self.fire(on_stomp_error(None, err)) + + @handler("subscribe") + def _subscribe(self, event, destination, ack=ACK_CLIENT_INDIVIDUAL): + if ack not in ACK_MODES: + raise ValueError("Invalid client ack mode specified") + LOG.info("Subscribe to message destination %s", destination) + try: + # Set ID to match destination name for easy reference later + frame, token = self._client.subscribe(destination, + headers={StompSpec.ACK_HEADER: ack, + 'id': destination}) + self._subscribed[destination] = token + except StompConnectionError as err: + event.success = False + self.fire(disconnected()) + except StompError as err: + event.success = False + LOG.debug(traceback.format_exc()) + self.fire(on_stomp_error(None, err)) + + @handler("unsubscribe") + def _unsubscribe(self, event, destination): + if destination not in self._subscribed: + LOG.error("Unsubscribe Request Ignored. Not subscribed to %s", destination) + return + try: + token = self._subscribed.pop(destination) + frame = self._client.unsubscribe(token) + LOG.debug("Unsubscribed: %s", frame) + except StompConnectionError as err: + event.success = False + self.fire(disconnected()) + except StompError as err: + LOG.error("Error sending ack") + event.success = False + self.fire(on_stomp_error(frame, err)) + + @handler("message") + def on_message(self, event, headers, message): + LOG.info("Stomp message received") + + @handler("ack") + def ack_frame(self, event, frame): + LOG.debug("ack_frame()") + try: + self._client.ack(frame) + LOG.debug("Ack Sent") + except StompConnectionError as err: + LOG.error("Error sending ack") + event.success = False + self.fire(disconnected()) + except StompError as err: + LOG.error("Error sending ack") + event.success = False + self.fire(on_stomp_error(frame, err)) + + def get_subscription(self, frame): + """ Get subscription from frame """ + LOG.info(self._subscribed) + _, token = self._client.message(frame) + return self._subscribed[token] diff --git a/circuits/protocols/stomp/events.py b/circuits/protocols/stomp/events.py new file mode 100644 index 000000000..84e5595d9 --- /dev/null +++ b/circuits/protocols/stomp/events.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +""" Circuits events for STOMP Client """ + +import logging +from circuits import Event + +LOG = logging.getLogger(__name__) + + +class stomp_event(Event): + """A Circuits event with less verbose repr""" + success = True + + def _repr(self): + return "" + + def __repr__(self): + "x.__repr__() <==> repr(x)" + + if len(self.channels) > 1: + channels = repr(self.channels) + elif len(self.channels) == 1: + channels = str(self.channels[0]) + else: + channels = "" + + data = self._repr() + + return u"<%s[%s] (%s)>" % (self.name, channels, data) + + +class disconnected(stomp_event): + def __init__(self, reconnect=True, receipt=None): + super(disconnected, self).__init__(receipt=receipt) + self.reconnect = reconnect + + +class disconnect(stomp_event): + pass + + +class message(stomp_event): + def __init__(self, frame): + super(message, self).__init__(headers=frame.headers, + message=frame.body) + self.frame = frame + + +class send(stomp_event): + def __init__(self, headers, body, destination): + super(send, self).__init__(headers=headers, + body=body, + destination=destination) + + +class client_heartbeat(stomp_event): + pass + + +class server_heartbeat(stomp_event): + pass + + +class connect(stomp_event): + def __init__(self, subscribe=False, host=None): + super(connect, self).__init__(host=host) + self.subscribe = subscribe + + +class connected(stomp_event): + pass + + +class connection_failed(stomp_event): + reconnect = True + + +class on_stomp_error(stomp_event): + def __init__(self, frame, err): + headers = frame.headers if frame else {} + body = frame.body if frame else None + super(on_stomp_error, self).__init__(headers=headers, + message=body, + error=err) + self.frame = frame + + +class heartbeat_timeout(stomp_event): + pass + + +class subscribe(stomp_event): + def __init__(self, destination, **kwargs): + super(subscribe, self).__init__(destination=destination, **kwargs) + self.destination = destination + + +class unsubscribe(stomp_event): + def __init__(self, destination): + super(unsubscribe, self).__init__(destination=destination) + self.destination = destination + + +class ack(stomp_event): + def __init__(self, frame): + super(ack, self).__init__(frame=frame) + self.frame = frame diff --git a/circuits/protocols/stomp/transport.py b/circuits/protocols/stomp/transport.py new file mode 100644 index 000000000..e7f41d4da --- /dev/null +++ b/circuits/protocols/stomp/transport.py @@ -0,0 +1,88 @@ +""" stompest StompFrameTransport allowing for ssl.wrap_socket """ + +import logging +import ssl +import socket +try: + from stompest.sync.transport import StompFrameTransport + from stompest.error import StompConnectionError +except ImportError: + raise ImportError("No stomp support available. Is stompest installed?") + +LOG = logging.getLogger(__name__) + + +class EnhancedStompFrameTransport(StompFrameTransport): + """ add support for older ssl module and http proxy """ + + proxy_host = None + proxy_port = None + proxy_user = None + proxy_password = None + + @staticmethod + def match_hostname(cert, hostname): + """ Check that hostname matches cert """ + names = [] + # Python 3 has an ssl.match_hostname method, which does hostname validation. + try: + ssl.match_hostname(cert, hostname) + return + except AttributeError as err: + # We don't have the backported python 3 ssl module, do a simplified check + for sub in cert.get('subject', ()): + for key, value in sub: + if key == 'commonName': + names.append(value) + if value == hostname: + return + raise RuntimeError("{0} does not match the expected value in the certificate {1}".format(hostname, str(names))) + + def connect(self, timeout=None): + """ Allow older versions of ssl module, allow http proxy connections """ + LOG.debug("stomp_transport.connect()") + ssl_params = None + if isinstance(self.sslContext, dict): + # This is actually a dictionary of ssl parameters for wrapping the socket + ssl_params = self.sslContext + self.sslContext = None + + try: + if self.proxy_host: + try: + # Don't try to import this unless we need it + import socks + except ImportError: + raise ImportError("No http proxy support available. Is pysocks installed?") + + LOG.info("Connecting through proxy %s", self.proxy_host) + self._socket = socks.socksocket() + self._socket.set_proxy(socks.HTTP, self.proxy_host, self.proxy_port, True, + username=self.proxy_user, password=self.proxy_password) + else: + self._socket = socket.socket() + + self._socket.settimeout(timeout) + self._socket.connect((self.host, self.port)) + + if ssl_params: + # For cases where we don't have a modern SSLContext (so no SNI) + cert_required = ssl.CERT_REQUIRED if ssl_params["ca_certs"] else ssl.CERT_NONE + self._socket = ssl.wrap_socket( + self._socket, + keyfile=ssl_params['key_file'], + certfile=ssl_params['cert_file'], + cert_reqs=cert_required, + ca_certs=ssl_params['ca_certs'], + ssl_version=ssl_params['ssl_version']) + if cert_required: + LOG.info("Performing manual hostname check") + cert = self._socket.getpeercert() + self.match_hostname(cert, self.host) + + if self.sslContext: + self._socket = self.sslContext.wrap_socket(self._socket, server_hostname=self.host) + + except IOError as e: + raise StompConnectionError('Could not establish connection [%s]' % e) + self._parser.reset() diff --git a/examples/stompclient.py b/examples/stompclient.py new file mode 100644 index 000000000..da3971b4a --- /dev/null +++ b/examples/stompclient.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Example usage for StompClient component + +Requires a STOMP server to connect to. + +""" + +import logging +import ssl +from circuits import Component, Timer, Event +from circuits.core.handlers import handler +from circuits.protocols.stomp.client import StompClient, ACK_AUTO +from circuits.protocols.stomp.events import subscribe, send, connect + +LOG = logging.getLogger(__name__) + +class QueueHandler(Component): + def __init__(self, queue, host=None, *args, **kwargs): + super(QueueHandler, self).__init__(*args, **kwargs) + self.queue = queue + self.host=host + + def registered(self, event, component, parent): + if component.parent is self: + self.fire(Event.create("reconnect")) + + def connected(self): + """Client has connected to the STOMP server""" + LOG.info("STOMP connected.") + # Let's subscribe to the message destination + self.fire(subscribe(self.queue, ack=ACK_AUTO)) + + def subscribe_success(self, event, *args, **kwargs): + """ Subscribed to message destination """ + # Let's fire off some messages + self.fire(send(headers = None, + body="Hello World", + destination=self.queue)) + self.fire(send(headers = None, + body="Hello Again World", + destination=self.queue)) + + def heartbeat_timeout(self): + """Heartbeat timed out from the STOMP server""" + LOG.error("STOMP heartbeat timeout!") + # Set a timer to automatically reconnect + Timer(10, Event.create("Reconnect")).register(self) + + def on_stomp_error(self, headers, message, error): + """STOMP produced an error.""" + LOG.error('STOMP listener: Error:\n%s', message or error) + + def message(self, event, headers, message): + """STOMP produced a message.""" + LOG.info("Message Received: %s", message) + + def disconnected(self, event, *args, **kwargs): + # Wait a while then try to reconnect + LOG.info("We got disconnected, reconnect") + Timer(10, Event.create("reconnect")).register(self) + + def reconnect(self): + """Try (re)connect to the STOMP server""" + LOG.info("STOMP attempting to connect") + self.fire(connect(host=self.host)) + + + +def main(): + logging.basicConfig() + logging.getLogger().setLevel(logging.INFO) + # Configure and run + context = ssl.create_default_context() + context.check_hostname = True + context.verify_mode = ssl.CERT_REQUIRED + + # You can create an STOMP server to test for free at https://www.cloudamqp.com/ + uri = "orangutan.rmq.cloudamqp.com" + port = 61614 + login = "xxxyyy" + passcode="somepassword" + host = "xxxyyy" + queue = "test1" + + s = StompClient(uri, port, + username=login, + password=passcode, + heartbeats=(10000, 10000), + ssl_context=context) + + qr = QueueHandler(queue, host=host) + s.register(qr) + + qr.run() + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 36661e6ae..94267d307 100755 --- a/setup.py +++ b/setup.py @@ -86,4 +86,5 @@ def read_file(filename): setup_requires=[ "setuptools_scm" ], + extras_require={"stomp": ["stompest>=2.3.0", "pysocks>=1.6.7"]}, ) diff --git a/tests/protocols/test_stomp.py b/tests/protocols/test_stomp.py new file mode 100644 index 000000000..68e03a545 --- /dev/null +++ b/tests/protocols/test_stomp.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- +""" Tests for StompClient component """ +from __future__ import print_function + +import os +import ssl +import pytest + +from circuits import Component, handler +from circuits.protocols.stomp.events import subscribe, connect, send, disconnect + + +try: + from circuits.protocols.stomp.client import StompClient, ACK_AUTO +except ImportError: + pytest.importorskip("stompest") + + +URI = os.environ.get("TEST_STOMP_URI", "") +LOGIN = os.environ.get("TEST_STOMP_LOGIN", "") +PASSCODE = os.environ.get("TEST_STOMP_PASSWORD", "") +HOST = os.environ.get("TEST_STOMP_HOST", "") +QUEUE = os.environ.get("TEST_STOMP_QUEUE", "") +TEST_MESSAGE = u"Test Message" +PROXY_HOST = os.environ.get("TEST_STOMP_PROXY_HOST", "") +try: + PROXY_PORT = int(os.environ.get("TEST_STOMP_PROXY_PORT", 0)) +except ValueError: + PROXY_PORT = None + + +# Tests can only run if a STOMP server is available +needstomp = pytest.mark.skipif(not(all((URI, LOGIN, PASSCODE, HOST, QUEUE))), + reason="No STOMP Server Configured") +needproxy = pytest.mark.skipif(not(PROXY_HOST and PROXY_PORT), + reason="No HTTP Proxy Configured") + + +class App(Component): + def __init__(self, queue, host=None, *args, **kwargs): + super(App, self).__init__(*args, **kwargs) + self.queue = QUEUE + self.host=HOST + self.received = [] + + def connected(self): + self.fire(subscribe(self.queue, ack=ACK_AUTO)) + print("connected") + + def message(self, event, headers, message): + self.received.append(message) + print("received") + + def subscribe_success(self, *args, **kwargs): + print("subscribed") + + def disconnected(self, *args, **kwargs): + print("disconnected") + + +@needstomp +@pytest.mark.parametrize("context", + [ssl.create_default_context(), None]) +def test_stomp_ssl(manager, watcher, tmpdir, context): + """ test ssl connection """ + port = 61614 + + if context: + context.check_hostname = True + context.verify_mode = ssl.CERT_REQUIRED + else: + # Run like older Python version w/out ssl context + pass + + app = App(QUEUE).register(manager) + client = StompClient(URI, port, + username=LOGIN, + password=PASSCODE, + ssl_context=context).register(app) + + watcher.wait("registered") + client.fire(connect(host=HOST)) + watcher.wait("connected") + + client.fire(subscribe(QUEUE, ack=ACK_AUTO)) + watcher.wait("subscribe_success") + + client.fire(send(headers=None, + body=TEST_MESSAGE, + destination=QUEUE)) + watcher.wait("message_success") + client.fire(disconnect()) + received = app.received[0].decode() + assert received == TEST_MESSAGE + + client.unregister() + app.unregister() + watcher.wait("unregistered") + +@needstomp +def test_stomp_no_ssl(manager, watcher, tmpdir): + """ Test plain tcp connection """ + port = 61613 + + app = App(QUEUE).register(manager) + client = StompClient(URI, port, + username=LOGIN, + password=PASSCODE, + use_ssl=False).register(app) + + watcher.wait("registered") + client.fire(connect(host=HOST)) + app.wait("connected") + + client.fire(subscribe(QUEUE, ack=ACK_AUTO)) + watcher.wait("subscribe_success") + + client.fire(send(headers=None, + body=TEST_MESSAGE, + destination=QUEUE)) + watcher.wait("message_success") + client.fire(disconnect()) + received = app.received[0].decode() + assert received == TEST_MESSAGE + + client.unregister() + app.unregister() + watcher.wait("unregistered") + +@needstomp +@needproxy +@pytest.mark.parametrize("context", + [ssl.create_default_context(), None]) +def test_stomp_proxy_ssl(manager, watcher, tmpdir, context): + """ test ssl connection through http proxy""" + port = 61614 + + if context: + context.check_hostname = True + context.verify_mode = ssl.CERT_REQUIRED + else: + # Run like older Python version w/out ssl context + pass + + app = App(QUEUE).register(manager) + client = StompClient(URI, port, + username=LOGIN, + password=PASSCODE, + proxy_host=PROXY_HOST, + proxy_port=PROXY_PORT, + ssl_context=context).register(app) + + watcher.wait("registered") + client.fire(connect(host=HOST)) + watcher.wait("connected") + + client.fire(subscribe(QUEUE, ack=ACK_AUTO)) + watcher.wait("subscribe_success") + + client.fire(send(headers=None, + body=TEST_MESSAGE, + destination=QUEUE)) + watcher.wait("message_success") + client.fire(disconnect()) + received = app.received[0].decode() + assert received == TEST_MESSAGE + + client.unregister() + app.unregister() + watcher.wait("unregistered") + +@needstomp +@needproxy +def test_stomp_proxy_no_ssl(manager, watcher, tmpdir): + """ Test plain tcp connection through http proxy""" + port = 61613 + + app = App(QUEUE).register(manager) + client = StompClient(URI, port, + username=LOGIN, + password=PASSCODE, + proxy_host=PROXY_HOST, + proxy_port=PROXY_PORT, + use_ssl=False).register(app) + + watcher.wait("registered") + client.fire(connect(host=HOST)) + app.wait("connected") + + client.fire(subscribe(QUEUE, ack=ACK_AUTO)) + watcher.wait("subscribe_success") + + client.fire(send(headers=None, + body=TEST_MESSAGE, + destination=QUEUE)) + watcher.wait("message_success") + client.fire(disconnect()) + received = app.received[0].decode() + assert received == TEST_MESSAGE + + client.unregister() + app.unregister() + watcher.wait("unregistered") diff --git a/tox.ini b/tox.ini index 311ca2420..56d96f164 100644 --- a/tox.ini +++ b/tox.ini @@ -7,10 +7,12 @@ addopts=-r fsxX --durations=10 --ignore=tmp [testenv] commands=py.test {posargs} +extras= stomp deps= pytest pytest-cov pytest-timeout +passenv=TEST_STOMP_* [testenv:docs] basepython=python