Skip to content

Commit

Permalink
Code cleanup (#259)
Browse files Browse the repository at this point in the history
* Apply isort

* Code cleanup stomp protocol components

* Code cleanup
  • Loading branch information
spaceone authored and prologic committed Dec 19, 2018
1 parent 507c217 commit 4be8e00
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 48 deletions.
2 changes: 1 addition & 1 deletion circuits/core/values.py
@@ -1,7 +1,7 @@
"""
This defines the Value object used by components and events.
"""
from ..six import python_2_unicode_compatible, string_types, PY2
from ..six import PY2, python_2_unicode_compatible, string_types
from .events import Event


Expand Down
18 changes: 9 additions & 9 deletions circuits/net/sockets.py
Expand Up @@ -10,16 +10,10 @@
EMFILE, ENFILE, ENOBUFS, ENOMEM, ENOTCONN, EPERM, EPIPE, EWOULDBLOCK,
)
from socket import (
AF_INET, AF_INET6, IPPROTO_IP, IPPROTO_TCP, SO_BROADCAST,
SO_REUSEADDR, SOCK_DGRAM, SOCK_STREAM, SOL_SOCKET, TCP_NODELAY,
error as SocketError, gaierror, getaddrinfo, getfqdn, gethostbyname,
gethostname, socket,
AF_INET, AF_INET6, IPPROTO_IP, IPPROTO_TCP, SO_BROADCAST, SO_REUSEADDR,
SOCK_DGRAM, SOCK_STREAM, SOL_SOCKET, TCP_NODELAY, error as SocketError,
gaierror, getaddrinfo, getfqdn, gethostbyname, gethostname, socket,
)
try:
from socket import AF_UNIX
except ImportError:
AF_UNIX = None

from time import time

from _socket import socket as SocketType
Expand All @@ -34,6 +28,12 @@
ready, unreachable, write,
)

try:
from socket import AF_UNIX
except ImportError:
AF_UNIX = None


try:
from ssl import wrap_socket as ssl_socket
from ssl import CERT_NONE, PROTOCOL_SSLv23
Expand Down
4 changes: 3 additions & 1 deletion circuits/protocols/irc/__init__.py
Expand Up @@ -9,7 +9,9 @@
from .message import Message # noqa
from .numerics import * # noqa
from .protocol import IRC # noqa
from .utils import joinprefix, parsemsg, parseprefix, strip, irc_color_to_ansi # noqa
from .utils import ( # noqa
irc_color_to_ansi, joinprefix, parsemsg, parseprefix, strip,
)

