Conversation
Scaffold proto schemas for ConversationGAgent state and ChannelUserBindingGAgent state + commands/events, per issue #258 Runtime package deliverables. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- ConversationGAgent owns per-conversation state keyed by canonical_key and enforces authoritative dedup + atomic turn commit per RFC §5.2b. - ChannelUserBindingGAgent splits out user credential/preference binding from legacy ChannelUserGAgent; conversation state stays conversation-scoped. - IConversationTurnRunner seam lets the grain defer bot turn execution to runtime without leaking pipeline or adapter details into the actor. - IShardLeaderGrain defines the lease/fencing contract used by gateway adapters (Discord, WeChat gateway). Implementation deferred per issue. Adds Foundation.Abstractions / Foundation.Core project refs and M.E.DependencyInjection.Abstractions / Logging.Abstractions packages. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…sion - ChannelPipeline composes IChannelMiddleware via ASP.NET-Core-style builder. - ConversationResolverMiddleware / LoggingMiddleware / TracingMiddleware cover RFC §5.7 standard middleware set; Tracing attaches mandatory dimensions per §6.1 around channel.pipeline.invoke span. - DurableInboxSubscriber provides OnNextAsync semantics per RFC §5.8 + §9.5.2 (return→commit, throw→redeliver, bounded Channel<T> 1000/Wait/500ms timeout). - ChannelDiagnostics centralises the Aevatar.Channel ActivitySource + span names + tag keys so middleware, grains, and adapters stay on one schema. - AddChannelRuntime() DI extension registers middlewares + default runner. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Cover the issue #258 acceptance criteria: - ConversationGAgent dedup + atomic commit, including TOCTOU-style redeliver collapse, sliding-window cap, continue-command duplicate rejection, and failed-turn event emission. - MiddlewarePipeline ordering, short-circuit, and exception propagation. - DurableInboxSubscriber OnNext semantics: success-commit, throw-redeliver, buffer-full timeout triggers redelivery. - TracingMiddleware emits channel.pipeline.invoke span with mandatory §6.1 dimensions (activity_id, canonical_key, bot_instance_id, channel_id, retry_count, auth_principal) + sets Error status when downstream throws. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
5ee4196 to
8f1f804
Compare
eanzhao
left a comment
There was a problem hiding this comment.
Left three blocking comments on durable inbox delivery semantics, transient continue retry semantics, and the new JSON-backed user-binding state.
Reviewer flagged three correctness issues; fix each at the source. 1. DurableInboxSubscriber: give every enqueued activity its own TCS so OnNextAsync only returns after the pipeline actually finishes. The observer's return → commit / throw → redeliver contract now holds even with the bounded working buffer (previously, OnNextAsync returned after enqueue, so Orleans could commit the offset before the pipeline ran or surface failures through the worker task instead of the observer task). 2. ConversationGAgent: only append to processed_command_ids on terminal (NotRetryable) continue failures. Retriable failures (retry_after_ms oneof) stay retriable so callers can re-dispatch the same logical command id after the back-off — previously the retry came back as DuplicateCommand. 3. ChannelUserPreferences: introduce a typed proto message and replace the opaque preferences_json string across state/command/event contracts. Internal actor state stays proto-typed per CLAUDE.md; downstream projections consume typed fields instead of parsing blobs. Tests updated: - Rewrite OnNextAsync buffer-full test to fire-and-forget the back-pressured sends (they now block on pipeline completion), add coverage for throw-path surfacing through the completion handshake and for dispose-time redelivery signalling. - Add continue-command retriable-vs-permanent failure semantics tests that pin down the new processed_command_ids rules. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a27381cbb0
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if (result.RetryAfter is { } retry) | ||
| failed.RetryAfterMs = (long)retry.TotalMilliseconds; | ||
| else | ||
| failed.NotRetryable = new Google.Protobuf.WellKnownTypes.Empty(); |
There was a problem hiding this comment.
Preserve transient retries when no retry_after is provided
This branch marks every failure without RetryAfter as not_retryable, regardless of FailureKind. That turns ConversationTurnResult.TransientFailure(...) (whose default retryAfter is null) into a terminal failure, and ApplyContinueFailed then consumes the command_id, so subsequent redispatches are rejected as duplicates instead of retrying. This breaks transient recovery paths whenever a runner reports a transient error but omits an explicit backoff value.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in 3056a7e. Retry policy now derives from FailureKind, not from whether RetryAfter was supplied: only PermanentAdapterError writes NotRetryable; every other kind stays retriable with retry_after_ms = supplied value (or 0 when omitted). Added HandleContinueCommandAsync_TransientFailureWithoutRetryAfter_StaysRetriable to pin down the default-backoff case.
| // Block until the worker has actually finished (successful → commit; throw → redeliver). | ||
| await pending.Completion.Task.ConfigureAwait(false); |
There was a problem hiding this comment.
Fail fast when OnNextAsync runs before Start
OnNextAsync always waits on pending.Completion, but that completion is only signaled by the background worker started via Start(). If a caller wires OnNextAsync as the stream handler (as documented) and forgets to call Start, the first delivery hangs indefinitely instead of throwing, which stalls durable-inbox consumption and checkpoint progress. The method should either auto-start the worker or throw immediately when _worker is not running.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in 3056a7e. Extracted EnsureWorkerStarted() (thread-safe CompareExchange singleton) and called it from both Start() and OnNextAsync(). First delivery auto-starts the worker so wiring OnNextAsync directly as a stream handler no longer hangs. Added OnNextAsync_AutoStartsWorker_WhenStartWasNotCalled to cover this path.
Codecov Report✅ All modified and coverable lines are covered by tests. @@ Coverage Diff @@
## dev #279 +/- ##
==========================================
- Coverage 68.59% 68.58% -0.01%
==========================================
Files 1110 1110
Lines 78074 78074
Branches 10221 10221
==========================================
- Hits 53554 53548 -6
- Misses 20610 20614 +4
- Partials 3910 3912 +2
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
Two additional issues surfaced after the earlier round: - P1: ConversationGAgent derived retry policy from whether result.RetryAfter was supplied, not from result.FailureKind. A TransientFailure without an explicit backoff was therefore written as NotRetryable and consumed the command id. AssignRetryPolicy now only sets NotRetryable for PermanentAdapterError; every other kind stays retriable (retry_after_ms = 0 when the runner omits a backoff). - P2: DurableInboxSubscriber.OnNextAsync blocked on pending.Completion which only signals once the worker loop has started. Wiring OnNextAsync directly as a stream handler without calling Start() hung forever. Start is now a thin wrapper over EnsureWorkerStarted (thread-safe via CompareExchange) and OnNextAsync auto-starts the worker on first delivery. Tests added: - HandleContinueCommandAsync_TransientFailureWithoutRetryAfter_StaysRetriable. - OnNextAsync_AutoStartsWorker_WhenStartWasNotCalled. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Closes #258 once merged through the channel-abstractions → dev chain.
Summary
Implements
Aevatar.GAgents.Channel.Runtimepackage on top of the abstractionsmerged in #272 (issue #257).
ConversationGAgent(GAgentBase<ConversationGAgentState>) — per-conversationsingle-activation actor keyed by
ConversationReference.CanonicalKey.Authoritative dedup on
ProcessedMessageIds/ProcessedCommandIdswith asliding-window cap (10 000). Turn runner seam (
IConversationTurnRunner)keeps bot invocation out of the grain.
ChannelUserBindingGAgent— user credential + preferences split out of thelegacy
ChannelUserGAgentper RFC §5.2b.IShardLeaderGrain— abstraction only; Discord gateway implementationdeferred per issue scope.
ChannelPipeline+MiddlewarePipelineBuildercompose
IChannelMiddlewareinstances (ASP.NET-Core-style). Threestandard middlewares shipped:
ConversationResolverMiddlewareLoggingMiddlewareTracingMiddlewareDurableInboxSubscriberbridgesChatActivitystreaminto the pipeline with RFC §5.8 / §9.5.2 semantics:
return→commit,throw→redeliver, boundedChannel<T>(1000 /Wait/ 500 ms).ChannelDiagnosticscentralises theAevatar.ChannelActivitySource, RFC §6.1 span names, and mandatory tag keys.AddChannelRuntime()extension.conversation_state.proto,channel_user_binding.protodefinestate + command/event contracts per CLAUDE.md proto-first rule.
Issue #258 acceptance checklist
Aevatar.GAgents.Channel.Runtime.csproj独立 build 通过OnNextAsync语义单测通过 (exercised viatransport-agnostic
IStreamanalog inDurableInboxSubscriberTests—success-commit, throw-redeliver, buffer-full-timeout)
ConversationGAgent的 dedup + atomic commit 测试通过 (TOCTOU scenariocovered in
ConversationGAgentDedupTests)ChannelTracingSmokeTests; avoids pullingOpenTelemetry.Exporter.InMemoryinto the test project)
Test plan
dotnet build aevatar.slnx --nologo— 0 errors.dotnet test test/Aevatar.GAgents.Channel.Protocol.Tests/Aevatar.GAgents.Channel.Protocol.Tests.csproj— 28 passed, 0 failed.bash tools/ci/channel_mega_interface_guard.sh— ok.bash tools/ci/architecture_guards.sh— passed.bash tools/ci/test_stability_guards.sh— passed (no new polling waits).Deviations / scope notes
(
DurableInboxSubscriber.OnNextAsync/OnNextInlineAsync) rather than byspinning up Orleans streams — Orleans-specific wiring belongs in host-layer
packages, not
Channel.Runtime(matches "no Orleans package in Runtime"constraint). The semantics tested are the ones the observer contract must
honor regardless of provider.
IShardLeaderGrainis intentionally abstraction-only.PersistDomainEventAsync→CommittedStateEventPublisherflow inherited fromGAgentBase. No new projector is introduced — readmodels come in follow-upPRs for
ChatHistory/UserMemoryconsumers.Dependencies
Depends on #272 (issue #257 abstractions). PR is stacked on that base branch.
🤖 Generated with Claude Code