Fix Telegram polling recovery after repeated network errors#7465
Fix Telegram polling recovery after repeated network errors#7465Shujakuinkuraudo wants to merge 1 commit intoAstrBotDevs:masterfrom
Conversation
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- The polling recovery behavior uses hardcoded
_polling_recovery_thresholdand_polling_failure_windowvalues; consider wiring these to the existing config so they can be tuned without code changes. - _recreate_application
does not check_terminatingbefore rebuilding the application, so a terminate request that sets_polling_recovery_requestedcould race with the polling loop and result in a fresh client being created during shutdown; consider short‑circuiting the rebuild when_terminating` is true.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The polling recovery behavior uses hardcoded `_polling_recovery_threshold` and `_polling_failure_window` values; consider wiring these to the existing config so they can be tuned without code changes.
- _recreate_application` does not check `_terminating` before rebuilding the application, so a terminate request that sets `_polling_recovery_requested` could race with the polling loop and result in a fresh client being created during shutdown; consider short‑circuiting the rebuild when `_terminating` is true.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/telegram/tg_adapter.py" line_range="84-90" />
<code_context>
-
self.scheduler = AsyncIOScheduler()
self._terminating = False
+ self._loop: asyncio.AbstractEventLoop | None = None
+ self._polling_recovery_requested = asyncio.Event()
+ self._consecutive_polling_failures = 0
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Guard against rebuilding the application while termination is in progress
`_recreate_application` doesn’t check `_terminating`. If `_polling_recovery_requested` is set just before or during `terminate()`, the main loop can still call `_recreate_application()`, recreating the app and restarting polling while shutdown is in progress. An early `if self._terminating: return` in `_recreate_application` would avoid this race and unnecessary work.
Suggested implementation:
```python
def _recreate_application(self) -> None:
# Avoid recreating the application while termination is in progress.
# This guards against a race where `_polling_recovery_requested` is set
# just before or during `terminate()`, which could otherwise restart
# polling while shutdown is underway.
if self._terminating:
return
self._build_application()
```
If the actual signature or body of `_recreate_application` differs (for example, it is `async def _recreate_application(...)` or has additional logic before `self._build_application()`), adjust the `SEARCH` section to match the existing function header and first body line, and insert the `if self._terminating: return` guard at the very beginning of the function body.
</issue_to_address>
### Comment 2
<location path="tests/test_telegram_adapter.py" line_range="229-236" />
<code_context>
+
+
+@pytest.mark.asyncio
+async def test_telegram_polling_error_requests_rebuild_after_threshold():
+ TelegramPlatformAdapter = _load_telegram_adapter()
+ adapter = TelegramPlatformAdapter(
+ make_platform_config("telegram"),
+ {},
+ asyncio.Queue(),
+ )
+ adapter._loop = asyncio.get_running_loop()
+
+ assert not adapter._polling_recovery_requested.is_set()
+
+ for _ in range(adapter._polling_recovery_threshold):
+ adapter._on_polling_error(Exception("proxy disconnected"))
+
+ await asyncio.sleep(0)
</code_context>
<issue_to_address>
**suggestion (testing):** Use the mocked `NetworkError` type instead of bare `Exception` to keep the test aligned with the implementation.
The recovery logic only handles `telegram.error.NetworkError`, but this test currently passes a plain `Exception` and relies on mocks redefining `NetworkError = Exception`. To avoid this brittle coupling, please construct the error using the mocked `NetworkError` type (e.g. via `module_globals["NetworkError"]` or `create_mock_telegram_modules()["telegram"].error.NetworkError`) so the test clearly depends on `NetworkError` specifically.
```suggestion
adapter._loop = asyncio.get_running_loop()
module_globals = TelegramPlatformAdapter.__init__.__globals__
NetworkError = module_globals["NetworkError"]
assert not adapter._polling_recovery_requested.is_set()
for _ in range(adapter._polling_recovery_threshold):
adapter._on_polling_error(NetworkError("proxy disconnected"))
await asyncio.sleep(0)
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| adapter._loop = asyncio.get_running_loop() | ||
|
|
||
| assert not adapter._polling_recovery_requested.is_set() | ||
|
|
||
| for _ in range(adapter._polling_recovery_threshold): | ||
| adapter._on_polling_error(Exception("proxy disconnected")) | ||
|
|
||
| await asyncio.sleep(0) |
There was a problem hiding this comment.
suggestion (testing): Use the mocked NetworkError type instead of bare Exception to keep the test aligned with the implementation.
The recovery logic only handles telegram.error.NetworkError, but this test currently passes a plain Exception and relies on mocks redefining NetworkError = Exception. To avoid this brittle coupling, please construct the error using the mocked NetworkError type (e.g. via module_globals["NetworkError"] or create_mock_telegram_modules()["telegram"].error.NetworkError) so the test clearly depends on NetworkError specifically.
| adapter._loop = asyncio.get_running_loop() | |
| assert not adapter._polling_recovery_requested.is_set() | |
| for _ in range(adapter._polling_recovery_threshold): | |
| adapter._on_polling_error(Exception("proxy disconnected")) | |
| await asyncio.sleep(0) | |
| adapter._loop = asyncio.get_running_loop() | |
| module_globals = TelegramPlatformAdapter.__init__.__globals__ | |
| NetworkError = module_globals["NetworkError"] | |
| assert not adapter._polling_recovery_requested.is_set() | |
| for _ in range(adapter._polling_recovery_threshold): | |
| adapter._on_polling_error(NetworkError("proxy disconnected")) | |
| await asyncio.sleep(0) |
There was a problem hiding this comment.
Code Review
This pull request implements a polling recovery mechanism for the Telegram adapter to handle repeated network errors by rebuilding the application instance. It introduces logic to track consecutive failures within a time window and refactors the application lifecycle management into dedicated methods. Feedback suggests making the recovery threshold and failure window configurable to allow for environment-specific tuning. Additionally, the shutdown sequence should be adjusted to ensure that final API calls, such as deleting commands, are performed before the application's HTTP session is terminated.
| self._polling_recovery_threshold = 3 | ||
| self._polling_failure_window = 60.0 |
There was a problem hiding this comment.
Consider making the polling recovery threshold and failure window configurable via the platform configuration, similar to telegram_polling_restart_delay. This allows users to tune the recovery behavior based on their specific network environment and stability requirements.
| self._polling_recovery_threshold = 3 | |
| self._polling_failure_window = 60.0 | |
| self._polling_recovery_threshold = self.config.get("telegram_polling_recovery_threshold", 3) | |
| self._polling_failure_window = float(self.config.get("telegram_polling_failure_window", 60.0)) |
| async def _shutdown_application( | ||
| self, | ||
| *, | ||
| delete_commands: bool, | ||
| ) -> None: | ||
| updater = self.application.updater | ||
| if updater is not None: | ||
| with suppress(Exception): | ||
| await updater.stop() | ||
|
|
||
| with suppress(Exception): | ||
| await self.application.stop() | ||
|
|
||
| if delete_commands and self.enable_command_register: | ||
| with suppress(Exception): | ||
| await self.client.delete_my_commands() | ||
|
|
||
| shutdown = getattr(self.application, "shutdown", None) | ||
| if shutdown is not None: | ||
| with suppress(Exception): | ||
| await shutdown() |
There was a problem hiding this comment.
The current shutdown sequence calls self.application.stop() before self.client.delete_my_commands(). In python-telegram-bot v20+, Application.stop() shuts down the bot's internal HTTP session. This will cause subsequent API calls like delete_my_commands() to fail because the underlying connection is closed. It is recommended to perform any final API calls after stopping the updater but before stopping the application entirely.
async def _shutdown_application(
self,
*,
delete_commands: bool,
) -> None:
updater = self.application.updater
if updater is not None:
with suppress(Exception):
await updater.stop()
if delete_commands and self.enable_command_register:
with suppress(Exception):
await self.client.delete_my_commands()
with suppress(Exception):
await self.application.stop()
shutdown = getattr(self.application, "shutdown", None)
if shutdown is not None:
with suppress(Exception):
await shutdown()|
Addressed the bot review feedback in
Re-ran:
|
|
Updated the PR with the additional recovery fix validated in deployment ( This addresses a follow-up failure mode where rebuilding the Telegram application could itself time out during
Re-ran:
|
b21f377 to
583ce5c
Compare
|
Closing this PR and reopening it as a fresh PR to keep the history clean after squashing the work into a single commit. |
Summary
NetworkErrorsTesting
.venv/bin/python -m ruff check astrbot/core/platform/sources/telegram/tg_adapter.py tests/test_telegram_adapter.py tests/fixtures/mocks/telegram.py.venv/bin/python -m pytest -q tests/test_telegram_adapter.pySummary by Sourcery
Improve Telegram adapter resilience by rebuilding the application and HTTP client after repeated polling network errors while preserving command scheduling.
Bug Fixes:
Enhancements:
Tests: