Skip to content
This repository has been archived by the owner on Jan 14, 2024. It is now read-only.

Commit

Permalink
stream: discard any pending incoming stanzas when the stream is stopped
Browse files Browse the repository at this point in the history
This fixes an ugly corner case and a potential for SM counters going out
of sync. If stanzas are in the incoming queue when the broker task of
the StanzaStream gets terminated, they are put back into the incoming
queue.

However, if the stream was actually covered by stream management, this
will cause incorrect counters in the following edge case:

1. Stanzas arrive and are put in the incoming queue
2. The broker is terminated (because the underlying TCP stream gets
   killed, for instance, and on_closing of the xmlstream emits, which
   causes stop() to be called).
3. The incoming queue now has >0 elements, and the SM counter is set to
   the last element processed, *not including the incoming queue*.
4. The stream is re-established and SM is resumed. Based on the previous
   counter value, the server will now re-send all stanzas still in the
   incoming queue.
5. The broker task starts processing inbound stanzas, starting with
   those still in the incoming queue from the initial termination of the
   stream.

Those have *also* been re-sent by definition by the server, so we'll now
see, process and count those stanzas twice, causing incorrect stream
management counters.

This is not a common case during normal operations; typically, when an
SM stream gets interrupted, it is due to a network going down or
somesuch, which gives the broker task plenty of time to clear its
inbound queue before it gets terminated.
  • Loading branch information
horazont committed Dec 14, 2021
1 parent fff425f commit 7d6a099
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 21 deletions.
39 changes: 37 additions & 2 deletions aioxmpp/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,7 @@ def flush_incoming(self):
It is legal (but pretty useless) to call this method while the stream
is :attr:`running`.
"""
self._logger.debug("flushing incoming queue")
while True:
try:
stanza_obj = self._incoming_queue.get_nowait()
Expand Down Expand Up @@ -1972,6 +1973,19 @@ async def close(self):
if self.sm_enabled:
self.stop_sm()

def _drain_incoming(self):
"""
Drain the incoming queue **without** processing any contents.
"""
self._logger.debug("draining incoming queue")
while True:
# this cannot loop for infinity because we do not yield control
# and the queue cannot be filled across threads.
try:
self._incoming_queue.get_nowait()
except asyncio.QueueEmpty:
break

async def _run(self, xmlstream):
self._xmlstream = xmlstream
self._update_xmlstream_limits()
Expand Down Expand Up @@ -2010,10 +2024,31 @@ async def _run(self, xmlstream):
# caught by the calls to get()
self._logger.debug("task terminating, rescuing stanzas and "
"clearing handlers")
if incoming_fut.done() and not incoming_fut.exception():
self._incoming_queue.putleft_nowait(incoming_fut.result())
# Drain the incoming queue:
# 1. Either we have an SM-resumable stream and we'll SM-resume it,
# in which case we can reply to stanzas in here; however, then
# we'd get them re-transmitted anyway.
# 2. Or the stream is not SM-resumed, in which case it would be
# invalid to reply to anything in the incoming queue or process
# it in any way, as it may refer to old, stale state.
# Imagine an incremental roster update slipping in here and
# getting processed after a reconnect. Terrible. Let's flush
# this queue immediately.
if incoming_fut.done():
# discard
try:
incoming_fut.result()
except BaseException: # noqa
# we truly do not care, because we are going to drain the
# queue anyway. if anything is fatally wrong with the
# queue, it'll reraise there. if anything is fatally
# wrong with the event loop, it'll hit us elsewhere
# eventually. if it was successful, good, let's drop the
# stanza because we want to drain right now.
pass
else:
incoming_fut.cancel()
self._drain_incoming()

if active_fut.done() and not active_fut.exception():
self._active_queue.putleft_nowait(active_fut.result())
Expand Down
22 changes: 3 additions & 19 deletions tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,7 @@ def test_unregister_presence_callback(self):

self.assertFalse(cb.mock_calls)

def test_rescue_unprocessed_incoming_stanza_on_stop(self):
def test_discard_unprocessed_incoming_stanza_on_stop(self):
pres = make_test_presence()

self.stream.start(self.xmlstream)
Expand All @@ -1581,26 +1581,10 @@ def test_rescue_unprocessed_incoming_stanza_on_stop(self):
self.stream.recv_stanza(pres)
self.stream.stop()

self.assertEqual(
(pres, None),
run_coroutine(self.stream._incoming_queue.get())
)

def test_unprocessed_incoming_stanza_does_not_get_lost_after_stop(self):
pres = make_test_presence()

self.stream.start(self.xmlstream)

run_coroutine(asyncio.sleep(0))

self.stream.stop()

self.stream.recv_stanza(pres)

self.assertEqual(
(pres, None),
run_coroutine(self.stream._incoming_queue.get())
)
with self.assertRaises(asyncio.QueueEmpty):
self.stream._incoming_queue.get_nowait()

def test_fail_on_unknown_stanza_class(self):
caught_exc = None
Expand Down

0 comments on commit 7d6a099

Please sign in to comment.