Skip to content

fix: MQTT connection flapping + comprehensive bug fixes#86

Merged
eman merged 8 commits into
mainfrom
fix/mqtt-connection-flapping
May 17, 2026
Merged

fix: MQTT connection flapping + comprehensive bug fixes#86
eman merged 8 commits into
mainfrom
fix/mqtt-connection-flapping

Conversation

@eman
Copy link
Copy Markdown
Owner

@eman eman commented May 16, 2026

Summary

Comprehensive bug fix PR addressing the MQTT connection flapping issue plus 9 additional bugs found during a full code audit.

MQTT Connection Flapping (Original Fix)

Problem

MQTT connections flap endlessly: connect → disconnect after 1-7s → reconnect → repeat. All entities become unavailable in Home Assistant.

Root Cause

When _active_reconnect() creates a new MqttConnection, the old connection is never closed. The old AWS IoT SDK connection's built-in auto-reconnect eventually succeeds, creating two active connections with the same client ID. AWS IoT kicks one off → on_connection_interrupted fires → another reconnection → infinite loop.

Fix

  • Add MqttConnection.close() method that unconditionally tears down the SDK connection regardless of _connected state
  • _active_reconnect(): close old connection before creating replacement
  • _deep_reconnect(): use close() unconditionally instead of checking is_connected

Additional Bug Fixes (Code Audit)

Thread Safety

  • ensure_device_info_cached: Use loop.call_soon_threadsafe() for Future resolution from AWS SDK callback thread (was calling future.set_result() directly from non-asyncio thread — race condition / potential crash)

MQTT Reconnection

  • Config validation: Clamp deep_reconnect_threshold to minimum of 1 to prevent ZeroDivisionError in reconnection backoff modulo operation

Diagnostics

  • Counter bug: Increment total_reconnect_attempts on each connection drop (was always 0)
  • JSON serialization: Replace float('inf') default for shortest_session_seconds with None in to_dict() (was breaking JSON serialization)

Events

  • Future binding: Use asyncio.get_running_loop().create_future() instead of bare asyncio.Future() in wait_for()

Encoding

  • Unit-aware temp validation: build_reservation_entry() now uses correct defaults per unit system (35-65°C metric, 95-150°F US). Was hardcoded to Fahrenheit values, causing valid Celsius inputs to fail validation.
  • Malformed data handling: decode_reservation_hex() logs a warning on trailing bytes instead of silently discarding them

Periodic Requests

  • Unknown request type: Log error and stop task on unknown PeriodicRequestType instead of silently doing nothing

Cache

  • Memory leak: Purge expired entries from device_info_cache during get_all_cached() instead of only filtering results

Testing

  • All 486 tests pass (463 existing + 23 new)
  • New test files: test_mqtt_reconnection.py, test_bug_fixes.py
  • Verified MQTT stability in live Home Assistant docker container

eman and others added 2 commits May 16, 2026 14:11
When _active_reconnect() creates a new MqttConnection, the old connection
was never disconnected. The old SDK connection's built-in auto-reconnect
would eventually succeed, creating two active connections with the same
client ID. AWS IoT only allows one connection per client ID, so the broker
kicks one off, triggering on_connection_interrupted and starting another
reconnection — causing an infinite connect/disconnect loop.

Changes:
- Add MqttConnection.close() method that unconditionally tears down the
  underlying SDK connection regardless of _connected state (unlike
  disconnect() which skips when _connected is False after interruption)
- _active_reconnect(): close old connection before creating replacement
- _deep_reconnect(): use close() unconditionally instead of checking
  is_connected before calling disconnect()
- Add tests for close() method behavior

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Thread safety:
- ensure_device_info_cached: use loop.call_soon_threadsafe for Future
  resolution from AWS SDK callback thread (prevents race/crash)

MQTT reconnection:
- Clamp deep_reconnect_threshold to minimum of 1 in config validation
  to prevent ZeroDivisionError in reconnection backoff logic

Diagnostics:
- Increment total_reconnect_attempts counter on each connection drop
  (was always 0 despite reconnections occurring)
