diff --git a/agent/internal/supervisor/supervisor.go b/agent/internal/supervisor/supervisor.go index ab063a58..529400e5 100644 --- a/agent/internal/supervisor/supervisor.go +++ b/agent/internal/supervisor/supervisor.go @@ -13,6 +13,48 @@ import ( "github.com/TerrifiedBug/vectorflow/agent/internal/logbuf" ) +// supervisedProcess abstracts an OS subprocess for lifecycle management. +// The real implementation wraps *exec.Cmd; tests substitute mocks. +type supervisedProcess interface { + Start() error + Wait() error + Pid() int + Signal(sig os.Signal) error + Kill() error +} + +// procFactory creates and configures a new supervisedProcess. +type procFactory func(bin string, args, env []string, stdout, stderr io.Writer) supervisedProcess + +// cmdProcess wraps *exec.Cmd to implement supervisedProcess. +type cmdProcess struct { + cmd *exec.Cmd +} + +func (p *cmdProcess) Start() error { return p.cmd.Start() } +func (p *cmdProcess) Wait() error { return p.cmd.Wait() } +func (p *cmdProcess) Pid() int { return p.cmd.Process.Pid } +func (p *cmdProcess) Signal(sig os.Signal) error { return p.cmd.Process.Signal(sig) } +func (p *cmdProcess) Kill() error { return p.cmd.Process.Kill() } + +// defaultProcFactory builds the real exec.Cmd-backed supervisedProcess. +func defaultProcFactory(bin string, args, env []string, stdout, stderr io.Writer) supervisedProcess { + cmd := exec.Command(bin, args...) + cmd.Env = env + cmd.Stdout = stdout + cmd.Stderr = stderr + return &cmdProcess{cmd: cmd} +} + +// defaultBackoffFunc returns exponential backoff capped at 60s. +func defaultBackoffFunc(restarts int) time.Duration { + backoff := time.Duration(1< 60*time.Second { + backoff = 60 * time.Second + } + return backoff +} + type ProcessInfo struct { PipelineID string Version int @@ -24,7 +66,7 @@ type ProcessInfo struct { LogLevel string Secrets map[string]string ConfigChecksum string - cmd *exec.Cmd + proc supervisedProcess configPath string restarts int done chan struct{} @@ -32,18 +74,26 @@ type ProcessInfo struct { } type Supervisor struct { - vectorBin string - mu sync.Mutex - processes map[string]*ProcessInfo // pipelineId -> process - basePort int - portSeq int + vectorBin string + mu sync.Mutex + processes map[string]*ProcessInfo // pipelineId -> process + basePort int + portSeq int + mkProc procFactory + startupDelay time.Duration + backoffFunc func(restarts int) time.Duration + stopTimeout time.Duration } func New(vectorBin string) *Supervisor { return &Supervisor{ - vectorBin: vectorBin, - processes: make(map[string]*ProcessInfo), - basePort: 8687, // prometheus_exporter ports start at 8688 (portSeq increments before use) + vectorBin: vectorBin, + processes: make(map[string]*ProcessInfo), + basePort: 8687, // prometheus_exporter ports start at 8688 (portSeq increments before use) + mkProc: defaultProcFactory, + startupDelay: 2 * time.Second, + backoffFunc: defaultBackoffFunc, + stopTimeout: 30 * time.Second, } } @@ -78,36 +128,34 @@ func (s *Supervisor) startProcess(pipelineID, configPath string, version int, lo args = append(args, "--config", sidecarPath) } - cmd := exec.Command(s.vectorBin, args...) - - // Inject environment variables - cmd.Env = os.Environ() + env := os.Environ() if logLevel != "" { - cmd.Env = append(cmd.Env, "VECTOR_LOG="+logLevel) + env = append(env, "VECTOR_LOG="+logLevel) } for k, v := range secrets { - cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v)) + env = append(env, fmt.Sprintf("%s=%s", k, v)) } lb := logbuf.New(500) - cmd.Stdout = io.MultiWriter(os.Stdout, lb) - cmd.Stderr = io.MultiWriter(os.Stderr, lb) + stdout := io.MultiWriter(os.Stdout, lb) + stderr := io.MultiWriter(os.Stderr, lb) - if err := cmd.Start(); err != nil { + proc := s.mkProc(s.vectorBin, args, env, stdout, stderr) + if err := proc.Start(); err != nil { return fmt.Errorf("start vector for pipeline %s: %w", pipelineID, err) } info := &ProcessInfo{ PipelineID: pipelineID, Version: version, - PID: cmd.Process.Pid, + PID: proc.Pid(), Status: "STARTING", StartedAt: time.Now(), MetricsPort: metricsPort, APIPort: apiPort, LogLevel: logLevel, Secrets: secrets, - cmd: cmd, + proc: proc, configPath: configPath, done: make(chan struct{}), logBuf: lb, @@ -122,7 +170,7 @@ func (s *Supervisor) startProcess(pipelineID, configPath string, version int, lo func (s *Supervisor) monitor(info *ProcessInfo, metricsPort, apiPort int) { // Mark as running after brief startup delay - time.Sleep(2 * time.Second) + time.Sleep(s.startupDelay) s.mu.Lock() if p, ok := s.processes[info.PipelineID]; ok && p == info && p.Status == "STARTING" { p.Status = "RUNNING" @@ -130,7 +178,7 @@ func (s *Supervisor) monitor(info *ProcessInfo, metricsPort, apiPort int) { s.mu.Unlock() // Wait for process to exit - err := info.cmd.Wait() + err := info.proc.Wait() close(info.done) s.mu.Lock() @@ -147,10 +195,7 @@ func (s *Supervisor) monitor(info *ProcessInfo, metricsPort, apiPort int) { // Exponential backoff restart: 1s, 2s, 4s, 8s, ... max 60s info.restarts++ - backoff := time.Duration(1< 60*time.Second { - backoff = 60 * time.Second - } + backoff := s.backoffFunc(info.restarts) slog.Info("restarting crashed pipeline", "pipeline", info.PipelineID, "backoff", backoff, "restarts", info.restarts) @@ -185,19 +230,15 @@ func (s *Supervisor) Stop(pipelineID string) error { } func (s *Supervisor) stopProcess(info *ProcessInfo) error { - if info.cmd.Process == nil { - return nil - } - // Send SIGTERM - info.cmd.Process.Signal(syscall.SIGTERM) + info.proc.Signal(syscall.SIGTERM) //nolint:errcheck - // Wait up to 30s for graceful shutdown + // Wait up to stopTimeout for graceful shutdown select { case <-info.done: - case <-time.After(30 * time.Second): + case <-time.After(s.stopTimeout): slog.Warn("pipeline did not stop after timeout, sending SIGKILL", "pipeline", info.PipelineID) - info.cmd.Process.Kill() + info.proc.Kill() //nolint:errcheck <-info.done } diff --git a/agent/internal/supervisor/supervisor_test.go b/agent/internal/supervisor/supervisor_test.go new file mode 100644 index 00000000..e1eceef5 --- /dev/null +++ b/agent/internal/supervisor/supervisor_test.go @@ -0,0 +1,773 @@ +package supervisor + +import ( + "errors" + "fmt" + "io" + "os" + "path/filepath" + "runtime" + "strings" + "sync" + "syscall" + "testing" + "time" +) + +// mockProc implements supervisedProcess for controlled unit testing. +// +// Tests control when Wait() returns by calling exit() or by triggering Kill(). +// Both exit() and Kill() use a sync.Once so exactly one value is ever delivered +// to the Wait() call — mirroring real OS process semantics where Wait returns +// exactly once. +// +// The stdout/stderr writers are stored so tests can simulate process log output +// flowing into the ring buffer. +type mockProc struct { + pid int + stdout io.Writer + stderr io.Writer + + startErr error + + mu sync.Mutex + signals []os.Signal + killed bool + + exitOnce sync.Once + exitCh chan error // buffered(1); exactly one value reaches Wait() +} + +func (m *mockProc) Start() error { + return m.startErr +} + +// Wait blocks until exit() or Kill() is called. +func (m *mockProc) Wait() error { + return <-m.exitCh +} + +func (m *mockProc) Pid() int { return m.pid } + +func (m *mockProc) Signal(sig os.Signal) error { + m.mu.Lock() + m.signals = append(m.signals, sig) + m.mu.Unlock() + return nil +} + +// Kill records the kill and unblocks Wait(), mirroring real SIGKILL semantics. +func (m *mockProc) Kill() error { + m.mu.Lock() + m.killed = true + m.mu.Unlock() + m.exitOnce.Do(func() { m.exitCh <- errors.New("killed") }) + return nil +} + +// exit makes Wait() return with err, simulating a normal process exit. +// If Kill() already fired, this is a no-op (sync.Once guard). +func (m *mockProc) exit(err error) { + m.exitOnce.Do(func() { m.exitCh <- err }) +} + +// receivedSIGTERM reports whether SIGTERM was sent to this process. +func (m *mockProc) receivedSIGTERM() bool { + m.mu.Lock() + defer m.mu.Unlock() + for _, sig := range m.signals { + if sig == syscall.SIGTERM { + return true + } + } + return false +} + +// wasKilled reports whether Kill() was called. +func (m *mockProc) wasKilled() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.killed +} + +// newMockFactory returns a procFactory and a buffered channel that receives +// each *mockProc as it is created. Tests receive from the channel to interact +// with each process (send output, trigger exit, inspect signals). +func newMockFactory(startErr error) (procFactory, chan *mockProc) { + ch := make(chan *mockProc, 16) + pidSeq := 0 + factory := func(bin string, args, env []string, stdout, stderr io.Writer) supervisedProcess { + pidSeq++ + mp := &mockProc{ + pid: pidSeq * 1000, + stdout: stdout, + stderr: stderr, + startErr: startErr, + exitCh: make(chan error, 1), + } + ch <- mp + return mp + } + return factory, ch +} + +// newTestSupervisor creates a Supervisor wired with a mock factory and +// near-zero timing so tests run fast without real sleep delays. +func newTestSupervisor(startErr error) (*Supervisor, chan *mockProc) { + factory, procs := newMockFactory(startErr) + s := New("/fake/vector") + s.mkProc = factory + s.startupDelay = 0 + s.backoffFunc = func(int) time.Duration { return 0 } + s.stopTimeout = 50 * time.Millisecond + return s, procs +} + +// tempConfig creates a placeholder config file in a temp dir and returns its +// path. The directory is automatically cleaned up via t.TempDir(). +func tempConfig(t *testing.T) string { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "pipeline.yaml") + if err := os.WriteFile(path, []byte("# placeholder"), 0600); err != nil { + t.Fatalf("tempConfig WriteFile: %v", err) + } + return path +} + +// drainProcs discards any remaining items from the factory channel so that +// goroutines blocked on channel send do not leak after a test completes. +func drainProcs(ch chan *mockProc) { + for { + select { + case <-ch: + default: + return + } + } +} + +// waitForStatus polls Statuses() until pipelineID reaches the given status +// or the timeout is exceeded. +func waitForStatus(t *testing.T, s *Supervisor, pipelineID, want string, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + for _, st := range s.Statuses() { + if st.PipelineID == pipelineID && st.Status == want { + return + } + } + time.Sleep(2 * time.Millisecond) + } + t.Errorf("timeout waiting for pipeline %q to reach status %q", pipelineID, want) +} + +// ── Start tests ────────────────────────────────────────────────────────────── + +// TestStart_Success verifies that a pipeline transitions STARTING → RUNNING +// after the (zero) startup delay, and that the PID is reflected correctly. +func TestStart_Success(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { s.ShutdownAll(); drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "info", nil); err != nil { + t.Fatalf("Start() error: %v", err) + } + + proc := <-procs + if proc.Pid() != 1000 { + t.Errorf("expected PID 1000, got %d", proc.Pid()) + } + + waitForStatus(t, s, "pipe1", "RUNNING", 500*time.Millisecond) +} + +// TestStart_DuplicatePipeline verifies that starting the same pipeline twice +// returns an "already running" error. +func TestStart_DuplicatePipeline(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { s.ShutdownAll(); drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatalf("first Start() error: %v", err) + } + + err := s.Start("pipe1", cfg, 1, "", nil) + if err == nil { + t.Fatal("expected error for duplicate pipeline, got nil") + } + if !strings.Contains(err.Error(), "already running") { + t.Errorf("unexpected error message: %v", err) + } +} + +// TestStart_ProcStartError verifies that a process whose Start() fails returns +// an error and does not register the pipeline in the active map. +func TestStart_ProcStartError(t *testing.T) { + s, procs := newTestSupervisor(errors.New("exec: no such file")) + cfg := tempConfig(t) + t.Cleanup(func() { drainProcs(procs) }) + + err := s.Start("pipe1", cfg, 1, "", nil) + if err == nil { + t.Fatal("expected error when proc.Start() fails, got nil") + } + if !strings.Contains(err.Error(), "start vector") { + t.Errorf("unexpected error message: %v", err) + } + + if statuses := s.Statuses(); len(statuses) != 0 { + t.Errorf("expected 0 active pipelines, got %d", len(statuses)) + } +} + +// TestStart_PortsAllocated verifies that each new pipeline receives unique +// metrics and API port numbers. +func TestStart_PortsAllocated(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { s.ShutdownAll(); drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatalf("Start pipe1: %v", err) + } + if err := s.Start("pipe2", cfg, 1, "", nil); err != nil { + t.Fatalf("Start pipe2: %v", err) + } + <-procs + <-procs + + statuses := s.Statuses() + if len(statuses) != 2 { + t.Fatalf("expected 2 statuses, got %d", len(statuses)) + } + + ports := map[int]bool{} + for _, st := range statuses { + if ports[st.MetricsPort] { + t.Errorf("duplicate MetricsPort %d", st.MetricsPort) + } + if ports[st.APIPort] { + t.Errorf("duplicate APIPort %d", st.APIPort) + } + ports[st.MetricsPort] = true + ports[st.APIPort] = true + } +} + +// ── Stop tests ─────────────────────────────────────────────────────────────── + +// TestStop_GracefulShutdown verifies that Stop sends SIGTERM, waits for the +// process to exit, and removes it from the active map. +func TestStop_GracefulShutdown(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatal(err) + } + proc := <-procs + + // Respond to SIGTERM by exiting cleanly. + go func() { + for !proc.receivedSIGTERM() { + time.Sleep(1 * time.Millisecond) + } + proc.exit(nil) + }() + + if err := s.Stop("pipe1"); err != nil { + t.Fatalf("Stop() error: %v", err) + } + + if !proc.receivedSIGTERM() { + t.Error("expected SIGTERM to be sent") + } + if proc.wasKilled() { + t.Error("expected no SIGKILL for graceful shutdown") + } + if statuses := s.Statuses(); len(statuses) != 0 { + t.Errorf("expected 0 active pipelines after stop, got %d", len(statuses)) + } +} + +// TestStop_KillAfterTimeout verifies that Stop escalates to SIGKILL when the +// process does not exit within the stop timeout window. +// +// With stopTimeout=50ms: stopProcess sends SIGTERM, waits 50ms, then calls +// Kill(). Our mockProc.Kill() unblocks Wait() (via exitOnce), so the stop +// completes promptly after the timeout. +func TestStop_KillAfterTimeout(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatal(err) + } + proc := <-procs + // proc ignores SIGTERM — Kill() will be required. + + if err := s.Stop("pipe1"); err != nil { + t.Fatalf("Stop() error: %v", err) + } + + if !proc.wasKilled() { + t.Error("expected SIGKILL after timeout") + } +} + +// TestStop_NonExistent verifies that stopping a non-existent pipeline is a no-op. +func TestStop_NonExistent(t *testing.T) { + s, _ := newTestSupervisor(nil) + if err := s.Stop("does-not-exist"); err != nil { + t.Errorf("expected nil error for non-existent pipeline, got: %v", err) + } +} + +// TestStop_RemovesFromActive verifies that after Stop the pipeline ID can be +// reused to start a new process. +func TestStop_RemovesFromActive(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatal(err) + } + proc := <-procs + proc.exit(nil) // let it exit cleanly + + // Allow monitor to process the clean exit. + time.Sleep(20 * time.Millisecond) + + s.Stop("pipe1") + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatalf("Start after Stop returned error: %v", err) + } + proc2 := <-procs + proc2.exit(nil) +} + +// ── Crash and restart tests ────────────────────────────────────────────────── + +// TestCrashAndRestart verifies that a process crash triggers an automatic +// restart with a fresh process. +func TestCrashAndRestart(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { s.ShutdownAll(); drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatal(err) + } + + proc1 := <-procs + proc1.exit(errors.New("exit status 1")) + + // Monitor should spawn a replacement process. + select { + case proc2 := <-procs: + if proc2.Pid() == proc1.Pid() { + t.Errorf("restarted process has same PID as crashed one") + } + proc2.exit(nil) // clean exit to stop further restarts + case <-time.After(500 * time.Millisecond): + t.Fatal("timeout: no restarted process appeared") + } +} + +// TestCrashStatus verifies that a crashed pipeline transitions to CRASHED status +// before the restart goroutine runs. A non-zero backoff is required so that the +// CRASHED state is observable between the crash and the replacement process +// being added back to the map. +func TestCrashStatus(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + // Use a short but non-zero backoff so the CRASHED status is observable. + s.backoffFunc = func(int) time.Duration { return 100 * time.Millisecond } + t.Cleanup(func() { s.ShutdownAll(); drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatal(err) + } + + proc1 := <-procs + proc1.exit(errors.New("exit status 1")) + + waitForStatus(t, s, "pipe1", "CRASHED", 300*time.Millisecond) + + // Clean up the restarted proc. + select { + case proc2 := <-procs: + proc2.exit(nil) + case <-time.After(500 * time.Millisecond): + } +} + +// TestMultipleCrashRestarts verifies that backoffFunc is called for each crash. +// +// Note: each crash creates a fresh ProcessInfo (restarts starts at 0 and +// increments to 1 before calling backoffFunc). The restart counter therefore +// does not accumulate across the chain of crashes — backoffFunc always receives +// restarts=1. This is the documented current behaviour of the supervisor's +// per-ProcessInfo restart tracking. +func TestMultipleCrashRestarts(t *testing.T) { + var ( + mu sync.Mutex + backoffCalls []int + ) + + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + s.backoffFunc = func(restarts int) time.Duration { + mu.Lock() + backoffCalls = append(backoffCalls, restarts) + mu.Unlock() + return 0 + } + t.Cleanup(func() { s.ShutdownAll(); drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatal(err) + } + + const crashes = 3 + for i := 0; i < crashes; i++ { + proc := <-procs + proc.exit(errors.New("crash")) + } + + // Drain the final restarted proc. + select { + case proc := <-procs: + proc.exit(nil) + case <-time.After(500 * time.Millisecond): + } + + mu.Lock() + got := append([]int(nil), backoffCalls...) + mu.Unlock() + + if len(got) < crashes { + t.Errorf("expected at least %d backoff calls, got %d: %v", crashes, len(got), got) + return + } + // Each call receives restarts=1 because each restart creates a fresh + // ProcessInfo beginning at restarts=0. + for i, v := range got[:crashes] { + if v != 1 { + t.Errorf("backoff call %d: expected restarts=1 (fresh ProcessInfo), got %d", i, v) + } + } +} + +// TestDefaultBackoff verifies the exponential backoff function with a cap at 60s. +func TestDefaultBackoff(t *testing.T) { + cases := []struct { + restarts int + want time.Duration + }{ + {1, 1 * time.Second}, + {2, 2 * time.Second}, + {3, 4 * time.Second}, + {4, 8 * time.Second}, + {7, 60 * time.Second}, // 1<<6=64 → capped to 60 + {10, 60 * time.Second}, + } + for _, tc := range cases { + got := defaultBackoffFunc(tc.restarts) + if got != tc.want { + t.Errorf("defaultBackoffFunc(%d) = %v, want %v", tc.restarts, got, tc.want) + } + } +} + +// TestCleanExit_NoRestart verifies that a process exiting cleanly (nil error) +// is marked STOPPED and not restarted. +func TestCleanExit_NoRestart(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatal(err) + } + + proc := <-procs + proc.exit(nil) + + waitForStatus(t, s, "pipe1", "STOPPED", 300*time.Millisecond) + + // No new process should be spawned after a clean exit. + select { + case <-procs: + t.Error("unexpected restart of cleanly exited process") + case <-time.After(100 * time.Millisecond): + } +} + +// ── Restart tests ──────────────────────────────────────────────────────────── + +// TestRestart verifies that Restart terminates the old process and spawns a new +// one with the new configuration. +func TestRestart(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { s.ShutdownAll(); drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatal(err) + } + proc1 := <-procs + + go func() { + for !proc1.receivedSIGTERM() { + time.Sleep(1 * time.Millisecond) + } + proc1.exit(nil) + }() + + if err := s.Restart("pipe1", cfg, 2, "", nil); err != nil { + t.Fatalf("Restart() error: %v", err) + } + + select { + case proc2 := <-procs: + if proc2.Pid() == proc1.Pid() { + t.Error("restarted process should have a different PID") + } + proc2.exit(nil) + case <-time.After(500 * time.Millisecond): + t.Fatal("timeout waiting for restarted process") + } +} + +// ── ShutdownAll tests ───────────────────────────────────────────────────────── + +// TestShutdownAll verifies that all active pipelines are stopped concurrently +// and none remain in the active map afterwards. +func TestShutdownAll(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + + const n = 3 + for i := 0; i < n; i++ { + if err := s.Start(fmt.Sprintf("pipe%d", i), cfg, 1, "", nil); err != nil { + t.Fatalf("Start pipe%d: %v", i, err) + } + } + + // All n procs are available; respond to SIGTERM by exiting. + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + proc := <-procs + for !proc.receivedSIGTERM() && !proc.wasKilled() { + time.Sleep(1 * time.Millisecond) + } + proc.exit(nil) + }() + } + + s.ShutdownAll() + wg.Wait() + + if statuses := s.Statuses(); len(statuses) != 0 { + t.Errorf("expected 0 active pipelines after ShutdownAll, got %d", len(statuses)) + } +} + +// ── Metadata tests ──────────────────────────────────────────────────────────── + +// TestStatuses verifies that Statuses returns the correct pipeline metadata. +func TestStatuses(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { s.ShutdownAll(); drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 42, "debug", map[string]string{"K": "V"}); err != nil { + t.Fatal(err) + } + <-procs + + statuses := s.Statuses() + if len(statuses) != 1 { + t.Fatalf("expected 1 status, got %d", len(statuses)) + } + st := statuses[0] + if st.PipelineID != "pipe1" { + t.Errorf("PipelineID: got %q, want %q", st.PipelineID, "pipe1") + } + if st.Version != 42 { + t.Errorf("Version: got %d, want 42", st.Version) + } + if st.PID != 1000 { + t.Errorf("PID: got %d, want 1000", st.PID) + } +} + +// TestUpdateVersion verifies that UpdateVersion changes the version field +// without restarting the process. +func TestUpdateVersion(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { s.ShutdownAll(); drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatal(err) + } + <-procs + + s.UpdateVersion("pipe1", 99) + + for _, st := range s.Statuses() { + if st.PipelineID == "pipe1" && st.Version != 99 { + t.Errorf("Version: got %d, want 99", st.Version) + } + } +} + +// TestSetConfigChecksum verifies that SetConfigChecksum stores the checksum on +// the pipeline's status entry. +func TestSetConfigChecksum(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { s.ShutdownAll(); drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatal(err) + } + <-procs + + s.SetConfigChecksum("pipe1", "abc123") + + for _, st := range s.Statuses() { + if st.PipelineID == "pipe1" && st.ConfigChecksum != "abc123" { + t.Errorf("ConfigChecksum: got %q, want %q", st.ConfigChecksum, "abc123") + } + } +} + +// TestGetRecentLogs verifies that bytes written to the process's stdout writer +// are buffered in the ring buffer and returned by GetRecentLogs, then cleared. +func TestGetRecentLogs(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { s.ShutdownAll(); drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatal(err) + } + proc := <-procs + + // Write through the stdout writer that the factory provided to the mock. + fmt.Fprintln(proc.stdout, "log line 1") + fmt.Fprintln(proc.stdout, "log line 2") + + logs := s.GetRecentLogs("pipe1") + if len(logs) != 2 { + t.Errorf("expected 2 log lines, got %d: %v", len(logs), logs) + } + + // Second call: buffer was cleared after the first read. + if logs2 := s.GetRecentLogs("pipe1"); len(logs2) != 0 { + t.Errorf("expected empty after first read, got %d lines", len(logs2)) + } + + // Non-existent pipeline returns nil. + if got := s.GetRecentLogs("no-such-pipe"); got != nil { + t.Errorf("expected nil for missing pipeline, got %v", got) + } + + proc.exit(nil) +} + +// ── Goroutine leak / race detector tests ───────────────────────────────────── + +// TestContextCancellation_NoGoroutineLeak verifies that stopping a pipeline +// causes the monitor goroutine to exit cleanly, leaving no leaked goroutines. +func TestContextCancellation_NoGoroutineLeak(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { drainProcs(procs) }) + + if err := s.Start("pipe1", cfg, 1, "", nil); err != nil { + t.Fatal(err) + } + proc := <-procs + + // Allow the monitor goroutine to reach its Wait() call. + time.Sleep(20 * time.Millisecond) + + before := runtime.NumGoroutine() + + // Respond to SIGTERM immediately. + go func() { + for !proc.receivedSIGTERM() { + time.Sleep(1 * time.Millisecond) + } + proc.exit(nil) + }() + + s.Stop("pipe1") + + // Allow the scheduler to fully deschedule the exited goroutines. + time.Sleep(30 * time.Millisecond) + + after := runtime.NumGoroutine() + + if after >= before { + t.Errorf("goroutine leak: before=%d after=%d; expected count to decrease", + before, after) + } +} + +// TestConcurrentPipelines verifies that multiple pipelines can run concurrently +// without data races (this test is most valuable when run with -race). +func TestConcurrentPipelines(t *testing.T) { + s, procs := newTestSupervisor(nil) + cfg := tempConfig(t) + t.Cleanup(func() { s.ShutdownAll(); drainProcs(procs) }) + + const n = 5 + for i := 0; i < n; i++ { + id := fmt.Sprintf("pipe%d", i) + if err := s.Start(id, cfg, i, "", nil); err != nil { + t.Fatalf("Start %s: %v", id, err) + } + } + + // Drain procs — each exits cleanly. + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + proc := <-procs + proc.exit(nil) + }() + } + + // Concurrently exercise read and write metadata paths. + var rwg sync.WaitGroup + for i := 0; i < 20; i++ { + rwg.Add(1) + go func(i int) { + defer rwg.Done() + s.Statuses() + s.UpdateVersion(fmt.Sprintf("pipe%d", i%n), i) + s.SetConfigChecksum(fmt.Sprintf("pipe%d", i%n), fmt.Sprintf("chk%d", i)) + }(i) + } + + rwg.Wait() + wg.Wait() +} diff --git a/e2e/tests/pipeline-validation.spec.ts b/e2e/tests/pipeline-validation.spec.ts index 1aa006ad..0f9cbbf1 100644 --- a/e2e/tests/pipeline-validation.spec.ts +++ b/e2e/tests/pipeline-validation.spec.ts @@ -1,5 +1,7 @@ import { test, expect } from "@playwright/test"; -import { prisma, readSeedResult } from "../helpers/scenario-utils"; +import { readSeedResult } from "../helpers/scenario-utils"; +import { prisma } from "../../src/lib/prisma"; +import type { Prisma } from "../../src/generated/prisma"; import { saveGraphComponents } from "../../src/server/services/pipeline-graph"; import { addDependency } from "../../src/server/services/pipeline-dependency"; @@ -8,7 +10,7 @@ test.describe("Pipeline Validation", () => { const seed = await readSeedResult(); await expect( - prisma.$transaction((tx) => + prisma.$transaction((tx: Prisma.TransactionClient) => saveGraphComponents(tx, { pipelineId: seed.pipelineId, userId: seed.userId,