Skip to content

fix: prevent MQTT reconnection storm from stale interrupt callbacks#87

Merged
eman merged 3 commits into
mainfrom
fix/mqtt-reconnection-storm
May 18, 2026
Merged

fix: prevent MQTT reconnection storm from stale interrupt callbacks#87
eman merged 3 commits into
mainfrom
fix/mqtt-reconnection-storm

Conversation

@eman
Copy link
Copy Markdown
Owner

@eman eman commented May 18, 2026

Problem

Running the Home Assistant integration against a real device produces a thundering-herd of simultaneous reconnection attempts visible in the logs as dozens of lines like:

Triggering quick reconnection...
Attempting active reconnection...
Recreating MQTT connection...

all firing within milliseconds of each other. Reconnection attempt counters jump non-monotonically (e.g. 1 → 2 → 5 → 3 → …), confirming that multiple independent _reconnect_with_backoff loops are running concurrently. Each loop tears down the connection the previous one just established, producing an endless cycle.

Root causes

Bug 1 – stale interruption events fire after a resume clears _reconnect_task

The AWS SDK fires on_connection_interrupted from background threads. These callbacks call run_coroutine_threadsafe, which queues a _start_reconnect_task coroutine into the event loop.

When the connection recovers, on_connection_resumed cancels the backoff task and sets _reconnect_task = None. Any _start_reconnect_task coroutines that were already queued but hadn't yet executed now run, see _reconnect_task is None, pass the existing guard, and spawn a new backoff loop against an already-healthy connection.

Fix: both on_connection_interrupted and _start_reconnect_task now call is_connected_func() and return immediately if the client is connected.

Bug 2 – closing the old connection inside _active_reconnect triggers a competing backoff loop

_active_reconnect calls connection_manager.close() on the old connection. The SDK fires _on_connection_interrupted_internal from a background thread for that closure, which queues another _start_reconnect_task coroutine. That coroutine fires after the new connection is up and the existing backoff task has been cancelled — tearing down the brand-new connection immediately.

Fix: a boolean flag _actively_reconnecting is set to True for the duration of _active_reconnect and _deep_reconnect (cleared in finally so it is always reset). _on_connection_interrupted_internal skips the reconnection-handler delegation while the flag is set. Both reconnect methods also return immediately if called while the flag is already True, making them safe against concurrent invocations.

Changes

File Change
mqtt/reconnection.py Add is_connected_func() guard in on_connection_interrupted and _start_reconnect_task
mqtt/client.py Add _actively_reconnecting flag; set/clear in _active_reconnect and _deep_reconnect; check in _on_connection_interrupted_internal
tests/test_mqtt_reconnection_storm.py 14 new tests covering both bugs (all pass)

Tests

14 passed in 0.50s

Two race conditions caused a thundering-herd loop of simultaneous
reconnection attempts (visible as dozens of 'Triggering quick
reconnection...' lines within milliseconds in the logs).

Bug 1 – stale interruption events fire after a resume clears
_reconnect_task.

The AWS SDK fires on_connection_interrupted from background threads via
run_coroutine_threadsafe.  When on_connection_resumed cancels the backoff
task and sets _reconnect_task = None, queued _start_reconnect_task
coroutines that haven't yet executed see _reconnect_task=None and pass
the existing task-existence guard, spawning a new _reconnect_with_backoff
loop against an already-healthy connection.

Fix: both on_connection_interrupted and _start_reconnect_task now call
is_connected_func() and bail out immediately if the client is connected.

Bug 2 – closing the old connection inside _active_reconnect triggers a
competing backoff loop.

_active_reconnect calls connection_manager.close() on the old connection.
The SDK fires _on_connection_interrupted_internal from a background thread,
which schedules a _start_reconnect_task coroutine.  That coroutine fires
after the new connection succeeds and the existing task has been cancelled,
tearing down the brand-new connection immediately.

Fix: a boolean flag _actively_reconnecting is set to True for the
duration of _active_reconnect and _deep_reconnect (cleared in a finally
block so it is always reset).  _on_connection_interrupted_internal checks
the flag and skips the reconnection-handler delegation while it is set.
Both methods also return immediately if called while the flag is already
True, making them safe against concurrent invocations.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes an MQTT reconnection storm caused by two distinct races: (1) stale interrupt callbacks queued via run_coroutine_threadsafe that fired after the connection had already been restored, and (2) the intentional teardown of the old connection inside _active_reconnect / _deep_reconnect triggering a competing backoff loop via the SDK's interrupt callback.

Changes:

  • Add is_connected_func() guard in MqttReconnectionHandler.on_connection_interrupted and _start_reconnect_task so stale interrupt events become no-ops when the client is healthy.
  • Add _actively_reconnecting re-entrancy flag in NavienMqttClient, set/cleared (in finally) around _active_reconnect and _deep_reconnect, and checked in _on_connection_interrupted_internal to suppress the spurious delegation.
  • Add 14 unit tests covering both bugs and edge cases (success, exception paths, re-entrancy, concurrent interrupts).

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
src/nwp500/mqtt/reconnection.py Adds is_connected_func() checks in on_connection_interrupted and _start_reconnect_task to ignore stale interrupt events after recovery.
src/nwp500/mqtt/client.py Introduces _actively_reconnecting flag; sets/clears around _active_reconnect/_deep_reconnect and uses it to suppress forwarding in _on_connection_interrupted_internal; also guards both reconnect methods from concurrent invocation.
tests/test_mqtt_reconnection_storm.py New tests verifying both fixes, including stale-interrupt-after-resume, multi-interrupt deduplication, flag set/clear on success and on exception, and re-entrancy safety.

eman and others added 2 commits May 18, 2026 13:02
- Wrap long comment in _active_reconnect (E501)
- Sort/format import block in test file (I001, fixed by ruff --fix)
- Remove quoted type annotation in _make_mqtt_client (UP037, fixed by ruff --fix)
- Shorten docstrings and assert messages over 80 chars (E501)
- Rename test_on_connection_interrupted_internal_skips_handler_when_flag_set
  to test_interrupted_internal_skips_handler_when_flag_set (E501 w/ noqa)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@eman eman merged commit 231624f into main May 18, 2026
7 checks passed
@eman eman deleted the fix/mqtt-reconnection-storm branch May 18, 2026 20:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants