Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/services/aiService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ export class AIService extends EventEmitter {
const earlyAllTools = await getToolsForModel(modelString, {
cwd: process.cwd(),
runtime: earlyRuntime,
tempDir: os.tmpdir(),
runtimeTempDir: os.tmpdir(),
secrets: {},
});
const earlyTools = applyToolPolicy(earlyAllTools, toolPolicy);
Expand Down Expand Up @@ -523,14 +523,14 @@ export class AIService extends EventEmitter {

// Generate stream token and create temp directory for tools
const streamToken = this.streamManager.generateStreamToken();
const tempDir = this.streamManager.createTempDirForStream(streamToken);
const runtimeTempDir = await this.streamManager.createTempDirForStream(streamToken, runtime);

// Get model-specific tools with workspace path (correct for local or remote)
const allTools = await getToolsForModel(modelString, {
cwd: workspacePath,
runtime,
secrets: secretsToRecord(projectSecrets),
tempDir,
runtimeTempDir,
});

// Apply tool policy to filter tools (if policy provided)
Expand Down Expand Up @@ -695,6 +695,7 @@ export class AIService extends EventEmitter {
modelString,
historySequence,
systemMessage,
runtime,
abortSignal,
tools,
{
Expand Down
2 changes: 1 addition & 1 deletion src/services/ipcMain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ export class IpcMain {
runtime,
secrets: secretsToRecord(projectSecrets),
niceness: options?.niceness,
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
overflow_policy: "truncate",
});

Expand Down
7 changes: 7 additions & 0 deletions src/services/streamManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { HistoryService } from "./historyService";
import type { PartialService } from "./partialService";
import { createAnthropic } from "@ai-sdk/anthropic";
import { shouldRunIntegrationTests, validateApiKeys } from "../../tests/testUtils";
import { createRuntime } from "@/runtime/runtimeFactory";

// Skip integration tests if TEST_INTEGRATION is not set
const describeIntegration = shouldRunIntegrationTests() ? describe : describe.skip;
Expand Down Expand Up @@ -38,6 +39,7 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
let streamManager: StreamManager;
let mockHistoryService: HistoryService;
let mockPartialService: PartialService;
const runtime = createRuntime({ type: "local", srcBaseDir: "/tmp" });

beforeEach(() => {
mockHistoryService = createMockHistoryService();
Expand Down Expand Up @@ -85,6 +87,7 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
"anthropic:claude-sonnet-4-5",
1,
"You are a helpful assistant",
runtime,
undefined,
{}
);
Expand All @@ -102,6 +105,7 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
"anthropic:claude-sonnet-4-5",
2,
"You are a helpful assistant",
runtime,
undefined,
{}
);
Expand Down Expand Up @@ -273,6 +277,7 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
"anthropic:claude-sonnet-4-5",
1,
"system",
runtime,
undefined,
{}
),
Expand All @@ -283,6 +288,7 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
"anthropic:claude-sonnet-4-5",
2,
"system",
runtime,
undefined,
{}
),
Expand All @@ -293,6 +299,7 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
"anthropic:claude-sonnet-4-5",
3,
"system",
runtime,
undefined,
{}
),
Expand Down
66 changes: 43 additions & 23 deletions src/services/streamManager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import { EventEmitter } from "events";
import * as fs from "fs";
import * as path from "path";
import * as os from "os";
import {
streamText,
stepCountIs,
Expand Down Expand Up @@ -31,6 +28,8 @@ import type { HistoryService } from "./historyService";
import { AsyncMutex } from "@/utils/concurrency/asyncMutex";
import type { ToolPolicy } from "@/utils/tools/toolPolicy";
import { StreamingTokenTracker } from "@/utils/main/StreamingTokenTracker";
import type { Runtime } from "@/runtime/Runtime";
import { execBuffered } from "@/utils/runtime/helpers";

// Type definitions for stream parts with extended properties
interface ReasoningDeltaPart {
Expand Down Expand Up @@ -107,7 +106,9 @@ interface WorkspaceStreamInfo {
// Track background processing promise for guaranteed cleanup
processingPromise: Promise<void>;
// Temporary directory for tool outputs (auto-cleaned when stream ends)
tempDir: string;
runtimeTempDir: string;
// Runtime for temp directory cleanup
runtime: Runtime;
}

/**
Expand Down Expand Up @@ -242,12 +243,26 @@ export class StreamManager extends EventEmitter {
* - Agent mistakes when copying/manipulating paths
* - Harder to read in tool outputs
* - Potential path length issues on some systems
*
* Uses the Runtime abstraction so temp directories work for both local and SSH runtimes.
*/
public createTempDirForStream(streamToken: StreamToken): string {
const homeDir = os.homedir();
const tempDir = path.join(homeDir, ".cmux-tmp", streamToken);
fs.mkdirSync(tempDir, { recursive: true, mode: 0o700 });
return tempDir;
public async createTempDirForStream(streamToken: StreamToken, runtime: Runtime): Promise<string> {
// Create directory and get absolute path (works for both local and remote)
// Use 'cd' + 'pwd' to resolve ~ to absolute path
const command = `mkdir -p ~/.cmux-tmp/${streamToken} && cd ~/.cmux-tmp/${streamToken} && pwd`;
const result = await execBuffered(runtime, command, {
cwd: "/",
timeout: 10,
});

if (result.exitCode !== 0) {
throw new Error(
`Failed to create temp directory ~/.cmux-tmp/${streamToken}: exit code ${result.exitCode}`
);
}

// Return absolute path (e.g., "/home/user/.cmux-tmp/abc123")
return result.stdout.trim();
}

/**
Expand Down Expand Up @@ -429,7 +444,8 @@ export class StreamManager extends EventEmitter {
private createStreamAtomically(
workspaceId: WorkspaceId,
streamToken: StreamToken,
tempDir: string,
runtimeTempDir: string,
runtime: Runtime,
messages: ModelMessage[],
model: LanguageModel,
modelString: string,
Expand Down Expand Up @@ -508,7 +524,8 @@ export class StreamManager extends EventEmitter {
lastPartialWriteTime: 0, // Initialize to 0 to allow immediate first write
partialWritePromise: undefined, // No write in flight initially
processingPromise: Promise.resolve(), // Placeholder, overwritten in startStream
tempDir, // Stream-scoped temp directory for tool outputs
runtimeTempDir, // Stream-scoped temp directory for tool outputs
runtime, // Runtime for temp directory cleanup
};

// Atomically register the stream
Expand Down Expand Up @@ -961,13 +978,17 @@ export class StreamManager extends EventEmitter {
streamInfo.partialWriteTimer = undefined;
}

// Clean up stream temp directory
if (streamInfo.tempDir && fs.existsSync(streamInfo.tempDir)) {
// Clean up stream temp directory using runtime
if (streamInfo.runtimeTempDir) {
try {
fs.rmSync(streamInfo.tempDir, { recursive: true, force: true });
log.debug(`Cleaned up temp dir: ${streamInfo.tempDir}`);
const result = await streamInfo.runtime.exec(`rm -rf "${streamInfo.runtimeTempDir}"`, {
cwd: "~",
timeout: 10,
});
await result.exitCode; // Wait for completion
log.debug(`Cleaned up temp dir: ${streamInfo.runtimeTempDir}`);
} catch (error) {
log.error(`Failed to cleanup temp dir ${streamInfo.tempDir}:`, error);
log.error(`Failed to cleanup temp dir ${streamInfo.runtimeTempDir}:`, error);
// Don't throw - cleanup is best-effort
}
}
Expand Down Expand Up @@ -1090,6 +1111,7 @@ export class StreamManager extends EventEmitter {
modelString: string,
historySequence: number,
system: string,
runtime: Runtime,
abortSignal?: AbortSignal,
tools?: Record<string, Tool>,
initialMetadata?: Partial<CmuxMetadata>,
Expand Down Expand Up @@ -1123,18 +1145,16 @@ export class StreamManager extends EventEmitter {
// Step 2: Use provided stream token or generate a new one
const streamToken = providedStreamToken ?? this.generateStreamToken();

// Step 3: Create temp directory for this stream
// If token was provided, temp dir might already exist - that's fine
const tempDir = path.join(os.homedir(), ".cmux-tmp", streamToken);
if (!fs.existsSync(tempDir)) {
fs.mkdirSync(tempDir, { recursive: true, mode: 0o700 });
}
// Step 3: Create temp directory for this stream using runtime
// If token was provided, temp dir might already exist - mkdir -p handles this
const runtimeTempDir = await this.createTempDirForStream(streamToken, runtime);

// Step 4: Atomic stream creation and registration
const streamInfo = this.createStreamAtomically(
typedWorkspaceId,
streamToken,
tempDir,
runtimeTempDir,
runtime,
messages,
model,
modelString,
Expand Down
27 changes: 14 additions & 13 deletions src/services/tools/bash.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ function createTestBashTool(options?: { niceness?: number }) {
const tool = createBashTool({
cwd: process.cwd(),
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
...options,
});

Expand Down Expand Up @@ -164,7 +164,7 @@ describe("bash tool", () => {
const tool = createBashTool({
cwd: process.cwd(),
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
overflow_policy: "truncate",
});

Expand Down Expand Up @@ -203,7 +203,7 @@ describe("bash tool", () => {
const tool = createBashTool({
cwd: process.cwd(),
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
overflow_policy: "truncate",
});

Expand Down Expand Up @@ -235,7 +235,7 @@ describe("bash tool", () => {
const tool = createBashTool({
cwd: process.cwd(),
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
overflow_policy: "truncate",
});

Expand Down Expand Up @@ -271,7 +271,7 @@ describe("bash tool", () => {
const tool = createBashTool({
cwd: process.cwd(),
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
// overflow_policy not specified - should default to tmpfile
});

Expand All @@ -289,7 +289,8 @@ describe("bash tool", () => {
expect(result.error).toContain("saved to");
expect(result.error).not.toContain("[OUTPUT TRUNCATED");

// Verify temp file was created
// Verify temp file was created in runtimeTempDir
expect(fs.existsSync(tempDir.path)).toBe(true);
const files = fs.readdirSync(tempDir.path);
const bashFiles = files.filter((f) => f.startsWith("bash-"));
expect(bashFiles.length).toBe(1);
Expand All @@ -303,7 +304,7 @@ describe("bash tool", () => {
const tool = createBashTool({
cwd: process.cwd(),
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
});

// Generate ~50KB of output (well over 16KB display limit, under 100KB file limit)
Expand Down Expand Up @@ -355,7 +356,7 @@ describe("bash tool", () => {
const tool = createBashTool({
cwd: process.cwd(),
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
});

// Generate ~150KB of output (exceeds 100KB file limit)
Expand Down Expand Up @@ -398,7 +399,7 @@ describe("bash tool", () => {
const tool = createBashTool({
cwd: process.cwd(),
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
});

// Generate output that exceeds display limit but not file limit
Expand Down Expand Up @@ -440,7 +441,7 @@ describe("bash tool", () => {
const tool = createBashTool({
cwd: process.cwd(),
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
});

// Generate a single line exceeding 1KB limit, then try to output more
Expand Down Expand Up @@ -480,7 +481,7 @@ describe("bash tool", () => {
const tool = createBashTool({
cwd: process.cwd(),
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
});

// Generate ~15KB of output (just under 16KB display limit)
Expand Down Expand Up @@ -510,7 +511,7 @@ describe("bash tool", () => {
const tool = createBashTool({
cwd: process.cwd(),
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
});

// Generate exactly 300 lines (hits line limit exactly)
Expand Down Expand Up @@ -1250,7 +1251,7 @@ describe("SSH runtime redundant cd detection", () => {
const tool = createBashTool({
cwd,
runtime: sshRuntime,
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
});

return {
Expand Down
5 changes: 4 additions & 1 deletion src/services/tools/bash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,10 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => {
try {
// Use 8 hex characters for short, memorable temp file IDs
const fileId = Math.random().toString(16).substring(2, 10);
const overflowPath = path.join(config.tempDir, `bash-${fileId}.txt`);
// Write to runtime temp directory (managed by StreamManager)
// Use path.posix.join to preserve forward slashes for SSH runtime
// (config.runtimeTempDir is always a POSIX path like /home/user/.cmux-tmp/token)
const overflowPath = path.posix.join(config.runtimeTempDir, `bash-${fileId}.txt`);
const fullOutput = lines.join("\n");

// Use runtime.writeFile() for SSH support
Expand Down
8 changes: 4 additions & 4 deletions src/services/tools/file_edit_insert.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ function createTestFileEditInsertTool(options?: { cwd?: string }) {
const tool = createFileEditInsertTool({
cwd: options?.cwd ?? process.cwd(),
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: tempDir.path,
runtimeTempDir: tempDir.path,
});

return {
Expand Down Expand Up @@ -214,7 +214,7 @@ describe("file_edit_insert tool", () => {
const tool = createFileEditInsertTool({
cwd: testDir,
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: "/tmp",
runtimeTempDir: "/tmp",
});
const args: FileEditInsertToolArgs = {
file_path: nonExistentPath,
Expand All @@ -240,7 +240,7 @@ describe("file_edit_insert tool", () => {
const tool = createFileEditInsertTool({
cwd: testDir,
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: "/tmp",
runtimeTempDir: "/tmp",
});
const args: FileEditInsertToolArgs = {
file_path: nestedPath,
Expand All @@ -267,7 +267,7 @@ describe("file_edit_insert tool", () => {
const tool = createFileEditInsertTool({
cwd: testDir,
runtime: createRuntime({ type: "local", srcBaseDir: "/tmp" }),
tempDir: "/tmp",
runtimeTempDir: "/tmp",
});
const args: FileEditInsertToolArgs = {
file_path: testFilePath,
Expand Down
Loading