Skip to content

feat(adk): implement cancel mechanism for ChatModelAgent#797

Merged
hi-pender merged 24 commits intofeat/agent_turn_loopfrom
feat/agent_turn_loop_cancel_v2
Feb 24, 2026
Merged

feat(adk): implement cancel mechanism for ChatModelAgent#797
hi-pender merged 24 commits intofeat/agent_turn_loopfrom
feat/agent_turn_loop_cancel_v2

Conversation

@hi-pender
Copy link
Copy Markdown
Contributor

@hi-pender hi-pender commented Feb 21, 2026

TurnLoop Cancel & Checkpoint Integration

Overview

This PR integrates the Cancel and Checkpoint mechanisms into TurnLoop, enabling:

  1. External Cancel: Cancel running Agent from outside TurnLoop via WithCancel pattern
  2. Checkpoint Storage: Save Agent state on Cancel/Interrupt for later recovery
  3. Resume from Checkpoint: Continue execution from saved checkpoint via WithTurnLoopResume

Background

Business Scenarios

Scenario Description Requirements
User Cancel User clicks "Stop" button to interrupt Agent External Cancel
Message Preemption New message interrupts running Agent Cancel + New Agent
Pod Migration K8s scheduling causes Pod migration, need to save state Cancel + Checkpoint
Human-in-the-loop Agent pauses at specific point waiting for human confirmation Interrupt + Resume

Design Goals

Goal Description
External Cancellability Support canceling Agent from outside TurnLoop
State Persistence Support saving and restoring execution state
Flexible Cancel Modes Support multiple cancel modes (immediate, after tool call, etc.)
Checkpoint-based Resume Resume execution from saved checkpoint across different TurnLoop instances

Architecture

Layer Structure

┌─────────────────────────────────────────────────────────────┐
│                        TurnLoop                              │
│  (Message-driven loop, manages multi-turn conversation)      │
│  - WithCancel: Returns ctx + TurnLoopCancelFunc              │
│  - Run: Drives Agent execution with checkpoint support       │
├─────────────────────────────────────────────────────────────┤
│                         Runner                               │
│  (Agent execution entry, manages Checkpoint storage)         │
│  - RunWithCancel: Run with cancel support                    │
│  - ResumeWithCancel: Resume from checkpoint                  │
├─────────────────────────────────────────────────────────────┤
│                     flowAgent                                │
│  (Agent wrapper, handles RunPath and event forwarding)       │
├─────────────────────────────────────────────────────────────┤
│                   ChatModelAgent                             │
│  (Concrete Agent implementation, ReAct loop)                 │
│  - buildNoToolsRunFunc: No-tools mode with cancel support    │
│  - buildReActRunFunc: ReAct mode with cancel support         │
├─────────────────────────────────────────────────────────────┤
│              cancelableChatModel / cancelableTool            │
│  (Cancel signal detection wrappers)                          │
├─────────────────────────────────────────────────────────────┤
│                    compose.Graph                             │
│  (Underlying execution engine, Interrupt/Resume mechanism)   │
└─────────────────────────────────────────────────────────────┘

Signal Flow

TurnLoop.WithCancel(ctx) → turnLoopCancelSig
         ↓
TurnLoop.Run(ctx)
         ↓
Runner.RunWithCancel() → Agent.RunWithCancel() → cancelSig
         ↓
User calls TurnLoopCancelFunc
         ↓
turnLoopCancelSig.done closed
         ↓
TurnLoop detects signal, calls Agent's CancelFunc
         ↓
cancelSig.done closed
         ↓
cancelableChatModel detects signal
         ↓
Returns compose.Interrupt error
         ↓
Agent sends AgentEvent{Action: {Interrupted: ...}}
         ↓
Runner saves checkpoint (if configured)
         ↓
TurnLoop returns TurnLoopInterruptError

Key Changes

1. TurnLoop Cancel Support

New API: WithCancel

// WithCancel returns a new context and cancel function for external cancellation
// Each call creates independent cancel signal, allowing multiple concurrent Runs
func (l *TurnLoop[T]) WithCancel(ctx context.Context) (context.Context, TurnLoopCancelFunc)

// TurnLoopCancelFunc doesn't require ctx parameter (already bound in WithCancel)
type TurnLoopCancelFunc func(opts ...CancelOption) error

Usage Example

ctx, cancel := turnLoop.WithCancel(context.Background())

go func() {
    err := turnLoop.Run(ctx)
    if interruptErr, ok := err.(*TurnLoopInterruptError[T]); ok {
        // Handle interrupt - can resume later
        saveCheckpointID(interruptErr.CheckpointID)
    }
}()

// Later: cancel from external signal (user click, pod migration, etc.)
cancel(WithCancelMode(CancelImmediate))

2. TurnLoop Checkpoint Support

New Run Option: WithTurnLoopResume

// WithTurnLoopResume allows resuming from a previously saved checkpoint
func WithTurnLoopResume[T any](checkPointID string, item T) TurnLoopRunOption[T]

