Skip to content

Refactor workflow execution lifecycle and align MessageId semantics#5

Closed
eanzhao wants to merge 1 commit intodevfrom
feature/execution-actor
Closed

Refactor workflow execution lifecycle and align MessageId semantics#5
eanzhao wants to merge 1 commit intodevfrom
feature/execution-actor

Conversation

@eanzhao
Copy link
Copy Markdown
Contributor

@eanzhao eanzhao commented Feb 19, 2026

Workflow Run Lifecycle(重构汇报稿)

执行摘要

这次重构的目标,是把“长期编排职责”和“单次执行职责”拆开,解决当前一个 WorkflowGAgent 同时承担模板管理、运行执行、并发隔离带来的复杂度问题。

目标方案是:

  • WorkflowGAgent 作为长期编排实例,负责 workflow 定义、策略和入口。
  • WorkflowExecutionGAgent 作为一次 run 的执行实例,AgentIdRunId 同值。
  • MessageId 保留为 AI 消息链路标识,不与 RunId 合并。

这套模型的核心收益是:执行隔离更清晰、查询主键统一、并发场景更稳,且与现有 CQRS 投影链路兼容。

1. 背景与重构目标

1.1 当前痛点

  • 同一个编排实例承载多次执行,运行态与编排态边界不清。
  • RunIdActorIdMessageId 在语义上容易被混用,讨论成本高。
  • 并发执行时,步骤回包关联和排障依赖更多约定,维护成本上升。

1.2 重构目标

  1. 每次执行具备独立 actor 身份,执行边界与生命周期清晰。
  2. RunId 成为唯一执行主键,查询和追踪统一。
  3. 保留 MessageId 的消息链路能力,避免流式与投影能力回退。

2. 目标架构

2.1 角色分工

  • WorkflowGAgent(long-lived)
    • 保存 workflow 定义与配置。
    • 接收入口请求并选择/创建执行实例。
    • 不承载单次 run 的可变执行状态。
  • WorkflowExecutionGAgent(per-run)
    • 处理本次 run 的步骤推进、事件发布、回包关联。
    • run 结束后销毁或回收。
flowchart TB
    workflowAgent["WorkflowGAgent (long-lived)"]
    executionA["WorkflowExecutionGAgent (runA)"]
    executionB["WorkflowExecutionGAgent (runB)"]
    runId["RunId = ExecutionActorId"]
    sessionKey["MessageId (message chain key)"]

    workflowAgent --> executionA
    workflowAgent --> executionB
    executionA --> runId
    executionA --> sessionKey
Loading

2.2 标识符职责边界

标识符 作用域 主用途 结论
WorkflowGAgent.AgentId 长期编排主体 入口与编排身份 不等于 RunId
RunId 单次执行 执行主键、查询主键 等于 ExecutionActorId
WorkflowExecutionGAgent.AgentId 单次执行 actor 运行隔离与线程标识 RunId 同值
MessageId AI 消息链路 流式消息聚合、回包匹配 不能删除

3. 关键设计:Execution 如何定位长期 Workflow

核心原则:执行实例不做运行时搜索,编排层在创建时显式绑定 owner。

标准流程:

  1. 入口先定位长期 WorkflowGAgentagentId 命中则复用,否则创建并配置)。
  2. 生成 RunId
  3. 创建 WorkflowExecutionGAgent,并强制 executionActorId = runId
  4. 调用 BindWorkflowAgentId(workflowAgentId) 显式绑定 owner。
  5. 调用 ConfigureWorkflow(workflowYaml, workflowName) 完成 execution 初始化。
  6. 建立父子关系:LinkAsync(workflowAgentId, executionActorId)
  7. 执行阶段若需回调编排实例,直接 SendToAsync(workflowAgentId, evt)

设计约束:

  • 禁止按 workflowName 扫描 owner。
  • workflowAgentId 在 run 生命周期内不可变。
  • owner 丢失时快速失败,输出明确错误事件。

4. 一次请求的调用链与参数赋值

