Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/node/services/backgroundProcessManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,50 @@ describe("BackgroundProcessManager", () => {
expect(output.elapsed_ms).toBeGreaterThanOrEqual(250);
expect(output.elapsed_ms).toBeLessThan(1000); // Didn't hang
});

it("should serialize concurrent getOutput calls to prevent duplicate output", async () => {
// This test verifies the fix for the race condition where parallel bash_output
// calls could both read from the same offset before either updates the position.
// Without serialization, both calls would return the same output.
const result = await manager.spawn(
runtime,
testWorkspaceId,
"echo 'line1'; echo 'line2'; echo 'line3'",
{ cwd: process.cwd(), displayName: "test" }
);

expect(result.success).toBe(true);
if (!result.success) return;

// Wait for all output to be written
await new Promise((resolve) => setTimeout(resolve, 200));

// Call getOutput twice in parallel - without serialization, both would
// read from offset 0 and return duplicate "line1\nline2\nline3"
const [output1, output2] = await Promise.all([
manager.getOutput(result.processId),
manager.getOutput(result.processId),
]);

expect(output1.success).toBe(true);
expect(output2.success).toBe(true);
if (!output1.success || !output2.success) return;

// Combine outputs - should contain all lines exactly once
const combinedOutput = output1.output + output2.output;
const line1Count = (combinedOutput.match(/line1/g) ?? []).length;
const line2Count = (combinedOutput.match(/line2/g) ?? []).length;
const line3Count = (combinedOutput.match(/line3/g) ?? []).length;

// Each line should appear exactly once across both outputs (no duplicates)
expect(line1Count).toBe(1);
expect(line2Count).toBe(1);
expect(line3Count).toBe(1);

// One call should get the content, the other should get empty (already read)
const hasContent = output1.output.trim().length > 0 || output2.output.trim().length > 0;
expect(hasContent).toBe(true);
});
});

describe("integration: spawn and getOutput", () => {
Expand Down
45 changes: 24 additions & 21 deletions src/node/services/backgroundProcessManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Runtime, BackgroundHandle } from "@/node/runtime/Runtime";
import { spawnProcess } from "./backgroundProcessExecutor";
import { getErrorMessage } from "@/common/utils/errors";
import { log } from "./log";
import { AsyncMutex } from "@/node/utils/concurrency/asyncMutex";

import { EventEmitter } from "events";

Expand All @@ -20,7 +21,9 @@ export interface BackgroundProcessMeta {
}

/**
* Represents a background process with file-based output
* Represents a background process with file-based output.
* All per-process state is consolidated here so cleanup is automatic when
* the process is removed from the processes map.
*/
export interface BackgroundProcess {
id: string; // Process ID (display_name from the bash tool call)
Expand All @@ -36,14 +39,11 @@ export interface BackgroundProcess {
displayName?: string; // Human-readable name (e.g., "Dev Server")
/** True if this process is being waited on (foreground mode) */
isForeground: boolean;
}

/**
* Tracks read position for incremental output retrieval.
* Each call to getOutput() returns only new content since the last read.
*/
interface OutputReadPosition {
outputBytes: number;
/** Tracks read position for incremental output retrieval */
outputBytesRead: number;
/** Mutex to serialize getOutput() calls (prevents race condition when
* parallel tool calls read from same offset before position is updated) */
outputLock: AsyncMutex;
}

/**
Expand Down Expand Up @@ -89,11 +89,10 @@ export class BackgroundProcessManager extends EventEmitter<BackgroundProcessMana
// NOTE: This map is in-memory only. Background processes use nohup/setsid so they
// could survive app restarts, but we kill all tracked processes on shutdown via
// dispose(). Rehydrating from meta.json on startup is out of scope for now.
// All per-process state (read position, output lock) is stored in BackgroundProcess
// so cleanup is automatic when the process is removed from this map.
private processes = new Map<string, BackgroundProcess>();

// Tracks read positions for incremental output retrieval
private readPositions = new Map<string, OutputReadPosition>();

// Base directory for process output files
private readonly bgOutputDir: string;
// Tracks foreground processes (started via runtime.exec) that can be backgrounded
Expand Down Expand Up @@ -218,6 +217,8 @@ export class BackgroundProcessManager extends EventEmitter<BackgroundProcessMana
handle,
displayName: config.displayName,
isForeground: config.isForeground ?? false,
outputBytesRead: 0,
outputLock: new AsyncMutex(),
};

// Store process in map
Expand Down Expand Up @@ -324,6 +325,8 @@ export class BackgroundProcessManager extends EventEmitter<BackgroundProcessMana
handle,
displayName,
isForeground: false, // Now in background
outputBytesRead: 0,
outputLock: new AsyncMutex(),
};

// Store process in map
Expand Down Expand Up @@ -473,15 +476,13 @@ export class BackgroundProcessManager extends EventEmitter<BackgroundProcessMana
return { success: false, error: `Process not found: ${processId}` };
}

// Get or initialize read position
let pos = this.readPositions.get(processId);
if (!pos) {
pos = { outputBytes: 0 };
this.readPositions.set(processId, pos);
}
// Acquire per-process mutex to serialize concurrent getOutput() calls.
// This prevents race conditions where parallel tool calls both read from
// the same offset before either updates the read position.
await using _lock = await proc.outputLock.acquire();

log.debug(
`BackgroundProcessManager.getOutput: proc.outputDir=${proc.outputDir}, offset=${pos.outputBytes}`
`BackgroundProcessManager.getOutput: proc.outputDir=${proc.outputDir}, offset=${proc.outputBytesRead}`
);

// Pre-compile regex if filter is provided
Expand Down Expand Up @@ -514,11 +515,11 @@ export class BackgroundProcessManager extends EventEmitter<BackgroundProcessMana
while (true) {
// Read new content via the handle (works for both local and SSH runtimes)
// Output is already unified in output.log (stdout + stderr via 2>&1)
const result = await proc.handle.readOutput(pos.outputBytes);
const result = await proc.handle.readOutput(proc.outputBytesRead);
accumulatedRaw += result.content;

// Update read position
pos.outputBytes = result.newOffset;
proc.outputBytesRead = result.newOffset;

// Refresh process status
const refreshedProc = await this.getProcess(processId);
Expand Down Expand Up @@ -699,6 +700,8 @@ export class BackgroundProcessManager extends EventEmitter<BackgroundProcessMana
await Promise.all(matching.map((p) => this.terminate(p.id)));

// Remove from memory (output dirs left on disk for OS/workspace cleanup)
// All per-process state (outputBytesRead, outputLock) is stored in the
// BackgroundProcess object, so cleanup is automatic when we delete here.
for (const p of matching) {
this.processes.delete(p.id);
}
Expand Down