|
1 | 1 | import { EventEmitter } from "events"; |
2 | | -import * as fs from "fs"; |
3 | | -import * as path from "path"; |
4 | | -import * as os from "os"; |
5 | 2 | import { |
6 | 3 | streamText, |
7 | 4 | stepCountIs, |
@@ -31,6 +28,8 @@ import type { HistoryService } from "./historyService"; |
31 | 28 | import { AsyncMutex } from "@/utils/concurrency/asyncMutex"; |
32 | 29 | import type { ToolPolicy } from "@/utils/tools/toolPolicy"; |
33 | 30 | import { StreamingTokenTracker } from "@/utils/main/StreamingTokenTracker"; |
| 31 | +import type { Runtime } from "@/runtime/Runtime"; |
| 32 | +import { execBuffered } from "@/utils/runtime/helpers"; |
34 | 33 |
|
35 | 34 | // Type definitions for stream parts with extended properties |
36 | 35 | interface ReasoningDeltaPart { |
@@ -107,7 +106,9 @@ interface WorkspaceStreamInfo { |
107 | 106 | // Track background processing promise for guaranteed cleanup |
108 | 107 | processingPromise: Promise<void>; |
109 | 108 | // Temporary directory for tool outputs (auto-cleaned when stream ends) |
110 | | - tempDir: string; |
| 109 | + runtimeTempDir: string; |
| 110 | + // Runtime for temp directory cleanup |
| 111 | + runtime: Runtime; |
111 | 112 | } |
112 | 113 |
|
113 | 114 | /** |
@@ -242,12 +243,26 @@ export class StreamManager extends EventEmitter { |
242 | 243 | * - Agent mistakes when copying/manipulating paths |
243 | 244 | * - Harder to read in tool outputs |
244 | 245 | * - Potential path length issues on some systems |
| 246 | + * |
| 247 | + * Uses the Runtime abstraction so temp directories work for both local and SSH runtimes. |
245 | 248 | */ |
246 | | - public createTempDirForStream(streamToken: StreamToken): string { |
247 | | - const homeDir = os.homedir(); |
248 | | - const tempDir = path.join(homeDir, ".cmux-tmp", streamToken); |
249 | | - fs.mkdirSync(tempDir, { recursive: true, mode: 0o700 }); |
250 | | - return tempDir; |
| 249 | + public async createTempDirForStream(streamToken: StreamToken, runtime: Runtime): Promise<string> { |
| 250 | + // Create directory and get absolute path (works for both local and remote) |
| 251 | + // Use 'cd' + 'pwd' to resolve ~ to absolute path |
| 252 | + const command = `mkdir -p ~/.cmux-tmp/${streamToken} && cd ~/.cmux-tmp/${streamToken} && pwd`; |
| 253 | + const result = await execBuffered(runtime, command, { |
| 254 | + cwd: "/", |
| 255 | + timeout: 10, |
| 256 | + }); |
| 257 | + |
| 258 | + if (result.exitCode !== 0) { |
| 259 | + throw new Error( |
| 260 | + `Failed to create temp directory ~/.cmux-tmp/${streamToken}: exit code ${result.exitCode}` |
| 261 | + ); |
| 262 | + } |
| 263 | + |
| 264 | + // Return absolute path (e.g., "/home/user/.cmux-tmp/abc123") |
| 265 | + return result.stdout.trim(); |
251 | 266 | } |
252 | 267 |
|
253 | 268 | /** |
@@ -429,7 +444,8 @@ export class StreamManager extends EventEmitter { |
429 | 444 | private createStreamAtomically( |
430 | 445 | workspaceId: WorkspaceId, |
431 | 446 | streamToken: StreamToken, |
432 | | - tempDir: string, |
| 447 | + runtimeTempDir: string, |
| 448 | + runtime: Runtime, |
433 | 449 | messages: ModelMessage[], |
434 | 450 | model: LanguageModel, |
435 | 451 | modelString: string, |
@@ -508,7 +524,8 @@ export class StreamManager extends EventEmitter { |
508 | 524 | lastPartialWriteTime: 0, // Initialize to 0 to allow immediate first write |
509 | 525 | partialWritePromise: undefined, // No write in flight initially |
510 | 526 | processingPromise: Promise.resolve(), // Placeholder, overwritten in startStream |
511 | | - tempDir, // Stream-scoped temp directory for tool outputs |
| 527 | + runtimeTempDir, // Stream-scoped temp directory for tool outputs |
| 528 | + runtime, // Runtime for temp directory cleanup |
512 | 529 | }; |
513 | 530 |
|
514 | 531 | // Atomically register the stream |
@@ -961,13 +978,17 @@ export class StreamManager extends EventEmitter { |
961 | 978 | streamInfo.partialWriteTimer = undefined; |
962 | 979 | } |
963 | 980 |
|
964 | | - // Clean up stream temp directory |
965 | | - if (streamInfo.tempDir && fs.existsSync(streamInfo.tempDir)) { |
| 981 | + // Clean up stream temp directory using runtime |
| 982 | + if (streamInfo.runtimeTempDir) { |
966 | 983 | try { |
967 | | - fs.rmSync(streamInfo.tempDir, { recursive: true, force: true }); |
968 | | - log.debug(`Cleaned up temp dir: ${streamInfo.tempDir}`); |
| 984 | + const result = await streamInfo.runtime.exec(`rm -rf "${streamInfo.runtimeTempDir}"`, { |
| 985 | + cwd: "~", |
| 986 | + timeout: 10, |
| 987 | + }); |
| 988 | + await result.exitCode; // Wait for completion |
| 989 | + log.debug(`Cleaned up temp dir: ${streamInfo.runtimeTempDir}`); |
969 | 990 | } catch (error) { |
970 | | - log.error(`Failed to cleanup temp dir ${streamInfo.tempDir}:`, error); |
| 991 | + log.error(`Failed to cleanup temp dir ${streamInfo.runtimeTempDir}:`, error); |
971 | 992 | // Don't throw - cleanup is best-effort |
972 | 993 | } |
973 | 994 | } |
@@ -1090,6 +1111,7 @@ export class StreamManager extends EventEmitter { |
1090 | 1111 | modelString: string, |
1091 | 1112 | historySequence: number, |
1092 | 1113 | system: string, |
| 1114 | + runtime: Runtime, |
1093 | 1115 | abortSignal?: AbortSignal, |
1094 | 1116 | tools?: Record<string, Tool>, |
1095 | 1117 | initialMetadata?: Partial<CmuxMetadata>, |
@@ -1123,18 +1145,16 @@ export class StreamManager extends EventEmitter { |
1123 | 1145 | // Step 2: Use provided stream token or generate a new one |
1124 | 1146 | const streamToken = providedStreamToken ?? this.generateStreamToken(); |
1125 | 1147 |
|
1126 | | - // Step 3: Create temp directory for this stream |
1127 | | - // If token was provided, temp dir might already exist - that's fine |
1128 | | - const tempDir = path.join(os.homedir(), ".cmux-tmp", streamToken); |
1129 | | - if (!fs.existsSync(tempDir)) { |
1130 | | - fs.mkdirSync(tempDir, { recursive: true, mode: 0o700 }); |
1131 | | - } |
| 1148 | + // Step 3: Create temp directory for this stream using runtime |
| 1149 | + // If token was provided, temp dir might already exist - mkdir -p handles this |
| 1150 | + const runtimeTempDir = await this.createTempDirForStream(streamToken, runtime); |
1132 | 1151 |
|
1133 | 1152 | // Step 4: Atomic stream creation and registration |
1134 | 1153 | const streamInfo = this.createStreamAtomically( |
1135 | 1154 | typedWorkspaceId, |
1136 | 1155 | streamToken, |
1137 | | - tempDir, |
| 1156 | + runtimeTempDir, |
| 1157 | + runtime, |
1138 | 1158 | messages, |
1139 | 1159 | model, |
1140 | 1160 | modelString, |
|
0 commit comments