Skip to content

Commit

Permalink
transport max message size handling (#1219)
Browse files Browse the repository at this point in the history
* implement client side payload exceed max size; improve max size exceeded handling
* polish logging
* bump version; update changelog
  • Loading branch information
oberstet committed Jul 7, 2019
1 parent 5515be9 commit f0d15f0
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 61 deletions.
2 changes: 1 addition & 1 deletion autobahn/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
#
###############################################################################

__version__ = u'19.6.2'
__version__ = u'19.7.1'
41 changes: 41 additions & 0 deletions autobahn/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) Crossbar.io Technologies GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################

from __future__ import absolute_import

from autobahn.util import public

__all__ = (
'PayloadExceededError',
)


@public
class PayloadExceededError(RuntimeError):
"""
Exception raised when the serialized and framed (eg WebSocket/RawSocket) WAMP payload
exceeds the transport message size limit.
"""
156 changes: 115 additions & 41 deletions autobahn/twisted/rawsocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from __future__ import absolute_import

import math
import txaio

from twisted.internet.protocol import Factory
Expand All @@ -37,6 +38,7 @@
from autobahn.twisted.util import peer2str, transport_channel_id
from autobahn.util import _LazyHexFormatter
from autobahn.wamp.exception import ProtocolError, SerializationError, TransportLost
from autobahn.exception import PayloadExceededError

