Skip to content

Commit

Permalink
Merge branch 'flake8-fixes' into event-retention
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkowl committed Nov 30, 2016
2 parents 1fb4bda + 8da1e90 commit c41518a
Show file tree
Hide file tree
Showing 28 changed files with 2,300 additions and 324 deletions.
37 changes: 37 additions & 0 deletions crossbar/__main__.py
@@ -0,0 +1,37 @@
#####################################################################################
#
# Copyright (c) Crossbar.io Technologies GmbH
#
# Unless a separate license agreement exists between you and Crossbar.io GmbH (e.g.
# you have purchased a commercial license), the license terms below apply.
#
# Should you enter into a separate license agreement after having received a copy of
# this software, then the terms of such license agreement replace the terms below at
# the time at which such license agreement becomes effective.
#
# In case a separate license agreement ends, and such agreement ends without being
# replaced by another separate license agreement, the license terms below apply
# from the time at which said agreement ends.
#
# LICENSE TERMS
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License, version 3, as published by the
# Free Software Foundation. This program is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See the GNU Affero General Public License Version 3 for more details.
#
# You should have received a copy of the GNU Affero General Public license along
# with this program. If not, see <http://www.gnu.org/licenses/agpl-3.0.en.html>.
#
#####################################################################################

if __name__ == '__main__':
from pkg_resources import load_entry_point
import sys

sys.exit(
load_entry_point('crossbar', 'console_scripts', 'crossbar')()
)
17 changes: 15 additions & 2 deletions crossbar/_log_categories.py
Expand Up @@ -70,10 +70,23 @@
"MQ100": "Got packet from '{client_id}': {packet!r}",
"MQ101": "Sent packet to '{client_id}': {packet!r}",
"MQ200": "Successful connection from '{client_id}'",
"MQ201": "Received a QoS 0 Publish from '{client_id}'",
"MQ202": "Received a QoS 1 Publish from '{client_id}'",
"MQ203": "Received a QoS 2 Publish from '{client_id}'",
"MQ204": "Received a Disconnect from '{client_id}', closing connection",
"MQ303": "Got a non-allowed QoS value in the publish queue, dropping it.",
"MQ400": "MQTT client '{client_id}' timed out after recieving no full packets for {seconds}",
"MQ401": "Protocol violation from '{client_id}', terminating connection: {error}",
"MQ402": "Got a packet ('{packet_id}') from '{client_id} that is invalid for a server, terminating connection",
"MQ500": "Error handling a Subscribe from '{client_id}', dropping connection",
"MQ402": "Got a packet ('{packet_id}') from '{client_id}' that is invalid for a server, terminating connection",
"MQ403": "Got a Publish packet from '{client_id}' that has both QoS bits set, terminating connection",
"MQ500": "Error handling a Connect, dropping connection",
"MQ501": "Error handling a Subscribe from '{client_id}', dropping connection",
"MQ502": "Error handling an Unsubscribe from '{client_id}', dropping connection",
"MQ503": "Error handling a QoS 0 Publish from '{client_id}', dropping connection",
"MQ504": "Error handling a QoS 1 Publish from '{client_id}', dropping connection",
"MQ505": "Error handling a QoS 2 Publish from '{client_id}', dropping connection",


}


Expand Down
118 changes: 117 additions & 1 deletion crossbar/adapter/mqtt/_events.py
Expand Up @@ -47,6 +47,22 @@ class Failure(object):
reason = attr.ib(default=None)


@attr.s
class Disconnect(object):
def serialise(self):
"""
Assemble this into an on-wire message.
"""
return build_header(14, (False, False, False, False), 0)

@classmethod
def deserialise(cls, flags, data):
if flags != (False, False, False, False):
raise ParseFailure(cls, "Bad flags")

return cls()


@attr.s
class PingRESP(object):
def serialise(self):
Expand Down Expand Up @@ -160,6 +176,105 @@ def deserialise(cls, flags, data):
return cls(packet_identifier=packet_identifier, topics=topics)


@attr.s
class PubCOMP(object):
packet_identifier = attr.ib(validator=instance_of(int))

def serialise(self):
"""
Assemble this into an on-wire message.
"""
payload = self._make_payload()
header = build_header(7, (False, False, False, False), len(payload))
return header + payload

def _make_payload(self):
"""
Build the payload from its constituent parts.
"""
b = []
b.append(pack('uint:16', self.packet_identifier).bytes)
return b"".join(b)

