A worker lifecycle library for Go — manage background goroutines with panic recovery, configurable restart, tracing, and structured shutdown.
Built on suture for Erlang-style supervisor trees. Part of the ColdBrew framework.
import "github.com/go-coldbrew/workers"Package workers provides a worker lifecycle library for Go, built on thejerf/suture. It manages background goroutines with automatic panic recovery, configurable restart with backoff, tracing, and structured shutdown.
Every worker runs inside its own supervisor subtree. This means:
- Each worker gets panic recovery and restart independently
- Workers can dynamically spawn child workers via WorkerContext
- When a parent worker stops, all its children stop (scoped lifecycle)
- The supervisor tree prevents cascading failures and CPU-burn restart storms
Create workers with NewWorker and run them with Run:
workers.Run(ctx, []*workers.Worker{
workers.NewWorker("kafka", consume),
workers.NewWorker("cleanup", cleanup).Every(5 * time.Minute).WithRestart(true),
})
Common patterns are provided as helpers:
- EveryInterval — periodic execution on a fixed interval
- ChannelWorker — consume items from a channel one at a time
- BatchChannelWorker — collect items into batches, flush on size or timer
Manager workers can spawn and remove child workers at runtime using the Add, Remove, and Children methods on WorkerContext. Children join the parent's supervisor subtree and get full framework guarantees (tracing, panic recovery, restart). See [Example_dynamicWorkerPool].
Example (Dynamic Worker Pool)
Simulates a config-driven worker pool manager that reconciles desired workers against running workers on each tick. This demonstrates the pattern used by services like route-store where worker configs are loaded from a database periodically.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
// Simulate config that changes over 3 ticks.
// Tick 1: start worker-a
// Tick 2: add worker-b
// Tick 3: remove worker-a
configs := [][]string{
{"worker-a"},
{"worker-a", "worker-b"},
{"worker-b"},
}
tick := 0
manager := workers.NewWorker("pool-manager", func(ctx workers.WorkerContext) error {
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if tick >= len(configs) {
continue
}
desired := map[string]bool{}
for _, name := range configs[tick] {
desired[name] = true
}
tick++
// Remove workers no longer desired.
for _, name := range ctx.Children() {
if !desired[name] {
ctx.Remove(name)
}
}
// Add new workers (Add is a no-op replacement if already running).
for name := range desired {
name := name
ctx.Add(workers.NewWorker(name, func(ctx workers.WorkerContext) error {
<-ctx.Done()
return ctx.Err()
}))
}
time.Sleep(10 * time.Millisecond) // let children start
fmt.Printf("tick %d: children=%v\n", tick, ctx.Children())
}
}
})
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{manager})
fmt.Println("pool shut down")
}tick 1: children=[worker-a]
tick 2: children=[worker-a worker-b]
tick 3: children=[worker-b]
pool shut down
Example (Standalone)
Standalone usage with signal handling — no ColdBrew required.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
// In production you'd use signal.NotifyContext(ctx, os.Interrupt).
// For the example, use a short timeout.
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{
workers.NewWorker("kafka", func(ctx workers.WorkerContext) error {
fmt.Println("consuming messages")
<-ctx.Done()
return ctx.Err()
}),
})
fmt.Println("shutdown complete")
}consuming messages
shutdown complete
- func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(WorkerContext, []T) error) func(WorkerContext) error
- func ChannelWorker[T any](ch <-chan T, fn func(WorkerContext, T) error) func(WorkerContext) error
- func EveryInterval(d time.Duration, fn func(WorkerContext) error) func(WorkerContext) error
- func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error
- func RunWorker(ctx context.Context, w *Worker, opts ...RunOption)
- type BaseMetrics
- func (BaseMetrics) ObserveRunDuration(string, time.Duration)
- func (BaseMetrics) SetActiveWorkers(int)
- func (BaseMetrics) WorkerFailed(string, error)
- func (BaseMetrics) WorkerPanicked(string)
- func (BaseMetrics) WorkerRestarted(string, int)
- func (BaseMetrics) WorkerStarted(string)
- func (BaseMetrics) WorkerStopped(string)
- type Metrics
- type RunOption
- type Worker
- func NewWorker(name string, run func(WorkerContext) error) *Worker
- func (w *Worker) Every(d time.Duration) *Worker
- func (w *Worker) WithBackoffJitter(jitter suture.Jitter) *Worker
- func (w *Worker) WithFailureBackoff(d time.Duration) *Worker
- func (w *Worker) WithFailureDecay(decay float64) *Worker
- func (w *Worker) WithFailureThreshold(threshold float64) *Worker
- func (w *Worker) WithMetrics(m Metrics) *Worker
- func (w *Worker) WithRestart(restart bool) *Worker
- func (w *Worker) WithTimeout(d time.Duration) *Worker
- type WorkerContext
func BatchChannelWorker
func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(WorkerContext, []T) error) func(WorkerContext) errorBatchChannelWorker collects items from ch into batches and calls fn when either the batch reaches maxSize or maxDelay elapses since the first item in the current batch — whichever comes first. Flushes any partial batch on context cancellation or channel close before returning.
Example
BatchChannelWorker collects items into batches and flushes on maxSize or maxDelay — whichever comes first.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
ch := make(chan int, 10)
for i := 1; i <= 6; i++ {
ch <- i
}
close(ch)
fn := workers.BatchChannelWorker(ch, 3, time.Hour, func(ctx workers.WorkerContext, batch []int) error {
fmt.Println(batch)
return nil
})
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
w := workers.NewWorker("batcher", fn)
workers.Run(ctx, []*workers.Worker{w})
}[1 2 3]
[4 5 6]
func ChannelWorker
func ChannelWorker[T any](ch <-chan T, fn func(WorkerContext, T) error) func(WorkerContext) errorChannelWorker consumes items from ch one at a time, calling fn for each. Returns when ctx is cancelled or ch is closed.
Example
ChannelWorker consumes items from a channel one at a time.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
ch := make(chan string, 3)
ch <- "hello"
ch <- "world"
ch <- "!"
close(ch)
fn := workers.ChannelWorker(ch, func(ctx workers.WorkerContext, item string) error {
fmt.Println(item)
return nil
})
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
w := workers.NewWorker("consumer", fn)
workers.Run(ctx, []*workers.Worker{w})
}hello
world
!
func EveryInterval
func EveryInterval(d time.Duration, fn func(WorkerContext) error) func(WorkerContext) errorEveryInterval wraps fn in a ticker loop that calls fn at the given interval. Returns when ctx is cancelled. If fn returns an error, EveryInterval returns that error (the supervisor decides whether to restart based on WithRestart).
Example
EveryInterval wraps a function in a ticker loop.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
count := 0
fn := workers.EveryInterval(20*time.Millisecond, func(ctx workers.WorkerContext) error {
count++
fmt.Printf("tick %d\n", count)
return nil
})
w := workers.NewWorker("periodic", fn)
ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}tick 1
tick 2
func Run
func Run(ctx context.Context, workers []*Worker, opts ...RunOption) errorRun starts all workers under a suture supervisor and blocks until ctx is cancelled and all workers have exited. Each worker gets its own child supervisor — when a worker stops, its children stop too. A worker exiting early (without restart) does not stop other workers. Returns nil on clean shutdown.
Example
Run multiple workers concurrently. All workers start together and stop when the context is cancelled.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
w1 := workers.NewWorker("api-poller", func(ctx workers.WorkerContext) error {
fmt.Println("api-poller started")
<-ctx.Done()
return ctx.Err()
})
w2 := workers.NewWorker("cache-warmer", func(ctx workers.WorkerContext) error {
fmt.Println("cache-warmer started")
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w1, w2})
fmt.Println("all workers stopped")
}api-poller started
cache-warmer started
all workers stopped
func RunWorker
func RunWorker(ctx context.Context, w *Worker, opts ...RunOption)RunWorker runs a single worker with panic recovery and optional restart. Blocks until ctx is cancelled or the worker exits without RestartOnFail.
Example
RunWorker runs a single worker — useful for dynamic managers that spawn child workers in their own goroutines.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
w := workers.NewWorker("single", func(ctx workers.WorkerContext) error {
fmt.Println("running")
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.RunWorker(ctx, w)
fmt.Println("done")
}running
done
type BaseMetrics
BaseMetrics provides no-op implementations of all Metrics methods. Embed it in custom Metrics implementations so that new methods added to the Metrics interface in future versions get safe no-op defaults instead of breaking your build:
type myMetrics struct {
workers.BaseMetrics // forward-compatible
client *statsd.Client
}
func (m *myMetrics) WorkerStarted(name string) {
m.client.Incr("worker.started", []string{"worker:" + name}, 1)
}
type BaseMetrics struct{}func (BaseMetrics) ObserveRunDuration
func (BaseMetrics) ObserveRunDuration(string, time.Duration)func (BaseMetrics) SetActiveWorkers
func (BaseMetrics) SetActiveWorkers(int)func (BaseMetrics) WorkerFailed
func (BaseMetrics) WorkerFailed(string, error)func (BaseMetrics) WorkerPanicked
func (BaseMetrics) WorkerPanicked(string)func (BaseMetrics) WorkerRestarted
func (BaseMetrics) WorkerRestarted(string, int)func (BaseMetrics) WorkerStarted
func (BaseMetrics) WorkerStarted(string)func (BaseMetrics) WorkerStopped
func (BaseMetrics) WorkerStopped(string)type Metrics
Metrics collects worker lifecycle metrics. Implement this interface to provide custom metrics (e.g., Datadog, StatsD). Use BaseMetrics{} to disable metrics, or NewPrometheusMetrics for the built-in Prometheus implementation.
type Metrics interface {
WorkerStarted(name string)
WorkerStopped(name string)
WorkerPanicked(name string)
WorkerFailed(name string, err error)
WorkerRestarted(name string, attempt int)
ObserveRunDuration(name string, duration time.Duration)
SetActiveWorkers(count int)
}func NewPrometheusMetrics
func NewPrometheusMetrics(namespace string) MetricsNewPrometheusMetrics creates a Metrics implementation backed by Prometheus. The namespace is prepended to all metric names (e.g., "myapp" → "myapp_worker_started_total"). Metrics are auto-registered with the default Prometheus registry. Safe to call multiple times with the same namespace — returns the cached instance. The cache is process-global; use a small number of static namespaces (not per-request/tenant values).
type RunOption
RunOption configures the behavior of Run.
type RunOption func(*runConfig)func WithMetrics
func WithMetrics(m Metrics) RunOptionWithMetrics sets the metrics implementation for all workers started by Run. Workers inherit this unless they override via Worker.WithMetrics. If not set, BaseMetrics{} is used.
type Worker
Worker represents a background goroutine managed by the framework. Create with NewWorker and configure with builder methods.
type Worker struct {
// contains filtered or unexported fields
}func NewWorker
func NewWorker(name string, run func(WorkerContext) error) *WorkerNewWorker creates a Worker with the given name and run function. The run function should block until ctx is cancelled or an error occurs.
Example
A simple worker that runs until cancelled.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
w := workers.NewWorker("greeter", func(ctx workers.WorkerContext) error {
fmt.Printf("worker %q started (attempt %d)\n", ctx.Name(), ctx.Attempt())
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}worker "greeter" started (attempt 0)
func (*Worker) Every
func (w *Worker) Every(d time.Duration) *WorkerEvery wraps the run function in a ticker loop that calls it at the given interval. The original run function is called once per tick. If it returns an error, the behavior depends on WithRestart: if true, the ticker worker restarts; if false, it exits.
Example
A periodic worker that runs a function on a fixed interval.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
count := 0
w := workers.NewWorker("ticker", func(ctx workers.WorkerContext) error {
count++
fmt.Printf("tick %d\n", count)
return nil
}).Every(20 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}tick 1
tick 2
func (*Worker) WithBackoffJitter
func (w *Worker) WithBackoffJitter(jitter suture.Jitter) *WorkerWithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts.
func (*Worker) WithFailureBackoff
func (w *Worker) WithFailureBackoff(d time.Duration) *WorkerWithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds.
func (*Worker) WithFailureDecay
func (w *Worker) WithFailureDecay(decay float64) *WorkerWithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0.
func (*Worker) WithFailureThreshold
func (w *Worker) WithFailureThreshold(threshold float64) *WorkerWithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5.
func (*Worker) WithMetrics
func (w *Worker) WithMetrics(m Metrics) *WorkerWithMetrics sets a per-worker metrics implementation, overriding the metrics inherited from the parent WorkerContext or Run options.
func (*Worker) WithRestart
func (w *Worker) WithRestart(restart bool) *WorkerWithRestart configures whether the worker should be restarted on failure. When true, the supervisor restarts the worker with backoff on non-context errors.
Example
A worker with automatic restart on failure. The supervisor logs restart events; the worker succeeds on the third attempt.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
attempt := 0
w := workers.NewWorker("resilient", func(ctx workers.WorkerContext) error {
attempt++
if attempt <= 2 {
return fmt.Errorf("transient error")
}
fmt.Printf("succeeded on attempt %d\n", attempt)
<-ctx.Done()
return ctx.Err()
}).WithRestart(true)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
// This example demonstrates restart behavior. Log output from the
// supervisor is expected between restarts. The worker prints on success.
}func (*Worker) WithTimeout
func (w *Worker) WithTimeout(d time.Duration) *WorkerWithTimeout sets the maximum time to wait for the worker to stop during graceful shutdown. Suture default is 10 seconds.
type WorkerContext
WorkerContext extends context.Context with worker metadata and dynamic child worker management. The framework creates these — users never need to implement this interface.
type WorkerContext interface {
context.Context
// Name returns the worker's name.
Name() string
// Attempt returns the restart attempt number (0 on first run).
Attempt() int
// Add adds or replaces a child worker by name under the same supervisor.
// If a worker with the same name already exists, it is removed first.
// Children get full framework guarantees (tracing, panic recovery, restart).
Add(w *Worker)
// Remove stops a child worker by name.
Remove(name string)
// Children returns the names of currently running child workers.
Children() []string
}Example (!dd)
A manager worker that dynamically spawns and removes child workers using WorkerContext.Add, Remove, and Children.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
manager := workers.NewWorker("manager", func(ctx workers.WorkerContext) error {
// Spawn two child workers dynamically.
ctx.Add(workers.NewWorker("child-a", func(ctx workers.WorkerContext) error {
fmt.Printf("%s started\n", ctx.Name())
<-ctx.Done()
return ctx.Err()
}))
ctx.Add(workers.NewWorker("child-b", func(ctx workers.WorkerContext) error {
fmt.Printf("%s started\n", ctx.Name())
<-ctx.Done()
return ctx.Err()
}))
// Give children time to start.
time.Sleep(30 * time.Millisecond)
fmt.Printf("children: %v\n", ctx.Children())
// Remove one child.
ctx.Remove("child-a")
time.Sleep(30 * time.Millisecond)
fmt.Printf("after remove: %v\n", ctx.Children())
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{manager})
}child-a started
child-b started
children: [child-a child-b]
after remove: [child-b]
Example (!dd_replace)
Replace a child worker by adding one with the same name. The old worker is stopped and the new one takes its place.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
manager := workers.NewWorker("manager", func(ctx workers.WorkerContext) error {
ctx.Add(workers.NewWorker("processor", func(ctx workers.WorkerContext) error {
fmt.Println("processor v1")
<-ctx.Done()
return ctx.Err()
}))
time.Sleep(30 * time.Millisecond)
// Replace with a new version — old one is stopped automatically.
ctx.Add(workers.NewWorker("processor", func(ctx workers.WorkerContext) error {
fmt.Println("processor v2")
<-ctx.Done()
return ctx.Err()
}))
time.Sleep(30 * time.Millisecond)
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{manager})
}processor v1
processor v2
Generated by gomarkdoc