From 4f270d5dcab03944860719f019f2ac6a434522b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Sch=C3=A4fer?= Date: Mon, 3 Oct 2022 12:44:38 +0200 Subject: [PATCH] Fix hard infinite loop on malformed stream management response If the server replies to a `` request with a non-conformant `` response, e.g. without a proper error condition, the `aioxmpp.node.Client` would get into a fun hard loop, until resource (memory) exhaustion eventually kills it off. We fix this by discarding SM state if we get a stream error during the attempt of resuming SM. There may be edge cases where this is overzealous, but those could be fixed once they occur (I can't find any off the top of my head). Fixes #382. --- aioxmpp/node.py | 9 ++ aioxmpp/protocol.py | 5 + docs/api/changelog.rst | 9 ++ tests/test_highlevel.py | 262 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 285 insertions(+) diff --git a/aioxmpp/node.py b/aioxmpp/node.py index e41f0f08..c5d4c8d3 100644 --- a/aioxmpp/node.py +++ b/aioxmpp/node.py @@ -976,6 +976,15 @@ async def _main_impl(self): # cancelled, this means a clean shutdown is requested await self.stream.close() raise + except errors.StreamError as exc: + self.logger.error("failed to negotiate stream features", + exc_info=True) + if self.stream.sm_enabled: + self.logger.warning( + "discarding SM state because of error during negotiation", + ) + self.stream.stop_sm() + raise finally: self.logger.info("stopping stream") self.stream.stop() diff --git a/aioxmpp/protocol.py b/aioxmpp/protocol.py index fcd63bcd..f3534853 100644 --- a/aioxmpp/protocol.py +++ b/aioxmpp/protocol.py @@ -443,6 +443,11 @@ def _rx_exception(self, exc): xso.tag_to_str((exc.ev_args[0], exc.ev_args[1])) )) from None else: + self._logger.warning( + "unhandled exception during parsing: %r " + "(see full traceback later, this will kill the stream)", + exc, + ) context = exc.__context__ or exc.__cause__ raise exc from context diff --git a/docs/api/changelog.rst b/docs/api/changelog.rst index a7f5d188..876dec95 100644 --- a/docs/api/changelog.rst +++ b/docs/api/changelog.rst @@ -5,6 +5,15 @@ Changelog .. _api-changelog-0.13: +Version 0.13.2 +============== + +* Fix hard loop on malformed server stream manegement response. + + This was reported on + `GitHub as issue #382 `_ by + `@jahrome `_. + Version 0.13.1 ============== diff --git a/tests/test_highlevel.py b/tests/test_highlevel.py index ffc20c6d..fb2b301b 100644 --- a/tests/test_highlevel.py +++ b/tests/test_highlevel.py @@ -26,6 +26,7 @@ """ import asyncio +import contextlib import unittest from datetime import timedelta @@ -33,6 +34,7 @@ import aioxmpp.structs from aioxmpp.testutils import ( + CoroutineMock, TransportMock, run_coroutine, run_coroutine_with_peer @@ -593,3 +595,263 @@ def fake_iq_constructor(*args, **kwargs): self.assertTrue(failure_fut.done()) self.assertIsInstance(failure_fut.exception(), ConnectionError) self.assertIn("timeout", str(failure_fut.exception())) + + def test_malformed_sm_failed_does_not_cause_loop(self): + import aioxmpp.protocol + import aioxmpp.stream + import aioxmpp.node + + version = (1, 0) + + async def mk_pair(): + fut = asyncio.Future() + p = aioxmpp.protocol.XMLStream( + to=TEST_PEER, + sorted_attributes=True, + features_future=fut, + ) + t = TransportMock(self, p) + await t.run_test( + [ + TransportMock.Write( + STREAM_HEADER, + response=[ + TransportMock.Receive( + PEER_STREAM_HEADER_TEMPLATE.format( + minor=version[1], + major=version[0]).encode("utf-8")), + TransportMock.Receive( + b"" + b"" + b"" + ) + ] + ), + ], + partial=True + ) + features = await fut + return t, p, features + + t, p, features = run_coroutine(mk_pair()) + + client = aioxmpp.node.Client( + local_jid=TEST_FROM, + security_layer=aioxmpp.make_security_layer( + password_provider="foobar2342", + )._replace(tls_required=False), + max_initial_attempts=None, + ) + client.backoff_start = timedelta(seconds=0.05) + + id_counter = 0 + + def autoset_id_impl(st): + nonlocal id_counter + if getattr(st, "id_", None) is None: + st.id_ = str(id_counter) + id_counter += 1 + + with contextlib.ExitStack() as stack: + connect_xmlstream = stack.enter_context( + unittest.mock.patch("aioxmpp.node.connect_xmlstream", + new=CoroutineMock()) + ) + connect_xmlstream.return_value = (None, p, features) + + autoset_id = stack.enter_context(unittest.mock.patch( + "aioxmpp.stanza.StanzaBase.autoset_id", + autospec=True, + )) + autoset_id.side_effect = autoset_id_impl + + done_future = asyncio.Future() + client.on_stream_established.connect( + done_future, + client.on_stream_established.AUTO_FUTURE, + ) + client.on_failure.connect( + done_future, + client.on_failure.AUTO_FUTURE, + ) + client.start() + + run_coroutine_with_peer( + done_future, + t.run_test( + [ + TransportMock.Write( + b'' + b'' + b'', + response=[ + TransportMock.Receive( + b'' + b'' + b'foo@bar.example/fnord' + b'' + b'' + ) + ] + ), + TransportMock.Write( + b'', + response=[ + TransportMock.Receive( + b'' + ) + ] + ), + ], + partial=True + ) + ) + + done_future = asyncio.Future() + client.on_stream_suspended.connect( + done_future, + client.on_stream_suspended.AUTO_FUTURE, + ) + client.on_failure.connect( + done_future, + client.on_failure.AUTO_FUTURE, + ) + + # XXX: we are using try/except here instead of self.assertRaises, + # because assertRaises calls traceback.clear_frames which for some + # reason not clear to me in Python 3.9 and earlier causes the + # _main() (from client._main_task) to be killed. + # + # This makes no sense, because _main_task is strongly referenced + # from client, and we can even later print client._main_task. It + # also doesn't go through __del__ of the task wrapped in + # ensure_future, nor does it call close(), so this is really weird. + # + # I suspect some bug in clear_frames itself in Python 3.9 and + # earlier, but given that we're at py 3.11 at this point and the + # use of clear_frames is hopefully rather exotic, I'm not + # inclined to debug further. + # + # Finding this out was a weekend **not** well-spent. At least I + # learnt that `traceback.print_stack` reveals useful details even + # in coroutines. + try: + run_coroutine_with_peer( + done_future, + t.run_test( + [], + stimulus=[ + TransportMock.LoseConnection( + ConnectionError("ohno"), + ) + ], + partial=True, + ) + ) + except ConnectionError: + pass + + t, p, features = run_coroutine(mk_pair()) + connect_xmlstream.return_value = (None, p, features) + + done_future = asyncio.Future() + client.on_stream_destroyed.connect( + done_future, + client.on_stream_destroyed.AUTO_FUTURE, + ) + client.on_failure.connect( + done_future, + client.on_failure.AUTO_FUTURE, + ) + + run_coroutine( + t.run_test( + [ + TransportMock.Write( + b'', + ), + ], + partial=True + ) + ) + + # we have to delay the next attempt in order to re-mock stuff, + # because the thing won't back-off in this specific condition + connect_xmlstream.side_effect = ConnectionError() + + run_coroutine( + t.run_test( + [ + TransportMock.Write( + b'Internal error while parsing XML. Client logs have more details.' + b'' + ), + TransportMock.WriteEof(), + TransportMock.Close(), + ], + stimulus=[ + TransportMock.Receive( + b'' + b'' + b"" + b"" + b"Unknown session" + b"" + b"" + b"" + ) + ], + partial=True, + ), + ) + + t, p, features = run_coroutine(mk_pair()) + connect_xmlstream.return_value = (None, p, features) + connect_xmlstream.side_effect = None + + done_future = asyncio.Future() + client.on_stream_established.connect( + done_future, + client.on_stream_established.AUTO_FUTURE, + ) + client.on_failure.connect( + done_future, + client.on_failure.AUTO_FUTURE, + ) + + run_coroutine_with_peer( + done_future, + t.run_test( + [ + TransportMock.Write( + b'' + b'' + b'fnord' + b'' + b'', + response=[ + TransportMock.Receive( + b'' + b'' + b'foo@bar.example/fnord' + b'' + b'' + ) + ] + ), + TransportMock.Write( + b'', + response=[ + TransportMock.Receive( + b'' + ) + ] + ), + ], + partial=True + ) + )