diff --git a/src/runtime/LocalRuntime.ts b/src/runtime/LocalRuntime.ts index e1ecf7bb3..69c9a61fb 100644 --- a/src/runtime/LocalRuntime.ts +++ b/src/runtime/LocalRuntime.ts @@ -209,7 +209,8 @@ export class LocalRuntime implements Runtime { return { stdout, stderr, stdin, exitCode, duration }; } - readFile(filePath: string): ReadableStream { + readFile(filePath: string, _abortSignal?: AbortSignal): ReadableStream { + // Note: _abortSignal ignored for local operations (fast, no need for cancellation) const nodeStream = fs.createReadStream(filePath); // Handle errors by wrapping in a transform @@ -238,7 +239,8 @@ export class LocalRuntime implements Runtime { }); } - writeFile(filePath: string): WritableStream { + writeFile(filePath: string, _abortSignal?: AbortSignal): WritableStream { + // Note: _abortSignal ignored for local operations (fast, no need for cancellation) let tempPath: string; let writer: WritableStreamDefaultWriter; let resolvedPath: string; @@ -304,7 +306,8 @@ export class LocalRuntime implements Runtime { }); } - async stat(filePath: string): Promise { + async stat(filePath: string, _abortSignal?: AbortSignal): Promise { + // Note: _abortSignal ignored for local operations (fast, no need for cancellation) try { const stats = await fsPromises.stat(filePath); return { @@ -480,10 +483,12 @@ export class LocalRuntime implements Runtime { async renameWorkspace( projectPath: string, oldName: string, - newName: string + newName: string, + _abortSignal?: AbortSignal ): Promise< { success: true; oldPath: string; newPath: string } | { success: false; error: string } > { + // Note: _abortSignal ignored for local operations (fast, no need for cancellation) // Compute workspace paths using canonical method const oldPath = this.getWorkspacePath(projectPath, oldName); const newPath = this.getWorkspacePath(projectPath, newName); @@ -503,8 +508,10 @@ export class LocalRuntime implements Runtime { async deleteWorkspace( projectPath: string, workspaceName: string, - force: boolean + force: boolean, + _abortSignal?: AbortSignal ): Promise<{ success: true; deletedPath: string } | { success: false; error: string }> { + // Note: _abortSignal ignored for local operations (fast, no need for cancellation) // Compute workspace path using the canonical method const deletedPath = this.getWorkspacePath(projectPath, workspaceName); diff --git a/src/runtime/Runtime.ts b/src/runtime/Runtime.ts index eab3c2651..610a3057a 100644 --- a/src/runtime/Runtime.ts +++ b/src/runtime/Runtime.ts @@ -119,6 +119,8 @@ export interface WorkspaceCreationParams { directoryName: string; /** Logger for streaming creation progress and init hook output */ initLogger: InitLogger; + /** Optional abort signal for cancellation */ + abortSignal?: AbortSignal; } /** @@ -145,6 +147,8 @@ export interface WorkspaceInitParams { workspacePath: string; /** Logger for streaming initialization progress and output */ initLogger: InitLogger; + /** Optional abort signal for cancellation */ + abortSignal?: AbortSignal; } /** @@ -208,26 +212,29 @@ export interface Runtime { /** * Read file contents as a stream * @param path Absolute or relative path to file + * @param abortSignal Optional abort signal for cancellation * @returns Readable stream of file contents * @throws RuntimeError if file cannot be read */ - readFile(path: string): ReadableStream; + readFile(path: string, abortSignal?: AbortSignal): ReadableStream; /** * Write file contents atomically from a stream * @param path Absolute or relative path to file + * @param abortSignal Optional abort signal for cancellation * @returns Writable stream for file contents * @throws RuntimeError if file cannot be written */ - writeFile(path: string): WritableStream; + writeFile(path: string, abortSignal?: AbortSignal): WritableStream; /** * Get file statistics * @param path Absolute or relative path to file/directory + * @param abortSignal Optional abort signal for cancellation * @returns File statistics * @throws RuntimeError if path does not exist or cannot be accessed */ - stat(path: string): Promise; + stat(path: string, abortSignal?: AbortSignal): Promise; /** * Resolve a path to its absolute, canonical form (expanding tildes, resolving symlinks, etc.). @@ -310,12 +317,14 @@ export interface Runtime { * @param projectPath Project root path (local path, used for git commands in LocalRuntime and to extract project name) * @param oldName Current workspace name * @param newName New workspace name + * @param abortSignal Optional abort signal for cancellation * @returns Promise resolving to Result with old/new paths on success, or error message */ renameWorkspace( projectPath: string, oldName: string, - newName: string + newName: string, + abortSignal?: AbortSignal ): Promise< { success: true; oldPath: string; newPath: string } | { success: false; error: string } >; @@ -333,12 +342,14 @@ export interface Runtime { * @param projectPath Project root path (local path, used for git commands in LocalRuntime and to extract project name) * @param workspaceName Workspace name to delete * @param force If true, force deletion even with uncommitted changes or special conditions (submodules, etc.) + * @param abortSignal Optional abort signal for cancellation * @returns Promise resolving to Result with deleted path on success, or error message */ deleteWorkspace( projectPath: string, workspaceName: string, - force: boolean + force: boolean, + abortSignal?: AbortSignal ): Promise<{ success: true; deletedPath: string } | { success: false; error: string }>; /** diff --git a/src/runtime/SSHRuntime.ts b/src/runtime/SSHRuntime.ts index c79a465d1..1403a40e5 100644 --- a/src/runtime/SSHRuntime.ts +++ b/src/runtime/SSHRuntime.ts @@ -81,6 +81,11 @@ export class SSHRuntime implements Runtime { async exec(command: string, options: ExecOptions): Promise { const startTime = performance.now(); + // Short-circuit if already aborted + if (options.abortSignal?.aborted) { + throw new RuntimeErrorClass("Operation aborted before execution", "exec"); + } + // Build command parts const parts: string[] = []; @@ -216,7 +221,7 @@ export class SSHRuntime implements Runtime { /** * Read file contents over SSH as a stream */ - readFile(path: string): ReadableStream { + readFile(path: string, abortSignal?: AbortSignal): ReadableStream { // Return stdout, but wrap to handle errors from exec() and exit code return new ReadableStream({ start: async (controller: ReadableStreamDefaultController) => { @@ -224,6 +229,7 @@ export class SSHRuntime implements Runtime { const stream = await this.exec(`cat ${shescape.quote(path)}`, { cwd: this.config.srcBaseDir, timeout: 300, // 5 minutes - reasonable for large files + abortSignal, }); const reader = stream.stdout.getReader(); @@ -265,7 +271,7 @@ export class SSHRuntime implements Runtime { * Write file contents over SSH atomically from a stream * Preserves symlinks and file permissions by resolving and copying metadata */ - writeFile(path: string): WritableStream { + writeFile(path: string, abortSignal?: AbortSignal): WritableStream { const tempPath = `${path}.tmp.${Date.now()}`; // Resolve symlinks to get the actual target path, preserving the symlink itself // If target exists, save its permissions to restore after write @@ -281,6 +287,7 @@ export class SSHRuntime implements Runtime { execPromise ??= this.exec(writeCommand, { cwd: this.config.srcBaseDir, timeout: 300, // 5 minutes - reasonable for large files + abortSignal, }); return execPromise; }; @@ -318,12 +325,13 @@ export class SSHRuntime implements Runtime { /** * Get file statistics over SSH */ - async stat(path: string): Promise { + async stat(path: string, abortSignal?: AbortSignal): Promise { // Use stat with format string to get: size, mtime, type // %s = size, %Y = mtime (seconds since epoch), %F = file type const stream = await this.exec(`stat -c '%s %Y %F' ${shescape.quote(path)}`, { cwd: this.config.srcBaseDir, timeout: 10, // 10 seconds - stat should be fast + abortSignal, }); const [stdout, stderr, exitCode] = await Promise.all([ @@ -510,8 +518,14 @@ export class SSHRuntime implements Runtime { private async syncProjectToRemote( projectPath: string, workspacePath: string, - initLogger: InitLogger + initLogger: InitLogger, + abortSignal?: AbortSignal ): Promise { + // Short-circuit if already aborted + if (abortSignal?.aborted) { + throw new Error("Sync operation aborted before starting"); + } + // Use timestamp-based bundle path to avoid conflicts (simpler than $$) const timestamp = Date.now(); const bundleTempPath = `~/.cmux-bundle-${timestamp}.bundle`; @@ -537,15 +551,22 @@ export class SSHRuntime implements Runtime { // Step 2: Create bundle locally and pipe to remote file via SSH initLogger.logStep(`Creating git bundle...`); await new Promise((resolve, reject) => { + // Check if aborted before spawning + if (abortSignal?.aborted) { + reject(new Error("Bundle creation aborted")); + return; + } + const sshArgs = this.buildSSHArgs(true); const command = `cd ${shescape.quote(projectPath)} && git bundle create - --all | ssh ${sshArgs.join(" ")} "cat > ${bundleTempPath}"`; log.debug(`Creating bundle: ${command}`); const proc = spawn("bash", ["-c", command]); - streamProcessToLogger(proc, initLogger, { + const cleanup = streamProcessToLogger(proc, initLogger, { logStdout: false, logStderr: true, + abortSignal, }); let stderr = ""; @@ -554,7 +575,10 @@ export class SSHRuntime implements Runtime { }); proc.on("close", (code) => { - if (code === 0) { + cleanup(); + if (abortSignal?.aborted) { + reject(new Error("Bundle creation aborted")); + } else if (code === 0) { resolve(); } else { reject(new Error(`Failed to create bundle: ${stderr}`)); @@ -562,6 +586,7 @@ export class SSHRuntime implements Runtime { }); proc.on("error", (err) => { + cleanup(); reject(err); }); }); @@ -576,6 +601,7 @@ export class SSHRuntime implements Runtime { const cloneStream = await this.exec(`git clone --quiet ${bundleTempPath} ${cloneDestPath}`, { cwd: "~", timeout: 300, // 5 minutes for clone + abortSignal, }); const [cloneStdout, cloneStderr, cloneExitCode] = await Promise.all([ @@ -597,6 +623,7 @@ export class SSHRuntime implements Runtime { { cwd: "~", timeout: 30, + abortSignal, } ); await createTrackingBranchesStream.exitCode; @@ -610,6 +637,7 @@ export class SSHRuntime implements Runtime { { cwd: "~", timeout: 10, + abortSignal, } ); @@ -627,6 +655,7 @@ export class SSHRuntime implements Runtime { { cwd: "~", timeout: 10, + abortSignal, } ); await removeOriginStream.exitCode; @@ -637,6 +666,7 @@ export class SSHRuntime implements Runtime { const rmStream = await this.exec(`rm ${bundleTempPath}`, { cwd: "~", timeout: 10, + abortSignal, }); const rmExitCode = await rmStream.exitCode; @@ -651,6 +681,7 @@ export class SSHRuntime implements Runtime { const rmStream = await this.exec(`rm -f ${bundleTempPath}`, { cwd: "~", timeout: 10, + abortSignal, }); await rmStream.exitCode; } catch { @@ -667,7 +698,8 @@ export class SSHRuntime implements Runtime { private async runInitHook( projectPath: string, workspacePath: string, - initLogger: InitLogger + initLogger: InitLogger, + abortSignal?: AbortSignal ): Promise { // Check if hook exists locally (we synced the project, so local check is sufficient) const hookExists = await checkInitHookExists(projectPath); @@ -688,6 +720,7 @@ export class SSHRuntime implements Runtime { const hookStream = await this.exec(hookCommand, { cwd: workspacePath, // Run in the workspace directory timeout: 3600, // 1 hour - generous timeout for init hooks + abortSignal, }); // Create line-buffered loggers @@ -739,7 +772,7 @@ export class SSHRuntime implements Runtime { async createWorkspace(params: WorkspaceCreationParams): Promise { try { - const { projectPath, branchName, initLogger } = params; + const { projectPath, branchName, initLogger, abortSignal } = params; // Compute workspace path using canonical method const workspacePath = this.getWorkspacePath(projectPath, branchName); @@ -760,6 +793,7 @@ export class SSHRuntime implements Runtime { const mkdirStream = await this.exec(parentDirCommand, { cwd: "/tmp", timeout: 10, + abortSignal, }); const mkdirExitCode = await mkdirStream.exitCode; if (mkdirExitCode !== 0) { @@ -791,13 +825,13 @@ export class SSHRuntime implements Runtime { } async initWorkspace(params: WorkspaceInitParams): Promise { - const { projectPath, branchName, trunkBranch, workspacePath, initLogger } = params; + const { projectPath, branchName, trunkBranch, workspacePath, initLogger, abortSignal } = params; try { // 1. Sync project to remote (opportunistic rsync with scp fallback) initLogger.logStep("Syncing project files to remote..."); try { - await this.syncProjectToRemote(projectPath, workspacePath, initLogger); + await this.syncProjectToRemote(projectPath, workspacePath, initLogger, abortSignal); } catch (error) { const errorMsg = getErrorMessage(error); initLogger.logStderr(`Failed to sync project: ${errorMsg}`); @@ -821,6 +855,7 @@ export class SSHRuntime implements Runtime { const checkoutStream = await this.exec(checkoutCmd, { cwd: workspacePath, // Use the full workspace path for git operations timeout: 300, // 5 minutes for git checkout (can be slow on large repos) + abortSignal, }); const [stdout, stderr, exitCode] = await Promise.all([ @@ -844,7 +879,7 @@ export class SSHRuntime implements Runtime { // Note: runInitHook calls logComplete() internally if hook exists const hookExists = await checkInitHookExists(projectPath); if (hookExists) { - await this.runInitHook(projectPath, workspacePath, initLogger); + await this.runInitHook(projectPath, workspacePath, initLogger, abortSignal); } else { // No hook - signal completion immediately initLogger.logComplete(0); @@ -865,10 +900,15 @@ export class SSHRuntime implements Runtime { async renameWorkspace( projectPath: string, oldName: string, - newName: string + newName: string, + abortSignal?: AbortSignal ): Promise< { success: true; oldPath: string; newPath: string } | { success: false; error: string } > { + // Check if already aborted + if (abortSignal?.aborted) { + return { success: false, error: "Rename operation aborted" }; + } // Compute workspace paths using canonical method const oldPath = this.getWorkspacePath(projectPath, oldName); const newPath = this.getWorkspacePath(projectPath, newName); @@ -886,6 +926,7 @@ export class SSHRuntime implements Runtime { const stream = await this.exec(moveCommand, { cwd: this.config.srcBaseDir, timeout: 30, + abortSignal, }); await stream.stdin.close(); @@ -920,8 +961,14 @@ export class SSHRuntime implements Runtime { async deleteWorkspace( projectPath: string, workspaceName: string, - force: boolean + force: boolean, + abortSignal?: AbortSignal ): Promise<{ success: true; deletedPath: string } | { success: false; error: string }> { + // Check if already aborted + if (abortSignal?.aborted) { + return { success: false, error: "Delete operation aborted" }; + } + // Compute workspace path using canonical method const deletedPath = this.getWorkspacePath(projectPath, workspaceName); @@ -945,6 +992,7 @@ export class SSHRuntime implements Runtime { const checkStream = await this.exec(checkScript, { cwd: this.config.srcBaseDir, timeout: 10, + abortSignal, }); await checkStream.stdin.close(); @@ -998,6 +1046,7 @@ export class SSHRuntime implements Runtime { const stream = await this.exec(removeCommand, { cwd: this.config.srcBaseDir, timeout: 30, + abortSignal, }); await stream.stdin.close(); diff --git a/src/runtime/streamProcess.ts b/src/runtime/streamProcess.ts index e2ca0eeec..38685f270 100644 --- a/src/runtime/streamProcess.ts +++ b/src/runtime/streamProcess.ts @@ -16,6 +16,7 @@ import type { InitLogger } from "./Runtime"; * @param process Child process to stream from * @param initLogger Logger to stream output to * @param options Configuration for which streams to log + * @returns Cleanup function to remove abort signal listener (call this in process close/error handlers) */ export function streamProcessToLogger( process: ChildProcess, @@ -27,15 +28,27 @@ export function streamProcessToLogger( logStderr?: boolean; /** Optional: Command string to log before streaming starts */ command?: string; + /** Optional: Abort signal to kill process on cancellation */ + abortSignal?: AbortSignal; } -): void { - const { logStdout = false, logStderr = true, command } = options ?? {}; +): () => void { + const { logStdout = false, logStderr = true, command, abortSignal } = options ?? {}; // Log the command being executed (if provided) if (command) { initLogger.logStep(`Executing: ${command}`); } + // Set up abort signal handler + const abortHandler = abortSignal + ? () => { + process.kill(); + } + : null; + if (abortHandler && abortSignal) { + abortSignal.addEventListener("abort", abortHandler); + } + // Drain stdout (prevent pipe overflow) if (process.stdout) { process.stdout.on("data", (data: Buffer) => { @@ -65,4 +78,11 @@ export function streamProcessToLogger( // Otherwise drain silently to prevent buffer overflow }); } + + // Return cleanup function to remove abort listener + return () => { + if (abortHandler && abortSignal) { + abortSignal.removeEventListener("abort", abortHandler); + } + }; } diff --git a/src/services/tools/bash.ts b/src/services/tools/bash.ts index 5673175ee..d25f7ef9b 100644 --- a/src/services/tools/bash.ts +++ b/src/services/tools/bash.ts @@ -425,7 +425,7 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => { const fullOutput = lines.join("\n"); // Use runtime.writeFile() for SSH support - const writer = config.runtime.writeFile(overflowPath); + const writer = config.runtime.writeFile(overflowPath, abortSignal); const encoder = new TextEncoder(); const writerInstance = writer.getWriter(); await writerInstance.write(encoder.encode(fullOutput)); diff --git a/src/services/tools/file_edit_insert.ts b/src/services/tools/file_edit_insert.ts index e65a376a9..340dc838a 100644 --- a/src/services/tools/file_edit_insert.ts +++ b/src/services/tools/file_edit_insert.ts @@ -18,12 +18,10 @@ export const createFileEditInsertTool: ToolFactory = (config: ToolConfiguration) return tool({ description: TOOL_DEFINITIONS.file_edit_insert.description, inputSchema: TOOL_DEFINITIONS.file_edit_insert.schema, - execute: async ({ - file_path, - line_offset, - content, - create, - }): Promise => { + execute: async ( + { file_path, line_offset, content, create }, + { abortSignal } + ): Promise => { try { // Validate no redundant path prefix (must come first to catch absolute paths) const redundantPrefixValidation = validateNoRedundantPrefix( @@ -57,7 +55,7 @@ export const createFileEditInsertTool: ToolFactory = (config: ToolConfiguration) const resolvedPath = config.runtime.normalizePath(file_path, config.cwd); // Check if file exists using runtime - const exists = await fileExists(config.runtime, resolvedPath); + const exists = await fileExists(config.runtime, resolvedPath, abortSignal); if (!exists) { if (!create) { @@ -69,7 +67,7 @@ export const createFileEditInsertTool: ToolFactory = (config: ToolConfiguration) // Create empty file using runtime helper try { - await writeFileString(config.runtime, resolvedPath, ""); + await writeFileString(config.runtime, resolvedPath, "", abortSignal); } catch (err) { if (err instanceof RuntimeError) { return { @@ -84,6 +82,7 @@ export const createFileEditInsertTool: ToolFactory = (config: ToolConfiguration) return executeFileEditOperation({ config, filePath: file_path, + abortSignal, operation: (originalContent) => { const lines = originalContent.split("\n"); diff --git a/src/services/tools/file_edit_operation.ts b/src/services/tools/file_edit_operation.ts index 5c06e3902..6ed67fde2 100644 --- a/src/services/tools/file_edit_operation.ts +++ b/src/services/tools/file_edit_operation.ts @@ -27,6 +27,7 @@ interface ExecuteFileEditOperationOptions { operation: ( originalContent: string ) => FileEditOperationResult | Promise>; + abortSignal?: AbortSignal; } /** @@ -37,6 +38,7 @@ export async function executeFileEditOperation({ config, filePath, operation, + abortSignal, }: ExecuteFileEditOperationOptions): Promise< FileEditErrorResult | (FileEditDiffSuccessBase & TMetadata) > { @@ -69,7 +71,7 @@ export async function executeFileEditOperation({ // Check if file exists and get stats using runtime let fileStat; try { - fileStat = await config.runtime.stat(resolvedPath); + fileStat = await config.runtime.stat(resolvedPath, abortSignal); } catch (err) { if (err instanceof RuntimeError) { return { @@ -98,7 +100,7 @@ export async function executeFileEditOperation({ // Read file content using runtime helper let originalContent: string; try { - originalContent = await readFileString(config.runtime, resolvedPath); + originalContent = await readFileString(config.runtime, resolvedPath, abortSignal); } catch (err) { if (err instanceof RuntimeError) { return { @@ -119,7 +121,7 @@ export async function executeFileEditOperation({ // Write file using runtime helper try { - await writeFileString(config.runtime, resolvedPath, operationResult.newContent); + await writeFileString(config.runtime, resolvedPath, operationResult.newContent, abortSignal); } catch (err) { if (err instanceof RuntimeError) { return { diff --git a/src/services/tools/file_edit_replace_lines.ts b/src/services/tools/file_edit_replace_lines.ts index 30be0a586..13b641e87 100644 --- a/src/services/tools/file_edit_replace_lines.ts +++ b/src/services/tools/file_edit_replace_lines.ts @@ -26,11 +26,15 @@ export const createFileEditReplaceLinesTool: ToolFactory = (config: ToolConfigur return tool({ description: TOOL_DEFINITIONS.file_edit_replace_lines.description, inputSchema: TOOL_DEFINITIONS.file_edit_replace_lines.schema, - execute: async (args: LineReplaceArgs): Promise => { + execute: async ( + args: LineReplaceArgs, + { abortSignal } + ): Promise => { const result = await executeFileEditOperation({ config, filePath: args.file_path, operation: (originalContent) => handleLineReplace(args, originalContent), + abortSignal, }); // handleLineReplace always returns lines_replaced and line_delta, diff --git a/src/services/tools/file_edit_replace_string.ts b/src/services/tools/file_edit_replace_string.ts index 8c255a068..abdeaf788 100644 --- a/src/services/tools/file_edit_replace_string.ts +++ b/src/services/tools/file_edit_replace_string.ts @@ -26,11 +26,15 @@ export const createFileEditReplaceStringTool: ToolFactory = (config: ToolConfigu return tool({ description: TOOL_DEFINITIONS.file_edit_replace_string.description, inputSchema: TOOL_DEFINITIONS.file_edit_replace_string.schema, - execute: async (args: StringReplaceArgs): Promise => { + execute: async ( + args: StringReplaceArgs, + { abortSignal } + ): Promise => { return executeFileEditOperation({ config, filePath: args.file_path, operation: (originalContent) => handleStringReplace(args, originalContent), + abortSignal, }); }, }); diff --git a/src/utils/runtime/fileExists.ts b/src/utils/runtime/fileExists.ts index 7f370faef..7504c9ac7 100644 --- a/src/utils/runtime/fileExists.ts +++ b/src/utils/runtime/fileExists.ts @@ -4,11 +4,16 @@ import type { Runtime } from "@/runtime/Runtime"; * Check if a path exists using runtime.stat() * @param runtime Runtime instance to use * @param path Path to check + * @param abortSignal Optional abort signal to cancel the operation * @returns True if path exists, false otherwise */ -export async function fileExists(runtime: Runtime, path: string): Promise { +export async function fileExists( + runtime: Runtime, + path: string, + abortSignal?: AbortSignal +): Promise { try { - await runtime.stat(path); + await runtime.stat(path, abortSignal); return true; } catch { return false; diff --git a/src/utils/runtime/helpers.ts b/src/utils/runtime/helpers.ts index b6b5e5731..d958e5cde 100644 --- a/src/utils/runtime/helpers.ts +++ b/src/utils/runtime/helpers.ts @@ -69,8 +69,12 @@ export async function execBuffered( /** * Read file contents as a UTF-8 string */ -export async function readFileString(runtime: Runtime, path: string): Promise { - const stream = runtime.readFile(path); +export async function readFileString( + runtime: Runtime, + path: string, + abortSignal?: AbortSignal +): Promise { + const stream = runtime.readFile(path, abortSignal); return streamToString(stream); } @@ -80,9 +84,10 @@ export async function readFileString(runtime: Runtime, path: string): Promise { - const stream = runtime.writeFile(path); + const stream = runtime.writeFile(path, abortSignal); const writer = stream.getWriter(); try { await writer.write(new TextEncoder().encode(content));