Progressive Lark reply via NyxID relay edit-message (#352)#374
Conversation
Codecov Report❌ Patch coverage is
@@ Coverage Diff @@
## dev #374 +/- ##
=======================================
Coverage 69.91% 69.91%
=======================================
Files 1172 1172
Lines 83456 83504 +48
Branches 10969 10976 +7
=======================================
+ Hits 58346 58385 +39
- Misses 20898 20905 +7
- Partials 4212 4214 +2
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 4 files with indirect coverage changes 🚀 New features to boost your workflow:
|
Drives streaming edit-in-place of an LLM reply so Lark users see a placeholder within ~1s and incremental updates instead of a 5-30s silent wait. Built on top of the NyxID relay edit endpoint shipped in ChronoAIProject/NyxID#480 / #483. This redesign keeps the reply token inside the conversation actor (per PR #368's security boundary): the inbox runtime only accumulates deltas and signals them to the actor; the actor is the sole caller of the outbound port, holding both the reply token and the placeholder platform_message_id in in-memory runtime state. ## Flow Inbox runtime (async LLM, no outbound port access): LLM stream delta -> TurnStreamingReplySink.OnDeltaAsync (throttle 750ms) -> actor.HandleEventAsync(LlmReplyStreamChunkEvent) LLM stream ends -> TurnStreamingReplySink.FinalizeAsync (bypass throttle) -> actor.HandleEventAsync(LlmReplyStreamChunkEvent) final flush -> actor.HandleEventAsync(LlmReplyReadyEvent) ConversationGAgent (single-threaded turn, owns reply token + streaming state): HandleLlmReplyStreamChunkAsync: - resolve runtimeContext with reply token (same path as HandleLlmReplyReadyAsync) - read per-correlation streaming state (PlatformMessageId, Disabled, EditCount) - if first chunk: runner.RunStreamChunkAsync(chunk, null PMID) -> placeholder send - if subsequent: runner.RunStreamChunkAsync(chunk, PMID) -> edit - on failure: mark state.Disabled and drop further chunks for this turn HandleLlmReplyReadyAsync: - if streaming state is healthy and LLM completed: force final update if final text differs from last flushed, then persist ConversationTurnCompletedEvent directly (no re-send) - otherwise: existing RunLlmReplyAsync fallback path - cleanup streaming state + reply token in both paths ## Architectural rules respected - Reply token never leaves the actor. Inbox runtime can't call the outbound port; it can only signal via EventEnvelope. Actor's _nyxRelayReplyTokens dict remains the single authoritative store. - No middle-layer ID map. Sink per-turn state (throttle timestamp, last emitted text) lives in instance fields on a per-invocation sink; the actor's _nyxRelayStreamingStates dict is actor-owned runtime state, same lifecycle as _nyxRelayReplyTokens. - No generic actor query/reply. Chunk dispatch is fire-and-(awaited)- order-preserving signaling; no back-channel read. - New proto event LlmReplyStreamChunkEvent is a runtime-only signal documented as never-persist; HandleEventAsync dispatches to handler without PersistDomainEventAsync, matching existing event flow. - Strong-typed event evolution: new event instead of overloading LlmReplyReadyEvent with a boolean, keeping semantics single-purpose. ## Degradation policy (v1) Any placeholder or update failure permanently disables streaming for the turn and falls back to the legacy single-shot reply path. Partial stale placeholder may remain visible to the user; the final send arrives as a second message. Transient-failure retry is tracked separately in #371 as backlog. ## Feature flag NyxIdRelayOptions.StreamingRepliesEnabled defaults to true; StreamingFlushIntervalMs defaults to 750ms per the issue spec. ## Verification - dotnet build aevatar.slnx - dotnet test test/Aevatar.GAgents.ChannelRuntime.Tests (298 passed, +17 new) - dotnet test test/Aevatar.AI.Tests (469 passed, +5 new) - dotnet test test/Aevatar.GAgents.Channel.Protocol.Tests (108 passed, +6 new) - bash tools/ci/architecture_guards.sh - bash tools/ci/test_stability_guards.sh Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
e9116b1 to
e5c3324
Compare
…treaming-reply # Conflicts: # test/Aevatar.GAgents.Channel.Protocol.Tests/ConversationGAgentDedupTests.cs
…treaming-reply # Conflicts: # agents/Aevatar.GAgents.Channel.Runtime/Conversation/ConversationGAgent.cs # agents/Aevatar.GAgents.ChannelRuntime/ChannelLlmReplyInboxRuntime.cs
…treaming-reply Resolve conflicts in ChannelLlmReplyInboxRuntime by combining streaming reply sink (TimeProvider-driven TurnStreamingReplySink) with dev's bot-owner LLM config resolution (INyxIdRelayScopeResolver + IUserConfigQueryPort feeding BuildEffectiveMetadataAsync). Tests on both sides retained; ambiguous NyxIdRelayOptions references in streaming tests qualified to the Channel.NyxIdRelay type.
| { | ||
| Logger.LogInformation( | ||
| "Streaming chunk delivery failed; disabling streaming for turn. correlation={CorrelationId}, code={Code}, editUnsupported={EditUnsupported}", | ||
| evt.CorrelationId, |
There was a problem hiding this comment.
[P1] 这里把 streaming 标成 Disabled 后,最终 LlmReplyReady 会回到 RunLlmReplyAsync 再发一次完整回复。但只要第一段 chunk 已经成功走过 /reply,NyxID reply token 就已经被消费;NyxID 的 /reply/update 也明确要求 JTI 已经被 /reply 消费过。因此后续任意 edit 失败、final flush 失败、LLM 终局失败后的“单次回复补偿”都会尝试复用已消费 token,实际大概率 401,用户只会看到 stale partial,而不是最终答案。建议重新设计失败降级:比如继续尝试最终 edit、把 partial 作为失败终态,或引入明确的 second-send/API-key 补偿路径。
There was a problem hiding this comment.
已修复 at 0d3a91a。
现在 streaming 状态有两种失败分支,语义由 reply token 是否已被消费决定:
Disabled= 首次发送前就失败(runtime context 没 token,或第一次/reply本身失败),此时 token 仍然可用,LlmReplyReady走RunLlmReplyAsync的 fallback 是安全的(保留旧行为)。SuppressInterim= 第一 chunk 已经成功走过/reply(PlatformMessageId非空),之后的/reply/update失败。此时 JTI 已死,绝不回退到RunLlmReplyAsync。后续 interim chunk 直接丢弃,但TryCompleteStreamedReplyAsync仍会尝试 final edit(本来就走/reply/update)。
Final edit 还失败时,新代码也不会 fallback 到 /reply——按你说的"把 partial 作为失败终态":直接用最后一次成功 flush 的文本落一个 ConversationTurnCompletedEvent,链路停,token 清理。用户看到 stale partial,但不会再 401 风暴。
新增三条 regression test 覆盖三种分支:
HandleLlmReplyStreamChunkAsync_InterimEditFailureAfterTokenConsumed_SuppressesSubsequentChunksWithoutDisablingFinalEditHandleLlmReplyReadyAsync_WhenTokenAlreadyConsumedAndInterimEditFailed_RetriesFinalEditInsteadOfReusingTokenHandleLlmReplyReadyAsync_WhenTokenConsumedAndFinalEditAlsoFails_PersistsLastFlushedPartialAsTerminalWithoutReusingToken
关于"second-send / API-key 补偿路径"——目前没加:NyxID relay 那边没 second-send 协议,绕过 token 用 bot API key 直发要打穿 actor 边界(token 是 actor-owned runtime state),改动面太大;当下诚实地把 partial 作为 terminal 比伪装 fallback 更稳。如果后面定义了独立的 second-send contract,可以再接进来。
| if (string.IsNullOrEmpty(chunk.DeltaContent)) | ||
| continue; | ||
|
|
||
| output.Append(chunk.DeltaContent); |
There was a problem hiding this comment.
[P2] 当前 streaming sink 只在 ChatStreamAsync 产出 DeltaContent 后才会被调用,所以首条 Lark 消息不是在 LLM 调用开始时发出,而是等模型 first delta。这个实现能把“首次可见”从完整回复完成提前到 first delta,但不能保证 issue 里说的 ≤1s 占位;如果模型冷启动、路由、工具调用或 provider 首 token 慢,用户仍会静默等待。若目标是稳定 ≤1s,需要在开始 LLM 前先发送固定 placeholder,再用 delta 覆盖。
There was a problem hiding this comment.
已修复 at 0d3a91a。
在 NyxIdConversationReplyGenerator.GenerateReplyAsync 里,进入 ChatStreamAsync 之前就先走一次 streamingSink.OnDeltaAsync(placeholder, ct)。这样首条 Lark 消息在 LLM 真正产 delta 之前就发出去了,time-to-first-visible = Lark 出站 RTT,和模型冷启动、路由、tool call 前置无关。第一条真 delta 会通过 edit-in-place 把 placeholder 覆盖掉。
Placeholder 由 NyxIdRelayOptions.StreamingPlaceholderText 控制,默认 "…"(短、语言中立、emoji 渲染差异小)。设成空/空白可关掉——此时退回到你原来的"等首 delta 再发"语义,代价是冷启动路径 ≤1s 不能保证。
注意两点失败耦合:
- Placeholder 发送失败 →
PlatformMessageId为空 → 走 P1 里Disabled分支 →LlmReplyReady可以安全 fallback 到RunLlmReplyAsync。 - Placeholder 成功 + 后续 LLM 抛 / 超时 / 空 reply →
FinalizeAsync或HandleLlmReplyReadyAsync的 final edit 负责把 placeholder 覆盖为错误分类文案或最终文本(走/reply/update,不再用死 token)。
新增 3 条测试:
GenerateReplyAsync_WithStreamingSinkAndPlaceholderConfigured_EmitsPlaceholderBeforeFirstDeltaGenerateReplyAsync_WithStreamingSinkButEmptyPlaceholderOption_SkipsPlaceholderEmitGenerateReplyAsync_WithoutStreamingSink_SkipsPlaceholderEmit
…eview) P1: Once the first streaming chunk consumed the NyxID /reply token, a subsequent interim /reply/update failure marked the turn Disabled, and LlmReplyReady then fell through to RunLlmReplyAsync which issued a fresh /reply with the already-consumed JTI (401). The user was left staring at a stale partial. The fix splits the in-memory streaming state into two failure modes: Disabled (pre-send, reply token still fresh, fallback to /reply is safe) vs SuppressInterim (post-send, token dead, skip interim edits but still try the final edit). If the final edit also fails, the actor now persists the last flushed partial as the terminal completion rather than spinning on a guaranteed-401 fallback. P2: The streaming sink only fired on the first non-empty LLM delta, so cold-start, router handoff, or tool-call-before-first-token pushed the time-to-first-visible above the ≤1s target. Introduce an opt-in StreamingPlaceholderText (default "…") that the reply generator emits as the very first streaming chunk, before awaiting ChatStreamAsync. The first real delta overwrites the placeholder via edit-in-place. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Closes #352.
Consumes the new
POST /api/v1/channel-relay/reply/updateendpoint shipped in ChronoAIProject/NyxID#480 (merged as ChronoAIProject/NyxID#483) so Lark bot replies render progressively instead of leaving the user in a 5–30s silent wait.Redesigned after PR #368 merge: PR #368 moved the reply token from
OutboundDeliveryContext(serializable) intoConversationGAgent._nyxRelayReplyTokens(actor-owned in-memory). This invalidated the earlier design where the inbox runtime called the outbound port directly, because the inbox runtime is a stream subscriber and has no access to the actor-scoped token. The current design flips the responsibility: the inbox runtime only signals deltas, and the actor is the sole caller of the outbound port.Flow
Architectural rules respected (CLAUDE.md)
EventEnvelopes. Actor's_nyxRelayReplyTokensstays the only authoritative store.entity/run/session→ state map_nyxRelayStreamingStatesis actor-owned runtime state (same lifecycle as_nyxRelayReplyTokens).LlmReplyStreamChunkEventis a new event instead of overloadingLlmReplyReadyEventwith a bool — keeps each event's semantic single-purpose.LlmReplyStreamChunkEventis documented as never-persist;HandleEventAsyncdispatches to handler withoutPersistDomainEventAsync.Key changes
EmitResult.platform_message_id— surfacesom_xxxfrom relay/replyresponse so the actor can thread it into subsequent edits.LlmReplyStreamChunkEvent— runtime-only event from inbox runtime to actor.NyxIdApiClient.UpdateChannelRelayReplyAsync+ text wrapper, distinct detection of501 edit_unsupportedviaNyxIdChannelRelayReplyResult.EditUnsupported.NyxIdRelayOutboundPort.UpdateAsyncmirroringSendAsyncshape (takesreplyTokenexplicitly, matching the PR Fix NyxID relay callback authentication #368 contract).IConversationTurnRunner.RunStreamChunkAsync+ConversationStreamChunkResult— unified "send or update based on currentPlatformMessageId" primitive, only invoked by the actor.IStreamingReplySink+TurnStreamingReplySink— throttled chunk event dispatcher, per-invocation state, no outbound port dependency.ChannelLlmReplyInboxRuntime— resolves actor reference once, builds sink, passes to generator; finalizes on stream end.ConversationGAgent— new_nyxRelayStreamingStatesdict,HandleLlmReplyStreamChunkAsynchandler, streaming short-circuit inHandleLlmReplyReadyAsync, combined cleanup inRemoveNyxRelayReplyToken.Degradation policy (v1)
Any placeholder or update failure permanently disables streaming for the turn and falls back to the legacy single-shot reply path. A partial stale placeholder may remain visible to the user; the final send arrives as a second message. Transient-failure retry classification tracked separately in #371 as backlog.
Feature flag
NyxIdRelayOptions.StreamingRepliesEnabled = true(default ON)NyxIdRelayOptions.StreamingFlushIntervalMs = 750(per issue spec)Test plan
dotnet build aevatar.slnxcleandotnet test test/Aevatar.GAgents.ChannelRuntime.Tests— 298 passed (+17 new: sink throttle/dispatch, inbox runtime event selection, outbound port UpdateAsync + SendAsync platform_message_id surfacing)dotnet test test/Aevatar.AI.Tests— 469 passed (+5 new: UpdateChannelRelayReplyAsync happy / 501 / generic error / empty text)dotnet test test/Aevatar.GAgents.Channel.Protocol.Tests— 108 passed (+6 new: actor chunk handler first/subsequent, disable on failure, reply-ready short-circuit vs fallback)bash tools/ci/architecture_guards.shbash tools/ci/test_stability_guards.shFollow-ups
RateLimited, 5xx, someConflict) can retry a bounded number of times before disabling, rather than disabling on the first update failure.🤖 Generated with Claude Code