Skip to content

Commit

Permalink
more fixes and moving to txaio logging
Browse files Browse the repository at this point in the history
  • Loading branch information
meejah committed Sep 16, 2015
1 parent 3a0bdae commit e09df6d
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 75 deletions.
50 changes: 26 additions & 24 deletions autobahn/twisted/rawsocket.py
Expand Up @@ -28,14 +28,15 @@

import binascii

from twisted.python import log
from twisted.internet.protocol import Factory
from twisted.protocols.basic import Int32StringReceiver
from twisted.internet.error import ConnectionDone

from autobahn.twisted.util import peer2str
from autobahn.wamp.exception import ProtocolError, SerializationError, TransportLost

import txaio

__all__ = (
'WampRawSocketServerProtocol',
'WampRawSocketClientProtocol',
Expand All @@ -48,10 +49,11 @@ class WampRawSocketProtocol(Int32StringReceiver):
"""
Base class for Twisted-based WAMP-over-RawSocket protocols.
"""
log = txaio.make_logger()

def connectionMade(self):
if self.factory.debug:
log.msg("WampRawSocketProtocol: connection made")
self.log.debug("WampRawSocketProtocol: connection made")

# the peer we are connected to
#
Expand Down Expand Up @@ -92,42 +94,42 @@ def _on_handshake_complete(self):
except Exception as e:
# Exceptions raised in onOpen are fatal ..
if self.factory.debug:
log.msg("WampRawSocketProtocol: ApplicationSession constructor / onOpen raised ({0})".format(e))
self.log.info("WampRawSocketProtocol: ApplicationSession constructor / onOpen raised ({0})".format(e))
self.abort()
else:
if self.factory.debug:
log.msg("ApplicationSession started.")
self.log.info("ApplicationSession started.")

def connectionLost(self, reason):
if self.factory.debug:
log.msg("WampRawSocketProtocol: connection lost: reason = '{0}'".format(reason))
self.log.info("WampRawSocketProtocol: connection lost: reason = '{0}'".format(reason))
try:
wasClean = isinstance(reason.value, ConnectionDone)
self._session.onClose(wasClean)
except Exception as e:
# silently ignore exceptions raised here ..
if self.factory.debug:
log.msg("WampRawSocketProtocol: ApplicationSession.onClose raised ({0})".format(e))
self.log.info("WampRawSocketProtocol: ApplicationSession.onClose raised ({0})".format(e))
self._session = None

def stringReceived(self, payload):
if self.factory.debug:
log.msg("WampRawSocketProtocol: RX octets: {0}".format(binascii.hexlify(payload)))
self.log.info("WampRawSocketProtocol: RX octets: {0}".format(binascii.hexlify(payload)))
try:
for msg in self._serializer.unserialize(payload):
if self.factory.debug:
log.msg("WampRawSocketProtocol: RX WAMP message: {0}".format(msg))
self.log.info("WampRawSocketProtocol: RX WAMP message: {0}".format(msg))
self._session.onMessage(msg)

except ProtocolError as e:
log.msg(str(e))
self.log.info(str(e))
if self.factory.debug:
log.msg("WampRawSocketProtocol: WAMP Protocol Error ({0}) - aborting connection".format(e))
self.log.info("WampRawSocketProtocol: WAMP Protocol Error ({0}) - aborting connection".format(e))
self.abort()

except Exception as e:
if self.factory.debug:
log.msg("WampRawSocketProtocol: WAMP Internal Error ({0}) - aborting connection".format(e))
self.log.info("WampRawSocketProtocol: WAMP Internal Error ({0}) - aborting connection".format(e))
self.abort()

def send(self, msg):
Expand All @@ -136,7 +138,7 @@ def send(self, msg):
"""
if self.isOpen():
if self.factory.debug:
log.msg("WampRawSocketProtocol: TX WAMP message: {0}".format(msg))
self.log.info("WampRawSocketProtocol: TX WAMP message: {0}".format(msg))
try:
payload, _ = self._serializer.serialize(msg)
except Exception as e:
Expand All @@ -145,7 +147,7 @@ def send(self, msg):
else:
self.sendString(payload)
if self.factory.debug:
log.msg("WampRawSocketProtocol: TX octets: {0}".format(binascii.hexlify(payload)))
self.log.info("WampRawSocketProtocol: TX octets: {0}".format(binascii.hexlify(payload)))
else:
raise TransportLost()

Expand Down Expand Up @@ -194,29 +196,29 @@ def dataReceived(self, data):
if len(self._handshake_bytes) == 4:

if self.factory.debug:
log.msg("WampRawSocketProtocol: opening handshake received - {0}".format(binascii.b2a_hex(self._handshake_bytes)))
self.log.info("WampRawSocketProtocol: opening handshake received - {0}".format(binascii.b2a_hex(self._handshake_bytes)))

if ord(self._handshake_bytes[0]) != 0x7f:
if self.factory.debug:
log.msg("WampRawSocketProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{0}, but expected 0x7f".format(binascii.b2a_hex(self._handshake_bytes[0])))
self.log.info("WampRawSocketProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{0}, but expected 0x7f".format(binascii.b2a_hex(self._handshake_bytes[0])))
self.abort()

# peer requests us to send messages of maximum length 2**max_len_exp
#
self._max_len_send = 2 ** (9 + (ord(self._handshake_bytes[1]) >> 4))
if self.factory.debug:
log.msg("WampRawSocketProtocol: client requests us to send out most {} bytes per message".format(self._max_len_send))
self.log.info("WampRawSocketProtocol: client requests us to send out most {} bytes per message".format(self._max_len_send))

# client wants to speak this serialization format
#
ser_id = ord(self._handshake_bytes[1]) & 0x0F
if ser_id in self.factory._serializers:
self._serializer = self.factory._serializers[ser_id]
if self.factory.debug:
log.msg("WampRawSocketProtocol: client wants to use serializer {}".format(ser_id))
self.log.info("WampRawSocketProtocol: client wants to use serializer {}".format(ser_id))
else:
if self.factory.debug:
log.msg("WampRawSocketProtocol: opening handshake - no suitable serializer found (client requested {0}, and we have {1})".format(ser_id, self.factory._serializers.keys()))
self.log.info("WampRawSocketProtocol: opening handshake - no suitable serializer found (client requested {0}, and we have {1})".format(ser_id, self.factory._serializers.keys()))
self.abort()

# we request the peer to send message of maximum length 2**reply_max_len_exp
Expand All @@ -235,7 +237,7 @@ def dataReceived(self, data):
self._on_handshake_complete()

if self.factory.debug:
log.msg("WampRawSocketProtocol: opening handshake completed", self._serializer)
self.log.info("WampRawSocketProtocol: opening handshake completed", self._serializer)

# consume any remaining data received already ..
#
Expand Down Expand Up @@ -275,33 +277,33 @@ def dataReceived(self, data):
if len(self._handshake_bytes) == 4:

if self.factory.debug:
log.msg("WampRawSocketProtocol: opening handshake received - {0}".format(binascii.b2a_hex(self._handshake_bytes)))
self.log.info("WampRawSocketProtocol: opening handshake received - {0}".format(binascii.b2a_hex(self._handshake_bytes)))

if ord(self._handshake_bytes[0]) != 0x7f:
if self.factory.debug:
log.msg("WampRawSocketProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{0}, but expected 0x7f".format(binascii.b2a_hex(self._handshake_bytes[0])))
self.log.info("WampRawSocketProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{0}, but expected 0x7f".format(binascii.b2a_hex(self._handshake_bytes[0])))
self.abort()

# peer requests us to send messages of maximum length 2**max_len_exp
#
self._max_len_send = 2 ** (9 + (ord(self._handshake_bytes[1]) >> 4))
if self.factory.debug:
log.msg("WampRawSocketProtocol: server requests us to send out most {} bytes per message".format(self._max_len_send))
self.log.info("WampRawSocketProtocol: server requests us to send out most {} bytes per message".format(self._max_len_send))

# client wants to speak this serialization format
#
ser_id = ord(self._handshake_bytes[1]) & 0x0F
if ser_id != self._serializer.RAWSOCKET_SERIALIZER_ID:
if self.factory.debug:
log.msg("WampRawSocketProtocol: opening handshake - no suitable serializer found (server replied {0}, and we requested {1})".format(ser_id, self._serializer.RAWSOCKET_SERIALIZER_ID))
self.log.info("WampRawSocketProtocol: opening handshake - no suitable serializer found (server replied {0}, and we requested {1})".format(ser_id, self._serializer.RAWSOCKET_SERIALIZER_ID))
self.abort()

self._handshake_complete = True

self._on_handshake_complete()

if self.factory.debug:
log.msg("WampRawSocketProtocol: opening handshake completed", self._serializer)
self.log.info("WampRawSocketProtocol: opening handshake completed", self._serializer)

# consume any remaining data received already ..
#
Expand Down
16 changes: 8 additions & 8 deletions autobahn/twisted/wamp.py
Expand Up @@ -30,7 +30,6 @@

import six

from twisted.python import log
from twisted.internet.defer import inlineCallbacks

from autobahn.wamp import protocol
Expand Down Expand Up @@ -67,11 +66,9 @@ def onUserError(self, e, msg):
"""
Override of wamp.ApplicationSession
"""
# see docs; will print currently-active exception to the logs,
# which is just what we want.
log.err(e)
self.log.error(txaio.failure_format_traceback(txaio.create_future_error(e)))
# also log the framework-provided error-message
log.err(msg)
self.log.error(msg)


class ApplicationSessionFactory(protocol.ApplicationSessionFactory):
Expand Down Expand Up @@ -168,7 +165,10 @@ def run(self, make, start_reactor=True):

isSecure, host, port, resource, path, params = parseWsUrl(self.url)

txaio.start_logging(level='debug')
if self.debug or self.debug_wamp or self.debug_app:
txaio.start_logging(level='debug')
else:
txaio.start_logging(level='info')

# factory for use ApplicationSession
def create():
Expand All @@ -178,7 +178,7 @@ def create():
except Exception as e:
if start_reactor:
# the app component could not be created .. fatal
log.err(str(e))
self.log.error(str(e))
reactor.stop()
else:
# if we didn't start the reactor, it's up to the
Expand Down Expand Up @@ -526,7 +526,7 @@ def _fire_signal(self, name, *args, **kwargs):
yield handler(*args, **kwargs)
except Exception as e:
# FIXME
log.msg("Warning: exception in signal handler swallowed", e)
self.log.info("Warning: exception in signal handler swallowed", e)


if service:
Expand Down
14 changes: 4 additions & 10 deletions autobahn/wamp/protocol.py
Expand Up @@ -520,8 +520,8 @@ def onUserError(self, e, msg):
:param msg: an informative message from the library. It is
suggested you log this immediately after the exception.
"""
traceback.print_exc()
print(msg)
self.log.error(txaio.failure_format_traceback(txaio.create_future_error(e)))
self.log.error(msg)

def _swallow_error(self, fail, msg):
'''
Expand All @@ -535,7 +535,6 @@ def _swallow_error(self, fail, msg):
chain for a Deferred/coroutine that will make it out to user
code.
'''
# print("_swallow_error", typ, exc, tb)
try:
self.onUserError(fail.value, msg)
except:
Expand All @@ -546,7 +545,6 @@ def onMessage(self, msg):
"""
Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onMessage`
"""
print("BLAM", id(self.log), type(self.log))
self.log.debug("onMessage: {message}", session_id=self._session_id, message=msg)
self.log.trace("onMessage: {message}", session_id=self._session_id, message=msg)
if self._session_id is None:
Expand Down Expand Up @@ -814,11 +812,7 @@ def error(err):
pass
formatted_tb = None
if self.traceback_app:
# if asked to marshal the traceback within the WAMP error message, extract it
# noinspection PyCallingNonCallable
tb = StringIO()
err.printTraceback(file=tb)
formatted_tb = tb.getvalue().splitlines()
formatted_tb = txaio.failure_format_traceback(err)

del self._invocations[msg.request]

Expand Down Expand Up @@ -972,7 +966,7 @@ def onLeave(self, details):
Implements :func:`autobahn.wamp.interfaces.ISession.onLeave`
"""
if details.reason.startswith('wamp.error.'):
print('{error}: {message}'.format(error=details.reason, message=details.message))
self.log.error('{reason}: {message}', reason=details.reason, message=details.message)
if self._transport:
self.disconnect()
# do we ever call onLeave with a valid transport?
Expand Down
61 changes: 29 additions & 32 deletions autobahn/wamp/test/test_protocol.py
Expand Up @@ -32,7 +32,6 @@

from twisted.internet.defer import inlineCallbacks, Deferred, returnValue
from twisted.internet.defer import succeed, DeferredList
from twisted.python import log
from twisted.trial import unittest
from six import PY3

Expand Down Expand Up @@ -463,37 +462,28 @@ def test_publish_callback_exception(self):
error_instance = RuntimeError("we have a problem")
got_err_d = Deferred()

def observer(kw):
if kw['isError'] and 'failure' in kw:
fail = kw['failure']
fail.trap(RuntimeError)
if error_instance == fail.value:
got_err_d.callback(True)
log.addObserver(observer)
def observer(e, msg):
if error_instance == e:
got_err_d.callback(True)
handler.onUserError = observer

def boom():
raise error_instance

try:
sub = yield handler.subscribe(boom, u'com.myapp.topic1')

# MockTransport gives us the ack reply and then we do our
# own event message
publish = yield handler.publish(
u'com.myapp.topic1',
options=types.PublishOptions(acknowledge=True, exclude_me=False),
)
msg = message.Event(sub.id, publish.id)
handler.onMessage(msg)
sub = yield handler.subscribe(boom, u'com.myapp.topic1')

# we know it worked if our observer worked and did
# .callback on our Deferred above.
self.assertTrue(got_err_d.called)
# ...otherwise trial will fail the test anyway
self.flushLoggedErrors()
# MockTransport gives us the ack reply and then we do our
# own event message
publish = yield handler.publish(
u'com.myapp.topic1',
options=types.PublishOptions(acknowledge=True, exclude_me=False),
)
msg = message.Event(sub.id, publish.id)
handler.onMessage(msg)

finally:
log.removeObserver(observer)
# we know it worked if our observer worked and did
# .callback on our Deferred above.
self.assertTrue(got_err_d.called)

@inlineCallbacks
def test_unsubscribe(self):
Expand Down Expand Up @@ -598,6 +588,10 @@ def test_invoke_user_raises(self):
handler = ApplicationSession()
handler.traceback_app = True
MockTransport(handler)
errors = []
def log_error(e, msg):
errors.append((e, msg))
handler.onUserError = log_error

name_error = NameError('foo')

Expand All @@ -618,9 +612,8 @@ def bing():

# also, we should have logged the real NameError to
# Twisted.
errs = self.flushLoggedErrors()
self.assertEqual(1, len(errs))
self.assertEqual(name_error, errs[0].value)
self.assertEqual(1, len(errors))
self.assertEqual(name_error, errors[0][0].value)

@inlineCallbacks
def test_invoke_progressive_result(self):
Expand Down Expand Up @@ -674,6 +667,10 @@ def bing(arg, details=None, key=None):

got_progress = Deferred()
progress_error = NameError('foo')
logged_errors = []
def got_error(e, msg):
logged_errors.append((e, msg))
handler.onUserError = got_error

def progress(arg, something=None):
self.assertEqual('nothing', something)
Expand All @@ -693,15 +690,15 @@ def progress(arg, something=None):
options=types.CallOptions(on_progress=progress),
key='word',
)

self.assertEqual(42, res)
# our progress handler raised an error, but not before
# recording success.
self.assertTrue(got_progress.called)
self.assertEqual('life', got_progress.result)
# make sure our progress-handler error was logged
errs = self.flushLoggedErrors()
self.assertEqual(1, len(errs))
self.assertEqual(progress_error, errs[0].value)
self.assertEqual(1, len(logged_errors))
self.assertEqual(progress_error, logged_errors[0][0])

@inlineCallbacks
def test_invoke_progressive_result_no_args(self):
Expand Down

0 comments on commit e09df6d

Please sign in to comment.