Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the streaming functionality of the WeCom AI bot by implementing a 0.5-second throttling mechanism for text-based streaming responses. This change aims to optimize the user experience by providing a more controlled and consistent flow of information, preventing overly rapid or fragmented updates, while preserving the immediate delivery of non-text elements and control signals. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
|
Related Documentation 1 document(s) may need updating based on files changed in this PR: AstrBotTeam's Space pr4697的改动View Suggested Changes@@ -1448,7 +1448,61 @@
---
-### 14. 其他优化
+### 14. 企业微信 AI 机器人流式响应间隔优化(PR #6610)
+
+#### 功能说明
+[PR #6610](https://github.com/AstrBotDevs/AstrBot/pull/6610) 为企业微信 AI 机器人平台的流式响应添加了 0.5 秒间隔限流机制,符合企业微信要求每 0.5 秒发送一次文本分片的规范,并减少了不必要的重复推送。
+
+#### 核心改进
+
+##### 新增流式间隔常量
+在 `WecomAIBotMessageEvent` 类中新增类常量 `STREAM_FLUSH_INTERVAL = 0.5`,用于控制流式响应的更新间隔(秒)。
+
+##### 长连接模式流式响应
+在长连接模式(使用 `long_connection_sender` 的场景)下,流式响应现在按 0.5 秒间隔推送:
+
+- 引入 `last_stream_update_time` 变量追踪上次推送时间
+- 使用 `asyncio.get_running_loop().time()` 获取当前时间
+- 仅当距离上次推送超过 `STREAM_FLUSH_INTERVAL` 时才发送增量内容
+- 非文本组件(如 At、Plain 以外的组件)立即发送,不受限流影响
+
+##### 普通流式响应
+在普通流式模式(使用消息队列的场景)下,流式响应也按 0.5 秒间隔推送:
+
+- 新增 `enqueue_stream_plain()` 辅助函数,负责将文本分片放入消息队列
+- 累积的文本内容(`increment_plain`)在达到间隔时间后才入队
+- 非文本组件(At、Plain 之外的消息组件)立即发送,不受限流影响
+- `break` 事件处理时会先推送剩余的累积文本,再发送分割符
+
+#### 技术实现要点
+
+##### 时间追踪机制
+使用高精度时间追踪(`asyncio.get_running_loop().time()`)确保限流的准确性,避免因系统时钟问题导致的异常。
+
+##### 文本提取优化
+引入 `_extract_plain_text_from_chain()` 方法统一提取消息链中的纯文本内容,确保文本累积和推送逻辑一致。
+
+##### 非文本组件处理
+非文本组件(如图片、文件、语音等)在新实现中会立即发送,不受 0.5 秒间隔限流影响,确保多媒体消息的实时性。
+
+##### Break 事件优化
+当收到 `break` 事件时,系统会先推送累积的文本内容,然后发送分割符,确保消息顺序正确且不丢失内容。
+
+#### 平台兼容性
+该优化专门针对企业微信 AI 机器人平台(WeCom AI Bot),符合该平台对流式响应的技术要求:
+
+- 企业微信要求流式消息每 0.5 秒推送一次文本分片
+- 避免过于频繁的推送导致消息重复或被限流
+- 确保用户在企业微信中看到的流式回复更加流畅和规范
+
+#### 用户收益
+- **符合平台规范**:遵循企业微信 AI 机器人平台的流式响应规范,避免违规风险
+- **减少重复推送**:通过间隔限流机制减少不必要的消息推送,降低系统负载
+- **改善用户体验**:流式回复以规律的间隔推送,用户阅读体验更加流畅
+
+---
+
+### 15. 其他优化
- JWT 处理和错误处理机制增强,提升系统安全性和稳定性
- UI 细节优化,提升用户体验
- 日志与异常处理增强,便于问题追踪Note: You must be authenticated to accept/decline updates. |
There was a problem hiding this comment.
Code Review
This pull request introduces a 0.5s interval for streaming responses in the Wecom AI bot, which is a good enhancement. However, I've identified a couple of logic issues in the new throttling implementation. One could lead to sending empty content periodically in long_connection mode, and another, more critical issue, could cause delayed flushing of buffered text in the other streaming mode. I've provided specific suggestions to address these points to make the throttling mechanism more robust and correct.
| if chunk_text: | ||
| increment_plain += chunk_text | ||
| final_data += chunk_text | ||
| now = asyncio.get_running_loop().time() | ||
| if now - last_stream_update_time >= self.STREAM_FLUSH_INTERVAL: | ||
| await enqueue_stream_plain(increment_plain) | ||
| last_stream_update_time = now |
There was a problem hiding this comment.
There is a logic issue here. The time-based flush for streaming is nested inside the if chunk_text: block. This means that if text is buffered in increment_plain and the generator then yields non-text chunks, the buffered text won't be flushed every 0.5 seconds as intended. It will only be flushed when a new text chunk arrives, which can lead to delayed delivery of text. The time check should be moved outside the if chunk_text: block to ensure timely flushing, and should also check if increment_plain has content before sending.
| if chunk_text: | |
| increment_plain += chunk_text | |
| final_data += chunk_text | |
| now = asyncio.get_running_loop().time() | |
| if now - last_stream_update_time >= self.STREAM_FLUSH_INTERVAL: | |
| await enqueue_stream_plain(increment_plain) | |
| last_stream_update_time = now | |
| if chunk_text: | |
| increment_plain += chunk_text | |
| final_data += chunk_text | |
| now = asyncio.get_running_loop().time() | |
| if increment_plain and now - last_stream_update_time >= self.STREAM_FLUSH_INTERVAL: | |
| await enqueue_stream_plain(increment_plain) | |
| last_stream_update_time = now |
| "finish": False, | ||
| "content": increment_plain, | ||
| now = asyncio.get_running_loop().time() | ||
| if now - last_stream_update_time >= self.STREAM_FLUSH_INTERVAL: |
There was a problem hiding this comment.
The current implementation might send empty content periodically if the generator yields non-text chunks for a while, as increment_plain would be empty. To avoid sending empty updates and improve efficiency, it's better to also check if increment_plain has content before sending.
| if now - last_stream_update_time >= self.STREAM_FLUSH_INTERVAL: | |
| if increment_plain and now - last_stream_update_time >= self.STREAM_FLUSH_INTERVAL: |
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- Initializing
last_stream_update_timeto0.0means the first text chunk may be held for up to 0.5s; consider initializing it to the current loop time or forcing an immediate first flush to keep the initial response snappy. - The interval-throttling logic using
STREAM_FLUSH_INTERVALis duplicated in both long-connection and queue-based streaming paths; consider extracting a small helper to centralize this behavior and reduce the chance of future divergence.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Initializing `last_stream_update_time` to `0.0` means the first text chunk may be held for up to 0.5s; consider initializing it to the current loop time or forcing an immediate first flush to keep the initial response snappy.
- The interval-throttling logic using `STREAM_FLUSH_INTERVAL` is duplicated in both long-connection and queue-based streaming paths; consider extracting a small helper to centralize this behavior and reduce the chance of future divergence.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/wecom_ai_bot/wecomai_event.py" line_range="256-265" />
<code_context>
chunk_text = self._extract_plain_text_from_chain(chain)
if chunk_text:
increment_plain += chunk_text
</code_context>
<issue_to_address>
**suggestion (bug_risk):** `final_data` no longer reflects non-text components, changing its semantics.
Previously `final_data` came from `_send(chain, ...)`, which likely included all components in the chain. Now it only accumulates `chunk_text` from Plain components, so non-text segments (images/files/etc.) are dropped from the final value. If any downstream code expects `final_data` to represent the complete reply, this is a behavioral change that could break callers. Either preserve `_send`’s full output in `final_data` or clearly define and document that `final_data` is now text-only and ensure callers are updated accordingly.
Suggested implementation:
```python
# Plain-text-only accumulator used for streaming updates.
increment_plain = ""
# Accumulator for the full reply (including non-text components) to preserve
# the original `final_data` semantics.
full_reply_chains = []
last_stream_update_time = 0.0
async for chain in generator:
# Track the full chain so `final_data` can still represent the complete reply.
full_reply_chains.append(chain)
if self.webhook_client:
await self.webhook_client.send_message_chain(
chunk_text = self._extract_plain_text_from_chain(chain)
if chunk_text:
increment_plain += chunk_text
now = asyncio.get_running_loop().time()
```
To fully implement the semantics described in your review comment, you should also:
1. Ensure that `final_data` is derived from `full_reply_chains` (or an equivalent full-chain accumulator) rather than from `increment_plain`. Concretely, update the code after this loop where `final_data` is currently set to a text-only value (likely something involving `increment_plain`) to instead construct `final_data` from `full_reply_chains` (e.g., by concatenating chains, or by passing them through the original `_send(chain, ...)` logic).
2. If any callers now rely on the text-only result, expose that explicitly as a separate value (for example, `final_plain_text`) rather than overloading `final_data` to mean “text-only”.
3. Update any documentation or type hints for `final_data` in this file (and related modules) to confirm that it continues to represent the complete reply, including non-text components.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| chunk_text = self._extract_plain_text_from_chain(chain) | ||
| if chunk_text: | ||
| increment_plain += chunk_text | ||
| await self.long_connection_sender( | ||
| req_id, | ||
| { | ||
| "msgtype": "stream", | ||
| "stream": { | ||
| "id": stream_id, | ||
| "finish": False, | ||
| "content": increment_plain, | ||
| now = asyncio.get_running_loop().time() | ||
| if now - last_stream_update_time >= self.STREAM_FLUSH_INTERVAL: | ||
| await self.long_connection_sender( | ||
| req_id, | ||
| { | ||
| "msgtype": "stream", | ||
| "stream": { |
There was a problem hiding this comment.
suggestion (bug_risk): final_data no longer reflects non-text components, changing its semantics.
Previously final_data came from _send(chain, ...), which likely included all components in the chain. Now it only accumulates chunk_text from Plain components, so non-text segments (images/files/etc.) are dropped from the final value. If any downstream code expects final_data to represent the complete reply, this is a behavioral change that could break callers. Either preserve _send’s full output in final_data or clearly define and document that final_data is now text-only and ensure callers are updated accordingly.
Suggested implementation:
# Plain-text-only accumulator used for streaming updates.
increment_plain = ""
# Accumulator for the full reply (including non-text components) to preserve
# the original `final_data` semantics.
full_reply_chains = []
last_stream_update_time = 0.0
async for chain in generator:
# Track the full chain so `final_data` can still represent the complete reply.
full_reply_chains.append(chain)
if self.webhook_client:
await self.webhook_client.send_message_chain(
chunk_text = self._extract_plain_text_from_chain(chain)
if chunk_text:
increment_plain += chunk_text
now = asyncio.get_running_loop().time()To fully implement the semantics described in your review comment, you should also:
- Ensure that
final_datais derived fromfull_reply_chains(or an equivalent full-chain accumulator) rather than fromincrement_plain. Concretely, update the code after this loop wherefinal_datais currently set to a text-only value (likely something involvingincrement_plain) to instead constructfinal_datafromfull_reply_chains(e.g., by concatenating chains, or by passing them through the original_send(chain, ...)logic). - If any callers now rely on the text-only result, expose that explicitly as a separate value (for example,
final_plain_text) rather than overloadingfinal_datato mean “text-only”. - Update any documentation or type hints for
final_datain this file (and related modules) to confirm that it continues to represent the complete reply, including non-text components.
…ed via add throttling (AstrBotDevs#6610) * fix(wecom-ai): add 0.5s interval for streaming responses * fix(wecom-ai): correct event type checking and add spacing in WecomAIBotMessageEvent
Summary
Effect
Testing