Implement two-tier MQTT reconnection strategy with unlimited retries#28
Implement two-tier MQTT reconnection strategy with unlimited retries#28
Conversation
- Remove duplicate 'Connection interrupted' logging - Change max_reconnect_attempts default from 10 to -1 (unlimited) - Implement two-tier reconnection strategy: * Quick reconnection (attempts 1-9): Fast reconnection for transient issues * Deep reconnection (every 10th attempt): Full rebuild with token refresh - Add deep_reconnect_threshold config parameter (default: 10) - Add _deep_reconnect() method that: * Cleans up old connection * Refreshes AWS credentials via token refresh API * Falls back to full re-authentication if needed * Recreates connection manager from scratch * Re-establishes all subscriptions - Add resubscribe_all() to MqttSubscriptionManager for subscription recovery - Fix reconnection to never give up permanently - Improve resilience to extended network outages and credential expiration Fixes connection recovery after AWS_ERROR_MQTT_UNEXPECTED_HANGUP errors. Previously, the client would give up after 10 attempts and never reconnect. Now it continues indefinitely with intelligent recovery strategies.
There was a problem hiding this comment.
Pull Request Overview
This PR implements a robust two-tier MQTT reconnection strategy that prevents permanent connection failures after network interruptions. The key improvement is changing from a limited retry approach (10 attempts) to unlimited retries with intelligent escalation between quick and deep reconnection methods.
Key Changes:
- Changed default
max_reconnect_attemptsfrom 10 to -1 (unlimited retries) - Implemented two-tier reconnection: quick reconnects (1-9) for transient issues, deep reconnects (every 10th attempt) for complex problems like credential expiration
- Added subscription recovery mechanism via
resubscribe_all()method to restore all subscriptions after deep reconnection
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
src/nwp500/mqtt_utils.py |
Added deep_reconnect_threshold config parameter and changed max_reconnect_attempts default to -1 |
src/nwp500/mqtt_reconnection.py |
Implemented two-tier reconnection logic with support for unlimited retries and deep reconnection triggers |
src/nwp500/mqtt_subscriptions.py |
Added resubscribe_all() method to restore all subscriptions after connection rebuild |
src/nwp500/mqtt_client.py |
Added _deep_reconnect() method for full connection rebuild and removed duplicate connection interrupted logging |
Use a set instead of list to track failed subscriptions, preventing duplicate entries when a topic has multiple handlers. This ensures the failure count in warning messages is accurate. Co-authored-by: GitHub Copilot <noreply@github.com>
Fixed Copilot AI FeedbackGood catch on the subscription failure tracking! Issue: When a topic had multiple handlers, it would be added to Fix: Changed to use a Example:
Commit: 5ef8708 |
Instead of accessing private _user_id and _password attributes directly, add proper public API to NavienAuthClient: - Add has_stored_credentials property to check if credentials are available - Add re_authenticate() method to perform fresh sign-in with stored credentials - Update mqtt_client to use new public API methods This improves encapsulation and provides a cleaner interface for credential management and re-authentication scenarios. Co-authored-by: GitHub Copilot <noreply@github.com>
Fixed Copilot AI Encapsulation FeedbackGood catch on accessing private attributes directly! Issue: The deep reconnection logic was accessing Solution: Added proper public API to New Public API
Usage in MQTT ClientBefore (accessing private attributes): if self._auth_client._user_id and self._auth_client._password:
await self._auth_client.sign_in(
self._auth_client._user_id,
self._auth_client._password,
)After (using public API): if self._auth_client.has_stored_credentials:
await self._auth_client.re_authenticate()This improves code maintainability and follows proper encapsulation practices while providing a cleaner, more intuitive API for credential management. Commit: 59e9dec |
Replace generic 'except Exception' handlers with specific exception types to improve error handling and debugging: - Import AuthenticationError and TokenRefreshError from auth module - Schedule coroutine: catch RuntimeError (event loop closed) - Connection callbacks: catch TypeError, AttributeError - Active reconnection: catch AwsCrtError, AuthenticationError, RuntimeError - Deep reconnection cleanup: catch AwsCrtError, RuntimeError - Token refresh: catch TokenRefreshError, ValueError, AuthenticationError - Deep reconnection: catch AwsCrtError, AuthenticationError, RuntimeError, ValueError - Connect: catch AwsCrtError, AuthenticationError, RuntimeError, ValueError - Disconnect: catch AwsCrtError, RuntimeError - Message processing: catch AttributeError, KeyError, TypeError - Publish: catch AwsCrtError specifically, add exception chaining This makes error handling more explicit and prevents masking unexpected exceptions that should propagate. Improves debuggability by catching only expected exception types.
Improved Exception HandlingReplaced all catch-all ChangesBefore: 10 generic After: Specific exception types for each context:
Benefits✅ Better debugging: Unexpected exceptions now propagate instead of being hidden Commit: 2807e1a |
Systematically replaced generic 'except Exception' handlers throughout the MQTT codebase with specific exception types: **mqtt_reconnection.py:** - Import AwsCrtError - Quick/deep reconnect: catch AwsCrtError, RuntimeError, ValueError - Reconnection loop: catch AwsCrtError, RuntimeError - Event emission: catch TypeError, RuntimeError **mqtt_connection.py:** - Import AwsCrtError - Connect: catch AwsCrtError, RuntimeError, ValueError - Disconnect: catch AwsCrtError, RuntimeError **mqtt_subscriptions.py:** - Import AwsCrtError - Message handlers: catch TypeError, AttributeError, KeyError - Message processing: catch AttributeError, KeyError, TypeError - Subscribe/unsubscribe: catch AwsCrtError, RuntimeError - Resubscribe: catch AwsCrtError, RuntimeError - Status parsing: catch TypeError, AttributeError (KeyError already caught) - State change detection: catch TypeError, AttributeError, RuntimeError - Feature parsing: catch TypeError, AttributeError (KeyError already caught) - Energy usage: catch TypeError, ValueError, AttributeError **mqtt_periodic.py:** - Periodic requests: catch AwsCrtError, RuntimeError with specific handling for clean session cancellation **events.py:** - Event handlers: catch TypeError, RuntimeError, AttributeError **Copilot instructions:** - Added exception handling guidelines with common exception types - Emphasize catching only expected exceptions This improves debuggability by allowing unexpected exceptions to propagate instead of being silently caught. Makes error handling explicit and type-safe.
Complete Exception Handling OverhaulFixed all catch-all exception handlers throughout the MQTT codebase. Files Fixed
Exception Type MappingAll
Updated DocumentationAdded exception handling guidelines to
Benefits✅ Debugging: Unexpected exceptions now propagate with full stack traces Total handlers fixed: 25 catch-all exception handlers → specific exception types Commit: 5f24c5b |
Restore catch-all exception handler in EventEmitter.emit() because this is a legitimate use case where resilience is required: - User-provided callbacks can raise any exception type - One bad callback should not break other callbacks - Event emitter pattern requires fault isolation - Added detailed comment explaining why catch-all is appropriate here This is one of the rare cases where 'except Exception' is the correct approach, as we need to protect the event system from arbitrary user code. Fixes test_event_error_handling test that validates this behavior.
src/nwp500/mqtt_subscriptions.py
Outdated
| f"Failed to re-subscribe to " | ||
| f"'{redact_topic(topic)}': {e}" | ||
| ) | ||
| topic_failed = True |
There was a problem hiding this comment.
The topic_failed flag doesn't prevent subsequent handlers from attempting to subscribe to the same failed topic. Consider breaking out of the loop after the first failure or logging once per topic rather than once per handler.
| topic_failed = True | |
| topic_failed = True | |
| break |
src/nwp500/mqtt_client.py
Outdated
| isinstance(e, AwsCrtError) | ||
| and e.name == "AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION" | ||
| ): | ||
| if e.name == "AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION": |
There was a problem hiding this comment.
The AwsCrtError.name attribute may not always exist or could be None. Add a check using hasattr(e, 'name') or handle AttributeError to prevent potential crashes when checking the error name.
| if e.name == "AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION": | |
| if getattr(e, "name", None) == "AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION": |
src/nwp500/mqtt_client.py
Outdated
| if ( | ||
| self._auth_client.current_tokens | ||
| and self._auth_client.current_tokens.refresh_token | ||
| ): | ||
| await self._auth_client.refresh_token( | ||
| self._auth_client.current_tokens.refresh_token | ||
| ) |
There was a problem hiding this comment.
[nitpick] This nested condition could be simplified for better readability. Consider storing self._auth_client.current_tokens in a local variable or restructuring the check to make the logic clearer.
| if ( | |
| self._auth_client.current_tokens | |
| and self._auth_client.current_tokens.refresh_token | |
| ): | |
| await self._auth_client.refresh_token( | |
| self._auth_client.current_tokens.refresh_token | |
| ) | |
| tokens = self._auth_client.current_tokens | |
| if tokens and tokens.refresh_token: | |
| await self._auth_client.refresh_token(tokens.refresh_token) |
src/nwp500/mqtt_reconnection.py
Outdated
| use_deep_reconnect = ( | ||
| self._deep_reconnect_func is not None | ||
| and self._reconnect_attempts | ||
| >= self.config.deep_reconnect_threshold | ||
| and self._reconnect_attempts | ||
| % self.config.deep_reconnect_threshold | ||
| == 0 |
There was a problem hiding this comment.
[nitpick] The multi-line boolean expression could be more readable with intermediate variables. Consider extracting is_at_threshold and is_threshold_multiple for clarity.
| use_deep_reconnect = ( | |
| self._deep_reconnect_func is not None | |
| and self._reconnect_attempts | |
| >= self.config.deep_reconnect_threshold | |
| and self._reconnect_attempts | |
| % self.config.deep_reconnect_threshold | |
| == 0 | |
| is_at_threshold = self._reconnect_attempts >= self.config.deep_reconnect_threshold | |
| is_threshold_multiple = ( | |
| self._reconnect_attempts % self.config.deep_reconnect_threshold == 0 | |
| ) | |
| use_deep_reconnect = ( | |
| self._deep_reconnect_func is not None | |
| and is_at_threshold | |
| and is_threshold_multiple |
When a topic subscription fails, subsequent handlers for that same topic will also fail for the same reason (network/connection issue). Changes: - Break out of handler loop immediately after first subscription failure - Avoids redundant error logs for the same topic - More efficient error handling during reconnection - Cleaner logs with one error per failed topic instead of one per handler Co-authored-by: GitHub Copilot <noreply@github.com>
Optimized Subscription RecoveryFixed inefficiency in IssueWhen a topic subscription failed, the code continued trying to subscribe remaining handlers for the same topic. Since they all fail for the same reason (connection/network issue), this created redundant error logs. FixBreak out of the handler loop after the first subscription failure for a topic: Before: for handler in handlers:
try:
await self.subscribe(topic, handler, qos)
except (AwsCrtError, RuntimeError) as e:
_logger.error(...)
topic_failed = True
# Continues to next handler even after failureAfter: for handler in handlers:
try:
await self.subscribe(topic, handler, qos)
except (AwsCrtError, RuntimeError) as e:
_logger.error(...)
failed_subscriptions.add(topic)
break # Skip remaining handlers for this topicBenefits✅ Cleaner logs: One error per failed topic instead of one per handler Commit: 7 commits total now |
The AwsCrtError.name attribute may not always exist or could be None. Added hasattr() checks before accessing e.name to prevent AttributeError. Changes: - mqtt_client.py: Added hasattr(e, 'name') check before comparing name - mqtt_periodic.py: Added hasattr(e, 'name') check in exception handling This prevents potential crashes when the name attribute is missing or None, making error handling more robust. Co-authored-by: GitHub Copilot <noreply@github.com>
Added Safety Checks for Exception AttributesFixed potential AttributeError when accessing IssueThe code assumed FixAdded Before: except AwsCrtError as e:
if e.name == "AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION":
# Handle itAfter: except AwsCrtError as e:
if hasattr(e, "name") and e.name == "AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION":
# Handle itFiles Updated
Benefits✅ Crash prevention: No AttributeError if name attribute is missing Commit: 4fb0bf0 |
Extracted self._auth_client.current_tokens to a local variable to avoid
nested attribute access and improve readability.
Before:
if (self._auth_client.current_tokens
and self._auth_client.current_tokens.refresh_token):
After:
current_tokens = self._auth_client.current_tokens
if current_tokens and current_tokens.refresh_token:
This makes the code clearer and easier to understand.
Co-authored-by: GitHub Copilot <noreply@github.com>
…variables Extracted complex boolean expression into clear, self-documenting variables: - has_deep_reconnect: Check if deep reconnect function exists - is_at_threshold: Check if attempts reached threshold - is_threshold_multiple: Check if at threshold interval (10, 20, 30...) This makes the logic much clearer and easier to understand at a glance. Co-authored-by: GitHub Copilot <noreply@github.com>
Problem
The MQTT client was experiencing permanent connection failures after network interruptions (AWS_ERROR_MQTT_UNEXPECTED_HANGUP). The client would:
Solution
Implemented a robust two-tier reconnection strategy that never gives up:
1. Quick Reconnection (attempts 1-9)
2. Deep Reconnection (every 10th attempt: 10, 20, 30...)
Changes
Core Improvements
max_reconnect_attemptsdefault from 10 to -1resubscribe_all()method restores all subscriptions after deep reconnectionModified Files
src/nwp500/mqtt_client.py(+102 lines)_deep_reconnect()method for full rebuildsrc/nwp500/mqtt_reconnection.py(+72 lines, -23 lines)src/nwp500/mqtt_subscriptions.py(+60 lines)resubscribe_all()method to restore subscriptionssrc/nwp500/mqtt_utils.py(+8 lines, -1 line)deep_reconnect_thresholdconfig parameter (default: 10)max_reconnect_attemptsdefault to -1 (unlimited)Testing
✅ All linting checks pass
✅ Type checking passes (mypy)
✅ Existing tests pass
✅ No breaking changes to public API
Behavior
Before:
After:
Migration
No breaking changes. Existing code continues to work with improved resilience.
Users can customize behavior via
MqttConnectionConfig:Closes: Connection recovery issue after AWS_ERROR_MQTT_UNEXPECTED_HANGUP