sequenceDiagram
    participant client as Client
    participant api as HostApi
    participant app as WorkflowApplication
    participant resolver as RunActorResolver
    participant execution as WorkflowExecutionGAgent
    participant role as RoleGAgent
    participant projection as ProjectionPipeline

    client->>api: "/api/chat(prompt, workflow, agentId?)"
    api->>app: "ExecuteAsync(request)"
    app->>resolver: "ResolveOrCreate(workflowAgent)"
    app->>app: "Generate runId"
    app->>execution: "Create(actorId = runId)"
    app->>execution: "BindWorkflowAgentId(workflowAgentId)"
    app->>execution: "ConfigureWorkflow(workflowYaml, workflowName)"
    app->>resolver: "LinkAsync(workflowAgentId, executionActorId)"
    app->>execution: "Dispatch run events"
    execution->>role: "ChatRequestEvent(messageId = runId:stepId)"
    role-->>execution: "TextMessageStart/Content/End(messageId)"
    execution-->>projection: "Events(runId, messageId)"
    projection-->>api: "Run report + stream frames"
Loading

参数赋值表:

参数 来源 赋值规则 消费方
prompt 请求体 ChatInput.Prompt 原样传递 ChatRequestEvent.Prompt
workflowName 请求体 为空走默认 workflow workflow 加载/解析
agentId 请求体 定位长期 WorkflowGAgent actor resolver
runId 服务端生成 每次 run 生成一次 执行事件、投影、查询
executionActorId 服务端派生 executionActorId = runId runtime、routing、thread
workflowAgentId 服务端解析 owner id,创建 execution 时写入 execution 回调编排主体
threadId 服务端派生 对外输出使用 executionActorId SSE/WS 输出
messageId 服务端生成 入口 chat-{guid};步骤 runId:stepId:attempt 预留) LLM 回包关联、消息聚合

5. 为什么 MessageId 不能删

RunIdMessageId 不是同一层标识。RunId 管执行实例,MessageId 管消息会话链路。

删除 MessageId 后会直接影响:

  1. 步骤回包匹配失效
    • LLMCallModule 依赖 messageId 关联 _pending 请求。
  2. AI 事件 run 归属能力下降
    • AIChatMessageRunIdResolver 通过 messageId 反解 runId
  3. 流式消息聚合受损
    • AGUI 侧 msg:{messageId} 无法稳定归并 start/content/end。
  4. 读模型可观测性下降
    • role reply 与 timeline 的 message_id 维度丢失。

结论:MessageId 必须保留,但它不是执行主键。

6. 落地步骤(建议汇报用)

阶段 1:模型落地

  • 新增 WorkflowExecutionGAgent,定义 owner 绑定字段(workflowAgentId)。
  • 调整 run 启动流程,改为“先定位 workflow agent,再创建 execution agent”。
  • 强制 executionActorId = runId

阶段 2:链路切换

  • 执行事件改为由 WorkflowExecutionGAgent 产生。
  • 输出 threadId 统一切到 execution actor id。
  • pending/关联键检查并统一为 run 作用域。

阶段 3:验证与收口

  • 并发 run 压测(同一 WorkflowGAgent 多 run)。
  • 回包匹配与流式聚合验证(MessageId 维度)。
  • 读模型查询一致性验证(/api/runs/{runId})。

7. 风险与控制

风险 表现 控制策略
owner 绑定遗漏 execution 无法回调 workflow 创建时强制校验 workflowAgentId
run/thread 标识不一致 前端线程展示混乱 统一 threadId = executionActorId
session 格式不统一 回包匹配或聚合异常 固化规范:chat-{guid}runId:stepId:attempt 预留)
迁移阶段行为漂移 新旧链路结果不一致 灰度开关 + 双链路对比

8. 验收标准

  1. 一次 run 只对应一个 WorkflowExecutionGAgent,且 AgentId == RunId
  2. 同一 WorkflowGAgent 并发多 run 时无串线。
  3. SSE/WS 全链路可用,RUN_STARTED/RUN_FINISHED 标识稳定。
  4. 查询侧按 runId 可稳定获取完整报告。
  5. 删除 MessageId 的回归测试明确失败(作为保护性用例)。

9. 代码锚点(便于评审)

  • 入口解析:src/Aevatar.Host.Api/Endpoints/ChatEndpoints.cs
  • WS 入口:src/Aevatar.Host.Api/Endpoints/ChatWebSocketRunCoordinator.cs
  • 编排入口:src/workflow/Aevatar.Workflow.Application/Runs/WorkflowChatRunApplicationService.cs
  • 编排主体定位:src/workflow/Aevatar.Workflow.Application/Runs/WorkflowRunActorResolver.cs
  • 运行编排:src/workflow/Aevatar.Workflow.Application/Orchestration/WorkflowExecutionRunOrchestrator.cs
  • 执行实例:src/workflow/Aevatar.Workflow.Core/WorkflowExecutionGAgent.cs
  • 工作流快照:src/workflow/Aevatar.Workflow.Core/WorkflowDefinitionSnapshot.cs
  • RunId 生成:src/workflow/Aevatar.Workflow.Application/Runs/WorkflowChatRunApplicationService.cs
  • RunId 注入:src/workflow/Aevatar.Workflow.Application/Runs/WorkflowChatRequestEnvelopeFactory.cs
  • RunId 提取:src/workflow/Aevatar.Workflow.Core/WorkflowGAgent.cs
  • MessageId 规范:src/Aevatar.AI.Abstractions/ChatMessageKeys.cs
  • 会话关联:src/workflow/Aevatar.Workflow.Core/Modules/LLMCallModule.cs

相关文档:docs/IDENTIFIER_RELATIONSHIPS.md

Enforce per-run execution invariants (owner/YAML fail-fast, RunId == execution actor ID, and metadata run_id consistency) and propagate MessageId on workflow completion to keep streaming/projection correlation stable. Also isolate per-run stats from long-lived workflow actors, add compatibility warnings for direct WorkflowGAgent entry, sync identifier/lifecycle docs, and add a destroy-race memo for follow-up soft-destroy/grace-period design.
@eanzhao eanzhao closed this Feb 25, 2026
eanzhao added a commit that referenced this pull request Apr 27, 2026
P1 — workflow termination on publish failure (reviewer's critical):
`PublishFailureAsync` emits `StepCompletedEvent { Success = false }`,
which the kernel routes through `TryRetryAsync` → `TryOnErrorAsync` →
fail. With no retry/on_error policy on `publish_to_twitter`, a Twitter
401/403/429/5xx terminated the entire workflow run as failed. Add
`on_error: { strategy: skip, default_output: "twitter_publish_failed" }`
to the YAML so the run advances to `done` cleanly; the module already
surfaces categorized errors to Lark independently.

#2 — Twitter v2 native error shape: `ClassifyTwitterResponse` now
recognizes the third response shape NyxID can forward verbatim:
`{ "title": "...", "detail": "...", "errors": [...] }` (Twitter's
native problem-details for content-policy / duplicate-tweet rejections).
Falls through to `twitter_publish_rejected` with the Twitter `message`
text in the Lark surfacing so users read the actual rejection reason.

#1 — Duplicate tweet risk: documented in code comment that
`POST /2/tweets` has no server-side dedup; the social_media template
intentionally has no `retry` policy on this step, and `on_error: skip`
advances rather than retrying. Authors customizing the YAML must keep
this invariant.

#3 — Removed redundant `nyxClient!` null-forgiving (no-op cleanup).

#4 — Renamed `ChannelMetadataKeys.LarkProxySlug` →
`LarkOutboundProxySlug` (`channel.lark.outbound_proxy_slug`) to
disambiguate "Lark API surface" from "NyxID provider routing".

#5 — Added xml-doc on `TrySendLarkAsync` documenting the dual-scope
api-key dependency (key must carry both api-twitter AND api-lark-bot
entitlements) so future callers don't silently break Lark surfacing
when narrowing the key's scope.

#6 — Added `RequiredServiceSlugs` field to `SocialMediaTemplateSpec`
for parity with `DailyReportTemplateSpec`; `CreateSocialMediaAgentAsync`
now reads slugs from the spec instead of inlining the list.

Tests:
- 3 new `ClassifyTwitterResponse` tests for the Twitter native error
  shapes (errors-array, RFC-7807 title/detail-only, empty-object
  unexpected-shape).
- Existing social_media test now also asserts `strategy: skip` lands in
  the upserted YAML.
- 482 channel-runtime + 236 workflow.core tests pass; full solution
  builds with 0 errors.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
eanzhao added a commit that referenced this pull request Apr 27, 2026
…elper

Resolves three followup architecture-review points on PR #451:

### Channel.Runtime drops AI / Workflow direct deps (review #2)

`Channel.Runtime`'s csproj used to pull in `Aevatar.AI.Abstractions` and
`Aevatar.Workflow.Application.Abstractions` because of two files that
straddled the channel/AI and channel/workflow boundary:

- `ChannelContextMiddleware` (an `ILLMCallMiddleware` impl) — moved to
  `Aevatar.GAgents.NyxidChat`, which is the only package that needs it
  and already references `AI.Abstractions`. NyxidChat SCE registers it
  for the LLM call pipeline; Channel.Runtime SCE no longer touches
  `ILLMCallMiddleware`.
- `ChannelCardActionRouting` (builds `WorkflowResumeCommand`) — moved
  to `Aevatar.GAgents.NyxidChat` for the same reason. Its sole consumer
  (`ChannelConversationTurnRunner`) lives there too.

`Channel.Runtime.csproj` now references only `Channel.Abstractions`,
`Foundation.Abstractions`/`Core`, and the `CQRS.Projection.*` slice —
matching the "channel-agnostic flow + projection infrastructure"
charter from the RFC. Tests (`ChannelCardActionRoutingTests`) get the
extra `using Aevatar.GAgents.NyxidChat;`.

### Extract Elasticsearch projection-store toggle helper (review #4)

The `ResolveElasticsearchEnabled` + `BuildElasticsearchOptions` helper
pair was duplicated three times (Channel.Runtime / Device / Scheduled
SCEs) with slightly different log strings and `Console.Error.WriteLine`
output. Centralized into
`Aevatar.CQRS.Projection.Providers.Elasticsearch.DependencyInjection.ElasticsearchProjectionConfiguration`
with two static helpers:

- `IsEnabled(IConfiguration?, ILogger?, string? storeName)` — explicit
  flag → endpoints presence → false; logs a structured warning via
  `ILogger` (when supplied) instead of `Console.Error.WriteLine`.
- `BindOptions(IConfiguration)` — typed binder for
  `ElasticsearchProjectionDocumentStoreOptions`.

All three SCEs now call into this helper; per-package warning text is
parameterized via `storeName`. Section path
(`Projection:Document:Providers:Elasticsearch`) is exposed as a const
so future call sites stay in sync.

### Followup points acknowledged but deferred

- **Cross-package dep chain `NyxidChat → Authoring.Lark → Scheduled →
  Platform.Lark`** (review #1) — pre-existing arch debt that the split
  surfaced rather than introduced. Cleaner would be to invert via
  `IInboundFlowResolver` plug-ins so `ChannelConversationTurnRunner`
  doesn't reach into `AgentBuilderCardFlow` directly. Out of scope for
  the package split; tracking as a separate follow-up.
- **Tombstone compactor "central coordinator"** (review #3) —
  `Channel.Runtime` defines `ITombstoneCompactionTarget` but does not
  reference `Device` / `Scheduled` at the csproj level; per-package
  targets register themselves through DI. The plug-in pattern is
  intentional and keeps the DAG one-way.
- **`Scheduled` package name vs UserAgentCatalog content** (review #5)
  — `UserAgentCatalog` is the delivery-target registry that Scheduled
  agents read at execution time to route output, so co-locating it
  with `SkillRunnerGAgent` / `WorkflowAgentGAgent` is intentional.
  Renaming to `AgentCatalog` would split actors from their primary
  consumer; deferring.

473/473 ChannelRuntime.Tests pass; full slnx still only fails the same
two pre-existing Mainnet hosting `BindAsync on IStudioMemberService`
tests that reproduce on origin/dev.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant