Skip to content

refactor(adk): replace TurnLoop with push-based API#835

Merged
shentongmartin merged 53 commits intoalpha/09from
refactor/eager_receive
Mar 26, 2026
Merged

refactor(adk): replace TurnLoop with push-based API#835
shentongmartin merged 53 commits intoalpha/09from
refactor/eager_receive

Conversation

@shentongmartin
Copy link
Copy Markdown
Contributor

@shentongmartin shentongmartin commented Mar 5, 2026

Unified Cancel State Machine, Eager Receive, and Composite Agent Cancel Support

Problem

The base feat/agent_turn_loop branch introduced CancellableAgent with RunWithCancel/ResumeWithCancel and TurnLoop with NewTurnLoop + Run. Three issues remained:

  1. Cancel was interface-coupled and only worked for ChatModelAgent. CancellableAgent required each agent type to implement RunWithCancel/ResumeWithCancel individually. Composite agents (sequential, loop, parallel, supervisor, planexecute) didn't implement it, so cancel was unavailable in complex workflows.

  2. Safe-point cancels were sentinel errors that broke checkpoint/resume. CancelAfterChatModel and CancelAfterToolCalls propagated as regular errors. The compose layer couldn't distinguish them from real failures, so no checkpoint was saved — targeted resume after a safe-point cancel was impossible.

  3. TurnLoop event delivery was lazy and preemption had race windows. Events buffered internally before reaching the caller. The cancelSig-based cancel wrapper in cancel_wrapper.go used model/tool wrapping to intercept cancel, which was fragile and didn't compose well across agent boundaries.

Additionally, in compose graphs, parent and sub-graph task managers share a cancel channel — if a sub-graph consumed the cancel value first, the parent lost FromGraphInterrupt=true.

  1. TurnContext preempt/stop signals could be over-reported. TurnContext.Preempted / TurnContext.Stopped were closed after calling cancel, but that call's options might not have been included in the CancelError observed by OnAgentEvents (race between cancel calls and cancel finalization).

Solution

Replace CancellableAgent interface with WithCancel option

Cancel is now an AgentRunOption instead of a separate interface:

cancelOpt, agentCancelFunc := WithCancel()
iter, err := agent.Run(ctx, input, cancelOpt)
// later:
handle, _ := agentCancelFunc(WithAgentCancelMode(CancelAfterChatModel))
_ = handle.Wait()

A cancelContext state machine tracks execution state:

Running → Cancelling → CancelHandled  (cancel path)
Running → Done                         (normal completion)

The outer flowAgent owns the cancel lifecycle. Inner agents access the cancelContext via Go context (getCancelContext(ctx)) — filterCancelOption strips the cancel option from nested calls to prevent double-ownership. This replaces cancel_wrapper.go entirely.

Extend cancel to all composite agents

Three bugs prevented cancel from working with composite agents:

  1. flowAgent called markDone() immediately for the workflowAgent path, causing AgentCancelFunc to return ErrExecutionCompleted before the workflow finished.
  2. The first sub-agent's flowAgent wrapper called markDone() on completion, prematurely closing doneChan for subsequent sub-agents.
  3. Parallel sub-agents each overwrote a single graphInterruptFunc, so cancel only interrupted the last branch.

Fix: only the outermost flowAgent owns markDone(). Inner agents access cancelCtx via context but never call markDone(). Parallel agents use a slice of interrupt functions so all branches get interrupted.

Safe-point semantics: transition boundaries ARE safe for all cancel modes

A key behavior decision: workflow transition boundaries (between sub-agents / between loop iterations / transfer points / parallel pre-spawn) are safe for all cancel modes because no sub-agent work is in progress at a boundary. Any cancel mode fires unconditionally at transition boundaries — there is no reason to delay cancellation.

  • CancelAfterChatModel and CancelAfterToolCalls are honored at boundaries because no model or tool call is running.
  • CancelImmediate additionally has a grace period wrapper (1 s by default) that gives child agents time to checkpoint before abort.

Cancel at Workflow Transition Boundaries

Sequential, loop, and parallel workflows now check for cancel at each transition boundary. Since no sub-agent work is in progress at a boundary, all cancel modes (including CancelAfterChatModel and CancelAfterToolCalls) are honored — there's no reason to delay cancellation. CancelImmediate additionally has a grace period wrapper (1 s by default) that gives child agents time to interrupt/checkpoint before force-abort.

Safe-point cancels via compose.Interrupt

Safe-point cancels now emit compose.Interrupt with typed cancelSafePointInfo instead of a sentinel error. This makes compose save checkpoint data automatically, enabling Runner.ResumeWithParams after a safe-point cancel. The runner detects CancelError and populates InterruptContexts from the interrupt signal.

Eager receive pattern

Events from agent execution are consumed in real-time rather than buffered. The react graph has an explicit CancelCheck lambda node after ChatModel for deterministic safe-point handling. Tool streams are wrapped by cancelMonitoredToolHandler — on CancelImmediate, the stream terminates with ErrStreamCancelled (a concrete *StreamCancelledError registered with gob so it survives checkpoint serialization).

New UnboundedChan methods (TrySend, TakeAll, PushFront) support this: preemption recovers unprocessed items via PushFront, and TakeAll enables batch drain.

Permissive TurnLoop API

The TurnLoop API is refined so all methods work before Run is called: Push buffers items, Stop sets a flag so Run exits immediately, Wait blocks until Run completes. Preemption (WithPreempt()) is gate-scoped to the active turn.

Per-turn context override for tracing