TurnLoopConfig.Store

type TurnLoopConfig[T any] struct {
    // ... existing fields
    Store CheckPointStore  // Optional: enables checkpoint storage
}

TurnLoopInterruptError

type TurnLoopInterruptError[T any] struct {
    Item              T                    // Original input item
    CheckpointID      string               // Checkpoint ID for resume
    InterruptContexts []*InterruptCtx      // Interrupt chain information
}

3. ChatModelAgent No-Tools Mode Fix

Issue: When ChatModelAgent has no tools configured, buildNoToolsRunFunc was ignoring the cancelSig parameter, causing Cancel mechanism to not work.

Fix:

  1. Wrap model with wrapModelForCancelable when cancelSig is provided
  2. Handle compose.Interrupt error and convert to proper AgentEvent
func (a *ChatModelAgent) buildNoToolsRunFunc(_ context.Context) runFunc {
    return func(ctx context.Context, ..., cs *cancelSig, ...) {
        wrappedModel := buildModelWrappers(a.model, ...)
        
        // NEW: Wrap for cancel support
        if cs != nil {
            wrappedModel = wrapModelForCancelable(wrappedModel, cs)
        }
        
        // ... chain execution ...
        
        // NEW: Handle Interrupt error (same as buildReActRunFunc)
        info, ok := compose.ExtractInterruptInfo(err)
        if ok {
            event := CompositeInterrupt(ctx, info, data, is)
            generator.Send(event)
        }
    }
}

Signal Handling in TurnLoop

State Machine

                    ┌─────────────────────────────────────────┐
                    │              Main Loop                   │
    ┌───────────┐   │   ┌───────────┐       ┌───────────┐    │
    │           │   │   │           │       │           │    │
───▶│   Idle    │───┼──▶│  Running  │──────▶│   Idle    │────┼───▶
    │  (wait)   │   │   │  (agent)  │       │  (wait)   │    │
    └───────────┘   │   └───────────┘       └───────────┘    │
         │          │        │ │                             │
         │          │        │ │ External Cancel             │
         │          │        │ └────────────┐                │
         │          │        │              │                │
         │          │        ▼              ▼                │
         │          │   ┌───────────┐  ┌───────────┐         │
         │          │   │  Cancel   │  │ Interrupt │         │
         │          │   │  (stop)   │  │  (pause)  │         │
         │          │   └───────────┘  └───────────┘         │
         │          │        │              │                │
         │          │        ▼              ▼                │
         │          │   ┌─────────────────────────┐          │
         │          │   │  Return Error/Resume    │          │
         │          │   └─────────────────────────┘          │
         │          └────────────────────────────────────────┘
         ▼
    ┌───────────┐
    │   Exit    │  (ErrLoopExit / Error)
    └───────────┘

Signal Priority in select

select {
case <-frontDone:           // Front() completed (message preview)
case <-done:                // handleEvents completed
case <-cs.done:             // External cancel signal (highest priority for cancel)
    externalCancelled = true
    cancelFunc(nCtx, ...)   // Trigger Agent cancel
}

Usage Examples

1. Basic Cancel with Checkpoint

store := NewCheckPointStore()

loop, _ := NewTurnLoop(TurnLoopConfig[string]{
    Source:   source,
    GenInput: genInput,
    GetAgent: getAgent,
    Store:    store,  // Enable checkpoint
})

ctx, cancel := loop.WithCancel(context.Background())

done := make(chan error)
go func() {
    done <- loop.Run(ctx)
}()

// Wait for some signal (user cancel, pod migration, etc.)
<-cancelSignal
cancel(WithCancelMode(CancelImmediate))

err := <-done
if interruptErr, ok := err.(*TurnLoopInterruptError[string]); ok {
    // Save checkpoint ID for later resume
    persistCheckpointID(interruptErr.CheckpointID, interruptErr.Item)
}

2. Resume from Checkpoint

// Later (possibly in different Pod/instance)
checkpointID, item := loadPersistedCheckpoint()

loop, _ := NewTurnLoop(TurnLoopConfig[string]{
    Source:   source,
    GenInput: genInput,
    GetAgent: getAgent,
    Store:    store,  // Same store (or connected to same backend)
})

err := loop.Run(context.Background(), WithTurnLoopResume(checkpointID, item))
// Agent continues from where it was interrupted

3. Internal Tool Interrupt with Checkpoint

// Tool that requires human confirmation
interruptTool := NewTool("confirm", func(ctx context.Context, input string) (string, error) {
    return "", compose.Interrupt(ctx, "needs human confirmation")
})

agent, _ := NewChatModelAgent(ctx, &ChatModelAgentConfig{
    Model: model,
    Tools: []BaseTool{interruptTool},
})

loop, _ := NewTurnLoop(TurnLoopConfig[string]{
    Source:   source,
    GetAgent: func(ctx context.Context, item string) (Agent, error) {
        return agent, nil
    },
    Store: store,
})

err := loop.Run(ctx)
if interruptErr, ok := err.(*TurnLoopInterruptError[string]); ok {
    // Show interrupt info to user, wait for confirmation
    // Then resume with the same checkpoint ID
}

Test Coverage

New Test Cases

Test Description
TestTurnLoop_ExternalCancel_WithStore External cancel triggers interrupt, saves checkpoint, then resumes
TestTurnLoop_InternalToolInterrupt_WithCheckpoint_ThenResume Tool interrupt saves checkpoint, then resumes to completion

Test Scenarios Covered

  1. External Cancel Flow:

    • TurnLoop.WithCancel() creates cancel context
    • Run() starts with checkpoint ID
    • External cancel triggers Agent interrupt
    • Checkpoint is saved
    • Run() returns TurnLoopInterruptError
    • Resume with WithTurnLoopResume() continues execution
  2. Internal Tool Interrupt Flow:

    • Tool returns compose.Interrupt error
    • Agent converts to InterruptInfo
    • Runner saves checkpoint
    • TurnLoop returns TurnLoopInterruptError
    • Resume with WithTurnLoopResume() continues from checkpoint

Design Decisions

Decision Choice Rationale
TurnLoop cancel mechanism WithCancel pattern Cancel:Run = 1:1, follows Go context conventions
TurnLoopCancelFunc signature No ctx parameter Context already bound in WithCancel, simplifies API
Interrupt detection Check Action.Interrupted Consistent with internal interrupt mechanism
No-tools mode cancel Wrap model with cancelable Same mechanism as ReAct mode, ensures consistency

Migration Notes

Breaking Changes

None. All changes are additive.

New Dependencies

  • TurnLoopConfig.Store: Optional field, no changes required for existing code
  • WithTurnLoopResume: New option, existing Run() calls unaffected

Related Documents

- Add comments for exported functions (WithCancelMode, WithCancelTimeout, WithPreemptive, etc.)
- Refactor cancelableTool to reduce cyclomatic complexity (43 -> smaller functions)
- Fix file formatting (remove trailing blank lines in react.go)
Replace atomic.Int32 and atomic.Bool (Go 1.19+) with int32 and atomic functions for Go 1.18 compatibility:
- atomic.Int32 -> int32 + atomic.AddInt32/LoadInt32
- atomic.Bool -> int32 + atomic.StoreInt32/LoadInt32
@codecov
Copy link
Copy Markdown

codecov bot commented Feb 21, 2026

Codecov Report

❌ Patch coverage is 70.33898% with 175 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (feat/agent_turn_loop@de163c0). Learn more about missing BASE report.

Files with missing lines Patch % Lines
adk/cancel_wrapper.go 35.48% 94 Missing and 6 partials ⚠️
adk/turn_loop.go 80.38% 31 Missing and 10 partials ⚠️
adk/flow.go 72.34% 10 Missing and 3 partials ⚠️
adk/chatmodel.go 85.18% 10 Missing and 2 partials ⚠️
adk/runner.go 82.92% 6 Missing and 1 partial ⚠️
adk/react.go 96.49% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@                   Coverage Diff                   @@
##             feat/agent_turn_loop     #797   +/-   ##
=======================================================
  Coverage                        ?   79.52%           
=======================================================
  Files                           ?      148           
  Lines                           ?    16261           
  Branches                        ?        0           
=======================================================
  Hits                            ?    12931           
  Misses                          ?     2367           
  Partials                        ?      963           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

- Remove TestChatModelAgentImplementsCancellableRun (replaced by compile-time assertion)
- Fix incorrect variable reference (slowTool -> st)
- Remove incorrect assertion in CancelWithCheckpoint test
Cancel should produce Interrupted event, not Err event
- TestRunWithCancel_Streaming: test cancel with EnableStreaming=true
  - CancelImmediate_DuringModelStream
  - CancelAfterToolCall_Streaming
- TestResumeWithCancel: test checkpoint + cancel workflow
  - RunWithCancel_ThenResumeWithCancel: first cancel, then resume normally
  - ResumeWithCancel_ThenCancel: first cancel, then resume and cancel again
Create new agent and runner instances for resume instead of modifying
the original slowModel fields, which could be read by background goroutines.
Explain that when Cancel is triggered, the inner model's Generate/Stream
continues in a background goroutine, so we must create new instances for
Resume to avoid concurrent read/write on model fields.
- Run now calls runWithCancel(withCancel=false)
- RunWithCancel calls runWithCancel(withCancel=true)
- Resume now calls resumeWithCancel(withCancel=false)
- ResumeWithCancel calls resumeWithCancel(withCancel=true)
- Remove duplicate resume() method
Comment thread adk/interface.go Outdated
Comment thread adk/cancel_wrapper.go
Comment thread adk/cancel_wrapper.go Outdated
@hi-pender hi-pender merged commit 60ba11e into feat/agent_turn_loop Feb 24, 2026
16 checks passed
@hi-pender hi-pender deleted the feat/agent_turn_loop_cancel_v2 branch February 24, 2026 06:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

2 participants