@classmethod
def deserialise(cls, flags, data):
"""
Disassemble from an on-wire message.
"""
if flags != (False, False, False, False):
raise ParseFailure(cls, "Bad flags")

packet_identifier = data.read('uint:16')

return cls(packet_identifier)


@attr.s
class PubREL(object):
packet_identifier = attr.ib(validator=instance_of(int))

def serialise(self):
"""
Assemble this into an on-wire message.
"""
payload = self._make_payload()
header = build_header(6, (False, False, True, False), len(payload))
return header + payload

def _make_payload(self):
"""
Build the payload from its constituent parts.
"""
b = []
b.append(pack('uint:16', self.packet_identifier).bytes)
return b"".join(b)

@classmethod
def deserialise(cls, flags, data):
"""
Disassemble from an on-wire message.
"""
if flags != (False, False, True, False):
raise ParseFailure(cls, "Bad flags")

packet_identifier = data.read('uint:16')

return cls(packet_identifier)


@attr.s
class PubREC(object):
packet_identifier = attr.ib(validator=instance_of(int))

def serialise(self):
"""
Assemble this into an on-wire message.
"""
payload = self._make_payload()
header = build_header(5, (False, False, False, False), len(payload))
return header + payload

def _make_payload(self):
"""
Build the payload from its constituent parts.
"""
b = []
b.append(pack('uint:16', self.packet_identifier).bytes)
return b"".join(b)

@classmethod
def deserialise(cls, flags, data):
"""
Disassemble from an on-wire message.
"""
if flags != (False, False, False, False):
raise ParseFailure(cls, "Bad flags")

packet_identifier = data.read('uint:16')

return cls(packet_identifier)


@attr.s
class PubACK(object):
packet_identifier = attr.ib(validator=instance_of(int))
Expand Down Expand Up @@ -199,8 +314,9 @@ class Publish(object):
qos_level = attr.ib(validator=instance_of(int))
retain = attr.ib(validator=instance_of(bool))
topic_name = attr.ib(validator=instance_of(unicode))
packet_identifier = attr.ib(validator=optional(instance_of(int)))
payload = attr.ib(validator=instance_of(bytes))
packet_identifier = attr.ib(validator=optional(instance_of(int)),
default=None)

def serialise(self):
"""
Expand Down
22 changes: 17 additions & 5 deletions crossbar/adapter/mqtt/protocol.py
Expand Up @@ -36,17 +36,13 @@
Subscribe, SubACK,
Unsubscribe, UnsubACK,
Publish, PubACK,
PubREC, PubREL, PubCOMP,
PingREQ, PingRESP,
)

import bitstring

__all__ = [
"Connect", "ConnACK",
"Subscribe", "SubACK",
"Unsubscribe", "UnsubACK",
"Publish", "PubACK",
"PingREQ", "PingRESP",
"MQTTParser",
]

Expand All @@ -56,6 +52,7 @@ class _NeedMoreData(Exception):
We need more data before we can get the bytes length.
"""


# State machine events
WAITING_FOR_NEW_PACKET = 0
COLLECTING_REST_OF_PACKET = 1
Expand All @@ -65,6 +62,9 @@ class _NeedMoreData(Exception):
P_CONNACK = 2
P_PUBLISH = 3
P_PUBACK = 4
P_PUBREC = 5
P_PUBREL = 6
P_PUBCOMP = 7
P_SUBSCRIBE = 8
P_SUBACK = 9
P_UNSUBSCRIBE = 10
Expand All @@ -75,9 +75,13 @@ class _NeedMoreData(Exception):
server_packet_handlers = {
P_CONNECT: Connect,
P_PUBLISH: Publish,
P_PUBACK: PubACK,
P_SUBSCRIBE: Subscribe,
P_UNSUBSCRIBE: Unsubscribe,
P_PINGREQ: PingREQ,
P_PUBREL: PubREL,
P_PUBREC: PubREC,
P_PUBCOMP: PubCOMP,
}

client_packet_handlers = {
Expand All @@ -87,6 +91,9 @@ class _NeedMoreData(Exception):
P_SUBACK: SubACK,
P_UNSUBACK: UnsubACK,
P_PINGRESP: PingRESP,
P_PUBREC: PubREC,
P_PUBREL: PubREL,
P_PUBCOMP: PubCOMP,
}


Expand Down Expand Up @@ -221,3 +228,8 @@ def data_received(self, data):
self._packet_count += 1
else:
return events


class MQTTClientParser(MQTTParser):
_first_pkt = P_CONNACK
_packet_handlers = client_packet_handlers

0 comments on commit c41518a

Please sign in to comment.