diff --git a/autobahn/twisted/rawsocket.py b/autobahn/twisted/rawsocket.py index 56458d087..962181132 100644 --- a/autobahn/twisted/rawsocket.py +++ b/autobahn/twisted/rawsocket.py @@ -28,7 +28,6 @@ import binascii -from twisted.python import log from twisted.internet.protocol import Factory from twisted.protocols.basic import Int32StringReceiver from twisted.internet.error import ConnectionDone @@ -36,6 +35,8 @@ from autobahn.twisted.util import peer2str from autobahn.wamp.exception import ProtocolError, SerializationError, TransportLost +import txaio + __all__ = ( 'WampRawSocketServerProtocol', 'WampRawSocketClientProtocol', @@ -48,10 +49,11 @@ class WampRawSocketProtocol(Int32StringReceiver): """ Base class for Twisted-based WAMP-over-RawSocket protocols. """ + log = txaio.make_logger() def connectionMade(self): if self.factory.debug: - log.msg("WampRawSocketProtocol: connection made") + self.log.debug("WampRawSocketProtocol: connection made") # the peer we are connected to # @@ -92,42 +94,42 @@ def _on_handshake_complete(self): except Exception as e: # Exceptions raised in onOpen are fatal .. if self.factory.debug: - log.msg("WampRawSocketProtocol: ApplicationSession constructor / onOpen raised ({0})".format(e)) + self.log.info("WampRawSocketProtocol: ApplicationSession constructor / onOpen raised ({0})".format(e)) self.abort() else: if self.factory.debug: - log.msg("ApplicationSession started.") + self.log.info("ApplicationSession started.") def connectionLost(self, reason): if self.factory.debug: - log.msg("WampRawSocketProtocol: connection lost: reason = '{0}'".format(reason)) + self.log.info("WampRawSocketProtocol: connection lost: reason = '{0}'".format(reason)) try: wasClean = isinstance(reason.value, ConnectionDone) self._session.onClose(wasClean) except Exception as e: # silently ignore exceptions raised here .. if self.factory.debug: - log.msg("WampRawSocketProtocol: ApplicationSession.onClose raised ({0})".format(e)) + self.log.info("WampRawSocketProtocol: ApplicationSession.onClose raised ({0})".format(e)) self._session = None def stringReceived(self, payload): if self.factory.debug: - log.msg("WampRawSocketProtocol: RX octets: {0}".format(binascii.hexlify(payload))) + self.log.info("WampRawSocketProtocol: RX octets: {0}".format(binascii.hexlify(payload))) try: for msg in self._serializer.unserialize(payload): if self.factory.debug: - log.msg("WampRawSocketProtocol: RX WAMP message: {0}".format(msg)) + self.log.info("WampRawSocketProtocol: RX WAMP message: {0}".format(msg)) self._session.onMessage(msg) except ProtocolError as e: - log.msg(str(e)) + self.log.info(str(e)) if self.factory.debug: - log.msg("WampRawSocketProtocol: WAMP Protocol Error ({0}) - aborting connection".format(e)) + self.log.info("WampRawSocketProtocol: WAMP Protocol Error ({0}) - aborting connection".format(e)) self.abort() except Exception as e: if self.factory.debug: - log.msg("WampRawSocketProtocol: WAMP Internal Error ({0}) - aborting connection".format(e)) + self.log.info("WampRawSocketProtocol: WAMP Internal Error ({0}) - aborting connection".format(e)) self.abort() def send(self, msg): @@ -136,7 +138,7 @@ def send(self, msg): """ if self.isOpen(): if self.factory.debug: - log.msg("WampRawSocketProtocol: TX WAMP message: {0}".format(msg)) + self.log.info("WampRawSocketProtocol: TX WAMP message: {0}".format(msg)) try: payload, _ = self._serializer.serialize(msg) except Exception as e: @@ -145,7 +147,7 @@ def send(self, msg): else: self.sendString(payload) if self.factory.debug: - log.msg("WampRawSocketProtocol: TX octets: {0}".format(binascii.hexlify(payload))) + self.log.info("WampRawSocketProtocol: TX octets: {0}".format(binascii.hexlify(payload))) else: raise TransportLost() @@ -194,18 +196,18 @@ def dataReceived(self, data): if len(self._handshake_bytes) == 4: if self.factory.debug: - log.msg("WampRawSocketProtocol: opening handshake received - {0}".format(binascii.b2a_hex(self._handshake_bytes))) + self.log.info("WampRawSocketProtocol: opening handshake received - {0}".format(binascii.b2a_hex(self._handshake_bytes))) if ord(self._handshake_bytes[0]) != 0x7f: if self.factory.debug: - log.msg("WampRawSocketProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{0}, but expected 0x7f".format(binascii.b2a_hex(self._handshake_bytes[0]))) + self.log.info("WampRawSocketProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{0}, but expected 0x7f".format(binascii.b2a_hex(self._handshake_bytes[0]))) self.abort() # peer requests us to send messages of maximum length 2**max_len_exp # self._max_len_send = 2 ** (9 + (ord(self._handshake_bytes[1]) >> 4)) if self.factory.debug: - log.msg("WampRawSocketProtocol: client requests us to send out most {} bytes per message".format(self._max_len_send)) + self.log.info("WampRawSocketProtocol: client requests us to send out most {} bytes per message".format(self._max_len_send)) # client wants to speak this serialization format # @@ -213,10 +215,10 @@ def dataReceived(self, data): if ser_id in self.factory._serializers: self._serializer = self.factory._serializers[ser_id] if self.factory.debug: - log.msg("WampRawSocketProtocol: client wants to use serializer {}".format(ser_id)) + self.log.info("WampRawSocketProtocol: client wants to use serializer {}".format(ser_id)) else: if self.factory.debug: - log.msg("WampRawSocketProtocol: opening handshake - no suitable serializer found (client requested {0}, and we have {1})".format(ser_id, self.factory._serializers.keys())) + self.log.info("WampRawSocketProtocol: opening handshake - no suitable serializer found (client requested {0}, and we have {1})".format(ser_id, self.factory._serializers.keys())) self.abort() # we request the peer to send message of maximum length 2**reply_max_len_exp @@ -235,7 +237,7 @@ def dataReceived(self, data): self._on_handshake_complete() if self.factory.debug: - log.msg("WampRawSocketProtocol: opening handshake completed", self._serializer) + self.log.info("WampRawSocketProtocol: opening handshake completed", self._serializer) # consume any remaining data received already .. # @@ -275,25 +277,25 @@ def dataReceived(self, data): if len(self._handshake_bytes) == 4: if self.factory.debug: - log.msg("WampRawSocketProtocol: opening handshake received - {0}".format(binascii.b2a_hex(self._handshake_bytes))) + self.log.info("WampRawSocketProtocol: opening handshake received - {0}".format(binascii.b2a_hex(self._handshake_bytes))) if ord(self._handshake_bytes[0]) != 0x7f: if self.factory.debug: - log.msg("WampRawSocketProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{0}, but expected 0x7f".format(binascii.b2a_hex(self._handshake_bytes[0]))) + self.log.info("WampRawSocketProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{0}, but expected 0x7f".format(binascii.b2a_hex(self._handshake_bytes[0]))) self.abort() # peer requests us to send messages of maximum length 2**max_len_exp # self._max_len_send = 2 ** (9 + (ord(self._handshake_bytes[1]) >> 4)) if self.factory.debug: - log.msg("WampRawSocketProtocol: server requests us to send out most {} bytes per message".format(self._max_len_send)) + self.log.info("WampRawSocketProtocol: server requests us to send out most {} bytes per message".format(self._max_len_send)) # client wants to speak this serialization format # ser_id = ord(self._handshake_bytes[1]) & 0x0F if ser_id != self._serializer.RAWSOCKET_SERIALIZER_ID: if self.factory.debug: - log.msg("WampRawSocketProtocol: opening handshake - no suitable serializer found (server replied {0}, and we requested {1})".format(ser_id, self._serializer.RAWSOCKET_SERIALIZER_ID)) + self.log.info("WampRawSocketProtocol: opening handshake - no suitable serializer found (server replied {0}, and we requested {1})".format(ser_id, self._serializer.RAWSOCKET_SERIALIZER_ID)) self.abort() self._handshake_complete = True @@ -301,7 +303,7 @@ def dataReceived(self, data): self._on_handshake_complete() if self.factory.debug: - log.msg("WampRawSocketProtocol: opening handshake completed", self._serializer) + self.log.info("WampRawSocketProtocol: opening handshake completed", self._serializer) # consume any remaining data received already .. # diff --git a/autobahn/twisted/wamp.py b/autobahn/twisted/wamp.py index bcf4126f1..e47cc2706 100644 --- a/autobahn/twisted/wamp.py +++ b/autobahn/twisted/wamp.py @@ -30,7 +30,6 @@ import six -from twisted.python import log from twisted.internet.defer import inlineCallbacks from autobahn.wamp import protocol @@ -67,11 +66,9 @@ def onUserError(self, e, msg): """ Override of wamp.ApplicationSession """ - # see docs; will print currently-active exception to the logs, - # which is just what we want. - log.err(e) + self.log.error(txaio.failure_format_traceback(txaio.create_future_error(e))) # also log the framework-provided error-message - log.err(msg) + self.log.error(msg) class ApplicationSessionFactory(protocol.ApplicationSessionFactory): @@ -168,7 +165,10 @@ def run(self, make, start_reactor=True): isSecure, host, port, resource, path, params = parseWsUrl(self.url) - txaio.start_logging(level='debug') + if self.debug or self.debug_wamp or self.debug_app: + txaio.start_logging(level='debug') + else: + txaio.start_logging(level='info') # factory for use ApplicationSession def create(): @@ -178,7 +178,7 @@ def create(): except Exception as e: if start_reactor: # the app component could not be created .. fatal - log.err(str(e)) + self.log.error(str(e)) reactor.stop() else: # if we didn't start the reactor, it's up to the @@ -526,7 +526,7 @@ def _fire_signal(self, name, *args, **kwargs): yield handler(*args, **kwargs) except Exception as e: # FIXME - log.msg("Warning: exception in signal handler swallowed", e) + self.log.info("Warning: exception in signal handler swallowed", e) if service: diff --git a/autobahn/wamp/protocol.py b/autobahn/wamp/protocol.py index a97f17699..186784fdf 100644 --- a/autobahn/wamp/protocol.py +++ b/autobahn/wamp/protocol.py @@ -520,8 +520,8 @@ def onUserError(self, e, msg): :param msg: an informative message from the library. It is suggested you log this immediately after the exception. """ - traceback.print_exc() - print(msg) + self.log.error(txaio.failure_format_traceback(txaio.create_future_error(e))) + self.log.error(msg) def _swallow_error(self, fail, msg): ''' @@ -535,7 +535,6 @@ def _swallow_error(self, fail, msg): chain for a Deferred/coroutine that will make it out to user code. ''' - # print("_swallow_error", typ, exc, tb) try: self.onUserError(fail.value, msg) except: @@ -546,7 +545,6 @@ def onMessage(self, msg): """ Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onMessage` """ - print("BLAM", id(self.log), type(self.log)) self.log.debug("onMessage: {message}", session_id=self._session_id, message=msg) self.log.trace("onMessage: {message}", session_id=self._session_id, message=msg) if self._session_id is None: @@ -814,11 +812,7 @@ def error(err): pass formatted_tb = None if self.traceback_app: - # if asked to marshal the traceback within the WAMP error message, extract it - # noinspection PyCallingNonCallable - tb = StringIO() - err.printTraceback(file=tb) - formatted_tb = tb.getvalue().splitlines() + formatted_tb = txaio.failure_format_traceback(err) del self._invocations[msg.request] @@ -972,7 +966,7 @@ def onLeave(self, details): Implements :func:`autobahn.wamp.interfaces.ISession.onLeave` """ if details.reason.startswith('wamp.error.'): - print('{error}: {message}'.format(error=details.reason, message=details.message)) + self.log.error('{reason}: {message}', reason=details.reason, message=details.message) if self._transport: self.disconnect() # do we ever call onLeave with a valid transport? diff --git a/autobahn/wamp/test/test_protocol.py b/autobahn/wamp/test/test_protocol.py index 9cd081afc..8d4dd6823 100644 --- a/autobahn/wamp/test/test_protocol.py +++ b/autobahn/wamp/test/test_protocol.py @@ -32,7 +32,6 @@ from twisted.internet.defer import inlineCallbacks, Deferred, returnValue from twisted.internet.defer import succeed, DeferredList - from twisted.python import log from twisted.trial import unittest from six import PY3 @@ -463,37 +462,28 @@ def test_publish_callback_exception(self): error_instance = RuntimeError("we have a problem") got_err_d = Deferred() - def observer(kw): - if kw['isError'] and 'failure' in kw: - fail = kw['failure'] - fail.trap(RuntimeError) - if error_instance == fail.value: - got_err_d.callback(True) - log.addObserver(observer) + def observer(e, msg): + if error_instance == e: + got_err_d.callback(True) + handler.onUserError = observer def boom(): raise error_instance - try: - sub = yield handler.subscribe(boom, u'com.myapp.topic1') - - # MockTransport gives us the ack reply and then we do our - # own event message - publish = yield handler.publish( - u'com.myapp.topic1', - options=types.PublishOptions(acknowledge=True, exclude_me=False), - ) - msg = message.Event(sub.id, publish.id) - handler.onMessage(msg) + sub = yield handler.subscribe(boom, u'com.myapp.topic1') - # we know it worked if our observer worked and did - # .callback on our Deferred above. - self.assertTrue(got_err_d.called) - # ...otherwise trial will fail the test anyway - self.flushLoggedErrors() + # MockTransport gives us the ack reply and then we do our + # own event message + publish = yield handler.publish( + u'com.myapp.topic1', + options=types.PublishOptions(acknowledge=True, exclude_me=False), + ) + msg = message.Event(sub.id, publish.id) + handler.onMessage(msg) - finally: - log.removeObserver(observer) + # we know it worked if our observer worked and did + # .callback on our Deferred above. + self.assertTrue(got_err_d.called) @inlineCallbacks def test_unsubscribe(self): @@ -598,6 +588,10 @@ def test_invoke_user_raises(self): handler = ApplicationSession() handler.traceback_app = True MockTransport(handler) + errors = [] + def log_error(e, msg): + errors.append((e, msg)) + handler.onUserError = log_error name_error = NameError('foo') @@ -618,9 +612,8 @@ def bing(): # also, we should have logged the real NameError to # Twisted. - errs = self.flushLoggedErrors() - self.assertEqual(1, len(errs)) - self.assertEqual(name_error, errs[0].value) + self.assertEqual(1, len(errors)) + self.assertEqual(name_error, errors[0][0].value) @inlineCallbacks def test_invoke_progressive_result(self): @@ -674,6 +667,10 @@ def bing(arg, details=None, key=None): got_progress = Deferred() progress_error = NameError('foo') + logged_errors = [] + def got_error(e, msg): + logged_errors.append((e, msg)) + handler.onUserError = got_error def progress(arg, something=None): self.assertEqual('nothing', something) @@ -693,15 +690,15 @@ def progress(arg, something=None): options=types.CallOptions(on_progress=progress), key='word', ) + self.assertEqual(42, res) # our progress handler raised an error, but not before # recording success. self.assertTrue(got_progress.called) self.assertEqual('life', got_progress.result) # make sure our progress-handler error was logged - errs = self.flushLoggedErrors() - self.assertEqual(1, len(errs)) - self.assertEqual(progress_error, errs[0].value) + self.assertEqual(1, len(logged_errors)) + self.assertEqual(progress_error, logged_errors[0][0]) @inlineCallbacks def test_invoke_progressive_result_no_args(self): diff --git a/autobahn/wamp/test/test_runner.py b/autobahn/wamp/test/test_runner.py index 31b2e3e07..c51bbebee 100644 --- a/autobahn/wamp/test/test_runner.py +++ b/autobahn/wamp/test/test_runner.py @@ -28,6 +28,7 @@ import os import unittest2 as unittest +import txaio if os.environ.get('USE_TWISTED', False): from mock import patch diff --git a/tox.ini b/tox.ini index d4697b532..2137f0c58 100644 --- a/tox.ini +++ b/tox.ini @@ -18,7 +18,7 @@ deps = unittest2 coverage msgpack-python - ../txaio + git+https://github.com/meejah/txaio@issue8-squash ; twisted dependencies twtrunk: https://github.com/twisted/twisted/archive/trunk.zip