sourceJoin = joinprefix
sourceSplit = parseprefix
Expand Down
1 change: 1 addition & 0 deletions circuits/protocols/stomp/__init__.py
Expand Up @@ -5,4 +5,5 @@
"""
from .client import StompClient

__all__ = ('StompClient',)
# pylama:skip=1
20 changes: 12 additions & 8 deletions circuits/protocols/stomp/client.py
Expand Up @@ -5,8 +5,15 @@
import ssl
import time
import traceback

from circuits import BaseComponent, Timer
from circuits.core.handlers import handler
from circuits.protocols.stomp.events import (
client_heartbeat, connected, connection_failed,
disconnected, heartbeat_timeout, message, on_stomp_error, server_heartbeat,
)
from circuits.protocols.stomp.transport import EnhancedStompFrameTransport

try:
from stompest.config import StompConfig
from stompest.protocol import StompSpec, StompSession
Expand All @@ -16,10 +23,6 @@
except ImportError:
raise ImportError("No stomp support available. Is stompest installed?")

from circuits.protocols.stomp.events import (connect, connected, on_stomp_error, disconnect, disconnected,
connection_failed, server_heartbeat, client_heartbeat, send,
message, subscribe, unsubscribe)
from circuits.protocols.stomp.transport import EnhancedStompFrameTransport

StompSpec.DEFAULT_VERSION = '1.2'
ACK_CLIENT_INDIVIDUAL = StompSpec.ACK_CLIENT_INDIVIDUAL
Expand All @@ -33,6 +36,7 @@
class StompClient(BaseComponent):
""" Send and Receive messages from a STOMP queue """
channel = "stomp"

def init(self, host, port, username=None, password=None,
connect_timeout=3, connected_timeout=3,
version=StompSpec.VERSION_1_2, accept_versions=["1.0", "1.1", "1.2"],
Expand Down Expand Up @@ -162,7 +166,7 @@ def connect(self, event, host=None, *args, **kwargs):
self.start_heartbeats()
return "success"

except StompConnectionError as err:
except StompConnectionError:
LOG.debug(traceback.format_exc())
self.fire(connection_failed(self._stomp_server))
event.success = False
Expand All @@ -174,7 +178,7 @@ def check_server_heartbeat(self, event):
now = time.time()
last = self._client.lastReceived or 0
if last:
elapsed = now-last
elapsed = now - last
else:
elapsed = -1
LOG.debug("Last received data %d seconds ago", elapsed)
Expand All @@ -192,7 +196,7 @@ def send_heartbeat(self, event):
LOG.debug("Sending heartbeat")
try:
self._client.beat()
except StompConnectionError as err:
except StompConnectionError:
event.success = False
self.fire(disconnected())

Expand All @@ -205,7 +209,7 @@ def generate_events(self, event):
frame = self._client.receiveFrame()
LOG.debug("Recieved frame %s", frame)
self.fire(message(frame))
except StompConnectionError as err:
except StompConnectionError:
self.fire(disconnected())

@handler("send")
Expand Down
5 changes: 3 additions & 2 deletions circuits/protocols/stomp/events.py
Expand Up @@ -2,6 +2,7 @@
""" Circuits events for STOMP Client """

import logging

from circuits import Event

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -80,8 +81,8 @@ def __init__(self, frame, err):
headers = frame.headers if frame else {}
body = frame.body if frame else None
super(on_stomp_error, self).__init__(headers=headers,
message=body,
error=err)
message=body,
error=err)
self.frame = frame


Expand Down
5 changes: 3 additions & 2 deletions circuits/protocols/stomp/transport.py
@@ -1,8 +1,9 @@
""" stompest StompFrameTransport allowing for ssl.wrap_socket """

import logging
import ssl
import socket
import ssl

try:
from stompest.sync.transport import StompFrameTransport
from stompest.error import StompConnectionError
Expand All @@ -28,7 +29,7 @@ def match_hostname(cert, hostname):
try:
ssl.match_hostname(cert, hostname)
return
except AttributeError as err:
except AttributeError:
# We don't have the backported python 3 ssl module, do a simplified check
for sub in cert.get('subject', ()):
for key, value in sub:
Expand Down
6 changes: 5 additions & 1 deletion examples/async_worker_webpage_download.py
@@ -1,7 +1,9 @@
from time import sleep
from circuits import Component, Debugger, Event, Timer, Worker, task

import requests

from circuits import Component, Debugger, Event, Timer, Worker, task


def download_web_page(url):
print('Downloading {}'.format(url))
Expand All @@ -11,6 +13,7 @@ def download_web_page(url):
# You would probably process web page for data before sending back
return response.text[:200]


class App(Component):

def init(self, *args, **kwargs):
Expand All @@ -34,6 +37,7 @@ def task_success(self, function_called, function_result):
func, url_called = function_called
print('url {} gave {}'.format(url_called, function_result))


if __name__ == '__main__':
app = App()
Debugger().register(app)
Expand Down
1 change: 1 addition & 0 deletions examples/factorial_multiple.py
Expand Up @@ -40,6 +40,7 @@ def task_success(self, function_called, factorial_result):
if argument == 14:
self.stop()


if __name__ == '__main__':
app = App()
Debugger().register(app)
Expand Down
20 changes: 11 additions & 9 deletions examples/stompclient.py
Expand Up @@ -9,18 +9,20 @@

import logging
import ssl
from circuits import Component, Timer, Event
from circuits.core.handlers import handler
from circuits.protocols.stomp.client import StompClient, ACK_AUTO
from circuits.protocols.stomp.events import subscribe, send, connect

from circuits import Component, Event, Timer
from circuits.protocols.stomp.client import ACK_AUTO, StompClient
from circuits.protocols.stomp.events import connect, send, subscribe

LOG = logging.getLogger(__name__)


class QueueHandler(Component):

def __init__(self, queue, host=None, *args, **kwargs):
super(QueueHandler, self).__init__(*args, **kwargs)
self.queue = queue
self.host=host
self.host = host

def registered(self, event, component, parent):
if component.parent is self:
Expand All @@ -35,10 +37,10 @@ def connected(self):
def subscribe_success(self, event, *args, **kwargs):
""" Subscribed to message destination """
# Let's fire off some messages
self.fire(send(headers = None,
self.fire(send(headers=None,
body="Hello World",
destination=self.queue))
self.fire(send(headers = None,
self.fire(send(headers=None,
body="Hello Again World",
destination=self.queue))

Expand Down Expand Up @@ -67,7 +69,6 @@ def reconnect(self):
self.fire(connect(host=self.host))



def main():
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
Expand All @@ -80,7 +81,7 @@ def main():
uri = "orangutan.rmq.cloudamqp.com"
port = 61614
login = "xxxyyy"
passcode="somepassword"
passcode = "somepassword"
host = "xxxyyy"
queue = "test1"

Expand All @@ -95,5 +96,6 @@ def main():

qr.run()


if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion tests/io/test_notify.py
@@ -1,7 +1,8 @@
#!/usr/bin/env python
import pytest
import os

import pytest

from circuits import Component, handler

try:
Expand Down
4 changes: 2 additions & 2 deletions tests/protocols/test_irc.py
Expand Up @@ -6,8 +6,8 @@
from circuits.net.events import read, write
from circuits.protocols.irc import (
AWAY, INVITE, IRC, JOIN, KICK, MODE, NAMES, NICK, NOTICE, PART, PASS, PONG,
PRIVMSG, QUIT, TOPIC, USER, WHOIS, joinprefix, parsemsg, parseprefix,
strip, irc_color_to_ansi
PRIVMSG, QUIT, TOPIC, USER, WHOIS, irc_color_to_ansi, joinprefix, parsemsg,
parseprefix, strip,
)
from circuits.six import b, u

Expand Down
29 changes: 17 additions & 12 deletions tests/protocols/test_stomp.py
Expand Up @@ -4,11 +4,13 @@

import os
import ssl
import pytest

from circuits import Component, handler
from circuits.protocols.stomp.events import subscribe, connect, send, disconnect
import pytest

from circuits import Component
from circuits.protocols.stomp.events import (
connect, disconnect, send, subscribe,
)

try:
from circuits.protocols.stomp.client import StompClient, ACK_AUTO
Expand All @@ -31,16 +33,16 @@

# Tests can only run if a STOMP server is available
needstomp = pytest.mark.skipif(not(all((URI, LOGIN, PASSCODE, HOST, QUEUE))),
reason="No STOMP Server Configured")
reason="No STOMP Server Configured")
needproxy = pytest.mark.skipif(not(PROXY_HOST and PROXY_PORT),
reason="No HTTP Proxy Configured")
reason="No HTTP Proxy Configured")


class App(Component):
def __init__(self, queue, host=None, *args, **kwargs):
super(App, self).__init__(*args, **kwargs)
self.queue = QUEUE
self.host=HOST
self.host = HOST
self.received = []

def connected(self):
Expand All @@ -60,7 +62,7 @@ def disconnected(self, *args, **kwargs):

@needstomp
@pytest.mark.parametrize("context",
[ssl.create_default_context(), None])
[ssl.create_default_context(), None])
def test_stomp_ssl(manager, watcher, tmpdir, context):
""" test ssl connection """
port = 61614
Expand Down Expand Up @@ -97,6 +99,7 @@ def test_stomp_ssl(manager, watcher, tmpdir, context):
app.unregister()
watcher.wait("unregistered")


@needstomp
def test_stomp_no_ssl(manager, watcher, tmpdir):
""" Test plain tcp connection """
Expand All @@ -116,8 +119,8 @@ def test_stomp_no_ssl(manager, watcher, tmpdir):
watcher.wait("subscribe_success")

client.fire(send(headers=None,
body=TEST_MESSAGE,
destination=QUEUE))
body=TEST_MESSAGE,
destination=QUEUE))
watcher.wait("message_success")
client.fire(disconnect())
received = app.received[0].decode()
Expand All @@ -127,10 +130,11 @@ def test_stomp_no_ssl(manager, watcher, tmpdir):
app.unregister()
watcher.wait("unregistered")


@needstomp
@needproxy
@pytest.mark.parametrize("context",
[ssl.create_default_context(), None])
[ssl.create_default_context(), None])
def test_stomp_proxy_ssl(manager, watcher, tmpdir, context):
""" test ssl connection through http proxy"""
port = 61614
Expand Down Expand Up @@ -169,6 +173,7 @@ def test_stomp_proxy_ssl(manager, watcher, tmpdir, context):
app.unregister()
watcher.wait("unregistered")


@needstomp
@needproxy
def test_stomp_proxy_no_ssl(manager, watcher, tmpdir):
Expand All @@ -191,8 +196,8 @@ def test_stomp_proxy_no_ssl(manager, watcher, tmpdir):
watcher.wait("subscribe_success")

client.fire(send(headers=None,
body=TEST_MESSAGE,
destination=QUEUE))
body=TEST_MESSAGE,
destination=QUEUE))
watcher.wait("message_success")
client.fire(disconnect())
received = app.received[0].decode()
Expand Down

0 comments on commit 4be8e00

Please sign in to comment.