Skip to content

Task Runner

Eshan Roy edited this page Jun 16, 2026 · 1 revision

Task Runner

The task runner schedules and executes workflow tasks with dependency resolution, bounded parallelism, retry support, and per-task timeouts.

Source: pkg/taskrunner/runner.go

Architecture

Plan Phase → Task[] with Dependencies
    │
    ▼
Runner.Schedule() → Kahn's Topological Sort → Execution Groups
    │
    ▼
Runner.ExecuteGroup() → Bounded Parallel Execution (max 4)
    │
    ├── Task 1 (context + timeout) → ExecuteFunc → TaskResult
    ├── Task 2 (context + timeout) → ExecuteFunc → TaskResult
    └── ...

Runner

type Runner struct {
    tasks       []types.Task
    status      map[int]types.TaskStatus
    results     map[int]TaskResult
    idToIdx     map[int]int
    OnTaskStart  func(task types.Task)        // callback on task start
    OnTaskUpdate func(task types.Task, status string) // callback on status change
    TaskTimeout  time.Duration                // per-task timeout (default 30min)
    MaxRetries   int                          // retry attempts for failed tasks
    MaxParallel  int                          // concurrent tasks per group (default 4)
}

Scheduling (Kahn's Algorithm)

Schedule() performs topological sort to determine execution order:

  1. Build adjacency list and in-degree count from task dependencies
  2. Start with zero in-degree nodes (no dependencies)
  3. Process groups: each group contains tasks whose dependencies are all met
  4. Decrement in-degrees for dependents and add new zero-degree nodes
  5. If processed < total tasks, a circular dependency exists

Circular Dependency Detection

Two levels of detection:

  • Self-reference: Task depending on its own ID is caught immediately
  • Cycle detection: Kahn's algorithm naturally detects cycles (processed < total)

Returns ErrCircularDependency on detection.

Execution

ExecuteGroup() runs all tasks in a group concurrently:

Pre-filtering

Before execution, each task is checked:

  • Skip tasks in terminal state (done, failed, skipped, unrecoverable)
  • Skip tasks whose dependencies failed or were skipped
  • Only execute tasks with all dependencies in StatusDone

Bounded Parallelism

Tasks within a group run concurrently, bounded by MaxParallel:

sem := make(chan struct{}, r.maxParallel()) // semaphore channel
// Each goroutine acquires sem before starting, releases on completion

Retry Support

Failed tasks are retried up to MaxRetries times with linear backoff:

for attempt := 0; attempt < maxAttempts; attempt++ {
    result = fn(taskCtx, task)
    if result.Success || attempt == maxAttempts-1 {
        break
    }
    backoff := time.Duration(attempt+1) * time.Second
    // wait backoff before retry
}

Context Propagation

Each task gets its own context derived from the parent:

  • Parent context cancellation propagates to all running tasks
  • Per-task timeout via context.WithTimeout()
  • Context checked both before goroutine launch and inside goroutine

Callbacks

Callback When
OnTaskStart(task) Before task execution begins
OnTaskUpdate(task, status) After task completes (status = "done" or "failed")

TaskResult

type TaskResult struct {
    Success    bool
    Output     string
    Error      string
    CommitHash string    // git commit hash if task created one
    DurationMs int64
    ToolCalls  int       // number of tool calls during execution
}

Query Methods

Method Description
Status(id) Current status of a task
Results() Copy of all task results
AllDone() True when no tasks are pending or running
Summary() Counts: total, done, failed, skipped
Tasks() Task list with updated statuses

Integration with Workflow

The task runner is used by the Execute phase (internal/workflow/execute.go):

  1. Plan phase produces []types.Task with dependencies
  2. Schedule() produces execution groups
  3. Each group is executed via ExecuteGroup()
  4. The ExecuteFunc sends the task to the LLM with tool definitions
  5. The LLM generates tool calls, which are dispatched
  6. Results are fed back to the LLM iteratively
  7. On completion, a git commit is created for the task
  8. Status is updated and persisted to the session

Clone this wiki locally