feat: add two-phase startup lifecycle#6942
Conversation
Allow the dashboard to become available before plugin bootstrap completes and surface runtime readiness and failure states to API callers. Guard plugin-facing endpoints until runtime is ready and clean up provider and plugin runtime state safely across bootstrap failures, retries, stop, and restart flows.
Continue terminating remaining providers and disable MCP servers even if one provider terminate hook fails. Also add InitialLoader failure-path coverage and extract guarded plugin routes into a shared constant for easier review and maintenance.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors the application's startup sequence to introduce a more robust and user-friendly two-phase lifecycle. By separating core initialization from the full runtime bootstrap, the dashboard can now launch more quickly, providing early access to essential functionalities. The changes also integrate comprehensive state tracking, allowing the system to clearly communicate its readiness and any bootstrap failures, which is crucial for diagnosing and recovering from issues without requiring a complete system restart. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- The runtime-guard logic in
AstrBotDashboard.auth_middlewareis repeated in three branches (API key, allowed endpoints, JWT); consider extracting a single helper that enforces theRUNTIME_BYPASS_ENDPOINT_PREFIXES/RUNTIME_FAILED_RECOVERY_ENDPOINT_PREFIXESrules andis_runtime_request_readycheck to reduce duplication and the chance of diverging behavior. - The sets/tuples that drive runtime guarding and bypassing (e.g.,
RUNTIME_BYPASS_ENDPOINT_PREFIXES,RUNTIME_FAILED_RECOVERY_ENDPOINT_PREFIXES,RUNTIME_GUARDED_PLUGIN_PATHS) are hard-coded and must be kept in sync with the actual routes; consider deriving these from route definitions or centralizing their declaration to avoid missing guards when adding new endpoints.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The runtime-guard logic in `AstrBotDashboard.auth_middleware` is repeated in three branches (API key, allowed endpoints, JWT); consider extracting a single helper that enforces the `RUNTIME_BYPASS_ENDPOINT_PREFIXES` / `RUNTIME_FAILED_RECOVERY_ENDPOINT_PREFIXES` rules and `is_runtime_request_ready` check to reduce duplication and the chance of diverging behavior.
- The sets/tuples that drive runtime guarding and bypassing (e.g., `RUNTIME_BYPASS_ENDPOINT_PREFIXES`, `RUNTIME_FAILED_RECOVERY_ENDPOINT_PREFIXES`, `RUNTIME_GUARDED_PLUGIN_PATHS`) are hard-coded and must be kept in sync with the actual routes; consider deriving these from route definitions or centralizing their declaration to avoid missing guards when adding new endpoints.
## Individual Comments
### Comment 1
<location path="astrbot/core/core_lifecycle.py" line_range="325-326" />
<code_context>
+ return tasks_to_wait
+
+ def _clear_runtime_artifacts(self) -> None:
+ if self.metadata_update_task is not None and not self.metadata_update_task.done():
+ self.metadata_update_task.cancel()
+ self.metadata_update_task = None
+ self.runtime_request_ready = False
</code_context>
<issue_to_address>
**suggestion (bug_risk):** `metadata_update_task` is cancelled but never awaited, which can raise unhandled warnings.
Because `metadata_update_task` is created via `asyncio.create_task`, cancelling it here without ever awaiting it can lead to `Task was destroyed but it is pending!`-style warnings and hide errors in `update_llm_metadata()`. Please either make sure it is included in the set of tasks awaited in `stop`/`restart`, or explicitly await it here (e.g. `await asyncio.gather(self.metadata_update_task, return_exceptions=True)`) before setting the reference to `None`.
Suggested implementation:
```python
def _cancel_current_tasks(self) -> list[asyncio.Task]:
tasks_to_wait: list[asyncio.Task] = []
# Cancel all current tasks and add them to the wait list
for task in self.curr_tasks:
task.cancel()
if isinstance(task, asyncio.Task):
tasks_to_wait.append(task)
self.curr_tasks = []
# Also cancel the metadata update task if it is still running so that
# it can be awaited by the caller together with other tasks.
if getattr(self, "metadata_update_task", None) is not None:
if isinstance(self.metadata_update_task, asyncio.Task) and not self.metadata_update_task.done():
self.metadata_update_task.cancel()
tasks_to_wait.append(self.metadata_update_task)
return tasks_to_wait
```
```python
def _clear_runtime_artifacts(self) -> None:
# Reset runtime wait and lifecycle state
self._runtime_wait_interrupted = False
self._set_lifecycle_state(LifecycleState.CREATED)
# Clear metadata update bookkeeping flags. The task itself is cancelled
# and awaited via `_cancel_current_tasks` / the caller that awaits its
# returned tasks, to avoid un-awaited cancelled task warnings.
if hasattr(self, "metadata_update_task"):
self.metadata_update_task = None
if hasattr(self, "runtime_request_ready"):
self.runtime_request_ready = False
```
1. Ensure any caller of `_cancel_current_tasks()` (e.g. `stop`/`restart`) *awaits* all returned tasks, typically via `await asyncio.gather(*tasks_to_wait, return_exceptions=True)`, so that exceptions in `metadata_update_task` are surfaced and no "Task was destroyed but it is pending!" warnings occur.
2. If `metadata_update_task` and `runtime_request_ready` are initialized elsewhere (e.g. in `__init__`), no further changes are needed. Otherwise, add their initialization to the constructor to avoid attribute errors.
</issue_to_address>
### Comment 2
<location path="astrbot/core/core_lifecycle.py" line_range="58" />
<code_context>
+ RUNTIME_READY = "runtime_ready"
+
+
class AstrBotCoreLifecycle:
"""AstrBot 核心生命周期管理类, 负责管理 AstrBot 的启动、停止、重启等操作.
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the new lifecycle handling by deriving flags from `LifecycleState`, basing readiness on a single bootstrap task, and centralizing shutdown logic to reduce cognitive load and duplication.
You can keep the new lifecycle semantics but cut a lot of cognitive load by tightening a few pieces. Three concrete refactors would give most of the benefit the other reviewer is asking for, without losing functionality.
---
### 1. Collapse flags into the lifecycle state
Right now you maintain:
- `lifecycle_state`
- `core_initialized`
- `runtime_failed`
- `runtime_ready`
- `runtime_request_ready`
- events + interruption flag
You already treat `LifecycleState` as canonical; you can derive most booleans from it and keep only the “request-ready” bit as an orthogonal flag.
**Suggested direction:**
- Make `core_initialized`, `runtime_failed`, `runtime_ready` computed properties.
- Keep only `lifecycle_state` and `runtime_request_ready` as stored flags.
```python
class AstrBotCoreLifecycle:
def __init__(...):
...
self._lifecycle_state = LifecycleState.CREATED
self.runtime_request_ready = False
...
@property
def lifecycle_state(self) -> LifecycleState:
return self._lifecycle_state
@property
def core_initialized(self) -> bool:
return self._lifecycle_state is not LifecycleState.CREATED
@property
def runtime_ready(self) -> bool:
return self._lifecycle_state is LifecycleState.RUNTIME_READY
@property
def runtime_failed(self) -> bool:
return self._lifecycle_state is LifecycleState.RUNTIME_FAILED
def _set_lifecycle_state(self, state: LifecycleState) -> None:
self._lifecycle_state = state
```
Then you can trim the internal updates in multiple places, e.g.:
```python
def _clear_runtime_failure_for_retry(self) -> None:
if self.lifecycle_state is LifecycleState.RUNTIME_FAILED:
self._set_lifecycle_state(LifecycleState.CORE_READY)
```
No need to manually keep booleans in sync in `_set_lifecycle_state`, which reduces mental overhead and chances of drift.
---
### 2. Simplify `_wait_for_runtime_ready` by driving everything off the bootstrap task
The current `_wait_for_runtime_ready` races:
- `runtime_bootstrap_task`
- `runtime_ready_event`
- `runtime_failed_event`
and manually cancels/cleans up the others. This is correct but hard to follow.
You can keep the external semantics and still simplify by making the bootstrap task the single source of truth and letting it set a result (or error). Events then just notify about completion, instead of being raced against the task.
**Example refactor sketch:**
1. In `bootstrap_runtime`, store the task and set events based on its outcome:
```python
async def bootstrap_runtime(self) -> None:
if not self.core_initialized:
raise RuntimeError("initialize_core must be called before bootstrap_runtime")
if self.runtime_ready:
return
self._clear_runtime_failure_for_retry()
self.runtime_bootstrap_error = None
self.runtime_ready_event.clear()
self.runtime_failed_event.clear()
self._runtime_wait_interrupted = False
async def _do_bootstrap() -> None:
try:
plugin_manager, provider_manager, kb_manager, platform_manager = (
self._require_runtime_bootstrap_components()
)
await plugin_manager.reload()
await provider_manager.initialize()
await kb_manager.initialize()
self.pipeline_scheduler_mapping = await self.load_pipeline_scheduler()
...
await platform_manager.initialize()
self.metadata_update_task = asyncio.create_task(update_llm_metadata())
self._set_lifecycle_state(LifecycleState.RUNTIME_READY)
self.runtime_ready_event.set()
except asyncio.CancelledError:
await self._cleanup_partial_runtime_bootstrap()
self._set_lifecycle_state(LifecycleState.CORE_READY)
self.runtime_bootstrap_error = None
self.runtime_failed_event.set()
raise
except BaseException as exc:
await self._cleanup_partial_runtime_bootstrap()
self._set_lifecycle_state(LifecycleState.RUNTIME_FAILED)
self.runtime_bootstrap_error = exc
self.runtime_failed_event.set()
raise
# single bootstrap task that owns the lifecycle
self.runtime_bootstrap_task = asyncio.create_task(_do_bootstrap(), name="runtime_bootstrap")
```
2. Then `_wait_for_runtime_ready` can just await the task, with a small bit of interruption logic:
```python
async def _wait_for_runtime_ready(self) -> bool:
if self.runtime_ready:
return True
if self._runtime_wait_interrupted:
return False
task = self.runtime_bootstrap_task
if task is None:
raise RuntimeError("runtime bootstrap task was not scheduled before start")
try:
await task
except asyncio.CancelledError:
# interruption path
return False
except Exception:
# error already stored in runtime_bootstrap_error and state updated
return False
# state is authoritative here
return self.runtime_ready
```
You still expose `runtime_ready_event`/`runtime_failed_event` for external consumers if needed, but internally you no longer race three things; you just wait for the task and respect the lifecycle state.
---
### 3. Consolidate duplicated shutdown logic in `stop` and `restart`
`stop()` and `restart()` share a lot of shutdown steps: interrupt waiters, cancel current tasks, handle the bootstrap task, terminate managers, set `dashboard_shutdown_event`, reset state.
This can be factored into a single helper so you only have to reason about the shutdown sequence in one place.
**Example consolidation:**
```python
async def _shutdown_runtime(self, *, for_restart: bool) -> None:
self.runtime_request_ready = False
self._interrupt_runtime_bootstrap_waiters()
tasks_to_wait = self._cancel_current_tasks()
runtime_bootstrap_tasks = self._collect_runtime_bootstrap_task()
tasks_to_wait.extend(runtime_bootstrap_tasks)
for task in runtime_bootstrap_tasks:
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"任务 {task.get_name()} 发生错误: {e}")
if self.cron_manager:
await self.cron_manager.shutdown()
if self.plugin_manager and self.plugin_manager.context:
for plugin in self.plugin_manager.context.get_all_stars():
try:
await self.plugin_manager._terminate_plugin(plugin)
except Exception as e:
logger.warning(traceback.format_exc())
logger.warning(
f"插件 {plugin.name} 未被正常终止 {e!s}, 可能会导致资源泄露等问题。",
)
if self.provider_manager:
await self.provider_manager.terminate()
if self.platform_manager:
await self.platform_manager.terminate()
if self.kb_manager:
await self.kb_manager.terminate()
if self.dashboard_shutdown_event:
self.dashboard_shutdown_event.set()
self._clear_runtime_artifacts()
self._set_lifecycle_state(LifecycleState.CREATED)
self._reset_runtime_bootstrap_state()
for task in tasks_to_wait:
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"任务 {task.get_name()} 发生错误: {e}")
async def stop(self) -> None:
if self.temp_dir_cleaner:
await self.temp_dir_cleaner.stop()
await self._shutdown_runtime(for_restart=False)
async def restart(self) -> None:
await self._shutdown_runtime(for_restart=True)
if self.astrbot_updator is None:
return
threading.Thread(
target=self.astrbot_updator._reboot,
name="restart",
daemon=True,
).start()
```
This keeps all the new lifecycle behavior (including bootstrap task handling) but reduces duplication and makes future changes to shutdown semantics much less error-prone.
---
These three refactors together would:
- Make `LifecycleState` the single source of truth for state,
- Turn runtime readiness into “just” the outcome of a single bootstrap task,
- Centralize shutdown behavior.
That aligns well with the reviewer’s concerns while preserving your new split core/runtime model and failure handling.
</issue_to_address>
### Comment 3
<location path="astrbot/dashboard/server.py" line_range="134" />
<code_context>
"/api/stat/start-time",
"/api/backup/download",
)
+ RUNTIME_BYPASS_ENDPOINT_PREFIXES = (
+ "/api/auth/login",
+ "/api/file",
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the new runtime guard and plugin routing logic into shared helpers and derived prefix constants to remove duplication and clarify responsibilities without changing behavior.
You can reduce the new complexity without changing behavior by:
### 1. Deduplicating the runtime guard logic
The runtime guard condition is now repeated three times in `auth_middleware` and once in `guarded_srv_plug_route`. Extract it into a small helper and call it from each code path:
```python
def _maybe_runtime_guard(
self, path: str, *, include_failure_details: bool = True
) -> ResponseReturnValue | None:
if self._should_bypass_runtime_guard(path):
return None
if self._should_allow_failed_runtime_recovery(path):
return None
if not is_runtime_request_ready(self.core_lifecycle):
return runtime_loading_response(
self.core_lifecycle,
include_failure_details=include_failure_details,
)
return None
```
Then use it in `auth_middleware`:
```python
if request.path.startswith("/api/v1"):
...
await self.db.touch_api_key(api_key.key_id)
guard_resp = self._maybe_runtime_guard(
request.path,
include_failure_details=False,
)
if guard_resp is not None:
return guard_resp
return None
if any(request.path.startswith(p) for p in self.ALLOWED_ENDPOINT_PREFIXES):
guard_resp = self._maybe_runtime_guard(
request.path,
include_failure_details=False,
)
if guard_resp is not None:
return guard_resp
return None
...
g.username = payload["username"]
guard_resp = self._maybe_runtime_guard(request.path)
if guard_resp is not None:
return guard_resp
```
And reuse it in `guarded_srv_plug_route` instead of inlining another runtime check:
```python
async def guarded_srv_plug_route(
self, subpath: str, *args, **kwargs
) -> ResponseReturnValue:
guard_resp = self._maybe_runtime_guard(request.path)
if guard_resp is not None:
return guard_resp
return await self.srv_plug_route(subpath, *args, **kwargs)
```
This makes it obvious there is a single place that decides whether to block a request due to runtime readiness.
### 2. Making `srv_plug_route` purely routing
Right now `guarded_srv_plug_route` handles runtime readiness and `srv_plug_route` mixes lazy initialization with routing. You can keep `guarded_srv_plug_route` but make `srv_plug_route` clearly “just routing”:
```python
def _get_plugin_handler(self, subpath: str, method: str) -> Callable | None:
handler = self._plugin_route_map.get((f"/{subpath}", method))
if handler:
return handler
# Lazy init remains but isolated here
self._init_plugin_route_index()
return self._plugin_route_map.get((f"/{subpath}", method))
async def srv_plug_route(self, subpath: str, *args, **kwargs):
handler = self._get_plugin_handler(subpath, request.method)
if not handler:
return jsonify(Response().error("未找到该路由").to_json())
try:
return await handler(*args, **kwargs)
except Exception:
logger.exception("插件 Web API 执行异常")
return jsonify(Response().error("插件 Web API 执行异常").to_json())
```
That keeps `guarded_srv_plug_route` clearly as “the guard” and `srv_plug_route` clearly as “the router”.
### 3. Reducing drift between prefix lists
`ALLOWED_ENDPOINT_PREFIXES` and `RUNTIME_BYPASS_ENDPOINT_PREFIXES` currently overlap and can drift. You can keep the current behavior but derive one from the other to express intent more clearly:
```python
ALLOWED_ENDPOINT_PREFIXES = (
"/api/auth/login",
"/api/file",
"/api/platform/webhook",
"/api/stat/start-time",
"/api/backup/download",
)
_RUNTIME_EXTRA_BYPASS_PREFIXES = (
"/api/stat/version",
"/api/stat/runtime-status",
"/api/stat/restart-core",
"/api/stat/changelog",
"/api/stat/changelog/list",
"/api/stat/first-notice",
)
RUNTIME_BYPASS_ENDPOINT_PREFIXES = (
*ALLOWED_ENDPOINT_PREFIXES,
*_RUNTIME_EXTRA_BYPASS_PREFIXES,
)
RUNTIME_FAILED_RECOVERY_ENDPOINT_PREFIXES = (
"/api/config/",
"/api/plugin/reload-failed",
"/api/plugin/uninstall-failed",
"/api/plugin/source/get-failed-plugins",
)
```
This keeps all current behavior but makes the relationship between “allowed” and “runtime bypass” explicit and easier to maintain.
</issue_to_address>
### Comment 4
<location path="astrbot/dashboard/routes/live_chat.py" line_range="165" />
<code_context>
await websocket.close(1008, "Invalid token")
return
+ if not is_runtime_request_ready(self.core_lifecycle):
+ await websocket.close(
+ 1013,
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting a single helper to perform the runtime-ready check and websocket close so you can call it once per loop iteration instead of repeating inline guard blocks throughout the methods.
You can keep the new runtime-guard behavior but reduce duplication and loop noise by extracting a small helper and using it consistently.
### 1. Centralize the runtime-ready check + close logic
```python
# inside the same class (e.g. LiveChatRoute or similar)
async def _ensure_runtime_ready(self, websocket) -> bool:
if is_runtime_request_ready(self.core_lifecycle):
return True
await websocket.close(
1013,
get_runtime_guard_message(self.core_lifecycle),
)
return False
```
This removes duplication of:
```python
if not is_runtime_request_ready(self.core_lifecycle):
await websocket.close(
1013,
get_runtime_guard_message(self.core_lifecycle),
)
return / break
```
across `_unified_ws_loop`, `_handle_chat_message`, and `_process_audio`.
### 2. Use the helper once per loop iteration (not before and after)
Example for `_unified_ws_loop`:
```python
# after auth, before session creation
if not await self._ensure_runtime_ready(websocket):
return
# ...
try:
while True:
if not await self._ensure_runtime_ready(websocket):
return
message = await websocket.receive_json()
# no second check here; you still re-check on the next iteration
ct = force_ct or message.get("ct", "live")
...
```
For back-queue loops (`_handle_chat_message`, `_process_audio`), check once per iteration and after long waits, but avoid the before/after double-check:
```python
while True:
if not await self._ensure_runtime_ready(websocket):
break
if session.should_interrupt:
...
break
try:
result = await asyncio.wait_for(back_queue.get(), timeout=1)
except asyncio.TimeoutError:
continue
# optionally keep just this *post-wait* check if you want to react
# promptly after long blocking waits, but still only once per iteration:
if not await self._ensure_runtime_ready(websocket):
break
if not result:
continue
...
```
This preserves the semantics of reacting to runtime shutdown, but:
- eliminates repeated inline `if not is_runtime_request_ready` blocks,
- centralizes the close behavior (status code + message),
- and makes the main loop bodies easier to follow.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| RUNTIME_READY = "runtime_ready" | ||
|
|
||
|
|
||
| class AstrBotCoreLifecycle: |
There was a problem hiding this comment.
issue (complexity): Consider simplifying the new lifecycle handling by deriving flags from LifecycleState, basing readiness on a single bootstrap task, and centralizing shutdown logic to reduce cognitive load and duplication.
You can keep the new lifecycle semantics but cut a lot of cognitive load by tightening a few pieces. Three concrete refactors would give most of the benefit the other reviewer is asking for, without losing functionality.
1. Collapse flags into the lifecycle state
Right now you maintain:
lifecycle_statecore_initializedruntime_failedruntime_readyruntime_request_ready- events + interruption flag
You already treat LifecycleState as canonical; you can derive most booleans from it and keep only the “request-ready” bit as an orthogonal flag.
Suggested direction:
- Make
core_initialized,runtime_failed,runtime_readycomputed properties. - Keep only
lifecycle_stateandruntime_request_readyas stored flags.
class AstrBotCoreLifecycle:
def __init__(...):
...
self._lifecycle_state = LifecycleState.CREATED
self.runtime_request_ready = False
...
@property
def lifecycle_state(self) -> LifecycleState:
return self._lifecycle_state
@property
def core_initialized(self) -> bool:
return self._lifecycle_state is not LifecycleState.CREATED
@property
def runtime_ready(self) -> bool:
return self._lifecycle_state is LifecycleState.RUNTIME_READY
@property
def runtime_failed(self) -> bool:
return self._lifecycle_state is LifecycleState.RUNTIME_FAILED
def _set_lifecycle_state(self, state: LifecycleState) -> None:
self._lifecycle_state = stateThen you can trim the internal updates in multiple places, e.g.:
def _clear_runtime_failure_for_retry(self) -> None:
if self.lifecycle_state is LifecycleState.RUNTIME_FAILED:
self._set_lifecycle_state(LifecycleState.CORE_READY)No need to manually keep booleans in sync in _set_lifecycle_state, which reduces mental overhead and chances of drift.
2. Simplify _wait_for_runtime_ready by driving everything off the bootstrap task
The current _wait_for_runtime_ready races:
runtime_bootstrap_taskruntime_ready_eventruntime_failed_event
and manually cancels/cleans up the others. This is correct but hard to follow.
You can keep the external semantics and still simplify by making the bootstrap task the single source of truth and letting it set a result (or error). Events then just notify about completion, instead of being raced against the task.
Example refactor sketch:
- In
bootstrap_runtime, store the task and set events based on its outcome:
async def bootstrap_runtime(self) -> None:
if not self.core_initialized:
raise RuntimeError("initialize_core must be called before bootstrap_runtime")
if self.runtime_ready:
return
self._clear_runtime_failure_for_retry()
self.runtime_bootstrap_error = None
self.runtime_ready_event.clear()
self.runtime_failed_event.clear()
self._runtime_wait_interrupted = False
async def _do_bootstrap() -> None:
try:
plugin_manager, provider_manager, kb_manager, platform_manager = (
self._require_runtime_bootstrap_components()
)
await plugin_manager.reload()
await provider_manager.initialize()
await kb_manager.initialize()
self.pipeline_scheduler_mapping = await self.load_pipeline_scheduler()
...
await platform_manager.initialize()
self.metadata_update_task = asyncio.create_task(update_llm_metadata())
self._set_lifecycle_state(LifecycleState.RUNTIME_READY)
self.runtime_ready_event.set()
except asyncio.CancelledError:
await self._cleanup_partial_runtime_bootstrap()
self._set_lifecycle_state(LifecycleState.CORE_READY)
self.runtime_bootstrap_error = None
self.runtime_failed_event.set()
raise
except BaseException as exc:
await self._cleanup_partial_runtime_bootstrap()
self._set_lifecycle_state(LifecycleState.RUNTIME_FAILED)
self.runtime_bootstrap_error = exc
self.runtime_failed_event.set()
raise
# single bootstrap task that owns the lifecycle
self.runtime_bootstrap_task = asyncio.create_task(_do_bootstrap(), name="runtime_bootstrap")- Then
_wait_for_runtime_readycan just await the task, with a small bit of interruption logic:
async def _wait_for_runtime_ready(self) -> bool:
if self.runtime_ready:
return True
if self._runtime_wait_interrupted:
return False
task = self.runtime_bootstrap_task
if task is None:
raise RuntimeError("runtime bootstrap task was not scheduled before start")
try:
await task
except asyncio.CancelledError:
# interruption path
return False
except Exception:
# error already stored in runtime_bootstrap_error and state updated
return False
# state is authoritative here
return self.runtime_readyYou still expose runtime_ready_event/runtime_failed_event for external consumers if needed, but internally you no longer race three things; you just wait for the task and respect the lifecycle state.
3. Consolidate duplicated shutdown logic in stop and restart
stop() and restart() share a lot of shutdown steps: interrupt waiters, cancel current tasks, handle the bootstrap task, terminate managers, set dashboard_shutdown_event, reset state.
This can be factored into a single helper so you only have to reason about the shutdown sequence in one place.
Example consolidation:
async def _shutdown_runtime(self, *, for_restart: bool) -> None:
self.runtime_request_ready = False
self._interrupt_runtime_bootstrap_waiters()
tasks_to_wait = self._cancel_current_tasks()
runtime_bootstrap_tasks = self._collect_runtime_bootstrap_task()
tasks_to_wait.extend(runtime_bootstrap_tasks)
for task in runtime_bootstrap_tasks:
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"任务 {task.get_name()} 发生错误: {e}")
if self.cron_manager:
await self.cron_manager.shutdown()
if self.plugin_manager and self.plugin_manager.context:
for plugin in self.plugin_manager.context.get_all_stars():
try:
await self.plugin_manager._terminate_plugin(plugin)
except Exception as e:
logger.warning(traceback.format_exc())
logger.warning(
f"插件 {plugin.name} 未被正常终止 {e!s}, 可能会导致资源泄露等问题。",
)
if self.provider_manager:
await self.provider_manager.terminate()
if self.platform_manager:
await self.platform_manager.terminate()
if self.kb_manager:
await self.kb_manager.terminate()
if self.dashboard_shutdown_event:
self.dashboard_shutdown_event.set()
self._clear_runtime_artifacts()
self._set_lifecycle_state(LifecycleState.CREATED)
self._reset_runtime_bootstrap_state()
for task in tasks_to_wait:
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"任务 {task.get_name()} 发生错误: {e}")
async def stop(self) -> None:
if self.temp_dir_cleaner:
await self.temp_dir_cleaner.stop()
await self._shutdown_runtime(for_restart=False)
async def restart(self) -> None:
await self._shutdown_runtime(for_restart=True)
if self.astrbot_updator is None:
return
threading.Thread(
target=self.astrbot_updator._reboot,
name="restart",
daemon=True,
).start()This keeps all the new lifecycle behavior (including bootstrap task handling) but reduces duplication and makes future changes to shutdown semantics much less error-prone.
These three refactors together would:
- Make
LifecycleStatethe single source of truth for state, - Turn runtime readiness into “just” the outcome of a single bootstrap task,
- Centralize shutdown behavior.
That aligns well with the reviewer’s concerns while preserving your new split core/runtime model and failure handling.
There was a problem hiding this comment.
Code Review
This pull request refactors the AstrBotCoreLifecycle into a two-phase initialization process: a fast initialize_core phase and a deferred bootstrap_runtime phase. This change introduces a new LifecycleState Enum to manage application readiness and provides robust error handling and cleanup mechanisms for runtime bootstrap failures, enabling safe retries. Dashboard routes are updated with runtime readiness guards, returning 503 errors if the runtime is not ready or has failed, while allowing specific recovery endpoints to bypass these checks. The ProviderManager and Context classes are enhanced with state clearing methods to support the new lifecycle management, and comprehensive unit tests have been added to validate these changes.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 2 issues
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="astrbot/core/core_lifecycle.py" line_range="49" />
<code_context>
from .event_bus import EventBus
+class LifecycleState(str, Enum):
+ """Minimal lifecycle contract for split initialization."""
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the lifecycle management by making `lifecycle_state` and the bootstrap task the single sources of truth and consolidating repeated shutdown logic.
You can reduce the added complexity without changing behavior by consolidating lifecycle state and centralizing runtime bootstrap/shutdown logic.
### 1. Use `lifecycle_state` as single source of truth
Right now you have overlapping flags:
- `lifecycle_state`
- `core_initialized`, `runtime_ready`, `runtime_failed`
- `runtime_ready_event`, `runtime_failed_event`
- `runtime_request_ready`, `_runtime_wait_interrupted`, `runtime_bootstrap_error`
You can keep the enum and events, but drop the extra booleans and replace them with small properties. This keeps behavior but removes divergence risk:
```python
class AstrBotCoreLifecycle:
def __init__(...):
...
self.lifecycle_state = LifecycleState.CREATED
self.runtime_bootstrap_task: asyncio.Task[None] | None = None
self.runtime_bootstrap_error: BaseException | None = None
self.runtime_ready_event = asyncio.Event()
self.runtime_failed_event = asyncio.Event()
self._runtime_wait_interrupted = False
self.runtime_request_ready = False
@property
def core_initialized(self) -> bool:
return self.lifecycle_state is not LifecycleState.CREATED
@property
def runtime_ready(self) -> bool:
return self.lifecycle_state is LifecycleState.RUNTIME_READY
@property
def runtime_failed(self) -> bool:
return self.lifecycle_state is LifecycleState.RUNTIME_FAILED
def _set_lifecycle_state(self, state: LifecycleState) -> None:
self.lifecycle_state = state
# events remain as external signaling only
if state is LifecycleState.RUNTIME_READY:
self.runtime_ready_event.set()
self.runtime_failed_event.clear()
elif state is LifecycleState.RUNTIME_FAILED:
self.runtime_failed_event.set()
self.runtime_ready_event.clear()
else:
self.runtime_ready_event.clear()
self.runtime_failed_event.clear()
```
This lets you remove the assignments to `core_initialized`, `runtime_ready`, `runtime_failed` inside `_set_lifecycle_state`, and all direct writes to those booleans elsewhere.
`runtime_request_ready` can also be derived in most places as:
```python
@property
def can_serve_requests(self) -> bool:
return self.lifecycle_state is LifecycleState.RUNTIME_READY and not self._runtime_wait_interrupted
```
and only kept as a flag if you really need the separate “request gate.”
### 2. Simplify `_wait_for_runtime_ready` around the task
`_wait_for_runtime_ready` currently orchestrates:
- `runtime_bootstrap_task`
- `runtime_ready_event`
- `runtime_failed_event`
- `_runtime_wait_interrupted`
- `runtime_bootstrap_error`
You can treat `runtime_bootstrap_task` as the single completion source and use events only for external observers, making `_wait_for_runtime_ready` much easier to follow:
```python
async def _wait_for_runtime_ready(self) -> bool:
# Fast-path
if self.runtime_ready:
return True
if self.runtime_failed or self._runtime_wait_interrupted:
return False
task = self.runtime_bootstrap_task
if task is None:
raise RuntimeError("runtime bootstrap task was not scheduled before start")
try:
await task # authoritative result
except asyncio.CancelledError:
# interrupted by stop/restart
self._runtime_wait_interrupted = True
return False
except BaseException as exc:
self.runtime_bootstrap_error = exc
self._set_lifecycle_state(LifecycleState.RUNTIME_FAILED)
return False
# task completed successfully
return self.runtime_ready
```
With the events driven from `_set_lifecycle_state` (snippet above), external code that wants to wait without calling `start()` can just use:
```python
await self.runtime_ready_event.wait()
```
This removes the `asyncio.wait({task, ready_wait, failed_wait})` dance while preserving all semantics: `start()` still blocks until runtime is ready or failed, and observers can use events.
### 3. Collapse `_require_*` helpers into phase guards
Many helpers are thin wrappers with near-identical error messages:
- `_require_scheduler_dependencies`
- `_require_config_manager`
- `_require_plugin_manager`
- `_require_platform_manager`
- `_require_runtime_bootstrap_components`
- `_require_runtime_started_components`
You can reduce noise by having a couple of phase-level guards and then using direct attributes:
```python
def _require_core_ready(self) -> None:
if not self.core_initialized:
raise RuntimeError("initialize_core must complete before this operation")
def _require_runtime_ready(self) -> None:
if self.lifecycle_state is not LifecycleState.RUNTIME_READY:
raise RuntimeError("Runtime must be ready before this operation")
```
Then, for example:
```python
async def load_pipeline_scheduler(self) -> dict[str, PipelineScheduler]:
self._require_core_ready()
assert self.astrbot_config_mgr is not None
assert self.plugin_manager is not None
mapping = {}
for conf_id, ab_config in self.astrbot_config_mgr.confs.items():
scheduler = PipelineScheduler(
PipelineContext(ab_config, self.plugin_manager, conf_id),
)
await scheduler.initialize()
mapping[conf_id] = scheduler
return mapping
```
This keeps type safety (via `assert`) but avoids the proliferation of `_require_*` variants.
### 4. Centralize runtime shutdown used by both `stop` and `restart`
`stop()` and `restart()` now share a long, similar sequence:
- `_interrupt_runtime_bootstrap_waiters`
- `_cancel_current_tasks`
- collect/await metadata & bootstrap tasks
- terminate managers
- set `dashboard_shutdown_event`
- clear artifacts
- reset lifecycle state
You can move this into a single internal method and call it from both places:
```python
async def _shutdown_runtime(self, for_restart: bool = False) -> None:
self.runtime_request_ready = False
self._interrupt_runtime_bootstrap_waiters()
tasks_to_wait = self._cancel_current_tasks()
await self._await_tasks(self._collect_metadata_update_task())
bootstrap_tasks = self._collect_runtime_bootstrap_task()
await self._await_tasks(bootstrap_tasks)
tasks_to_wait.extend(bootstrap_tasks)
if self.cron_manager:
await self.cron_manager.shutdown()
if self.plugin_manager and self.plugin_manager.context:
for plugin in self.plugin_manager.context.get_all_stars():
try:
await self.plugin_manager._terminate_plugin(plugin)
except Exception as e:
logger.warning(traceback.format_exc())
logger.warning(
f"插件 {plugin.name} 未被正常终止 {e!s}, 可能会导致资源泄露等问题。",
)
if self.provider_manager:
await self.provider_manager.terminate()
if self.platform_manager:
await self.platform_manager.terminate()
if self.kb_manager:
await self.kb_manager.terminate()
if self.dashboard_shutdown_event:
self.dashboard_shutdown_event.set()
await self._await_tasks(tasks_to_wait)
self._clear_runtime_artifacts()
# For restart we might want to keep CORE_READY, for full stop you
# can reset to CREATED depending on your desired semantics.
self._set_lifecycle_state(LifecycleState.CREATED)
self._reset_runtime_bootstrap_state()
```
Then:
```python
async def stop(self) -> None:
if self.temp_dir_cleaner:
await self.temp_dir_cleaner.stop()
await self._shutdown_runtime(for_restart=False)
async def restart(self) -> None:
await self._shutdown_runtime(for_restart=True)
if self.astrbot_updator is None:
return
threading.Thread(
target=self.astrbot_updator._reboot,
name="restart",
daemon=True,
).start()
```
This keeps all the new behavior but makes future changes to shutdown semantics much safer and easier to follow.
</issue_to_address>
### Comment 2
<location path="astrbot/dashboard/routes/live_chat.py" line_range="145" />
<code_context>
"""Unified Chat WebSocket 处理器(支持 ct=live/chat)"""
await self._unified_ws_loop(force_ct=None)
+ async def _ensure_runtime_ready(self) -> bool:
+ if is_runtime_request_ready(self.core_lifecycle):
+ return True
</code_context>
<issue_to_address>
**issue (complexity):** Consider encapsulating the runtime readiness checks into dedicated helpers (for WebSocket receive and queue reads) so each loop has a single, centralized guard instead of repeated `_ensure_runtime_ready()` calls.
You can reduce the added complexity by encapsulating the runtime guard instead of sprinkling `_ensure_runtime_ready()` across all loops (including back‑to‑back calls).
### 1. Avoid back‑to‑back runtime checks around `receive_json`
Instead of:
```python
while True:
if not await self._ensure_runtime_ready():
return
message = await websocket.receive_json()
if not await self._ensure_runtime_ready():
return
ct = force_ct or message.get("ct", "live")
...
```
wrap the receive in a helper that includes the guard:
```python
async def _recv_ws_json_guarded(self) -> dict | None:
if not await self._ensure_runtime_ready():
return None
try:
return await websocket.receive_json()
except Exception:
# keep whatever existing error handling you have
raise
```
Then the loop becomes easier to follow and avoids duplicate checks:
```python
while True:
message = await self._recv_ws_json_guarded()
if message is None:
return # runtime closed ws in _ensure_runtime_ready
ct = force_ct or message.get("ct", "live")
if ct == "chat":
await self._handle_chat_message(live_session, message)
else:
await self._handle_message(live_session, message)
```
### 2. Encapsulate runtime guard with `back_queue.get()`
You’re repeating the pattern: guard → `wait_for(back_queue.get())` → guard again.
For `_handle_chat_message`:
```python
while True:
if not await self._ensure_runtime_ready():
break
if session.should_interrupt:
session.should_interrupt = False
break
try:
result = await asyncio.wait_for(back_queue.get(), timeout=1)
except asyncio.TimeoutError:
continue
if not await self._ensure_runtime_ready():
break
...
```
You can fold the guard into a helper that reads from the queue:
```python
async def _guarded_queue_get(
self,
queue: asyncio.Queue,
timeout: float,
) -> Any | None:
if not await self._ensure_runtime_ready():
return None
try:
item = await asyncio.wait_for(queue.get(), timeout=timeout)
except asyncio.TimeoutError:
return "TIMEOUT"
if not await self._ensure_runtime_ready():
return None
return item
```
Then in both `_handle_chat_message` and `_process_audio`:
```python
while True:
if not await self._ensure_runtime_ready():
break
if session.should_interrupt:
session.should_interrupt = False
break
result = await self._guarded_queue_get(back_queue, timeout=1)
if result is None:
break # runtime not ready anymore
if result == "TIMEOUT":
continue
# existing processing
if not result:
continue
...
```
And similarly in `_process_audio`:
```python
while True:
if not await self._ensure_runtime_ready():
break
if session.should_interrupt:
...
break
result = await self._guarded_queue_get(back_queue, timeout=0.5)
if result is None:
break
if result == "TIMEOUT":
continue
if not result:
continue
...
```
### 3. Keep a single guard at the top of loops where possible
For loops that don’t need to guard both before and after the blocking call, you can rely on a single guard per iteration:
```python
while True:
if not await self._ensure_runtime_ready():
break
# rest of the loop (interrupt check, queue read, processing...)
```
This keeps the runtime concern in one predictable place per loop, rather than interleaving it with business logic, while preserving your early‑bail behavior when the runtime goes down.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| from .event_bus import EventBus | ||
|
|
||
|
|
||
| class LifecycleState(str, Enum): |
There was a problem hiding this comment.
issue (complexity): Consider simplifying the lifecycle management by making lifecycle_state and the bootstrap task the single sources of truth and consolidating repeated shutdown logic.
You can reduce the added complexity without changing behavior by consolidating lifecycle state and centralizing runtime bootstrap/shutdown logic.
1. Use lifecycle_state as single source of truth
Right now you have overlapping flags:
lifecycle_statecore_initialized,runtime_ready,runtime_failedruntime_ready_event,runtime_failed_eventruntime_request_ready,_runtime_wait_interrupted,runtime_bootstrap_error
You can keep the enum and events, but drop the extra booleans and replace them with small properties. This keeps behavior but removes divergence risk:
class AstrBotCoreLifecycle:
def __init__(...):
...
self.lifecycle_state = LifecycleState.CREATED
self.runtime_bootstrap_task: asyncio.Task[None] | None = None
self.runtime_bootstrap_error: BaseException | None = None
self.runtime_ready_event = asyncio.Event()
self.runtime_failed_event = asyncio.Event()
self._runtime_wait_interrupted = False
self.runtime_request_ready = False
@property
def core_initialized(self) -> bool:
return self.lifecycle_state is not LifecycleState.CREATED
@property
def runtime_ready(self) -> bool:
return self.lifecycle_state is LifecycleState.RUNTIME_READY
@property
def runtime_failed(self) -> bool:
return self.lifecycle_state is LifecycleState.RUNTIME_FAILED
def _set_lifecycle_state(self, state: LifecycleState) -> None:
self.lifecycle_state = state
# events remain as external signaling only
if state is LifecycleState.RUNTIME_READY:
self.runtime_ready_event.set()
self.runtime_failed_event.clear()
elif state is LifecycleState.RUNTIME_FAILED:
self.runtime_failed_event.set()
self.runtime_ready_event.clear()
else:
self.runtime_ready_event.clear()
self.runtime_failed_event.clear()This lets you remove the assignments to core_initialized, runtime_ready, runtime_failed inside _set_lifecycle_state, and all direct writes to those booleans elsewhere.
runtime_request_ready can also be derived in most places as:
@property
def can_serve_requests(self) -> bool:
return self.lifecycle_state is LifecycleState.RUNTIME_READY and not self._runtime_wait_interruptedand only kept as a flag if you really need the separate “request gate.”
2. Simplify _wait_for_runtime_ready around the task
_wait_for_runtime_ready currently orchestrates:
runtime_bootstrap_taskruntime_ready_eventruntime_failed_event_runtime_wait_interruptedruntime_bootstrap_error
You can treat runtime_bootstrap_task as the single completion source and use events only for external observers, making _wait_for_runtime_ready much easier to follow:
async def _wait_for_runtime_ready(self) -> bool:
# Fast-path
if self.runtime_ready:
return True
if self.runtime_failed or self._runtime_wait_interrupted:
return False
task = self.runtime_bootstrap_task
if task is None:
raise RuntimeError("runtime bootstrap task was not scheduled before start")
try:
await task # authoritative result
except asyncio.CancelledError:
# interrupted by stop/restart
self._runtime_wait_interrupted = True
return False
except BaseException as exc:
self.runtime_bootstrap_error = exc
self._set_lifecycle_state(LifecycleState.RUNTIME_FAILED)
return False
# task completed successfully
return self.runtime_readyWith the events driven from _set_lifecycle_state (snippet above), external code that wants to wait without calling start() can just use:
await self.runtime_ready_event.wait()This removes the asyncio.wait({task, ready_wait, failed_wait}) dance while preserving all semantics: start() still blocks until runtime is ready or failed, and observers can use events.
3. Collapse _require_* helpers into phase guards
Many helpers are thin wrappers with near-identical error messages:
_require_scheduler_dependencies_require_config_manager_require_plugin_manager_require_platform_manager_require_runtime_bootstrap_components_require_runtime_started_components
You can reduce noise by having a couple of phase-level guards and then using direct attributes:
def _require_core_ready(self) -> None:
if not self.core_initialized:
raise RuntimeError("initialize_core must complete before this operation")
def _require_runtime_ready(self) -> None:
if self.lifecycle_state is not LifecycleState.RUNTIME_READY:
raise RuntimeError("Runtime must be ready before this operation")Then, for example:
async def load_pipeline_scheduler(self) -> dict[str, PipelineScheduler]:
self._require_core_ready()
assert self.astrbot_config_mgr is not None
assert self.plugin_manager is not None
mapping = {}
for conf_id, ab_config in self.astrbot_config_mgr.confs.items():
scheduler = PipelineScheduler(
PipelineContext(ab_config, self.plugin_manager, conf_id),
)
await scheduler.initialize()
mapping[conf_id] = scheduler
return mappingThis keeps type safety (via assert) but avoids the proliferation of _require_* variants.
4. Centralize runtime shutdown used by both stop and restart
stop() and restart() now share a long, similar sequence:
_interrupt_runtime_bootstrap_waiters_cancel_current_tasks- collect/await metadata & bootstrap tasks
- terminate managers
- set
dashboard_shutdown_event - clear artifacts
- reset lifecycle state
You can move this into a single internal method and call it from both places:
async def _shutdown_runtime(self, for_restart: bool = False) -> None:
self.runtime_request_ready = False
self._interrupt_runtime_bootstrap_waiters()
tasks_to_wait = self._cancel_current_tasks()
await self._await_tasks(self._collect_metadata_update_task())
bootstrap_tasks = self._collect_runtime_bootstrap_task()
await self._await_tasks(bootstrap_tasks)
tasks_to_wait.extend(bootstrap_tasks)
if self.cron_manager:
await self.cron_manager.shutdown()
if self.plugin_manager and self.plugin_manager.context:
for plugin in self.plugin_manager.context.get_all_stars():
try:
await self.plugin_manager._terminate_plugin(plugin)
except Exception as e:
logger.warning(traceback.format_exc())
logger.warning(
f"插件 {plugin.name} 未被正常终止 {e!s}, 可能会导致资源泄露等问题。",
)
if self.provider_manager:
await self.provider_manager.terminate()
if self.platform_manager:
await self.platform_manager.terminate()
if self.kb_manager:
await self.kb_manager.terminate()
if self.dashboard_shutdown_event:
self.dashboard_shutdown_event.set()
await self._await_tasks(tasks_to_wait)
self._clear_runtime_artifacts()
# For restart we might want to keep CORE_READY, for full stop you
# can reset to CREATED depending on your desired semantics.
self._set_lifecycle_state(LifecycleState.CREATED)
self._reset_runtime_bootstrap_state()Then:
async def stop(self) -> None:
if self.temp_dir_cleaner:
await self.temp_dir_cleaner.stop()
await self._shutdown_runtime(for_restart=False)
async def restart(self) -> None:
await self._shutdown_runtime(for_restart=True)
if self.astrbot_updator is None:
return
threading.Thread(
target=self.astrbot_updator._reboot,
name="restart",
daemon=True,
).start()This keeps all the new behavior but makes future changes to shutdown semantics much safer and easier to follow.
| """Unified Chat WebSocket 处理器(支持 ct=live/chat)""" | ||
| await self._unified_ws_loop(force_ct=None) | ||
|
|
||
| async def _ensure_runtime_ready(self) -> bool: |
There was a problem hiding this comment.
issue (complexity): Consider encapsulating the runtime readiness checks into dedicated helpers (for WebSocket receive and queue reads) so each loop has a single, centralized guard instead of repeated _ensure_runtime_ready() calls.
You can reduce the added complexity by encapsulating the runtime guard instead of sprinkling _ensure_runtime_ready() across all loops (including back‑to‑back calls).
1. Avoid back‑to‑back runtime checks around receive_json
Instead of:
while True:
if not await self._ensure_runtime_ready():
return
message = await websocket.receive_json()
if not await self._ensure_runtime_ready():
return
ct = force_ct or message.get("ct", "live")
...wrap the receive in a helper that includes the guard:
async def _recv_ws_json_guarded(self) -> dict | None:
if not await self._ensure_runtime_ready():
return None
try:
return await websocket.receive_json()
except Exception:
# keep whatever existing error handling you have
raiseThen the loop becomes easier to follow and avoids duplicate checks:
while True:
message = await self._recv_ws_json_guarded()
if message is None:
return # runtime closed ws in _ensure_runtime_ready
ct = force_ct or message.get("ct", "live")
if ct == "chat":
await self._handle_chat_message(live_session, message)
else:
await self._handle_message(live_session, message)2. Encapsulate runtime guard with back_queue.get()
You’re repeating the pattern: guard → wait_for(back_queue.get()) → guard again.
For _handle_chat_message:
while True:
if not await self._ensure_runtime_ready():
break
if session.should_interrupt:
session.should_interrupt = False
break
try:
result = await asyncio.wait_for(back_queue.get(), timeout=1)
except asyncio.TimeoutError:
continue
if not await self._ensure_runtime_ready():
break
...You can fold the guard into a helper that reads from the queue:
async def _guarded_queue_get(
self,
queue: asyncio.Queue,
timeout: float,
) -> Any | None:
if not await self._ensure_runtime_ready():
return None
try:
item = await asyncio.wait_for(queue.get(), timeout=timeout)
except asyncio.TimeoutError:
return "TIMEOUT"
if not await self._ensure_runtime_ready():
return None
return itemThen in both _handle_chat_message and _process_audio:
while True:
if not await self._ensure_runtime_ready():
break
if session.should_interrupt:
session.should_interrupt = False
break
result = await self._guarded_queue_get(back_queue, timeout=1)
if result is None:
break # runtime not ready anymore
if result == "TIMEOUT":
continue
# existing processing
if not result:
continue
...And similarly in _process_audio:
while True:
if not await self._ensure_runtime_ready():
break
if session.should_interrupt:
...
break
result = await self._guarded_queue_get(back_queue, timeout=0.5)
if result is None:
break
if result == "TIMEOUT":
continue
if not result:
continue
...3. Keep a single guard at the top of loops where possible
For loops that don’t need to guard both before and after the blocking call, you can rely on a single guard per iteration:
while True:
if not await self._ensure_runtime_ready():
break
# rest of the loop (interrupt check, queue read, processing...)This keeps the runtime concern in one predictable place per loop, rather than interleaving it with business logic, while preserving your early‑bail behavior when the runtime goes down.
Summary
Test Plan
uv run ruff check astrbot/core/core_lifecycle.py astrbot/core/initial_loader.py astrbot/core/provider/manager.py astrbot/core/star/context.py astrbot/core/star/star_manager.py astrbot/dashboard/routes/live_chat.py astrbot/dashboard/routes/open_api.py astrbot/dashboard/routes/plugin.py astrbot/dashboard/routes/route.py astrbot/dashboard/routes/stat.py astrbot/dashboard/server.py tests/test_dashboard.py tests/unit/test_core_lifecycle.py tests/unit/test_initial_loader.pyuv run pytest tests/unit/test_core_lifecycle.py tests/unit/test_initial_loader.py tests/test_dashboard.py -vuv run ruff check astrbot/_internal/runtime/orchestrator.pyuv run pytest tests/unit/test_internal/test_architecture_compliance.py -q tests/unit/test_internal/test_orchestrator.py -q tests/unit/test_internal_runtime.py -q tests/unit/test_runtime_status_star.py -quv run pytest tests/test_docs.py -qNotes
devbranch.