Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 75 additions & 34 deletions agent/internal/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,48 @@ import (
"github.com/TerrifiedBug/vectorflow/agent/internal/logbuf"
)

// supervisedProcess abstracts an OS subprocess for lifecycle management.
// The real implementation wraps *exec.Cmd; tests substitute mocks.
type supervisedProcess interface {
Start() error
Wait() error
Pid() int
Signal(sig os.Signal) error
Kill() error
}

// procFactory creates and configures a new supervisedProcess.
type procFactory func(bin string, args, env []string, stdout, stderr io.Writer) supervisedProcess

// cmdProcess wraps *exec.Cmd to implement supervisedProcess.
type cmdProcess struct {
cmd *exec.Cmd
}

func (p *cmdProcess) Start() error { return p.cmd.Start() }
func (p *cmdProcess) Wait() error { return p.cmd.Wait() }
func (p *cmdProcess) Pid() int { return p.cmd.Process.Pid }
func (p *cmdProcess) Signal(sig os.Signal) error { return p.cmd.Process.Signal(sig) }
func (p *cmdProcess) Kill() error { return p.cmd.Process.Kill() }

// defaultProcFactory builds the real exec.Cmd-backed supervisedProcess.
func defaultProcFactory(bin string, args, env []string, stdout, stderr io.Writer) supervisedProcess {
cmd := exec.Command(bin, args...)
cmd.Env = env
cmd.Stdout = stdout
cmd.Stderr = stderr
return &cmdProcess{cmd: cmd}
}

// defaultBackoffFunc returns exponential backoff capped at 60s.
func defaultBackoffFunc(restarts int) time.Duration {
backoff := time.Duration(1<<minInt(restarts-1, 6)) * time.Second
if backoff > 60*time.Second {
backoff = 60 * time.Second
}
return backoff
}

type ProcessInfo struct {
PipelineID string
Version int
Expand All @@ -24,26 +66,34 @@ type ProcessInfo struct {
LogLevel string
Secrets map[string]string
ConfigChecksum string
cmd *exec.Cmd
proc supervisedProcess
configPath string
restarts int
done chan struct{}
logBuf *logbuf.RingBuffer
}

type Supervisor struct {
vectorBin string
mu sync.Mutex
processes map[string]*ProcessInfo // pipelineId -> process
basePort int
portSeq int
vectorBin string
mu sync.Mutex
processes map[string]*ProcessInfo // pipelineId -> process
basePort int
portSeq int
mkProc procFactory
startupDelay time.Duration
backoffFunc func(restarts int) time.Duration
stopTimeout time.Duration
}

func New(vectorBin string) *Supervisor {
return &Supervisor{
vectorBin: vectorBin,
processes: make(map[string]*ProcessInfo),
basePort: 8687, // prometheus_exporter ports start at 8688 (portSeq increments before use)
vectorBin: vectorBin,
processes: make(map[string]*ProcessInfo),
basePort: 8687, // prometheus_exporter ports start at 8688 (portSeq increments before use)
mkProc: defaultProcFactory,
startupDelay: 2 * time.Second,
backoffFunc: defaultBackoffFunc,
stopTimeout: 30 * time.Second,
}
}

Expand Down Expand Up @@ -78,36 +128,34 @@ func (s *Supervisor) startProcess(pipelineID, configPath string, version int, lo
args = append(args, "--config", sidecarPath)
}

cmd := exec.Command(s.vectorBin, args...)

// Inject environment variables
cmd.Env = os.Environ()
env := os.Environ()
if logLevel != "" {
cmd.Env = append(cmd.Env, "VECTOR_LOG="+logLevel)
env = append(env, "VECTOR_LOG="+logLevel)
}
for k, v := range secrets {
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v))
env = append(env, fmt.Sprintf("%s=%s", k, v))
}

lb := logbuf.New(500)
cmd.Stdout = io.MultiWriter(os.Stdout, lb)
cmd.Stderr = io.MultiWriter(os.Stderr, lb)
stdout := io.MultiWriter(os.Stdout, lb)
stderr := io.MultiWriter(os.Stderr, lb)

if err := cmd.Start(); err != nil {
proc := s.mkProc(s.vectorBin, args, env, stdout, stderr)
if err := proc.Start(); err != nil {
return fmt.Errorf("start vector for pipeline %s: %w", pipelineID, err)
}

info := &ProcessInfo{
PipelineID: pipelineID,
Version: version,
PID: cmd.Process.Pid,
PID: proc.Pid(),
Status: "STARTING",
StartedAt: time.Now(),
MetricsPort: metricsPort,
APIPort: apiPort,
LogLevel: logLevel,
Secrets: secrets,
cmd: cmd,
proc: proc,
configPath: configPath,
done: make(chan struct{}),
logBuf: lb,
Expand All @@ -122,15 +170,15 @@ func (s *Supervisor) startProcess(pipelineID, configPath string, version int, lo

func (s *Supervisor) monitor(info *ProcessInfo, metricsPort, apiPort int) {
// Mark as running after brief startup delay
time.Sleep(2 * time.Second)
time.Sleep(s.startupDelay)
s.mu.Lock()
if p, ok := s.processes[info.PipelineID]; ok && p == info && p.Status == "STARTING" {
p.Status = "RUNNING"
}
s.mu.Unlock()

// Wait for process to exit
err := info.cmd.Wait()
err := info.proc.Wait()
close(info.done)

s.mu.Lock()
Expand All @@ -147,10 +195,7 @@ func (s *Supervisor) monitor(info *ProcessInfo, metricsPort, apiPort int) {

// Exponential backoff restart: 1s, 2s, 4s, 8s, ... max 60s
info.restarts++
backoff := time.Duration(1<<minInt(info.restarts-1, 6)) * time.Second
if backoff > 60*time.Second {
backoff = 60 * time.Second
}
backoff := s.backoffFunc(info.restarts)

slog.Info("restarting crashed pipeline", "pipeline", info.PipelineID, "backoff", backoff, "restarts", info.restarts)

Expand Down Expand Up @@ -185,19 +230,15 @@ func (s *Supervisor) Stop(pipelineID string) error {
}

func (s *Supervisor) stopProcess(info *ProcessInfo) error {
if info.cmd.Process == nil {
return nil
}

// Send SIGTERM
info.cmd.Process.Signal(syscall.SIGTERM)
info.proc.Signal(syscall.SIGTERM) //nolint:errcheck

// Wait up to 30s for graceful shutdown
// Wait up to stopTimeout for graceful shutdown
select {
case <-info.done:
case <-time.After(30 * time.Second):
case <-time.After(s.stopTimeout):
slog.Warn("pipeline did not stop after timeout, sending SIGKILL", "pipeline", info.PipelineID)
info.cmd.Process.Kill()
info.proc.Kill() //nolint:errcheck
<-info.done
}

Expand Down
Loading
Loading