- Replace float('inf') default for shortest_session_seconds with None
  in to_dict() to prevent JSON serialization errors

Events:
- Use asyncio.get_running_loop().create_future() instead of bare
  asyncio.Future() in wait_for() for proper loop binding

Encoding:
- Make build_reservation_entry temperature validation unit-aware:
  defaults are now 35-65°C in metric mode, 95-150°F in US mode
  (was hardcoded to Fahrenheit, breaking Celsius users)
- Log warning on malformed reservation hex data with trailing bytes
  instead of silently dropping partial entries

Periodic requests:
- Log error and break on unknown PeriodicRequestType instead of
  silently doing nothing

Cache:
- Purge expired entries from device_info_cache during get_all_cached()
  instead of only filtering them from results (prevents memory leak)

All 486 tests pass (23 new tests added).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@eman eman changed the title fix: close old MQTT connection before reconnect to prevent flapping fix: MQTT connection flapping + comprehensive bug fixes May 16, 2026
eman and others added 2 commits May 16, 2026 14:28
- Remove unused imports from test_bug_fixes.py (ruff F401)
- Convert test_mqtt_reconnection.py from asyncio.get_event_loop()
  to @pytest.mark.asyncio (asyncio.get_event_loop() removed in 3.14)
- Fix import sorting in test_mqtt_reconnection.py (ruff I001)
- Shorten docstring line to fit 80-char limit (ruff E501)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses MQTT connection flapping in the MQTT client by ensuring old AWS IoT SDK connections are torn down before creating replacement connections, and bundles several additional audit-driven bug fixes across diagnostics, events, encoding, periodic requests, and caching.

Changes:

  • Add an unconditional MqttConnection.close() teardown and use it during active/deep reconnect flows to prevent competing connections with the same client ID.
  • Fix several correctness issues found during audit (thread-safe future resolution from SDK thread, JSON-safe diagnostics serialization, reconnection config validation, event-loop-bound futures).
  • Add new tests covering reconnection cleanup and the additional bug fixes.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/test_mqtt_reconnection.py New tests for MqttConnection.close() and reconnection cleanup behavior.
tests/test_bug_fixes.py New tests covering config clamping, diagnostics metrics serialization, event wait_for(), reservation encoding/decoding, and cache purge behavior.
src/nwp500/mqtt/utils.py Clamp deep_reconnect_threshold to a minimum of 1 during config initialization.
src/nwp500/mqtt/periodic.py Log and stop the periodic loop on unknown PeriodicRequestType.
src/nwp500/mqtt/diagnostics.py Make metrics JSON-friendly (infNone) and increment reconnect attempts on drops.
src/nwp500/mqtt/connection.py Introduce close() that disconnects regardless of _connected state and clears internal state to prevent SDK auto-reconnect interference.
src/nwp500/mqtt/client.py Close old connection manager before rebuilding during reconnection; make device-info caching future resolution thread-safe.
src/nwp500/events.py Bind wait_for() futures to the running event loop via get_running_loop().create_future().
src/nwp500/encoding.py Add warning on trailing bytes in reservation hex decode; make reservation temp validation unit-system aware.
src/nwp500/device_info_cache.py Purge expired entries during get_all_cached() to prevent cache growth.

Comment thread src/nwp500/mqtt/client.py Outdated
Comment thread tests/test_mqtt_reconnection.py
Comment thread src/nwp500/encoding.py
eman and others added 4 commits May 16, 2026 14:39
- client.py on_feature: move future.done() check inside call_soon_threadsafe
  callback to eliminate race between done-check and set_result across
  the SDK thread / event loop thread boundary
- tests/test_mqtt_reconnection.py: replace asyncio.Future with
  concurrent.futures.Future to match what the AWS IoT SDK actually
  returns, exercising the wrap_future() conversion path in close()
- encoding.py build_reservation_entry: read get_unit_system() once into
  a local variable instead of calling it twice

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@eman eman merged commit d6b2910 into main May 17, 2026
10 checks passed
@eman eman deleted the fix/mqtt-connection-flapping branch May 17, 2026 00:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants