fix: enable plugin hooks for active_agent cron jobs by dispatching through PipelineScheduler#7649
fix: enable plugin hooks for active_agent cron jobs by dispatching through PipelineScheduler#7649kaixinyujue wants to merge 4 commits intoAstrBotDevs:masterfrom
Conversation
…rough PipelineScheduler Previously, active_agent cron jobs called _woke_main_agent() directly, bypassing the PipelineScheduler pipeline. This caused plugin hooks (on_llm_response, on_decorating_result, etc.) to not be triggered, and emotion tags were not parsed by plugins like meme_manager. Changes: - core/cron/events.py: Use session.platform_name for PlatformMetadata.name - core/cron/manager.py: Add _dispatch_to_pipeline() to route cron jobs through the standard event queue instead of direct agent wake
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- In
_dispatch_to_pipeline, consider guarding againstself._event_queuenot being initialized (e.g., ifstart()hasn’t been called yet) and failing fast with a clear log or exception instead of relying on an implicit attribute error. - The role resolution logic in
_dispatch_to_pipeline(admins_id,origin == 'api') looks duplicated from other parts of the system; if there is an existing helper or central place for this, reusing it would reduce the chance of role/permission drift between cron and non-cron flows.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `_dispatch_to_pipeline`, consider guarding against `self._event_queue` not being initialized (e.g., if `start()` hasn’t been called yet) and failing fast with a clear log or exception instead of relying on an implicit attribute error.
- The role resolution logic in `_dispatch_to_pipeline` (`admins_id`, `origin == 'api'`) looks duplicated from other parts of the system; if there is an existing helper or central place for this, reusing it would reduce the chance of role/permission drift between cron and non-cron flows.
## Individual Comments
### Comment 1
<location path="astrbot/core/cron/manager.py" line_range="398-403" />
<code_context>
logger.warning("Cron job agent got no response")
return
+ async def _dispatch_to_pipeline(
+ self,
+ *,
+ message: str,
+ session_str: str,
+ extras: dict,
+ ) -> None:
+ """将定时任务消息放入事件队列,使其经过完整的 PipelineScheduler 流程。"""
</code_context>
<issue_to_address>
**suggestion:** Align parameter naming/typing with the ability to accept `MessageSession` objects.
The parameter is typed and named as a `str`, but the implementation also accepts `MessageSession` (`isinstance(session_str, MessageSession)`). Please rename it (e.g. `session` or `session_or_str`) and change the annotation to a union type so the signature matches actual usage and is clear to callers and type checkers.
Suggested implementation:
```python
await self._dispatch_to_pipeline(
message=note,
session=session_str,
extras=extras,
)
logger.warning("Cron job agent got no response")
return
```
```python
async def _dispatch_to_pipeline(
self,
*,
message: str,
session: "MessageSession | str",
extras: dict,
) -> None:
"""将定时任务消息放入事件队列,使其经过完整的 PipelineScheduler 流程。"""
from astrbot.core.cron.events import CronMessageEvent
from astrbot.core.platform.message_session import MessageSession
try:
session = (
session
if isinstance(session, MessageSession)
else MessageSession.from_str(session)
)
except Exception as e:
```
1. If your project is not yet on Python 3.10+ or not using `from __future__ import annotations`, adjust the type annotation to `session: "MessageSession" | str` or use `Union["MessageSession", str]` and ensure `Union` is imported from `typing`.
2. If there are other call sites of `_dispatch_to_pipeline` in this file or elsewhere, update them to use the renamed keyword argument `session=` instead of `session_str=`.
</issue_to_address>
### Comment 2
<location path="astrbot/core/cron/manager.py" line_range="440-441" />
<code_context>
+
+ # 将事件放入事件队列,由 EventBus 调度到 PipelineScheduler
+ await self._event_queue.put(cron_event)
+ logger.debug(
+ f"Cron job {extras.get('cron_job', {}).get('id')} dispatched to pipeline."
+ )
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Avoid `AttributeError` when logging if `extras` is `None`.
Above you already guard against `extras` being `None` when building `cron_payload`. Here the log line calls `extras.get(...)` unconditionally, which will raise if `extras` is `None`. Please either use a local `safe_extras = extras or {}` (and reuse it for both `cron_payload` and this log), or add the same `if extras else {}` pattern here.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| async def _dispatch_to_pipeline( | ||
| self, | ||
| *, | ||
| message: str, | ||
| session_str: str, | ||
| extras: dict, |
There was a problem hiding this comment.
suggestion: Align parameter naming/typing with the ability to accept MessageSession objects.
The parameter is typed and named as a str, but the implementation also accepts MessageSession (isinstance(session_str, MessageSession)). Please rename it (e.g. session or session_or_str) and change the annotation to a union type so the signature matches actual usage and is clear to callers and type checkers.
Suggested implementation:
await self._dispatch_to_pipeline(
message=note,
session=session_str,
extras=extras,
)
logger.warning("Cron job agent got no response")
return async def _dispatch_to_pipeline(
self,
*,
message: str,
session: "MessageSession | str",
extras: dict,
) -> None:
"""将定时任务消息放入事件队列,使其经过完整的 PipelineScheduler 流程。"""
from astrbot.core.cron.events import CronMessageEvent
from astrbot.core.platform.message_session import MessageSession
try:
session = (
session
if isinstance(session, MessageSession)
else MessageSession.from_str(session)
)
except Exception as e:- If your project is not yet on Python 3.10+ or not using
from __future__ import annotations, adjust the type annotation tosession: "MessageSession" | stror useUnion["MessageSession", str]and ensureUnionis imported fromtyping. - If there are other call sites of
_dispatch_to_pipelinein this file or elsewhere, update them to use the renamed keyword argumentsession=instead ofsession_str=.
| logger.debug( | ||
| f"Cron job {extras.get('cron_job', {}).get('id')} dispatched to pipeline." |
There was a problem hiding this comment.
issue (bug_risk): Avoid AttributeError when logging if extras is None.
Above you already guard against extras being None when building cron_payload. Here the log line calls extras.get(...) unconditionally, which will raise if extras is None. Please either use a local safe_extras = extras or {} (and reuse it for both cron_payload and this log), or add the same if extras else {} pattern here.
There was a problem hiding this comment.
Code Review
This pull request refactors the cron job execution flow to use the system's event pipeline instead of a direct agent call. Key changes include updating CronMessageEvent to use dynamic platform names, initializing an event queue in CronJobManager, and introducing _dispatch_to_pipeline to handle event routing. Review feedback suggests optimizing the new method by removing redundant local imports, improving type hinting, and ensuring that the pipeline stages correctly handle cron-specific logic like system prompts and history persistence. Additionally, it was recommended to remove the now-obsolete _woke_main_agent method.
| async def _dispatch_to_pipeline( | ||
| self, | ||
| *, | ||
| message: str, | ||
| session_str: str, | ||
| extras: dict, | ||
| ) -> None: | ||
| """将定时任务消息放入事件队列,使其经过完整的 PipelineScheduler 流程。""" | ||
| from astrbot.core.cron.events import CronMessageEvent | ||
| from astrbot.core.platform.message_session import MessageSession | ||
|
|
||
| try: | ||
| session = ( | ||
| session_str | ||
| if isinstance(session_str, MessageSession) | ||
| else MessageSession.from_str(session_str) | ||
| ) | ||
| except Exception as e: | ||
| logger.error(f"Invalid session for cron job: {e}") | ||
| return | ||
|
|
||
| cron_event = CronMessageEvent( | ||
| context=self.ctx, | ||
| session=session, | ||
| message=message, | ||
| extras=extras or {}, | ||
| message_type=session.message_type, | ||
| ) | ||
|
|
||
| # 判断用户角色 | ||
| umo = cron_event.unified_msg_origin | ||
| cfg = self.ctx.get_config(umo=umo) | ||
| cron_payload = extras.get("cron_payload", {}) if extras else {} | ||
| sender_id = cron_payload.get("sender_id") | ||
| admin_ids = cfg.get("admins_id", []) | ||
| if admin_ids: | ||
| cron_event.role = "admin" if sender_id in admin_ids else "member" | ||
| if cron_payload.get("origin", "tool") == "api": | ||
| cron_event.role = "admin" | ||
|
|
||
| # 将事件放入事件队列,由 EventBus 调度到 PipelineScheduler | ||
| await self._event_queue.put(cron_event) | ||
| logger.debug( | ||
| f"Cron job {extras.get('cron_job', {}).get('id')} dispatched to pipeline." | ||
| ) |
There was a problem hiding this comment.
此方法有以下改进建议:
- 冗余导入:
CronMessageEvent和MessageSession已经在文件顶部导入,无需在此处进行局部导入。 - 参数命名与类型提示:建议将
session_str更名为session,并明确类型提示为str | MessageSession。 - 逻辑缺失风险:原有的
_woke_main_agent方法包含特定的系统提示词(PROACTIVE_AGENT_CRON_WOKE_SYSTEM_PROMPT) and 历史记录持久化逻辑。通过管道分发后,请确保 Pipeline 中的 Stage(如MainAgentStage)能够识别CronMessageEvent并应用这些特定的逻辑,否则可能会导致 Agent 失去 cron 任务的上下文或历史记录无法正确保存。 - 清理死代码:
_woke_main_agent方法现在已不再被内部调用,建议在确认逻辑已成功迁移到 Pipeline 后将其删除。
async def _dispatch_to_pipeline(
self,
*,
message: str,
session: str | MessageSession,
extras: dict[str, Any],
) -> None:
"""将定时任务消息放入事件队列,使其经过完整的 PipelineScheduler 流程。"""
try:
session_obj = (
session
if isinstance(session, MessageSession)
else MessageSession.from_str(session)
)
except Exception as e:
logger.error(f"Invalid session for cron job: {e}")
return
cron_event = CronMessageEvent(
context=self.ctx,
session=session_obj,
message=message,
extras=extras or {},
message_type=session_obj.message_type,
)
# 判断用户角色
umo = cron_event.unified_msg_origin
cfg = self.ctx.get_config(umo=umo)
cron_payload = extras.get("cron_payload", {})
sender_id = cron_payload.get("sender_id")
admin_ids = cfg.get("admins_id", [])
if admin_ids:
cron_event.role = "admin" if sender_id in admin_ids else "member"
if cron_payload.get("origin", "tool") == "api":
cron_event.role = "admin"
# 将事件放入事件队列,由 EventBus 调度到 PipelineScheduler
await self._event_queue.put(cron_event)
logger.debug(
f"Cron job {extras.get('cron_job', {}).get('id')} dispatched to pipeline."
)There was a problem hiding this comment.
gemini好强哇(。^▽^)
…rough PipelineScheduler Fixes AstrBotDevs#7419 Previously, active_agent cron jobs called _woke_main_agent() directly, bypassing the PipelineScheduler pipeline. This caused plugin hooks (on_llm_response, on_decorating_result, etc.) to not be triggered, and emotion tags were not parsed by plugins like meme_manager. Changes: - core/cron/events.py: Use session.platform_name for PlatformMetadata.name - core/cron/manager.py: Add _dispatch_to_pipeline() to route cron jobs through the standard event queue instead of direct agent wake
|
为什么我按照gemini的建议,在_dispatch_to_pipeline中把原有_woke_main_agent的逻辑补上之后,QQ机器人又没法正常转译定时任务的表情包了┭┮﹏┭┮ |
|
我知道问题在哪儿了,不是没法转译,是QQ机器人消息发重了,第二条含_woke_main_agent原始逻辑的消息没有转译 |
…rough PipelineScheduler Fixes AstrBotDevs#7419 Previously, active_agent cron jobs called _woke_main_agent() directly, bypassing the PipelineScheduler pipeline. This caused plugin hooks to not be triggered, and emotion tags were not parsed by plugins meme_manager. Changes: - core/cron/events.py: Use session.platform_name for PlatformMetadata.name - core/cron/manager.py: Add _dispatch_to_pipeline() to route cron jobs through the standard event queue instead of direct agent wake - core/cron/manager.py: Remove legacy _woke_main_agent logic as it is no longer needed after pipeline integration - core/cron/manager.py: Preserve history saving functionality via PipelineScheduler's automatic _save_to_history() call The _woke_main_agent approach has been fully replaced. History persistence is now handled automatically by PipelineScheduler after agent execution, eliminating the need for explicit persist_agent_history() calls.
针对gemini-code-assist所提出的改进建议进行了修改
替代方案的完整性分析1. 系统提示词处理
2. 历史记录持久化
3. 事件上下文完整性
4. 验证结果
结论
|
|
感谢你对该问题的深入分析和详细的逻辑验证。既然你已经确认 关于你提到的 目前的修改逻辑清晰,且通过将任务分发至事件队列,成功解决了插件钩子无法触发的问题,这正是该 PR 的核心目标。请继续保持对定时任务在不同平台下表现的观察。 |
faf411f to
0068960
Compare
关联 Issue
Fixes #7419
Modifications / 改动点
此 PR 修复了
active_agent类型的定时任务(cron job)绕过PipelineScheduler标准处理流程的问题,导致插件钩子(plugin hooks)无法被触发。问题背景:
在之前的实现中,
active_agent类型的定时任务直接调用了_woke_main_agent()方法,使 Agent 在隔离环境中执行,未经过标准的PipelineScheduler管道。这导致以下问题:on_llm_response、on_decorating_result等钩子失效)&&happy&&&&sad&&)无法被meme_manager等插件解析和渲染为表情图片影响范围:
此问题影响了所有依赖插件钩子的定时任务场景,特别是需要表情包转译、消息装饰等功能的插件无法在定时触发时正常工作。
修改文件:
core/cron/events.pyPlatformMetadata.name从硬编码的"cron"改为session.platform_nameunified_msg_origin与原始平台格式一致,使插件能够正确查找配置和执行权限检查core/cron/manager.pystart()方法中初始化事件队列:self._event_queue: Queue = ctx.get_event_queue()_run_active_agent_job()中将直接调用_woke_main_agent()替换为_dispatch_to_pipeline()_dispatch_to_pipeline()方法:CronMessageEventunified_msg_origin查询配置以确定用户角色PipelineScheduler进行完整的管道处理核心变更逻辑:
Screenshots or Test Results / 运行截图或测试结果
如图所示,修复后的定时任务可以正常触发表情包了:



验证项:
&&happy&&)被meme_manager插件正确解析为表情图片Checklist / 检查清单
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Route active_agent cron jobs through the standard pipeline so they participate in normal event handling and role resolution.
Bug Fixes:
Enhancements: