feat: add durable service-run registry for refresh-safe Studio Observe (#429)#430
feat: add durable service-run registry for refresh-safe Studio Observe (#429)#430
Conversation
Resolves issue #429. Introduces an implementation-agnostic, durable service-run registry that all three invoke paths (Workflow / Static / Scripting) write to before returning the accepted receipt. /api/scopes/{scopeId}/services/{serviceId}/runs no longer depends on the workflow-specific IWorkflowRunBindingReader, and /runs/{runId} and /audit resolve via the unified registry first then fan out to implementation-specific detail readers (workflow audit only when the run is workflow-kind). The registry is owned by ServiceRunGAgent (one actor per run, business-named per "actor 即业务实体"), emits ServiceRunRegisteredEvent + ServiceRunStatusUpdatedEvent, and materializes ServiceRunCurrentStateReadModel via a current-state projector. The dispatcher calls IServiceRunRegistrationPort before invoking the target so the durable record is committed prior to receipt return. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 060b9c4f62
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| var snapshots = await serviceRunQueryPort.ListAsync( | ||
| new ServiceRunQuery(scopeId, serviceId, Math.Clamp(take <= 0 ? 50 : take, 1, 200)), | ||
| ct); |
There was a problem hiding this comment.
Register stream invokes before relying solely on run registry
The runs list/get endpoints now read exclusively from IServiceRunQueryPort, but stream invocation paths in HandleInvokeStreamAsync do not write to that registry (the only writer is DefaultServiceInvocationDispatcher.RegisterRunAsync). As a result, runs started through /invoke/*:stream can execute normally yet never appear in /runs after refresh, which regresses Studio Observe’s refresh-safe behavior for streamed workflows/chats.
Useful? React with 👍 / 👎.
| scopeId, | ||
| serviceId, | ||
| snapshot.RunId, | ||
| snapshot.ActorId, |
There was a problem hiding this comment.
Keep run summary ActorId aligned with workflow control APIs
BuildScopeRunSummaryFromRegistryAsync now returns snapshot.ActorId (the service-run registry actor) in ScopeServiceRunSummaryHttpResponse.ActorId. However, resume/signal/stop resolution still matches actorId against workflow binding actor IDs, so clients that round-trip the returned ActorId into control requests will get SERVICE_RUN_NOT_FOUND. This breaks compatibility for callers using run-summary ActorId as the control handle.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
确认这个是有效问题。ScopeServiceRunSummaryHttpResponse.ActorId 之前表达的是 workflow run actor,控制请求的 ActorId 也仍然按 workflow binding actor 匹配;这里改成 service-run registry actor 后,老客户端把 summary 里的 ActorId 回传给 resume/signal/stop 会匹配不到。建议保持 ActorId 为可控制的 target/workflow actor,registry actor 用单独字段表达,避免同一字段语义漂移。
eanzhao
left a comment
There was a problem hiding this comment.
发现两个会影响 service-run registry 正确性的点,已在行内标出。
| await _dispatchPort.DispatchAsync(runActorId, envelope, ct); | ||
| } | ||
|
|
||
| private static string BuildRunActorId(string runId) => $"service-run:{runId}"; |
There was a problem hiding this comment.
[P1] 这里不能只用 runId 作为 registry actor 地址。当前 runId 来自 command_id,调用方可以在不同 scope/service 里复用同一个 command id;这样两次运行会落到同一个 ServiceRunGAgent,而 actor 只按 RunId 判断幂等,第二个 scope/service 的注册会直接 return,不会提交自己的记录。投影里 ServiceRunCurrentStateReadModel.Id = record.RunId 也有同样的跨服务/跨租户覆盖风险。建议把 registry 身份和 document id 收紧到 scope/service/run(或生成真正全局唯一的 run id 并在 receipt 中返回),同时幂等重试要校验 scope/service/target 等不可变字段。
| resolution.Deployments, | ||
| workflowRunBindingReader, | ||
| take, | ||
| var snapshots = await serviceRunQueryPort.ListAsync( |
There was a problem hiding this comment.
[P1] 这里已经只从 IServiceRunQueryPort 读 run 列表,但 :stream invoke 路径没有写入这套 registry。/invoke/chat:stream 和 /services/{serviceId}/invoke/{endpointId}:stream 的 workflow/static/scripting 三个分支都绕过 DefaultServiceInvocationDispatcher,也没有调用 IServiceRunRegistrationPort;客户端收到 RunStarted 后刷新,registry readmodel 仍然没有这条 run,/runs 会为空或 /runs/{runId} 返回 404。需要在流式 accepted/run started 时用实际 run id 注册到同一套 registry。
Three P1 review items from eanzhao + codex:
1. Registry actor and document keys are now scoped to {scopeId}:{serviceId}:{runId}
via the new ServiceRunIds helper. Reusing the same command_id across different
scope/service no longer collides on a single ServiceRunGAgent or overwrites
another tenant's readmodel entry.
2. ServiceRunGAgent's idempotent re-register check now also validates scope_id,
service_id, and target_actor_id against the existing record so silently
accepting a foreign re-registration is impossible.
3. ScopeServiceRunSummaryHttpResponse.ActorId now returns the controllable
target actor (workflow run actor for workflow runs) so existing
resume/signal/stop callers that round-trip the field continue to resolve;
the registry actor is internal infrastructure.
4. HandleInvokeStreamAsync (and its default-chat forwarder) now register the
service run via IServiceRunRegistrationPort before delegating to workflow /
static / scripting stream handlers, so refresh after a streamed invoke
returns the run from /runs.
Adds ServiceRunRegistrationAdapter unit tests, composite-key collision tests
on the projector, and scope/service/target mismatch tests on the actor.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
eanzhao
left a comment
There was a problem hiding this comment.
复查修复后,stream registry 这条还没有完全闭环,已补充行内问题。
| string serviceId, | ||
| CancellationToken ct) | ||
| { | ||
| var commandId = string.IsNullOrWhiteSpace(invocationRequest.CommandId) |
There was a problem hiding this comment.
[P1] 这次把 stream 路径接入了 registry,但这里生成的 commandId 没有传给后面的实际执行链,所以 registry 记录的 RunId 仍然不会稳定等于客户端收到的 run id。workflow 分支会在 WorkflowCapabilityEndpoints.HandleChat 的 command pipeline 里生成自己的 receipt.CommandId;static 分支的 GAgentDraftRunCommand.CommandId 也是 null,由 interaction pipeline 生成后再在 OnAccepted 里作为 RunStarted.RunId 发出;scripting 分支又在 HandleScriptingServiceChatStreamAsync 里单独生成 runId。结果是刷新后客户端拿 SSE 里的 RunStarted.RunId 去 /runs/{runId} 仍可能 404。建议用同一个 id 驱动注册和执行:要么把这里生成的 id 作为 command/run id 显式传入各分支,要么在 workflow/static 的 onAccepted 和 scripting 生成 runId 后,用实际发给客户端的 id 注册 registry。
eanzhao
left a comment
There was a problem hiding this comment.
补充一个验证发现:现有 stream integration 测试当前会失败。
| [FromServices] ServiceInvocationResolutionService resolutionService, | ||
| [FromServices] IInvokeAdmissionAuthorizer admissionAuthorizer, | ||
| [FromServices] IServiceRunRegistrationPort serviceRunRegistrationPort, | ||
| [FromServices] ICommandInteractionService<WorkflowChatRunRequest, WorkflowChatRunAcceptedReceipt, WorkflowChatRunStartError, WorkflowRunEventEnvelope, WorkflowProjectionCompletionStatus> chatRunService, |
There was a problem hiding this comment.
[P1] 这个新 endpoint 依赖没有同步到现有 stream integration test host,当前 dotnet test test/Aevatar.GAgentService.Integration.Tests/Aevatar.GAgentService.Integration.Tests.csproj --no-restore --nologo --filter "FullyQualifiedName~ScopeInvokeStreamEndpoint|FullyQualifiedName~InvokeStreamEndpoint" 会有 6 个用例失败,都是 No service for type 'IServiceRunRegistrationPort' has been registered,成功路径变 500,错误路径也拿不到预期 JSON。需要在测试宿主里注册 recording/null IServiceRunRegistrationPort,最好顺带断言 stream 注册使用的 run id 和 SSE 返回给客户端的 run id 一致。
dev added HandleInvokeMemberStreamAsync (PR #427 member-first APIs) which forwards to HandleInvokeStreamAsync. Pass IServiceRunRegistrationPort through so the new member-stream entry point also registers a service-run record before delegating, matching the other stream entry points. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two P1 review items from PR #430: 1. Stream invoke paths previously registered the run upfront with a freshly generated commandId, but the workflow / static / scripting pipelines each produced their own run id downstream. The id the SSE RunStarted frame carried did not match the id stored in the registry, so /runs/{runId} would return 404 after refresh. Each stream branch now registers via IServiceRunRegistrationPort at the moment the actual run id is established: - Workflow: passes an onAcceptedHook into WorkflowCapabilityEndpoints.HandleChat and registers using receipt.ActorId / receipt.CommandId before the SSE writer emits run-context. - Static: registers inside HandleStaticGAgentChatStreamAsync's existing OnAcceptedAsync, using receipt.CommandId (the same value that becomes RunStarted.RunId). - Scripting: registers inside HandleScriptingServiceChatStreamAsync once the local runId is generated, before RunRuntimeAsync. The shared RegisterStreamServiceRunAsync helper now takes runId / commandId / correlationId / targetActorId explicitly so each branch supplies its authentic values. 2. The integration test host did not register IServiceRunRegistrationPort, so seven stream-endpoint test cases failed with "No service for type 'IServiceRunRegistrationPort' has been registered". Added a recording port + a fake IServiceRunQueryPort to the test host, and bridged the existing FakeWorkflowRunBindingReader fixtures into the query port (with a deployment resolver that fills RevisionId/DeploymentId from FakeServiceLifecycleQueryPort.Deployments) so existing tests keep working without per-test fixture changes. Stream-helper reflection tests now pass through small typed wrappers so future signature changes only require updating two helper definitions instead of every call site. Adds an assertion to the workflow stream test verifying the registry RunId / CommandId / TargetActorId match what the workflow pipeline returns. Removes three obsolete assertions that checked the workflow binding reader was queried (the run-list/get endpoints now go through IServiceRunQueryPort). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
ServiceRunGAgentactor (one per run),ServiceRunCurrentStateReadModelmaterialized by a current-state projector, plusIServiceRunQueryPort/IServiceRunRegistrationPort/IServiceRunCurrentStateProjectionPortabstractions.DefaultServiceInvocationDispatchernow register the run via the new port before returning the accepted receipt; receipts still carry stablecommand_id/correlation_id/target_actor_id/ newrun_id.IWorkflowRunBindingReaderin the run directory endpoints (/api/scopes/{scopeId}/services/{serviceId}/runs,/runs/{runId},/audit, plus default-service forwarders) with the unifiedIServiceRunQueryPort./auditfirst resolves the run via the registry then fans out to the workflow report only when the implementation kind isWorkflow.ScopeServiceRunSummaryHttpResponsewithImplementationKind,Status,CommandId,CorrelationId,EndpointId,TargetActorId,CreatedAtso Studio Observe can rely on registry truth across all three kinds.Resolves #429.
Architecture notes
ServiceRunGAgentis one actor per run (business entity = a run; aligns with "actor 即业务实体").ServiceRunCurrentStateProjectorconsumesEventEnvelope<CommittedStateEventPublished>via the state-mirror pattern (TryUnpackState<ServiceRunState>), upserts togagent-service-runsindex keyed byrunId. No middle-layer state, no replay/priming on the query path.IServiceRunRegistrationPort.RegisterAsync→IActorRuntime.CreateAsync<ServiceRunGAgent>(actorId="service-run:{runId}")→EnsureProjectionAsync→ dispatchRegisterServiceRunRequestedenvelope → actor commitsServiceRunRegisteredEvent→ projector materializes readmodel.target_actor_idin the registry record stores the implementation-specific actor (workflow run actor for workflow; service primary actor for static/scripting), keeping the registry independent of how each kind executes.Out of scope (per issue)
Accepted(issue allows: registry is the run directory, not the live state machine)Test plan
dotnet build aevatar.slnx --nologo— cleandotnet test test/Aevatar.GAgentService.Tests/Aevatar.GAgentService.Tests.csproj— 343/343 pass (10 new tests across actor, projector, dispatcher integration)dotnet test test/Aevatar.Workflow.Host.Api.Tests/Aevatar.Workflow.Host.Api.Tests.csproj— 325/325 passbash tools/ci/architecture_guards.sh— all checks pass (route mapping / current-state projection / closed-world workflow / etc.)bash tools/ci/test_stability_guards.sh— pass (no new polling)bash tools/ci/solution_split_test_guards.sh— pass across all eight solution filtersplayground_asset_drift_guard.shstill passes (skipped locally — environmental, requires pnpm/vite)🤖 Generated with Claude Code