-
-
Notifications
You must be signed in to change notification settings - Fork 0
Task Runner
Eshan Roy edited this page Jun 16, 2026
·
1 revision
The task runner schedules and executes workflow tasks with dependency resolution, bounded parallelism, retry support, and per-task timeouts.
Source: pkg/taskrunner/runner.go
Plan Phase → Task[] with Dependencies
│
▼
Runner.Schedule() → Kahn's Topological Sort → Execution Groups
│
▼
Runner.ExecuteGroup() → Bounded Parallel Execution (max 4)
│
├── Task 1 (context + timeout) → ExecuteFunc → TaskResult
├── Task 2 (context + timeout) → ExecuteFunc → TaskResult
└── ...
type Runner struct {
tasks []types.Task
status map[int]types.TaskStatus
results map[int]TaskResult
idToIdx map[int]int
OnTaskStart func(task types.Task) // callback on task start
OnTaskUpdate func(task types.Task, status string) // callback on status change
TaskTimeout time.Duration // per-task timeout (default 30min)
MaxRetries int // retry attempts for failed tasks
MaxParallel int // concurrent tasks per group (default 4)
}Schedule() performs topological sort to determine execution order:
- Build adjacency list and in-degree count from task dependencies
- Start with zero in-degree nodes (no dependencies)
- Process groups: each group contains tasks whose dependencies are all met
- Decrement in-degrees for dependents and add new zero-degree nodes
- If processed < total tasks, a circular dependency exists
Two levels of detection:
- Self-reference: Task depending on its own ID is caught immediately
- Cycle detection: Kahn's algorithm naturally detects cycles (processed < total)
Returns ErrCircularDependency on detection.
ExecuteGroup() runs all tasks in a group concurrently:
Before execution, each task is checked:
- Skip tasks in terminal state (done, failed, skipped, unrecoverable)
- Skip tasks whose dependencies failed or were skipped
- Only execute tasks with all dependencies in
StatusDone
Tasks within a group run concurrently, bounded by MaxParallel:
sem := make(chan struct{}, r.maxParallel()) // semaphore channel
// Each goroutine acquires sem before starting, releases on completionFailed tasks are retried up to MaxRetries times with linear backoff:
for attempt := 0; attempt < maxAttempts; attempt++ {
result = fn(taskCtx, task)
if result.Success || attempt == maxAttempts-1 {
break
}
backoff := time.Duration(attempt+1) * time.Second
// wait backoff before retry
}Each task gets its own context derived from the parent:
- Parent context cancellation propagates to all running tasks
- Per-task timeout via
context.WithTimeout() - Context checked both before goroutine launch and inside goroutine
| Callback | When |
|---|---|
OnTaskStart(task) |
Before task execution begins |
OnTaskUpdate(task, status) |
After task completes (status = "done" or "failed") |
type TaskResult struct {
Success bool
Output string
Error string
CommitHash string // git commit hash if task created one
DurationMs int64
ToolCalls int // number of tool calls during execution
}| Method | Description |
|---|---|
Status(id) |
Current status of a task |
Results() |
Copy of all task results |
AllDone() |
True when no tasks are pending or running |
Summary() |
Counts: total, done, failed, skipped |
Tasks() |
Task list with updated statuses |
The task runner is used by the Execute phase (internal/workflow/execute.go):
- Plan phase produces
[]types.Taskwith dependencies -
Schedule()produces execution groups - Each group is executed via
ExecuteGroup() - The
ExecuteFuncsends the task to the LLM with tool definitions - The LLM generates tool calls, which are dispatched
- Results are fed back to the LLM iteratively
- On completion, a git commit is created for the task
- Status is updated and persisted to the session