GenInputResult.RunCtx allows each turn to override the execution context used by PrepareAgent, the agent run/resume, and OnAgentEvents. This enables a pushed item to attach trace metadata (e.g., trace/span IDs) to the exact agent execution that processes it. The override must be derived from the TurnLoop's run context to preserve cancellation semantics.

FromGraphInterrupt propagation

resolveInterruptCompletedTasks now propagates FromGraphInterrupt upward from sub-graph interrupt info, so the parent graph correctly identifies cancel-triggered interrupts.

Preempt acknowledgment channel (new in this update)

Push() now returns an acknowledgment channel when used with WithPreempt(). This allows callers to wait for the preempt signal to be acknowledged before proceeding:

ok, ack := loop.Push("urgent", WithPreempt(WithAgentCancelMode(CancelAfterToolCalls)))
if ok {
    <-ack  // Wait for cancel to be initiated
}

The acknowledgment channel is closed when:

  • The cancel operation has been initiated (for running loops)
  • Immediately (for loops that haven't started yet)

This eliminates race conditions between pushing urgent items and checking preempt status.

CancelHandle for async cancel operations (new in this update)

AgentCancelFunc returns a CancelHandle and a contributed bool:

handle, contributed := agentCancelFunc(WithAgentCancelMode(CancelAfterChatModel))
if contributed {
    // This call's options were included in the CancelError before it was finalized.
}
err := handle.Wait()  // Blocks until cancel completes

This allows the cancel operation to be started asynchronously while still providing a way to wait for completion and check the outcome (ErrCancelTimeout, ErrExecutionCompleted). The contributed bool is used by TurnLoop to provide strict semantics for turn-level signals.

Strict contributed semantics for TurnContext.Preempted/Stopped (new in this update)

TurnLoop now closes TurnContext.Preempted / TurnContext.Stopped only when the corresponding cancel call actually contributed to the CancelError for the current turn. This prevents a race where cancellation was requested but the cancel error was already created/finalized by another path.

Internally, createCancelError() and markCancelHandled() are synchronized under cancelMu via an atomic helper, so a concurrent cancel call deterministically reports contributed=true or false.

TurnLoop.Resume — resume a stopped loop from checkpoint (new in this update)

TurnLoop now has a Resume method that restarts execution from a previously saved checkpoint rather than from scratch:

exitState, _ := loop.Wait()  // after Stop or error
// later:
err = loop.Resume(ctx, exitState.CheckPointID, newItems)

When Resume is called, TurnLoop loads the checkpoint from Store, reconstructs the pendingResume payload, and on the first iteration resumes the interrupted agent turn (via Runner.ResumeWithParams) instead of running a new one. TurnLoopExitState now carries CheckPointID, CanceledItems, and UnhandledItems so callers have all the data needed for the next Resume without querying the store directly.

ExternalTurnState mode — decouple checkpoint from TurnLoop internals (new in this update)

Setting TurnLoopConfig.ExternalTurnState = true shifts ownership of turn-level checkpoint data from TurnLoop's internal Store to the caller:

Mode What TurnLoop owns What the caller owns
ExternalTurnState=false (default) Saves/loads agent checkpoint via Store Nothing
ExternalTurnState=true Tracks CheckPointID; leaves persistence to caller Load checkpoint, supply items via WithExternalResumeItems

In ExternalTurnState mode, Resume must be given the canceled and unhandled items explicitly:

loop.Resume(ctx, checkPointID, newItems,
    WithExternalResumeItems(canceledItems, unhandledItems))

This enables callers who manage their own persistence layer to drive TurnLoop resume without a separate Store.

GenResume callback — reconstruct the input queue on resume (new in this update)

TurnLoopConfig.GenResume is the resume-time counterpart to GenInput. It is called exactly once on the first iteration of a resumed loop to let callers reconcile items that were in flight when the loop stopped:

GenResume: func(ctx context.Context, loop *TurnLoop[T],
    canceledItems, unhandledItems, newItems []T,
) (*GenResumeResult[T], error) {
    merged := append(canceledItems, newItems...)
    return &GenResumeResult[T]{
        CheckPointID: existingCheckpointID,
        Consumed:     merged[:1],   // passed to PrepareAgent + agent resume
        Remaining:    merged[1:],   // re-queued for subsequent turns
    }, nil
}

GenResumeResult.Consumed items are passed to PrepareAgent and used to resume the agent turn. GenResumeResult.Remaining items are pushed back to the front of the queue. Items in neither list are dropped. This hook replaces GenInput entirely for the first resume iteration.

Stop escalation via AgentCancelOptions (new in this update)

Stop() now accepts AgentCancelOption varargs so callers can simultaneously signal loop exit and cancel the in-flight agent:

loop.Stop(WithAgentCancelMode(CancelImmediate))

turnLoopStopSig stores the cancel opts alongside a generation counter. The watchStopSignal goroutine fires agentCancelFunc each time the generation increases, making multiple escalating Stop calls work correctly without races:

loop.Stop(WithAgentCancelMode(CancelAfterChatModel))
// ... if still running ...
loop.Stop(WithAgentCancelMode(CancelImmediate))  // fires again with stronger mode

turnLoopStopSig refactor: single-use done → repeatable notify (new in this update)

Previously turnLoopStopSig used a single done chan struct{} that was closed on the first Stop call. This prevented multiple Stop calls from delivering different cancel options to the running agent.

The struct now separates two concerns:

  • Permanent stopped state: done is still closed exactly once (when TurnLoop's main loop exits), readable via isStopped().
  • Per-signal notification: a buffered-1 notify channel + generation counter allows signal() to be called multiple times without blocking and without losing the at-least-one-signal guarantee.

TurnLoop API simplification (new in this update)

Three changes simplify the TurnLoop public API by removing error returns that callers never meaningfully handle and splitting Resume into a failable preparation step and a non-failable execution step:

1. NewTurnLoop returns *TurnLoop[T] instead of (*TurnLoop[T], error)

The only validation NewTurnLoop performs is checking that GenInput and PrepareAgent are non-nil. These are programming errors (always known at compile/init time), not runtime failures, so a panic is more appropriate than an error return. This eliminates boilerplate if err != nil blocks at every construction site:

// before
loop, err := NewTurnLoop[string](config)
if err != nil { ... }

// after
loop := NewTurnLoop[string](config)  // panics if GenInput or PrepareAgent is nil

2. Run() returns nothing instead of error

Previously Run() returned an error when called on an already-running or already-finished loop. Since Run is a lifecycle method that starts the main loop asynchronously, duplicate calls are now silently treated as no-ops. Callers retrieve the final outcome via Wait(), which already returns TurnLoopExitState:

// before
if err := loop.Run(ctx); err != nil { ... }

// after
loop.Run(ctx)  // duplicate calls are no-op
exitState, err := loop.Wait()

3. Resume() split into PrepareResume() + Run()

Resume previously combined checkpoint loading/validation (which can fail) with starting the loop (which cannot meaningfully fail). These are now separate steps:

  • PrepareResume(ctx, checkPointID, newItems, ...opts) error — loads the checkpoint from Store, validates items, and stages the resume payload. Returns an error if the checkpoint cannot be loaded or validation fails.
  • Run(ctx) — starts the loop (works for both fresh starts and prepared resumes).
// before
err = loop.Resume(ctx, checkPointID, newItems)

// after
if err := loop.PrepareResume(ctx, checkPointID, newItems); err != nil {
    return err  // checkpoint load or validation failed
}
loop.Run(ctx)

This separation lets callers handle checkpoint errors before committing to the loop lifecycle, and keeps Run as a single uniform entry point for both fresh and resumed loops.

Key Insight

By making cancel an option (WithCancel) instead of an interface (CancellableAgent), the cancel lifecycle becomes orthogonal to agent implementation. A single flowAgent drives the state machine regardless of how many agents are nested — inner agents observe cancellation through context, and safe-point cancels become regular compose interrupts that the checkpoint system already knows how to persist. All cancel modes fire unconditionally at workflow transition boundaries (where no sub-agent work is in progress), and CancelImmediate adds a grace period so child agents can checkpoint before force-abort.

The addition of preempt acknowledgment channels, async CancelHandle waiting, and strict contributed semantics removes race windows in preemption/stop signalling while preserving precise CancelError attribution.

Summary

Problem Solution
Cancel requires implementing CancellableAgent interface per agent type WithCancel as AgentRunOption with cancelContext state machine
Composite agents (sequential, loop, parallel, supervisor, planexecute) don't support cancel Outer flowAgent owns lifecycle; inner agents via context; parallel agents use interrupt func slice
Safe-point cancel as sentinel error prevents checkpoint/resume compose.Interrupt with typed cancelSafePointInfo — checkpoint saved automatically
Safe-point semantics ambiguous at workflow boundaries Transition boundaries are safe for all cancel modes (no sub-agent work in progress); cancel fires unconditionally
Cancel not checked at workflow transition boundaries All cancel modes fire at sequential/loop/parallel transition boundaries unconditionally
CancelImmediate aborts child agents without allowing checkpoint Grace period wrapper gives children 1s to interrupt/checkpoint before force-abort
Lazy event buffering in TurnLoop Eager receive with explicit CancelCheck node; cancelMonitoredToolHandler for streams
Per-item trace context not linked to agent execution GenInputResult.RunCtx overrides per-turn ctx for PrepareAgent/run/OnAgentEvents
Preemption recovery and flexible channel ops UnboundedChan.TrySend/TakeAll/PushFront methods
TurnLoop methods invalid before Run Permissive API: Push buffers, Stop sets flag, Wait blocks until Run completes
Parent graph loses FromGraphInterrupt in shared cancel channel Propagate flag upward in resolveInterruptCompletedTasks
Stream errors during checkpoint serialization *StreamCancelledError gob-registered type; agentEventWrapper.GobEncode consumes unconsumed streams
Graph interrupt functions not idempotent cause double-fire panic Hold mutex across iteration in sendInterrupt and setGraphInterruptFunc
Race between preempt signal and cancel initiation Preempt acknowledgment channel returned from Push(WithPreempt())
Blocking cancel operations CancelHandle.Wait() enables async cancel and explicit outcome checking
TurnContext preempt/stop could be over-reported Strict contributed gating on Preempted/Stopped channel closing
No way to restart a TurnLoop from a saved checkpoint TurnLoop.Resume(...) loads checkpoint and resumes first turn
Turn-level checkpoint storage requires separate Store instance ExternalTurnState=true + WithExternalResumeItems decouples storage from TurnLoop
No hook to reconstruct input queue on resume GenResume callback + GenResumeResult.Consumed/Remaining
Stop() can only signal exit, not cancel the running agent Stop(WithAgentCancelMode(...)) + watchStopSignal per-generation firing
turnLoopStopSig done channel was single-use, prevented escalation Repeatable notify channel + generation counter separate permanent-stop from per-signal cancel
NewTurnLoop returns error for programming mistakes (nil callbacks) NewTurnLoop returns *TurnLoop[T] directly; panics on nil GenInput/PrepareAgent
Run() returns error on duplicate calls that callers must handle Run() returns nothing; duplicate calls are no-op; outcome via Wait()
Resume() mixes failable checkpoint loading with non-failable loop start PrepareResume() returns error for checkpoint/validation; Run() starts the loop

统一取消状态机、Eager Receive 与复合 Agent 取消支持

问题

基础分支 feat/agent_turn_loop 引入了 CancellableAgent(含 RunWithCancel/ResumeWithCancel)和 TurnLoop(NewTurnLoop + Run)。但仍存在三个问题:

  1. 取消与接口耦合,仅对 ChatModelAgent 有效。 CancellableAgent 要求每种 agent 类型单独实现 RunWithCancel/ResumeWithCancel。复合 agent(sequential、loop、parallel、supervisor、planexecute)未实现该接口,因此复杂工作流中无法使用取消。

  2. 安全点取消使用 sentinel error,破坏了 checkpoint/resume。 CancelAfterChatModelCancelAfterToolCalls 作为普通错误传播。compose 层无法将其与真实错误区分,因此不保存 checkpoint——安全点取消后无法定向恢复。

  3. TurnLoop 事件传递延迟,抢占存在竞态窗口。 事件先在内部缓冲后才到达调用方。cancel_wrapper.go 中基于 cancelSig 的取消包装通过 model/tool 拦截实现,脆弱且难以跨 agent 边界组合。

此外,compose 图中 parent 和 sub-graph 的 task manager 共享 cancel channel——若 sub-graph 先消费了 cancel 值,parent 会丢失 FromGraphInterrupt=true

解决方案

WithCancel option 替代 CancellableAgent 接口

取消现在是 AgentRunOption 而非独立接口:

cancelOpt, agentCancelFunc := WithCancel()
iter, err := agent.Run(ctx, input, cancelOpt)
// 之后:
agentCancelFunc(CancelAfterChatModel)

cancelContext 状态机跟踪执行状态:

Running → Cancelling → CancelHandled  (取消路径)
Running → Done                         (正常完成)

外层 flowAgent 拥有取消生命周期。内层 agent 通过 Go context 访问 cancelContextgetCancelContext(ctx))——filterCancelOption 从嵌套调用中剥离 cancel option 以防止双重持有。此方案完全替代了 cancel_wrapper.go

将取消扩展到所有复合 agent

三个 bug 阻止了取消在复合 agent 中工作:

  1. flowAgentworkflowAgent 路径立即调用 markDone(),导致 AgentCancelFunc 在工作流完成前就返回 ErrExecutionCompleted
  2. 第一个子 agent 的 flowAgent 包装在完成时调用 markDone(),过早关闭后续子 agent 的 doneChan
  3. 并行子 agent 各自覆写单一 graphInterruptFunc,导致取消仅中断最后一个分支。

修复:仅最外层 flowAgent 拥有 markDone() 生命周期。内层 agent 通过 context 访问 cancelCtx 但不调用 markDone()。并行 agent 使用 interrupt 函数切片确保所有分支均被中断。

安全点语义:过渡边界对所有取消模式均安全

一个关键行为选择:工作流过渡边界(子 agent 之间 / loop iteration 之间 / transfer 点 / parallel pre-spawn)对所有取消模式都是安全的,因为在边界处没有子 agent 工作正在进行。任何取消模式在过渡边界都会无条件触发——没有理由延迟取消。

  • CancelAfterChatModelCancelAfterToolCalls 在边界处会被执行,因为没有模型或工具调用正在运行。
  • CancelImmediate 额外包含宽限期包装器(默认 1 秒),在强制中止前给子 agent 留出 checkpoint 时间。

工作流过渡边界的取消检查

sequential、loop 和 parallel 工作流现在会在每个过渡边界检查取消。由于边界处没有子 agent 工作正在进行,所有取消模式(包括 CancelAfterChatModelCancelAfterToolCalls)都会被执行——没有理由延迟取消。CancelImmediate 额外包含宽限期包装器(默认 1 秒),在强制中止前给子 agent 留出中断/checkpoint 时间。

安全点取消通过 compose.Interrupt

安全点取消现在发出携带 typed cancelSafePointInfocompose.Interrupt,而非 sentinel error。compose 自动保存 checkpoint 数据,支持安全点取消后通过 Runner.ResumeWithParams 恢复。Runner 检测 CancelError 并从 interrupt signal 填充 InterruptContexts

Eager Receive 模式

Agent 执行的事件现在被实时消费而非缓冲。react graph 在 ChatModel 之后添加了显式 CancelCheck lambda 节点用于确定性安全点处理。工具流被 cancelMonitoredToolHandler 包装——CancelImmediate 时流以 ErrStreamCancelled(注册了 gob 的具体 *StreamCancelledError 类型)终止,确保序列化后仍可识别。

新增 UnboundedChan 方法(TrySendTakeAllPushFront)支持此模式:抢占通过 PushFront 恢复未处理项,TakeAll 支持批量消费。

宽松的 TurnLoop API

TurnLoop API 经过优化,所有方法在 Run 调用前均有效:Push 缓冲项、Stop 设置标志使 Run 立即退出、Wait 阻塞直到 Run 完成。抢占(WithPreempt())通过 gate 限定在当前 turn。

单次 turn 的上下文覆盖(用于链路追踪)

GenInputResult.RunCtx 允许每个 turn 覆盖执行上下文,作用范围包括 PrepareAgent、agent 的 run/resume 以及 OnAgentEvents。这让被 Push 的 item 可以把 trace/span 等元信息绑定到具体的 agent 执行上。该上下文必须从 TurnLoop 的 run ctx 派生,以保留取消语义。

FromGraphInterrupt 传播

resolveInterruptCompletedTasks 现在从子 graph interrupt info 向上传播 FromGraphInterrupt,确保 parent graph 正确识别取消触发的 interrupt。

抢占确认通道(本次更新新增)

Push() 现在在使用 WithPreempt() 时返回确认通道。这允许调用者在继续前等待抢占信号被确认:

ok, ack := loop.Push("urgent", WithPreempt(WithAgentCancelMode(CancelAfterToolCalls)))
if ok {
    <-ack  // 等待取消被发起
}

确认通道在以下情况关闭:

  • 取消操作已发起(对于运行中的循环)
  • 立即关闭(对于尚未启动的循环)

这消除了推送紧急项目和检查抢占状态之间的竞态条件。

用于异步取消操作的 CancelHandle(本次更新新增)

AgentCancelFunc 现在返回 CancelHandle 而非阻塞:

type CancelHandle interface {
    Wait() error
}

handle := agentCancelFunc(WithAgentCancelMode(CancelAfterChatModel))
err := handle.Wait()  // 阻塞直到取消完成

这允许取消操作异步启动,同时仍提供等待完成和检查结果的方式(ErrCancelTimeoutErrExecutionCompleted)。

TurnLoop.Resume — 从 checkpoint 恢复停止的循环(本次更新新入)

TurnLoop 新增 Resume 方法,可从之前保存的 checkpoint 重新启动执行,而非从头开始:

exitState, _ := loop.Wait()  // Stop 或错误后
// 稍后:
err = loop.Resume(ctx, exitState.CheckPointID, newItems)

Resume 调用时,TurnLoop 从 Store 加载 checkpoint,重建 pendingResume 负载,并在第一次迭代时通过 Runner.ResumeWithParams 恢复被中断的 agent turn,而非启动新的一次运行。TurnLoopExitState 现在携带 CheckPointIDCanceledItemsUnhandledItems,调用方无需直接查询 store 即可获得下次 Resume 所需的全部数据。

ExternalTurnState 模式 — 将 checkpoint 与 TurnLoop 内部解耦(本次更新新入)

TurnLoopConfig.ExternalTurnState = true 把 turn 级 checkpoint 数据的持有权从 TurnLoop 内部 Store 转移给调用方:

模式 TurnLoop 负责 调用方负责
ExternalTurnState=false(默认) 通过 Store 保存/加载 agent checkpoint
ExternalTurnState=true 跟踪 CheckPointID;持久化由调用方负责 加载 checkpoint,通过 WithExternalResumeItems 提供项

ExternalTurnState 模式下,Resume 必须显式提供已取消和未处理的项:

loop.Resume(ctx, checkPointID, newItems,
    WithExternalResumeItems(canceledItems, unhandledItems))

这使得自行管理持久化层的调用方无需独立的 Store 即可驱动 TurnLoop resume。

GenResume 回调 — resume 时重建输入队列(本次更新新入)

TurnLoopConfig.GenResumeGenInput 在 resume 时的对应回调。它在恢复循环的第一次迭代时恰好调用一次,让调用方调和循环停止时正在处理中的项:

GenResume: func(ctx context.Context, loop *TurnLoop[T],
    canceledItems, unhandledItems, newItems []T,
) (*GenResumeResult[T], error) {
    merged := append(canceledItems, newItems...)
    return &GenResumeResult[T]{
        CheckPointID: existingCheckpointID,
        Consumed:     merged[:1],
        Remaining:    merged[1:],
    }, nil
}

GenResumeResult.Consumed 项传入 PrepareAgent 并用于恢复 agent turn。GenResumeResult.Remaining 项重新入队头。不属于两者的项被丢弃。该回调在第一次 resume 迭代时完全替代 GenInput

Stop 升级通过 AgentCancelOptions(本次更新新入)

Stop() 现在接受 AgentCancelOption 可变参数,调用方可同时发出退出信号并取消运行中的 agent:

loop.Stop(WithAgentCancelMode(CancelImmediate))

turnLoopStopSig 将 cancel opts 和世代计数存储在一起。watchStopSignal goroutine 在每次世代增加时触发 agentCancelFunc,使多次递进式 Stop 调用在没有竞争的情况下正确工作。

turnLoopStopSig 重构:单次使用 done → 可重复 notify(本次更新新入)

此前 turnLoopStopSig 使用单一 done chan struct{},在第一次 Stop 调用时就被 close。这防止了多次 Stop 调用将不同的 cancel options 传递给运行中的 agent。

结构现在将两个关注点分离:

  • 永久停止状态done 仍尺好只关闭一次(在 TurnLoop 主循环退出时),通过 isStopped() 可读取。
  • 每次信号通知:容量为 1 的 notify channel 加世代计数将 signal() 能够多次调用而不阐塞,同时保证 at-least-one-signal 语义。

TurnLoop API 简化(本次更新新增)

三项变更简化了 TurnLoop 公共 API,移除了调用方无法有效处理的 error 返回值,并将 Resume 拆分为可失败的准备步骤和不可失败的执行步骤:

1. NewTurnLoop 返回 *TurnLoop[T] 而非 (*TurnLoop[T], error)

NewTurnLoop 唯一的校验是检查 GenInputPrepareAgent 是否为 nil。这属于编程错误(在编译/初始化时即可确定),而非运行时故障,因此 panic 比返回 error 更为恰当。这消除了每个构造点的 if err != nil 样板代码:

// 之前
loop, err := NewTurnLoop[string](config)
if err != nil { ... }

// 之后
loop := NewTurnLoop[string](config)  // GenInput 或 PrepareAgent 为 nil 时 panic

2. Run() 不再返回值(原先返回 error

此前 Run() 在对已运行或已完成的循环重复调用时返回 error。由于 Run 是异步启动主循环的生命周期方法,重复调用现在被静默视为空操作。调用方通过 Wait() 获取最终结果,该方法已返回 TurnLoopExitState

// 之前
if err := loop.Run(ctx); err != nil { ... }

// 之后
loop.Run(ctx)  // 重复调用为空操作
exitState, err := loop.Wait()

3. Resume() 拆分为 PrepareResume() + Run()

Resume 此前将 checkpoint 加载/校验(可能失败)与启动循环(不会有意义地失败)合为一体。现在拆分为两个步骤:

  • PrepareResume(ctx, checkPointID, newItems, ...opts) error — 从 Store 加载 checkpoint、校验数据并暂存 resume 负载。若 checkpoint 无法加载或校验失败则返回 error。
  • Run(ctx) — 启动循环(适用于全新启动和已准备好的 resume)。
// 之前
err = loop.Resume(ctx, checkPointID, newItems)

// 之后
if err := loop.PrepareResume(ctx, checkPointID, newItems); err != nil {
    return err  // checkpoint 加载或校验失败
}
loop.Run(ctx)

这种分离让调用方可以在进入循环生命周期之前处理 checkpoint 错误,同时让 Run 作为全新启动和 resume 启动的统一入口。

关键洞察

将取消从接口(CancellableAgent)改为 option(WithCancel),使取消生命周期与 agent 实现正交。单一 flowAgent 驱动状态机,无论嵌套多少层——内层 agent 通过 context 观察取消,安全点取消成为 compose 层已知如何持久化的普通 interrupt。所有取消模式在工作流过渡边界(没有子 agent 工作正在进行)无条件触发,CancelImmediate 额外添加宽限期以便子 agent 在强制中止前完成 checkpoint。

抢占确认通道和 CancelHandle 的添加提供了对异步操作的更好控制,并消除了抢占场景中的竞态条件。

总结

问题 解决方案
取消需要每种 agent 类型实现 CancellableAgent 接口 WithCancel 作为 AgentRunOption + cancelContext 状态机
复合 agent(sequential、loop、parallel、supervisor、planexecute)不支持取消 外层 flowAgent 持有生命周期;内层 agent 通过 context 访问;并行 agent 使用 interrupt 函数切片
安全点取消作为 sentinel error 阻止 checkpoint/resume compose.Interrupt 携带 typed cancelSafePointInfo——自动保存 checkpoint
工作流边界的安全点语义不明确 过渡边界对所有取消模式均安全(无子 agent 工作进行中);取消无条件触发
工作流过渡边界未检查取消 所有取消模式在 sequential/loop/parallel 过渡边界无条件触发
CancelImmediate 中止子 agent 时不允许 checkpoint 宽限期包装器给予子 agent 1 秒中断/checkpoint 时间后再强制中止
TurnLoop 延迟事件缓冲 Eager receive + 显式 CancelCheck 节点 + cancelMonitoredToolHandler 处理流
单个 item 的 trace 无法关联到执行 GenInputResult.RunCtx 覆盖单次 turn 上下文用于 PrepareAgent/run/OnAgentEvents
抢占恢复与灵活通道操作 UnboundedChan.TrySend/TakeAll/PushFront 方法
TurnLoop 方法在 Run 前无效 宽松 API:Push 缓冲、Stop 设标志、Wait 阻塞直到 Run 完成
Parent graph 在共享 cancel channel 中丢失 FromGraphInterrupt resolveInterruptCompletedTasks 中向上传播标志
Checkpoint 序列化时的 stream 错误 *StreamCancelledError gob 注册类型;agentEventWrapper.GobEncode 消费未消费的 stream
Graph interrupt 函数非幂等导致 double-fire panic sendInterruptsetGraphInterruptFunc 中持有 mutex 跨迭代
抢占信号与取消发起之间的竞态 Push(WithPreempt()) 返回的抢占确认通道
阻塞式取消操作 CancelHandle 接口及 Wait() 方法用于异步取消
TurnLoop 无法从保存的 checkpoint 重启 TurnLoop.Resume(...) 加载 checkpoint 并恢复第一个 turn
Turn 级 checkpoint 存储需要独立的 Store ExternalTurnState=true + WithExternalResumeItems 将存储与 TurnLoop 解耦
Resume 时无御针重建输入队列 GenResume 回调 + GenResumeResult.Consumed/Remaining
Stop() 只能发出退出信号,无法取消运行中的 agent Stop(WithAgentCancelMode(...)) + watchStopSignal 按世代触发
turnLoopStopSig done channel 单次使用,阻止升级 可重复 notify channel + 世代计数将永久停止与每次信号取消分离
NewTurnLoop 对编程错误(nil 回调)返回 error NewTurnLoop 直接返回 *TurnLoop[T]GenInput/PrepareAgent 为 nil 时 panic
Run() 重复调用返回 error,调用方必须处理 Run() 不返回值;重复调用为空操作;结果通过 Wait() 获取
Resume() 将可失败的 checkpoint 加载与不可失败的循环启动混为一体 PrepareResume() 返回 error 用于 checkpoint/校验;Run() 启动循环

@hi-pender hi-pender force-pushed the feat/agent_turn_loop branch from 167e5e8 to 5a7c43a Compare March 5, 2026 02:54
@shentongmartin shentongmartin force-pushed the refactor/eager_receive branch from e986cc7 to aec39b6 Compare March 5, 2026 02:58
@codecov
Copy link
Copy Markdown

codecov bot commented Mar 5, 2026

Codecov Report

❌ Patch coverage is 84.58333% with 222 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (alpha/09@30c1995). Learn more about missing BASE report.

Files with missing lines Patch % Lines
adk/turn_loop.go 87.63% 45 Missing and 26 partials ⚠️
adk/cancel.go 82.09% 52 Missing and 13 partials ⚠️
adk/chatmodel.go 76.00% 29 Missing and 7 partials ⚠️
adk/flow.go 66.66% 13 Missing ⚠️
adk/runner.go 80.64% 4 Missing and 2 partials ⚠️
adk/agent_tool.go 64.28% 4 Missing and 1 partial ⚠️
adk/utils.go 64.28% 5 Missing ⚠️
compose/graph_run.go 86.84% 2 Missing and 3 partials ⚠️
adk/interrupt.go 76.47% 2 Missing and 2 partials ⚠️
adk/retry_chatmodel.go 0.00% 2 Missing and 2 partials ⚠️
... and 3 more
Additional details and impacted files
@@             Coverage Diff             @@
##             alpha/09     #835   +/-   ##
===========================================
  Coverage            ?   81.21%           
===========================================
  Files               ?      158           
  Lines               ?    18838           
  Branches            ?        0           
===========================================
  Hits                ?    15300           
  Misses              ?     2415           
  Partials            ?     1123           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@shentongmartin shentongmartin force-pushed the refactor/eager_receive branch from b48db19 to 3242e0d Compare March 5, 2026 09:22
shentongmartin added a commit that referenced this pull request Mar 6, 2026
- Simplify cancel state machine: consolidate markCompleted/markInterrupted/markError
  into single markDone, remove stateInterrupted and stateError
- Remove ErrExecutionInterrupted and ErrExecutionFailed sentinels, use
  ErrExecutionCompleted for all terminal states
- Add cancelMonitoredToolHandler to wrap streamable tool streams with
  cancel monitoring via cancelContextKey (ErrStreamCancelled on CancelImmediate)
- Rename turnLoopCancelSig -> turnLoopStopSig, align Stop/cancel terminology
- Remove unused ErrTurnLoopAlreadyStopped, keep Push returning bool
- Add context cancellation monitoring in TurnLoop.run() to unblock Receive()
- Document Stop() degradation when agent doesn't support WithCancel
- Run agentCancelFunc in goroutine to avoid deadlock in stop path
- Add comprehensive tests for cancelMonitoredToolHandler, cancelContextKey,
  TurnLoop context cancellation, default error propagation, and callback tests

Change-Id: I4fbe23619378b584206790d99bf75f17150cd075
shentongmartin added a commit that referenced this pull request Mar 19, 2026
- Simplify cancel state machine: consolidate markCompleted/markInterrupted/markError
  into single markDone, remove stateInterrupted and stateError
- Remove ErrExecutionInterrupted and ErrExecutionFailed sentinels, use
  ErrExecutionCompleted for all terminal states
- Add cancelMonitoredToolHandler to wrap streamable tool streams with
  cancel monitoring via cancelContextKey (ErrStreamCancelled on CancelImmediate)
- Rename turnLoopCancelSig -> turnLoopStopSig, align Stop/cancel terminology
- Remove unused ErrTurnLoopAlreadyStopped, keep Push returning bool
- Add context cancellation monitoring in TurnLoop.run() to unblock Receive()
- Document Stop() degradation when agent doesn't support WithCancel
- Run agentCancelFunc in goroutine to avoid deadlock in stop path
- Add comprehensive tests for cancelMonitoredToolHandler, cancelContextKey,
  TurnLoop context cancellation, default error propagation, and callback tests

Change-Id: I4fbe23619378b584206790d99bf75f17150cd075
@shentongmartin shentongmartin force-pushed the refactor/eager_receive branch from 98c9018 to af5147a Compare March 19, 2026 09:44
@shentongmartin shentongmartin changed the base branch from feat/agent_turn_loop to alpha/09 March 19, 2026 09:44
@shentongmartin shentongmartin force-pushed the refactor/eager_receive branch 3 times, most recently from 96fb350 to b3cc8f2 Compare March 23, 2026 05:58
shentongmartin added a commit that referenced this pull request Mar 23, 2026
- Simplify cancel state machine: consolidate markCompleted/markInterrupted/markError
  into single markDone, remove stateInterrupted and stateError
- Remove ErrExecutionInterrupted and ErrExecutionFailed sentinels, use
  ErrExecutionCompleted for all terminal states
- Add cancelMonitoredToolHandler to wrap streamable tool streams with
  cancel monitoring via cancelContextKey (ErrStreamCancelled on CancelImmediate)
- Rename turnLoopCancelSig -> turnLoopStopSig, align Stop/cancel terminology
- Remove unused ErrTurnLoopAlreadyStopped, keep Push returning bool
- Add context cancellation monitoring in TurnLoop.run() to unblock Receive()
- Document Stop() degradation when agent doesn't support WithCancel
- Run agentCancelFunc in goroutine to avoid deadlock in stop path
- Add comprehensive tests for cancelMonitoredToolHandler, cancelContextKey,
  TurnLoop context cancellation, default error propagation, and callback tests

Change-Id: I4fbe23619378b584206790d99bf75f17150cd075
@shentongmartin shentongmartin force-pushed the refactor/eager_receive branch 2 times, most recently from 90b13c3 to 3ff710c Compare March 23, 2026 06:56
@shentongmartin shentongmartin force-pushed the refactor/eager_receive branch from f1cac4b to e9c74a4 Compare March 23, 2026 07:40
…drop bugs

- Convert preemptSignal from boolean paused to holdCount counter to
  support multiple independent holders (run loop + Push callers)
- Move currentTC/currentRunCtx onto preemptSignal struct under same
  mutex, replacing the separate RWMutex on TurnLoop
- Add holdAndGetTurn() for atomic hold+snapshot in pushWithStrategy,
  eliminating the TOCTOU race where strategy could observe a stale turn
- Run loop now brackets each turn with holdRunLoop()/endTurnAndUnhold()
  so the unconditional end-of-turn release no longer clobbers a Push
  caller's hold, fixing silent preempt signal drops
- Fix pending ack channels not being closed when holdCount reaches 0,
  preventing goroutine leaks
- Rename methods for clarity: pause->holdRunLoop, release->unholdRunLoop,
  signalWithAck->requestPreempt, waitIfPaused->waitForPreemptOrUnhold,
  check->receivePreempt, pauseAndGetTurn->holdAndGetTurn
- Add comprehensive doc comments on preemptSignal lifecycle
- Add 6 preemptSignal unit tests and 5 integration race-condition tests

Change-Id: I049c56806e42b227b0fbe263d4af9d16f65f60e6
…vent deadlock

Change-Id: If1c1d8ad60ac73225736478c07459f2639ea304c
… reset

- In the done case of runAgentAndHandleEvents select, add non-blocking
  check on preemptDone to handle the select race where done wins over
  preemptDone — previously this would leak the CancelError and
  incorrectly save a checkpoint instead of treating it as a preempt.
- Only save checkpoint when stopSig.isStopped(), not on arbitrary
  handleErr — generic errors (panics, LLM failures) are not resumable.
- Apply same fix in cleanup(): remove runErr != nil from
  shouldSaveCheckpoint condition.
- Extract resetLocked() helper to deduplicate the identical reset body
  across unholdRunLoop, endTurnAndUnhold, and drainAll.

Change-Id: Ie93a0243b9ec9d46be2155a0654f5fdf38a501ec
… tests

- Remove ExternalTurnState, PrepareResume, and per-turn CheckpointID generation
- Add CheckpointID to TurnLoopConfig for declarative checkpoint-based resume
- Auto-detect resume vs fresh start via tryLoadCheckpoint on Run()
- Delete stale checkpoints on clean exit to prevent stale resumption
- Add CheckPointDeleter optional interface in core package for explicit deletion
- Add 15 new tests covering all checkpoint/resume edge cases:
  tryLoadCheckpoint paths, cleanup save/delete paths, GenResume error handling,
  ResumeWithParams, stale checkpoint deletion via context cancellation, etc.

Change-Id: Ic0c7bded8da11229a2816c22fbef3ab42ce74a85
@shentongmartin shentongmartin force-pushed the refactor/eager_receive branch from 15db73d to 8908e8c Compare March 26, 2026 08:52
@shentongmartin shentongmartin force-pushed the refactor/eager_receive branch from 8908e8c to 7e4b2ca Compare March 26, 2026 09:04
@shentongmartin shentongmartin force-pushed the refactor/eager_receive branch from 7e4b2ca to e13edf3 Compare March 26, 2026 09:06
…sed signals

- compose/graph_manager.go: receiveWithListening now returns immediately
  when cancel channel is closed, preventing goroutine hang on subsequent
  receives from a closed channel (previously fell through to unreachable break)
- adk/turn_loop.go: watchStopSignal adds a dedicated case for l.stopSig.done
  to ensure stop signals are not missed on subsequent turns when the notify
  channel was already drained
- adk/turn_loop_test.go: wrap iteration loops in t.Run subtests for better
  isolation and diagnostics; remove unused result assertions

Change-Id: If6479185bb3984ab618b2414f711433bfb4a1c41
@shentongmartin shentongmartin force-pushed the refactor/eager_receive branch from e13edf3 to ab92a8e Compare March 26, 2026 09:08
…gents

- Add cancel-at-transition checks in sequential, loop, and parallel workflows
  Transition boundaries are unconditionally safe — any cancel mode fires
- Track child cancel contexts with activeChildren counter for grace period
- Wrap graph interrupt with grace period when children are active
- Defensive copy in wrapGraphInterruptWithGracePeriod to prevent slice aliasing
- Add cancelAsync helper eliminating time.Sleep-based test synchronization
- Add comprehensive tests for transition boundaries, resume, multi-level
  nesting, custom cancel-unaware agents, and grace period fallback

Change-Id: I1da9fbb365cfc45a7c5db7a26c1c18af93cbfa2a
…shWithStrategy

Move setTurn() before runner.Run()/ResumeWithParams() so that
holdAndGetTurn() always sees a non-nil TurnContext when a turn is active.
Previously, the agent's goroutine could signal agentStarted before the
run loop set currentTC, causing PushStrategy callbacks to receive nil.

Change-Id: I623ba5104a9ca9600cff3bce232e41cd1b1380ef
…double-fire of interrupt functions

Change-Id: If20978ee5836450e9811d0c82de9ea43e9dc4f97
…th concurrent preempts

Change-Id: Ibbcabb3349c42ee334191179cc076c0a5dcacc71
@shentongmartin shentongmartin merged commit 2117056 into alpha/09 Mar 26, 2026
16 checks passed
@shentongmartin shentongmartin deleted the refactor/eager_receive branch March 26, 2026 11:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

2 participants