__all__ = (
'WampRawSocketServerProtocol',
Expand All @@ -52,8 +54,18 @@ class WampRawSocketProtocol(Int32StringReceiver):
"""
log = txaio.make_logger()

def __init__(self):
# set the RawSocket maximum message size by default
self._max_message_size = 2**24

def lengthLimitExceeded(self, length):
# override hook in Int32StringReceiver base class that is fired when a message is (to be) received
# that is larger than what we agreed to handle (by negotiation in the RawSocket opening handshake)
emsg = 'RawSocket connection: length of received message exceeded (message was {} bytes, but current maximum is {} bytes)'.format(length, self.MAX_LENGTH)
raise PayloadExceededError(emsg)

def connectionMade(self):
self.log.debug("WampRawSocketProtocol: connection made")
self.log.debug('{klass}.connectionMade()', klass=self.__class__.__name__)

# the peer we are connected to
#
Expand Down Expand Up @@ -86,7 +98,7 @@ def connectionMade(self):
#
self._handshake_bytes = b''

# Client requested maximum length of serialized messages.
# Peer requested to _receive_ this maximum length of serialized messages - hence we must not send larger msgs!
#
self._max_len_send = None

Expand All @@ -96,43 +108,59 @@ def _on_handshake_complete(self):
self._session.onOpen(self)
except Exception as e:
# Exceptions raised in onOpen are fatal ..
self.log.warn("WampRawSocketProtocol: ApplicationSession constructor / onOpen raised ({err})", err=e)
self.log.warn("{klass}._on_handshake_complete(): ApplicationSession constructor / onOpen raised ({err})",
klass=self.__class__.__name__, err=e)
self.abort()
else:
self.log.debug("ApplicationSession started.")

def connectionLost(self, reason):
self.log.debug("WampRawSocketProtocol: connection lost: reason = '{reason}'", reason=reason)
self.log.debug('{klass}.connectionLost(reason="{reason}"', klass=self.__class__.__name__, reason=reason)
txaio.resolve(self.is_closed, self)
try:
wasClean = isinstance(reason.value, ConnectionDone)
if self._session:
self._session.onClose(wasClean)
except Exception as e:
# silently ignore exceptions raised here ..
self.log.warn("WampRawSocketProtocol: ApplicationSession.onClose raised ({err})", err=e)
self.log.warn('{klass}.connectionLost(): ApplicationSession.onClose raised "{err}"',
klass=self.__class__.__name__, err=e)
self._session = None

def stringReceived(self, payload):
self.log.trace("WampRawSocketProtocol: RX octets: {octets}", octets=_LazyHexFormatter(payload))
self.log.trace('{klass}.stringReceived(): RX {octets} octets',
klass=self.__class__.__name__, octets=_LazyHexFormatter(payload))
try:
for msg in self._serializer.unserialize(payload):
self.log.trace("WampRawSocketProtocol: RX WAMP message: {msg}", msg=msg)
self.log.trace("{klass}.stringReceived: RX WAMP message: {msg}",
klass=self.__class__.__name__, msg=msg)
self._session.onMessage(msg)

except CancelledError as e:
self.log.warn("{klass}.stringReceived: WAMP CancelledError - connection will continue\n{err}",
self.log.debug("{klass}.stringReceived: WAMP CancelledError - connection will continue!\n{err}",
klass=self.__class__.__name__,
err=e)

except ProtocolError as e:
self.log.warn("{klass}.stringReceived: WAMP ProtocolError - aborting connection!\n{err}",
klass=self.__class__.__name__,
err=e)
self.abort()

except PayloadExceededError as e:
self.log.warn("{klass}.stringReceived: WAMP PayloadExceededError - aborting connection!\n{err}",
klass=self.__class__.__name__,
err=e)
self.abort()

except ProtocolError as e:
self.log.warn("{klass}.stringReceived: WAMP ProtocolError - aborting connection\n{err}",
except SerializationError as e:
self.log.warn("{klass}.stringReceived: WAMP SerializationError - aborting connection!\n{err}",
klass=self.__class__.__name__,
err=e)
self.abort()

except Exception as e:
self.log.warn("{klass}.stringReceived: WAMP Exception - aborting connection\n{err}",
self.log.warn("{klass}.stringReceived: WAMP Exception - aborting connection!\n{err}",
klass=self.__class__.__name__,
err=e)
self.abort()
Expand All @@ -142,15 +170,24 @@ def send(self, msg):
Implements :func:`autobahn.wamp.interfaces.ITransport.send`
"""
if self.isOpen():
self.log.trace("WampRawSocketProtocol (serializer={serializer}): TX WAMP message: {msg}", msg=msg, serializer=self._serializer)
self.log.trace('{klass}.send() (serializer={serializer}): TX WAMP message: "{msg}"',
klass=self.__class__.__name__, msg=msg, serializer=self._serializer)
try:
payload, _ = self._serializer.serialize(msg)
except Exception as e:
except SerializationError as e:
# all exceptions raised from above should be serialization errors ..
raise SerializationError("WampRawSocketProtocol: unable to serialize WAMP application payload ({0})".format(e))
else:
self.sendString(payload)
self.log.trace("WampRawSocketProtocol: TX octets: {octets}", octets=_LazyHexFormatter(payload))
payload_len = len(payload)
if 0 < self._max_len_send < payload_len:
emsg = u'tried to send RawSocket message with size {} exceeding payload limit of {} octets'.format(
payload_len, self._max_len_send)
self.log.warn(emsg)
raise PayloadExceededError(emsg)
else:
self.sendString(payload)
self.log.trace('{klass}.send(): TX {octets} octets',
klass=self.__class__.__name__, octets=_LazyHexFormatter(payload))
else:
raise TransportLost()

Expand Down Expand Up @@ -204,13 +241,13 @@ def dataReceived(self, data):
if len(self._handshake_bytes) == 4:

self.log.debug(
"WampRawSocketProtocol: opening handshake received - {octets}",
"WampRawSocketServerProtocol: opening handshake received - {octets}",
octets=_LazyHexFormatter(self._handshake_bytes),
)

if ord(self._handshake_bytes[0:1]) != 0x7f:
self.log.warn(
"WampRawSocketProtocol: invalid magic byte (octet 1) in"
"WampRawSocketServerProtocol: invalid magic byte (octet 1) in"
" opening handshake: was 0x{magic}, but expected 0x7f",
magic=_LazyHexFormatter(self._handshake_bytes[0]),
)
Expand All @@ -220,7 +257,7 @@ def dataReceived(self, data):
#
self._max_len_send = 2 ** (9 + (ord(self._handshake_bytes[1:2]) >> 4))
self.log.debug(
"WampRawSocketProtocol: client requests us to send out most {max_bytes} bytes per message",
"WampRawSocketServerProtocol: client requests us to send out most {max_bytes} bytes per message",
max_bytes=self._max_len_send,
)

Expand All @@ -230,20 +267,24 @@ def dataReceived(self, data):
if ser_id in self.factory._serializers:
self._serializer = self.factory._serializers[ser_id]
self.log.debug(
"WampRawSocketProtocol: client wants to use serializer '{serializer}'",
"WampRawSocketServerProtocol: client wants to use serializer '{serializer}'",
serializer=ser_id,
)
else:
self.log.warn(
"WampRawSocketProtocol: opening handshake - no suitable serializer found (client requested {serializer}, and we have {serializers}",
"WampRawSocketServerProtocol: opening handshake - no suitable serializer found (client requested {serializer}, and we have {serializers}",
serializer=ser_id,
serializers=self.factory._serializers.keys(),
)
self.abort()

# we request the peer to send message of maximum length 2**reply_max_len_exp
# we request the client to send message of maximum length 2**reply_max_len_exp
#
reply_max_len_exp = 24
reply_max_len_exp = int(math.ceil(math.log(self._max_message_size, 2)))

# this is an instance attribute on the Twisted base class for maximum size
# of _received_ messages
self.MAX_LENGTH = 2**reply_max_len_exp

# send out handshake reply
#
Expand All @@ -258,7 +299,7 @@ def dataReceived(self, data):
self._on_handshake_complete()

self.log.debug(
"WampRawSocketProtocol: opening handshake completed: {serializer}",
"WampRawSocketServerProtocol: opening handshake completed: {serializer}",
serializer=self._serializer,
)

Expand Down Expand Up @@ -289,11 +330,14 @@ def connectionMade(self):
WampRawSocketProtocol.connectionMade(self)
self._serializer = self.factory._serializer

# we request the peer to send message of maximum length 2**reply_max_len_exp
#
request_max_len_exp = 24
# we request the peer to send messages of maximum length 2**reply_max_len_exp
request_max_len_exp = int(math.ceil(math.log(self._max_message_size, 2)))

# send out handshake reply
# this is an instance attribute on the Twisted base class for maximum size
# of _received_ messages
self.MAX_LENGTH = 2**request_max_len_exp

# send out handshake request
#
request_octet2 = bytes(bytearray([
((request_max_len_exp - 9) << 4) | self._serializer.RAWSOCKET_SERIALIZER_ID]))
Expand All @@ -312,22 +356,22 @@ def dataReceived(self, data):
if len(self._handshake_bytes) == 4:

self.log.debug(
"WampRawSocketProtocol: opening handshake received - {handshake}",
"WampRawSocketClientProtocol: opening handshake received - {handshake}",
handshake=_LazyHexFormatter(self._handshake_bytes),
)

if ord(self._handshake_bytes[0:1]) != 0x7f:
self.log.debug(
"WampRawSocketProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{magic}, but expected 0x7f",
"WampRawSocketClientProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{magic}, but expected 0x7f",
magic=_LazyHexFormatter(self._handshake_bytes[0]),
)
self.abort()

# peer requests us to send messages of maximum length 2**max_len_exp
# peer requests us to _send_ messages of maximum length 2**max_len_exp
#
self._max_len_send = 2 ** (9 + (ord(self._handshake_bytes[1:2]) >> 4))
self.log.debug(
"WampRawSocketProtocol: server requests us to send out most {max} bytes per message",
"WampRawSocketClientProtocol: server requests us to send out most {max} bytes per message",
max=self._max_len_send,
)

Expand All @@ -336,7 +380,7 @@ def dataReceived(self, data):
ser_id = ord(self._handshake_bytes[1:2]) & 0x0F
if ser_id != self._serializer.RAWSOCKET_SERIALIZER_ID:
self.log.error(
"WampRawSocketProtocol: opening handshake - no suitable serializer found (server replied {serializer}, and we requested {serializers})",
"WampRawSocketClientProtocol: opening handshake - no suitable serializer found (server replied {serializer}, and we requested {serializers})",
serializer=ser_id,
serializers=self._serializer.RAWSOCKET_SERIALIZER_ID,
)
Expand All @@ -347,7 +391,7 @@ def dataReceived(self, data):
self._on_handshake_complete()

self.log.debug(
"WampRawSocketProtocol: opening handshake completed (using serializer {serializer})",
"WampRawSocketClientProtocol: opening handshake completed (using serializer {serializer})",
serializer=self._serializer,
)

Expand All @@ -368,6 +412,42 @@ class WampRawSocketFactory(Factory):
"""
Base class for Twisted-based WAMP-over-RawSocket factories.
"""
log = txaio.make_logger()

def __init__(self, factory):
"""
:param factory: A callable that produces instances that implement
:class:`autobahn.wamp.interfaces.ITransportHandler`
:type factory: callable
"""
if callable(factory):
self._factory = factory
else:
self._factory = lambda: factory

# RawSocket max payload size is 16M (https://wamp-proto.org/_static/gen/wamp_latest_ietf.html#handshake)
self._max_message_size = 2**24

def resetProtocolOptions(self):
self._max_message_size = 2**24

def setProtocolOptions(self, maxMessagePayloadSize=None):
self.log.debug('{klass}.setProtocolOptions(maxMessagePayloadSize={maxMessagePayloadSize})',
klass=self.__class__.__name__, maxMessagePayloadSize=maxMessagePayloadSize)
assert maxMessagePayloadSize is None or (type(maxMessagePayloadSize) == int and maxMessagePayloadSize >= 512 and maxMessagePayloadSize <= 2**24)
if maxMessagePayloadSize is not None and maxMessagePayloadSize != self._max_message_size:
self._max_message_size = maxMessagePayloadSize

def buildProtocol(self, addr):
self.log.debug('{klass}.buildProtocol(addr={addr})', klass=self.__class__.__name__, addr=addr)
p = self.protocol()
p.factory = self
p.MAX_LENGTH = self._max_message_size
p._max_message_size = self._max_message_size
self.log.debug('{klass}.buildProtocol() -> proto={proto}, max_message_size={max_message_size}, MAX_LENGTH={MAX_LENGTH}',
klass=self.__class__.__name__, proto=p, max_message_size=p._max_message_size, MAX_LENGTH=p.MAX_LENGTH)
return p


@public
Expand All @@ -390,10 +470,7 @@ def __init__(self, factory, serializers=None):
:type serializers: list of objects implementing
:class:`autobahn.wamp.interfaces.ISerializer`
"""
if callable(factory):
self._factory = factory
else:
self._factory = lambda: factory
WampRawSocketFactory.__init__(self, factory)

if serializers is None:
serializers = []
Expand Down Expand Up @@ -458,10 +535,7 @@ def __init__(self, factory, serializer=None):
this list: CBOR, MessagePack, UBJSON, JSON).
:type serializer: object implementing :class:`autobahn.wamp.interfaces.ISerializer`
"""
if callable(factory):
self._factory = factory
else:
self._factory = lambda: factory
WampRawSocketFactory.__init__(self, factory)

if serializer is None:

Expand Down
6 changes: 6 additions & 0 deletions autobahn/wamp/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ class ApplicationError(Error):
The application payload could not be serialized.
"""

PAYLOAD_SIZE_EXCEEDED = u"wamp.error.payload_size_exceeded"
"""
The application payload could not be transported becuase the serialized/framed payload
exceeds the transport limits.
"""

NO_SUCH_PROCEDURE = u"wamp.error.no_such_procedure"
"""
A Dealer could not perform a call, since not procedure is currently registered
Expand Down

0 comments on commit f0d15f0

Please sign in to comment.