feat: Add "typing" ("对方正在输入...") state control for weixin_oc plateform#6977
feat: Add "typing" ("对方正在输入...") state control for weixin_oc plateform#6977Soulter merged 7 commits intoAstrBotDevs:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a 'typing' (对方正在输入...) state control feature for the Weixin OC platform. It establishes a complete typing lifecycle within AstrBot, ensuring that typing indicators are displayed and managed correctly during LLM processing. The implementation includes robust mechanisms for acquiring and refreshing typing tickets, maintaining the typing state through keepalive signals, and handling concurrent requests and cleanup gracefully, all while providing best-effort error handling. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- The per-user
_typing_statesdict currently only grows and is never pruned; consider adding a strategy to expire or remove entries for inactive users to avoid unbounded memory growth in long-lived processes. - The typing lifecycle logic in
WeixinOCAdapter(ticket refresh, keepalive, cancel, ownership) has become fairly complex; extracting some of this into smaller, focused helpers or a dedicated manager class would improve readability and reduce the chance of subtle concurrency bugs.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The per-user `_typing_states` dict currently only grows and is never pruned; consider adding a strategy to expire or remove entries for inactive users to avoid unbounded memory growth in long-lived processes.
- The typing lifecycle logic in `WeixinOCAdapter` (ticket refresh, keepalive, cancel, ownership) has become fairly complex; extracting some of this into smaller, focused helpers or a dedicated manager class would improve readability and reduce the chance of subtle concurrency bugs.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/weixin_oc/weixin_oc_adapter.py" line_range="423" />
<code_context>
+ async def _cleanup_typing_tasks(self) -> None:
+ tasks: list[asyncio.Task] = []
+ cancels: list[tuple[str, str]] = []
+ for user_id, state in self._typing_states.items():
+ if state.ticket and (
+ state.owners
</code_context>
<issue_to_address>
**issue (bug_risk):** Iterating directly over `_typing_states` can raise `RuntimeError` if the dict is mutated concurrently.
Because `_get_typing_state` can mutate `_typing_states` while `_cleanup_typing_tasks` is running (e.g., during overlapping shutdown and traffic), this loop can hit `RuntimeError: dictionary changed size during iteration`. To avoid that, iterate over a snapshot like `for user_id, state in list(self._typing_states.items()):` so concurrent mutations don’t break the cleanup pass.
</issue_to_address>
### Comment 2
<location path="astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py" line_range="148-150" />
<code_context>
follow_up_consumed_marked = False
follow_up_activated = False
+ typing_requested = False
try:
streaming_response = self.streaming_response
if (enable_streaming := event.get_extra("enable_streaming")) is not None:
</code_context>
<issue_to_address>
**suggestion:** The broad exception handler around `send_typing` drops error details such as type/traceback, making debugging harder.
Catching `Exception` here is fine for resilience, but logging only `%s` discards the exception type and traceback that are needed for debugging. Prefer `logger.exception(...)` or `logger.warning(..., exc_info=True)` so the stack trace is preserved while still allowing the pipeline to continue.
</issue_to_address>
### Comment 3
<location path="astrbot/core/platform/sources/weixin_oc/weixin_oc_adapter.py" line_range="324" />
<code_context>
+ if state.cancel_task is current_task:
+ state.cancel_task = None
+
+ async def start_typing(self, user_id: str, owner_id: str) -> None:
+ state = self._get_typing_state(user_id)
+ cancel_task: asyncio.Task | None = None
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting shared task-cancellation and owner-handling logic into small helper methods to make the typing state machine easier to follow and maintain.
You can reduce a chunk of the cognitive load here by factoring out the repeated task-cancel/await patterns and the owner handling logic into small helpers. That keeps the state machine as-is but makes individual methods much easier to read and reason about.
### 1. Centralize task cancel + await logic
`start_typing`, `stop_typing`, and `_cleanup_typing_tasks` all repeat:
```python
if task is not None and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
logger.warning(...)
```
Pull this into a single helper:
```python
async def _cancel_task_safely(
self,
task: asyncio.Task | None,
*,
log_msg: str | None = None,
) -> None:
if task is None or task.done():
return
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
if log_msg is not None:
logger.warning("%s: %s", log_msg, e)
```
Then use it in e.g. `start_typing` and `stop_typing`:
```python
# start_typing
async def start_typing(self, user_id: str, owner_id: str) -> None:
state = self._get_typing_state(user_id)
cancel_task: asyncio.Task | None = None
async with state.lock:
...
if state.cancel_task is not None and not state.cancel_task.done():
cancel_task = state.cancel_task
state.cancel_task = None
cancel_task.cancel()
...
await self._cancel_task_safely(
cancel_task,
log_msg=f"weixin_oc({self.meta().id}): typing cancel_task wait failed",
)
```
```python
# stop_typing
if task is not None:
await self._cancel_task_safely(
task,
log_msg=f"weixin_oc({self.meta().id}): typing keepalive stop failed for {user_id}",
)
```
```python
# _cleanup_typing_tasks
for task in tasks:
await self._cancel_task_safely(
task,
log_msg=f"weixin_oc({self.meta().id}): typing cleanup failed",
)
```
This removes duplicated branching and error handling without changing behavior.
### 2. Encapsulate owner/remove-and-should-stop logic
`stop_typing` currently has nested checks on `owners` which encode “remove owner; if any owners remain, do nothing; otherwise stop”:
```python
async def stop_typing(self, user_id: str, owner_id: str) -> None:
state = self._typing_states.get(user_id)
if state is None:
return
task: asyncio.Task | None = None
async with state.lock:
if owner_id in state.owners:
state.owners.remove(owner_id)
elif state.owners:
return
else:
return
if state.owners:
return
task = state.keepalive_task
state.keepalive_task = None
...
```
You can hide this branching behind a small helper that makes intent explicit:
```python
def _remove_owner_and_should_stop(
self,
state: TypingSessionState,
owner_id: str,
) -> bool:
if owner_id not in state.owners:
return False
state.owners.remove(owner_id)
# return True iff this was the last owner
return not state.owners
```
Then `stop_typing` simplifies to:
```python
async def stop_typing(self, user_id: str, owner_id: str) -> None:
state = self._typing_states.get(user_id)
if state is None:
return
task: asyncio.Task | None = None
async with state.lock:
if not self._remove_owner_and_should_stop(state, owner_id):
return
task = state.keepalive_task
state.keepalive_task = None
await self._cancel_task_safely(
task,
log_msg=f"weixin_oc({self.meta().id}): typing keepalive stop failed for {user_id}",
)
async with state.lock:
if state.owners:
return
ticket = state.ticket
if ticket and (state.cancel_task is None or state.cancel_task.done()):
state.cancel_task = asyncio.create_task(
self._delayed_cancel_typing(user_id, ticket)
)
```
This keeps all semantics intact (including “ignore stop for unknown owner if others exist”) but reduces branching and clarifies the state machine (“should we stop?” is a single boolean).
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Code Review
This pull request introduces comprehensive typing indicator support for the Weixin OC platform. It adds send_typing and stop_typing methods to the core event system, implements robust state management, ticket handling, and keepalive mechanisms within the Weixin OC adapter, and integrates new API calls for typing indicators. The process stage now correctly manages the lifecycle of typing indicators, including error handling and conditional logic for streaming platforms. Extensive unit tests have been added to validate this new functionality. Feedback suggests adding logging for silently ignored exceptions in stop_typing for better debuggability and simplifying the stop_typing logic for improved readability.
根据 Issue #6974 , 为个人微信渠道增加了"对方正在输入中..."状态控制的实现.
Modifications / 改动点
改动文件
astrbot/core/platform/astr_message_event.pyastrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.pyastrbot/core/platform/sources/weixin_oc/weixin_oc_client.pyastrbot/core/platform/sources/weixin_oc/weixin_oc_adapter.pyastrbot/core/platform/sources/weixin_oc/weixin_oc_event.pytests/unit/test_astr_message_event.pytests/unit/test_internal_agent_sub_stage.pytests/unit/test_weixin_oc_typing.py实现的功能
为 AstrBot 补充了对称的 typing 生命周期,新增
stop_typing()默认钩子。在 LLM 内部处理阶段接入 best-effort typing 启停:请求前尝试
send_typing(),非流式平台在处理结束后尝试stop_typing(),异常只记日志,不影响正常回复。为
weixin_oc增加完整的 typing 状态支持,包括:getconfig获取 typing ticketsendtyping发送开始/取消 typing 状态context_token刷新和复用 ticket增加了对应单元测试,覆盖生命周期、并发竞态、ticket 刷新、异常恢复和清理流程。
This is NOT a breaking change. / 这不是一个破坏性变更。
Screenshots or Test Results / 运行截图或测试结果
运行截图
测试步骤
运行定向单测,确认 typing 生命周期、并发 owner、ticket 刷新、异常恢复与 cleanup 行为正常:
uv run pytest tests/unit/test_astr_message_event.py tests/unit/test_internal_agent_sub_stage.py tests/unit/test_weixin_oc_typing.py -qChecklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Add non-breaking typing state lifecycle support and best-effort integration across the core message pipeline and the personal WeChat (weixin_oc) platform.
New Features:
Enhancements:
Tests: