From 1f37e12186d7bd29619b33e458e3140482482ce6 Mon Sep 17 00:00:00 2001 From: Ammar Date: Tue, 28 Oct 2025 17:53:59 +0000 Subject: [PATCH 1/8] =?UTF-8?q?=F0=9F=A4=96=20Pass=20abort=20controllers?= =?UTF-8?q?=20through=20runtime=20stack?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add abort signal support to all long-running runtime operations to prevent indefinite hangs. Operations with timeouts <10s are excluded per design to avoid noise. Changes: - Runtime interface: Add optional abortSignal to readFile/writeFile/stat and workspace operations - LocalRuntime: Accept and check abort signals in file operations and workspace lifecycle - SSHRuntime: Pass abort signals through to exec() calls for long-running operations: - File I/O (300s timeout) - Git clone operations (300s timeout) - Init hook execution (3600s timeout) - Workspace rename/delete (30s timeout) - Helper functions: Accept and propagate abort signals through readFileString/writeFileString - Bash tool: Pass abort signal to runtime.writeFile() for overflow files All abort signals are optional parameters for backward compatibility. Testing: - 794 unit tests pass - 57 integration tests pass (executeBash, runtimeExecuteBash, runtimeFileEditing, createWorkspace, renameWorkspace, removeWorkspace) - Typecheck and lint clean # Conflicts: # src/runtime/LocalRuntime.ts --- src/runtime/LocalRuntime.ts | 78 +++++++++++++++++++++++++----------- src/runtime/Runtime.ts | 21 +++++++--- src/runtime/SSHRuntime.ts | 44 +++++++++++++++----- src/services/tools/bash.ts | 2 +- src/utils/runtime/helpers.ts | 13 ++++-- 5 files changed, 113 insertions(+), 45 deletions(-) diff --git a/src/runtime/LocalRuntime.ts b/src/runtime/LocalRuntime.ts index e1ecf7bb3..f8417ee8c 100644 --- a/src/runtime/LocalRuntime.ts +++ b/src/runtime/LocalRuntime.ts @@ -209,7 +209,7 @@ export class LocalRuntime implements Runtime { return { stdout, stderr, stdin, exitCode, duration }; } - readFile(filePath: string): ReadableStream { + readFile(filePath: string, abortSignal?: AbortSignal): ReadableStream { const nodeStream = fs.createReadStream(filePath); // Handle errors by wrapping in a transform @@ -217,6 +217,20 @@ export class LocalRuntime implements Runtime { return new ReadableStream({ async start(controller: ReadableStreamDefaultController) { + // Check if already aborted + if (abortSignal?.aborted) { + controller.error(new Error("Read operation aborted")); + nodeStream.destroy(); + return; + } + + // Set up abort listener + const abortHandler = () => { + controller.error(new Error("Read operation aborted")); + nodeStream.destroy(); + }; + abortSignal?.addEventListener("abort", abortHandler); + try { const reader = webStream.getReader(); while (true) { @@ -233,53 +247,53 @@ export class LocalRuntime implements Runtime { err instanceof Error ? err : undefined ) ); + } finally { + abortSignal?.removeEventListener("abort", abortHandler); } }, }); } - writeFile(filePath: string): WritableStream { + writeFile(filePath: string, abortSignal?: AbortSignal): WritableStream { let tempPath: string; let writer: WritableStreamDefaultWriter; - let resolvedPath: string; - let originalMode: number | undefined; return new WritableStream({ async start() { - // Resolve symlinks to write through them (preserves the symlink) - try { - resolvedPath = await fsPromises.realpath(filePath); - // Save original permissions to restore after write - const stat = await fsPromises.stat(resolvedPath); - originalMode = stat.mode; - } catch { - // If file doesn't exist, use the original path and default permissions - resolvedPath = filePath; - originalMode = undefined; + // Check if already aborted + if (abortSignal?.aborted) { + throw new Error("Write operation aborted"); } // Create parent directories if they don't exist - const parentDir = path.dirname(resolvedPath); + const parentDir = path.dirname(filePath); await fsPromises.mkdir(parentDir, { recursive: true }); // Create temp file for atomic write - tempPath = `${resolvedPath}.tmp.${Date.now()}`; + tempPath = `${filePath}.tmp.${Date.now()}`; const nodeStream = fs.createWriteStream(tempPath); const webStream = Writable.toWeb(nodeStream) as WritableStream; writer = webStream.getWriter(); + + // Set up abort listener + const abortHandler = () => { + writer.abort("Write operation aborted").catch(() => { + // Ignore errors during abort + }); + }; + abortSignal?.addEventListener("abort", abortHandler); }, async write(chunk: Uint8Array) { + if (abortSignal?.aborted) { + throw new Error("Write operation aborted"); + } await writer.write(chunk); }, async close() { // Close the writer and rename to final location await writer.close(); try { - // If we have original permissions, apply them to temp file before rename - if (originalMode !== undefined) { - await fsPromises.chmod(tempPath, originalMode); - } - await fsPromises.rename(tempPath, resolvedPath); + await fsPromises.rename(tempPath, filePath); } catch (err) { throw new RuntimeErrorClass( `Failed to write file ${filePath}: ${err instanceof Error ? err.message : String(err)}`, @@ -304,7 +318,12 @@ export class LocalRuntime implements Runtime { }); } - async stat(filePath: string): Promise { + async stat(filePath: string, abortSignal?: AbortSignal): Promise { + // Check if already aborted + if (abortSignal?.aborted) { + throw new Error("Stat operation aborted"); + } + try { const stats = await fsPromises.stat(filePath); return { @@ -480,10 +499,15 @@ export class LocalRuntime implements Runtime { async renameWorkspace( projectPath: string, oldName: string, - newName: string + newName: string, + abortSignal?: AbortSignal ): Promise< { success: true; oldPath: string; newPath: string } | { success: false; error: string } > { + // Check if already aborted + if (abortSignal?.aborted) { + return { success: false, error: "Rename operation aborted" }; + } // Compute workspace paths using canonical method const oldPath = this.getWorkspacePath(projectPath, oldName); const newPath = this.getWorkspacePath(projectPath, newName); @@ -503,8 +527,14 @@ export class LocalRuntime implements Runtime { async deleteWorkspace( projectPath: string, workspaceName: string, - force: boolean + force: boolean, + abortSignal?: AbortSignal ): Promise<{ success: true; deletedPath: string } | { success: false; error: string }> { + // Check if already aborted + if (abortSignal?.aborted) { + return { success: false, error: "Delete operation aborted" }; + } + // Compute workspace path using the canonical method const deletedPath = this.getWorkspacePath(projectPath, workspaceName); diff --git a/src/runtime/Runtime.ts b/src/runtime/Runtime.ts index eab3c2651..610a3057a 100644 --- a/src/runtime/Runtime.ts +++ b/src/runtime/Runtime.ts @@ -119,6 +119,8 @@ export interface WorkspaceCreationParams { directoryName: string; /** Logger for streaming creation progress and init hook output */ initLogger: InitLogger; + /** Optional abort signal for cancellation */ + abortSignal?: AbortSignal; } /** @@ -145,6 +147,8 @@ export interface WorkspaceInitParams { workspacePath: string; /** Logger for streaming initialization progress and output */ initLogger: InitLogger; + /** Optional abort signal for cancellation */ + abortSignal?: AbortSignal; } /** @@ -208,26 +212,29 @@ export interface Runtime { /** * Read file contents as a stream * @param path Absolute or relative path to file + * @param abortSignal Optional abort signal for cancellation * @returns Readable stream of file contents * @throws RuntimeError if file cannot be read */ - readFile(path: string): ReadableStream; + readFile(path: string, abortSignal?: AbortSignal): ReadableStream; /** * Write file contents atomically from a stream * @param path Absolute or relative path to file + * @param abortSignal Optional abort signal for cancellation * @returns Writable stream for file contents * @throws RuntimeError if file cannot be written */ - writeFile(path: string): WritableStream; + writeFile(path: string, abortSignal?: AbortSignal): WritableStream; /** * Get file statistics * @param path Absolute or relative path to file/directory + * @param abortSignal Optional abort signal for cancellation * @returns File statistics * @throws RuntimeError if path does not exist or cannot be accessed */ - stat(path: string): Promise; + stat(path: string, abortSignal?: AbortSignal): Promise; /** * Resolve a path to its absolute, canonical form (expanding tildes, resolving symlinks, etc.). @@ -310,12 +317,14 @@ export interface Runtime { * @param projectPath Project root path (local path, used for git commands in LocalRuntime and to extract project name) * @param oldName Current workspace name * @param newName New workspace name + * @param abortSignal Optional abort signal for cancellation * @returns Promise resolving to Result with old/new paths on success, or error message */ renameWorkspace( projectPath: string, oldName: string, - newName: string + newName: string, + abortSignal?: AbortSignal ): Promise< { success: true; oldPath: string; newPath: string } | { success: false; error: string } >; @@ -333,12 +342,14 @@ export interface Runtime { * @param projectPath Project root path (local path, used for git commands in LocalRuntime and to extract project name) * @param workspaceName Workspace name to delete * @param force If true, force deletion even with uncommitted changes or special conditions (submodules, etc.) + * @param abortSignal Optional abort signal for cancellation * @returns Promise resolving to Result with deleted path on success, or error message */ deleteWorkspace( projectPath: string, workspaceName: string, - force: boolean + force: boolean, + abortSignal?: AbortSignal ): Promise<{ success: true; deletedPath: string } | { success: false; error: string }>; /** diff --git a/src/runtime/SSHRuntime.ts b/src/runtime/SSHRuntime.ts index c79a465d1..f57a14b74 100644 --- a/src/runtime/SSHRuntime.ts +++ b/src/runtime/SSHRuntime.ts @@ -216,7 +216,7 @@ export class SSHRuntime implements Runtime { /** * Read file contents over SSH as a stream */ - readFile(path: string): ReadableStream { + readFile(path: string, abortSignal?: AbortSignal): ReadableStream { // Return stdout, but wrap to handle errors from exec() and exit code return new ReadableStream({ start: async (controller: ReadableStreamDefaultController) => { @@ -224,6 +224,7 @@ export class SSHRuntime implements Runtime { const stream = await this.exec(`cat ${shescape.quote(path)}`, { cwd: this.config.srcBaseDir, timeout: 300, // 5 minutes - reasonable for large files + abortSignal, }); const reader = stream.stdout.getReader(); @@ -265,7 +266,7 @@ export class SSHRuntime implements Runtime { * Write file contents over SSH atomically from a stream * Preserves symlinks and file permissions by resolving and copying metadata */ - writeFile(path: string): WritableStream { + writeFile(path: string, abortSignal?: AbortSignal): WritableStream { const tempPath = `${path}.tmp.${Date.now()}`; // Resolve symlinks to get the actual target path, preserving the symlink itself // If target exists, save its permissions to restore after write @@ -281,6 +282,7 @@ export class SSHRuntime implements Runtime { execPromise ??= this.exec(writeCommand, { cwd: this.config.srcBaseDir, timeout: 300, // 5 minutes - reasonable for large files + abortSignal, }); return execPromise; }; @@ -318,12 +320,13 @@ export class SSHRuntime implements Runtime { /** * Get file statistics over SSH */ - async stat(path: string): Promise { + async stat(path: string, _abortSignal?: AbortSignal): Promise { // Use stat with format string to get: size, mtime, type // %s = size, %Y = mtime (seconds since epoch), %F = file type + // Note: timeout is <10s so no abort signal needed per requirement const stream = await this.exec(`stat -c '%s %Y %F' ${shescape.quote(path)}`, { cwd: this.config.srcBaseDir, - timeout: 10, // 10 seconds - stat should be fast + timeout: 10, // 10 seconds - stat should be fast (no abort needed per requirement) }); const [stdout, stderr, exitCode] = await Promise.all([ @@ -510,7 +513,8 @@ export class SSHRuntime implements Runtime { private async syncProjectToRemote( projectPath: string, workspacePath: string, - initLogger: InitLogger + initLogger: InitLogger, + abortSignal?: AbortSignal ): Promise { // Use timestamp-based bundle path to avoid conflicts (simpler than $$) const timestamp = Date.now(); @@ -576,6 +580,7 @@ export class SSHRuntime implements Runtime { const cloneStream = await this.exec(`git clone --quiet ${bundleTempPath} ${cloneDestPath}`, { cwd: "~", timeout: 300, // 5 minutes for clone + abortSignal, }); const [cloneStdout, cloneStderr, cloneExitCode] = await Promise.all([ @@ -597,6 +602,7 @@ export class SSHRuntime implements Runtime { { cwd: "~", timeout: 30, + abortSignal, } ); await createTrackingBranchesStream.exitCode; @@ -667,7 +673,8 @@ export class SSHRuntime implements Runtime { private async runInitHook( projectPath: string, workspacePath: string, - initLogger: InitLogger + initLogger: InitLogger, + abortSignal?: AbortSignal ): Promise { // Check if hook exists locally (we synced the project, so local check is sufficient) const hookExists = await checkInitHookExists(projectPath); @@ -688,6 +695,7 @@ export class SSHRuntime implements Runtime { const hookStream = await this.exec(hookCommand, { cwd: workspacePath, // Run in the workspace directory timeout: 3600, // 1 hour - generous timeout for init hooks + abortSignal, }); // Create line-buffered loggers @@ -791,13 +799,13 @@ export class SSHRuntime implements Runtime { } async initWorkspace(params: WorkspaceInitParams): Promise { - const { projectPath, branchName, trunkBranch, workspacePath, initLogger } = params; + const { projectPath, branchName, trunkBranch, workspacePath, initLogger, abortSignal } = params; try { // 1. Sync project to remote (opportunistic rsync with scp fallback) initLogger.logStep("Syncing project files to remote..."); try { - await this.syncProjectToRemote(projectPath, workspacePath, initLogger); + await this.syncProjectToRemote(projectPath, workspacePath, initLogger, abortSignal); } catch (error) { const errorMsg = getErrorMessage(error); initLogger.logStderr(`Failed to sync project: ${errorMsg}`); @@ -821,6 +829,7 @@ export class SSHRuntime implements Runtime { const checkoutStream = await this.exec(checkoutCmd, { cwd: workspacePath, // Use the full workspace path for git operations timeout: 300, // 5 minutes for git checkout (can be slow on large repos) + abortSignal, }); const [stdout, stderr, exitCode] = await Promise.all([ @@ -844,7 +853,7 @@ export class SSHRuntime implements Runtime { // Note: runInitHook calls logComplete() internally if hook exists const hookExists = await checkInitHookExists(projectPath); if (hookExists) { - await this.runInitHook(projectPath, workspacePath, initLogger); + await this.runInitHook(projectPath, workspacePath, initLogger, abortSignal); } else { // No hook - signal completion immediately initLogger.logComplete(0); @@ -865,10 +874,15 @@ export class SSHRuntime implements Runtime { async renameWorkspace( projectPath: string, oldName: string, - newName: string + newName: string, + abortSignal?: AbortSignal ): Promise< { success: true; oldPath: string; newPath: string } | { success: false; error: string } > { + // Check if already aborted + if (abortSignal?.aborted) { + return { success: false, error: "Rename operation aborted" }; + } // Compute workspace paths using canonical method const oldPath = this.getWorkspacePath(projectPath, oldName); const newPath = this.getWorkspacePath(projectPath, newName); @@ -886,6 +900,7 @@ export class SSHRuntime implements Runtime { const stream = await this.exec(moveCommand, { cwd: this.config.srcBaseDir, timeout: 30, + abortSignal, }); await stream.stdin.close(); @@ -920,8 +935,14 @@ export class SSHRuntime implements Runtime { async deleteWorkspace( projectPath: string, workspaceName: string, - force: boolean + force: boolean, + abortSignal?: AbortSignal ): Promise<{ success: true; deletedPath: string } | { success: false; error: string }> { + // Check if already aborted + if (abortSignal?.aborted) { + return { success: false, error: "Delete operation aborted" }; + } + // Compute workspace path using canonical method const deletedPath = this.getWorkspacePath(projectPath, workspaceName); @@ -998,6 +1019,7 @@ export class SSHRuntime implements Runtime { const stream = await this.exec(removeCommand, { cwd: this.config.srcBaseDir, timeout: 30, + abortSignal, }); await stream.stdin.close(); diff --git a/src/services/tools/bash.ts b/src/services/tools/bash.ts index 5673175ee..d25f7ef9b 100644 --- a/src/services/tools/bash.ts +++ b/src/services/tools/bash.ts @@ -425,7 +425,7 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => { const fullOutput = lines.join("\n"); // Use runtime.writeFile() for SSH support - const writer = config.runtime.writeFile(overflowPath); + const writer = config.runtime.writeFile(overflowPath, abortSignal); const encoder = new TextEncoder(); const writerInstance = writer.getWriter(); await writerInstance.write(encoder.encode(fullOutput)); diff --git a/src/utils/runtime/helpers.ts b/src/utils/runtime/helpers.ts index b6b5e5731..d958e5cde 100644 --- a/src/utils/runtime/helpers.ts +++ b/src/utils/runtime/helpers.ts @@ -69,8 +69,12 @@ export async function execBuffered( /** * Read file contents as a UTF-8 string */ -export async function readFileString(runtime: Runtime, path: string): Promise { - const stream = runtime.readFile(path); +export async function readFileString( + runtime: Runtime, + path: string, + abortSignal?: AbortSignal +): Promise { + const stream = runtime.readFile(path, abortSignal); return streamToString(stream); } @@ -80,9 +84,10 @@ export async function readFileString(runtime: Runtime, path: string): Promise { - const stream = runtime.writeFile(path); + const stream = runtime.writeFile(path, abortSignal); const writer = stream.getWriter(); try { await writer.write(new TextEncoder().encode(content)); From d7addb521eefe035b48dc15897cb36b80308f01b Mon Sep 17 00:00:00 2001 From: Ammar Date: Mon, 27 Oct 2025 19:01:10 +0000 Subject: [PATCH 2/8] Remove abort signal handling from LocalRuntime operations Local file operations (readFile, writeFile, stat, renameWorkspace, deleteWorkspace) are fast and don't need cancellation support. The abort signal parameters remain in the interface for SSHRuntime, but LocalRuntime now ignores them with a comment explaining why. This simplifies LocalRuntime while keeping abort signal support where it matters most - in SSHRuntime's network-bound operations. --- src/runtime/LocalRuntime.ts | 61 ++++++------------------------------- 1 file changed, 10 insertions(+), 51 deletions(-) diff --git a/src/runtime/LocalRuntime.ts b/src/runtime/LocalRuntime.ts index f8417ee8c..e2e6bb761 100644 --- a/src/runtime/LocalRuntime.ts +++ b/src/runtime/LocalRuntime.ts @@ -209,7 +209,8 @@ export class LocalRuntime implements Runtime { return { stdout, stderr, stdin, exitCode, duration }; } - readFile(filePath: string, abortSignal?: AbortSignal): ReadableStream { + readFile(filePath: string, _abortSignal?: AbortSignal): ReadableStream { + // Note: _abortSignal ignored for local operations (fast, no need for cancellation) const nodeStream = fs.createReadStream(filePath); // Handle errors by wrapping in a transform @@ -217,20 +218,6 @@ export class LocalRuntime implements Runtime { return new ReadableStream({ async start(controller: ReadableStreamDefaultController) { - // Check if already aborted - if (abortSignal?.aborted) { - controller.error(new Error("Read operation aborted")); - nodeStream.destroy(); - return; - } - - // Set up abort listener - const abortHandler = () => { - controller.error(new Error("Read operation aborted")); - nodeStream.destroy(); - }; - abortSignal?.addEventListener("abort", abortHandler); - try { const reader = webStream.getReader(); while (true) { @@ -247,24 +234,18 @@ export class LocalRuntime implements Runtime { err instanceof Error ? err : undefined ) ); - } finally { - abortSignal?.removeEventListener("abort", abortHandler); } }, }); } - writeFile(filePath: string, abortSignal?: AbortSignal): WritableStream { + writeFile(filePath: string, _abortSignal?: AbortSignal): WritableStream { + // Note: _abortSignal ignored for local operations (fast, no need for cancellation) let tempPath: string; let writer: WritableStreamDefaultWriter; return new WritableStream({ async start() { - // Check if already aborted - if (abortSignal?.aborted) { - throw new Error("Write operation aborted"); - } - // Create parent directories if they don't exist const parentDir = path.dirname(filePath); await fsPromises.mkdir(parentDir, { recursive: true }); @@ -274,19 +255,8 @@ export class LocalRuntime implements Runtime { const nodeStream = fs.createWriteStream(tempPath); const webStream = Writable.toWeb(nodeStream) as WritableStream; writer = webStream.getWriter(); - - // Set up abort listener - const abortHandler = () => { - writer.abort("Write operation aborted").catch(() => { - // Ignore errors during abort - }); - }; - abortSignal?.addEventListener("abort", abortHandler); }, async write(chunk: Uint8Array) { - if (abortSignal?.aborted) { - throw new Error("Write operation aborted"); - } await writer.write(chunk); }, async close() { @@ -318,12 +288,8 @@ export class LocalRuntime implements Runtime { }); } - async stat(filePath: string, abortSignal?: AbortSignal): Promise { - // Check if already aborted - if (abortSignal?.aborted) { - throw new Error("Stat operation aborted"); - } - + async stat(filePath: string, _abortSignal?: AbortSignal): Promise { + // Note: _abortSignal ignored for local operations (fast, no need for cancellation) try { const stats = await fsPromises.stat(filePath); return { @@ -500,14 +466,11 @@ export class LocalRuntime implements Runtime { projectPath: string, oldName: string, newName: string, - abortSignal?: AbortSignal + _abortSignal?: AbortSignal ): Promise< { success: true; oldPath: string; newPath: string } | { success: false; error: string } > { - // Check if already aborted - if (abortSignal?.aborted) { - return { success: false, error: "Rename operation aborted" }; - } + // Note: _abortSignal ignored for local operations (fast, no need for cancellation) // Compute workspace paths using canonical method const oldPath = this.getWorkspacePath(projectPath, oldName); const newPath = this.getWorkspacePath(projectPath, newName); @@ -528,13 +491,9 @@ export class LocalRuntime implements Runtime { projectPath: string, workspaceName: string, force: boolean, - abortSignal?: AbortSignal + _abortSignal?: AbortSignal ): Promise<{ success: true; deletedPath: string } | { success: false; error: string }> { - // Check if already aborted - if (abortSignal?.aborted) { - return { success: false, error: "Delete operation aborted" }; - } - + // Note: _abortSignal ignored for local operations (fast, no need for cancellation) // Compute workspace path using the canonical method const deletedPath = this.getWorkspacePath(projectPath, workspaceName); From da4b5c4cca1943dae77956b051d5c2058f6f8442 Mon Sep 17 00:00:00 2001 From: Ammar Date: Tue, 28 Oct 2025 17:55:17 +0000 Subject: [PATCH 3/8] Fix SSHRuntime abort signal handling for stat() and deleteWorkspace() --- src/runtime/SSHRuntime.ts | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/src/runtime/SSHRuntime.ts b/src/runtime/SSHRuntime.ts index f57a14b74..432fa207a 100644 --- a/src/runtime/SSHRuntime.ts +++ b/src/runtime/SSHRuntime.ts @@ -320,13 +320,13 @@ export class SSHRuntime implements Runtime { /** * Get file statistics over SSH */ - async stat(path: string, _abortSignal?: AbortSignal): Promise { + async stat(path: string, abortSignal?: AbortSignal): Promise { // Use stat with format string to get: size, mtime, type // %s = size, %Y = mtime (seconds since epoch), %F = file type - // Note: timeout is <10s so no abort signal needed per requirement const stream = await this.exec(`stat -c '%s %Y %F' ${shescape.quote(path)}`, { cwd: this.config.srcBaseDir, - timeout: 10, // 10 seconds - stat should be fast (no abort needed per requirement) + timeout: 10, // 10 seconds - stat should be fast + abortSignal, }); const [stdout, stderr, exitCode] = await Promise.all([ @@ -547,6 +547,13 @@ export class SSHRuntime implements Runtime { log.debug(`Creating bundle: ${command}`); const proc = spawn("bash", ["-c", command]); + // Handle abort signal + const abortHandler = () => { + proc.kill(); + reject(new Error("Bundle creation aborted")); + }; + abortSignal?.addEventListener("abort", abortHandler); + streamProcessToLogger(proc, initLogger, { logStdout: false, logStderr: true, @@ -558,7 +565,10 @@ export class SSHRuntime implements Runtime { }); proc.on("close", (code) => { - if (code === 0) { + abortSignal?.removeEventListener("abort", abortHandler); + if (abortSignal?.aborted) { + reject(new Error("Bundle creation aborted")); + } else if (code === 0) { resolve(); } else { reject(new Error(`Failed to create bundle: ${stderr}`)); @@ -566,6 +576,7 @@ export class SSHRuntime implements Runtime { }); proc.on("error", (err) => { + abortSignal?.removeEventListener("abort", abortHandler); reject(err); }); }); @@ -616,6 +627,7 @@ export class SSHRuntime implements Runtime { { cwd: "~", timeout: 10, + abortSignal, } ); @@ -633,6 +645,7 @@ export class SSHRuntime implements Runtime { { cwd: "~", timeout: 10, + abortSignal, } ); await removeOriginStream.exitCode; @@ -643,6 +656,7 @@ export class SSHRuntime implements Runtime { const rmStream = await this.exec(`rm ${bundleTempPath}`, { cwd: "~", timeout: 10, + abortSignal, }); const rmExitCode = await rmStream.exitCode; @@ -657,6 +671,7 @@ export class SSHRuntime implements Runtime { const rmStream = await this.exec(`rm -f ${bundleTempPath}`, { cwd: "~", timeout: 10, + abortSignal, }); await rmStream.exitCode; } catch { @@ -747,7 +762,7 @@ export class SSHRuntime implements Runtime { async createWorkspace(params: WorkspaceCreationParams): Promise { try { - const { projectPath, branchName, initLogger } = params; + const { projectPath, branchName, initLogger, abortSignal } = params; // Compute workspace path using canonical method const workspacePath = this.getWorkspacePath(projectPath, branchName); @@ -768,6 +783,7 @@ export class SSHRuntime implements Runtime { const mkdirStream = await this.exec(parentDirCommand, { cwd: "/tmp", timeout: 10, + abortSignal, }); const mkdirExitCode = await mkdirStream.exitCode; if (mkdirExitCode !== 0) { @@ -966,6 +982,7 @@ export class SSHRuntime implements Runtime { const checkStream = await this.exec(checkScript, { cwd: this.config.srcBaseDir, timeout: 10, + abortSignal, }); await checkStream.stdin.close(); From d388f1b11972d4bb51ebd5774d1913ee3b2d4a66 Mon Sep 17 00:00:00 2001 From: Ammar Date: Tue, 28 Oct 2025 18:02:56 +0000 Subject: [PATCH 4/8] Restore symlink resolution logic in LocalRuntime.writeFile() During rebase conflict resolution, symlink resolution logic was accidentally removed from LocalRuntime.writeFile(). This restores: - resolvedPath: Resolves symlinks to write through them - originalMode: Preserves file permissions across writes The abort signal parameter addition is preserved. --- src/runtime/LocalRuntime.ts | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/runtime/LocalRuntime.ts b/src/runtime/LocalRuntime.ts index e2e6bb761..69c9a61fb 100644 --- a/src/runtime/LocalRuntime.ts +++ b/src/runtime/LocalRuntime.ts @@ -243,15 +243,29 @@ export class LocalRuntime implements Runtime { // Note: _abortSignal ignored for local operations (fast, no need for cancellation) let tempPath: string; let writer: WritableStreamDefaultWriter; + let resolvedPath: string; + let originalMode: number | undefined; return new WritableStream({ async start() { + // Resolve symlinks to write through them (preserves the symlink) + try { + resolvedPath = await fsPromises.realpath(filePath); + // Save original permissions to restore after write + const stat = await fsPromises.stat(resolvedPath); + originalMode = stat.mode; + } catch { + // If file doesn't exist, use the original path and default permissions + resolvedPath = filePath; + originalMode = undefined; + } + // Create parent directories if they don't exist - const parentDir = path.dirname(filePath); + const parentDir = path.dirname(resolvedPath); await fsPromises.mkdir(parentDir, { recursive: true }); // Create temp file for atomic write - tempPath = `${filePath}.tmp.${Date.now()}`; + tempPath = `${resolvedPath}.tmp.${Date.now()}`; const nodeStream = fs.createWriteStream(tempPath); const webStream = Writable.toWeb(nodeStream) as WritableStream; writer = webStream.getWriter(); @@ -263,7 +277,11 @@ export class LocalRuntime implements Runtime { // Close the writer and rename to final location await writer.close(); try { - await fsPromises.rename(tempPath, filePath); + // If we have original permissions, apply them to temp file before rename + if (originalMode !== undefined) { + await fsPromises.chmod(tempPath, originalMode); + } + await fsPromises.rename(tempPath, resolvedPath); } catch (err) { throw new RuntimeErrorClass( `Failed to write file ${filePath}: ${err instanceof Error ? err.message : String(err)}`, From 7855eaae096079295e6b10b1ef604d1bb6c92a4e Mon Sep 17 00:00:00 2001 From: Ammar Date: Tue, 28 Oct 2025 18:26:43 +0000 Subject: [PATCH 5/8] Add abort signal pre-checks to prevent starting cancelled operations AbortSignal events don't fire retroactively, so if a signal is already aborted when a method is called, the abort listener never fires and the operation runs to completion despite being cancelled. Added pre-checks at operation start in: - SSHRuntime.exec(): Short-circuit before spawning SSH process - SSHRuntime.syncProjectToRemote(): Check at function entry and before bundle spawn This ensures operations cancelled between sequential steps don't start. Addresses Codex feedback. --- src/runtime/SSHRuntime.ts | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/runtime/SSHRuntime.ts b/src/runtime/SSHRuntime.ts index 432fa207a..c2e2769f7 100644 --- a/src/runtime/SSHRuntime.ts +++ b/src/runtime/SSHRuntime.ts @@ -81,6 +81,11 @@ export class SSHRuntime implements Runtime { async exec(command: string, options: ExecOptions): Promise { const startTime = performance.now(); + // Short-circuit if already aborted + if (options.abortSignal?.aborted) { + throw new RuntimeErrorClass("Operation aborted before execution", "exec"); + } + // Build command parts const parts: string[] = []; @@ -516,6 +521,11 @@ export class SSHRuntime implements Runtime { initLogger: InitLogger, abortSignal?: AbortSignal ): Promise { + // Short-circuit if already aborted + if (abortSignal?.aborted) { + throw new Error("Sync operation aborted before starting"); + } + // Use timestamp-based bundle path to avoid conflicts (simpler than $$) const timestamp = Date.now(); const bundleTempPath = `~/.cmux-bundle-${timestamp}.bundle`; @@ -541,6 +551,12 @@ export class SSHRuntime implements Runtime { // Step 2: Create bundle locally and pipe to remote file via SSH initLogger.logStep(`Creating git bundle...`); await new Promise((resolve, reject) => { + // Check if aborted before spawning + if (abortSignal?.aborted) { + reject(new Error("Bundle creation aborted")); + return; + } + const sshArgs = this.buildSSHArgs(true); const command = `cd ${shescape.quote(projectPath)} && git bundle create - --all | ssh ${sshArgs.join(" ")} "cat > ${bundleTempPath}"`; From e8f704ed653a028a84df32a38f30903186fbce88 Mon Sep 17 00:00:00 2001 From: Ammar Date: Tue, 28 Oct 2025 18:29:27 +0000 Subject: [PATCH 6/8] Refactor: Move abort signal handling into streamProcessToLogger Rather than having callers manually set up abort handlers and cleanup, streamProcessToLogger now accepts an optional abortSignal and returns a cleanup function. This centralizes the abort handling logic and makes call sites cleaner. Benefits: - Reduces boilerplate at call sites - Ensures consistent cleanup (no risk of forgetting removeEventListener) - Single place to maintain abort handling logic Addresses code review feedback. --- src/runtime/SSHRuntime.ts | 14 ++++---------- src/runtime/streamProcess.ts | 24 ++++++++++++++++++++++-- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/src/runtime/SSHRuntime.ts b/src/runtime/SSHRuntime.ts index c2e2769f7..1403a40e5 100644 --- a/src/runtime/SSHRuntime.ts +++ b/src/runtime/SSHRuntime.ts @@ -563,16 +563,10 @@ export class SSHRuntime implements Runtime { log.debug(`Creating bundle: ${command}`); const proc = spawn("bash", ["-c", command]); - // Handle abort signal - const abortHandler = () => { - proc.kill(); - reject(new Error("Bundle creation aborted")); - }; - abortSignal?.addEventListener("abort", abortHandler); - - streamProcessToLogger(proc, initLogger, { + const cleanup = streamProcessToLogger(proc, initLogger, { logStdout: false, logStderr: true, + abortSignal, }); let stderr = ""; @@ -581,7 +575,7 @@ export class SSHRuntime implements Runtime { }); proc.on("close", (code) => { - abortSignal?.removeEventListener("abort", abortHandler); + cleanup(); if (abortSignal?.aborted) { reject(new Error("Bundle creation aborted")); } else if (code === 0) { @@ -592,7 +586,7 @@ export class SSHRuntime implements Runtime { }); proc.on("error", (err) => { - abortSignal?.removeEventListener("abort", abortHandler); + cleanup(); reject(err); }); }); diff --git a/src/runtime/streamProcess.ts b/src/runtime/streamProcess.ts index e2ca0eeec..38685f270 100644 --- a/src/runtime/streamProcess.ts +++ b/src/runtime/streamProcess.ts @@ -16,6 +16,7 @@ import type { InitLogger } from "./Runtime"; * @param process Child process to stream from * @param initLogger Logger to stream output to * @param options Configuration for which streams to log + * @returns Cleanup function to remove abort signal listener (call this in process close/error handlers) */ export function streamProcessToLogger( process: ChildProcess, @@ -27,15 +28,27 @@ export function streamProcessToLogger( logStderr?: boolean; /** Optional: Command string to log before streaming starts */ command?: string; + /** Optional: Abort signal to kill process on cancellation */ + abortSignal?: AbortSignal; } -): void { - const { logStdout = false, logStderr = true, command } = options ?? {}; +): () => void { + const { logStdout = false, logStderr = true, command, abortSignal } = options ?? {}; // Log the command being executed (if provided) if (command) { initLogger.logStep(`Executing: ${command}`); } + // Set up abort signal handler + const abortHandler = abortSignal + ? () => { + process.kill(); + } + : null; + if (abortHandler && abortSignal) { + abortSignal.addEventListener("abort", abortHandler); + } + // Drain stdout (prevent pipe overflow) if (process.stdout) { process.stdout.on("data", (data: Buffer) => { @@ -65,4 +78,11 @@ export function streamProcessToLogger( // Otherwise drain silently to prevent buffer overflow }); } + + // Return cleanup function to remove abort listener + return () => { + if (abortHandler && abortSignal) { + abortSignal.removeEventListener("abort", abortHandler); + } + }; } From bf59808a88065121675221c2c427c10736cc969f Mon Sep 17 00:00:00 2001 From: Ammar Date: Tue, 28 Oct 2025 18:59:08 +0000 Subject: [PATCH 7/8] =?UTF-8?q?=F0=9F=A4=96=20feat:=20thread=20abort=20sig?= =?UTF-8?q?nals=20through=20file=20edit=20tools?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add abortSignal parameter to all file edit tools and helpers: - executeFileEditOperation() accepts and passes abortSignal - fileExists() accepts and passes abortSignal to runtime.stat() - file_edit_replace_string passes abortSignal from AI SDK - file_edit_replace_lines passes abortSignal from AI SDK - file_edit_insert passes abortSignal from AI SDK These tools now receive abort signals from AI SDK tool execution context and properly pass them to runtime operations (stat, readFile, writeFile). SSH runtime will cancel operations when abort is triggered. Local runtime ignores abort signals (fast operations). --- src/services/tools/file_edit_insert.ts | 20 +++++++++++-------- src/services/tools/file_edit_operation.ts | 8 +++++--- src/services/tools/file_edit_replace_lines.ts | 6 +++++- .../tools/file_edit_replace_string.ts | 6 +++++- src/utils/runtime/fileExists.ts | 9 +++++++-- 5 files changed, 34 insertions(+), 15 deletions(-) diff --git a/src/services/tools/file_edit_insert.ts b/src/services/tools/file_edit_insert.ts index e65a376a9..ca03cc7c0 100644 --- a/src/services/tools/file_edit_insert.ts +++ b/src/services/tools/file_edit_insert.ts @@ -18,12 +18,15 @@ export const createFileEditInsertTool: ToolFactory = (config: ToolConfiguration) return tool({ description: TOOL_DEFINITIONS.file_edit_insert.description, inputSchema: TOOL_DEFINITIONS.file_edit_insert.schema, - execute: async ({ - file_path, - line_offset, - content, - create, - }): Promise => { + execute: async ( + { + file_path, + line_offset, + content, + create, + }, + { abortSignal } + ): Promise => { try { // Validate no redundant path prefix (must come first to catch absolute paths) const redundantPrefixValidation = validateNoRedundantPrefix( @@ -57,7 +60,7 @@ export const createFileEditInsertTool: ToolFactory = (config: ToolConfiguration) const resolvedPath = config.runtime.normalizePath(file_path, config.cwd); // Check if file exists using runtime - const exists = await fileExists(config.runtime, resolvedPath); + const exists = await fileExists(config.runtime, resolvedPath, abortSignal); if (!exists) { if (!create) { @@ -69,7 +72,7 @@ export const createFileEditInsertTool: ToolFactory = (config: ToolConfiguration) // Create empty file using runtime helper try { - await writeFileString(config.runtime, resolvedPath, ""); + await writeFileString(config.runtime, resolvedPath, "", abortSignal); } catch (err) { if (err instanceof RuntimeError) { return { @@ -84,6 +87,7 @@ export const createFileEditInsertTool: ToolFactory = (config: ToolConfiguration) return executeFileEditOperation({ config, filePath: file_path, + abortSignal, operation: (originalContent) => { const lines = originalContent.split("\n"); diff --git a/src/services/tools/file_edit_operation.ts b/src/services/tools/file_edit_operation.ts index 5c06e3902..6ed67fde2 100644 --- a/src/services/tools/file_edit_operation.ts +++ b/src/services/tools/file_edit_operation.ts @@ -27,6 +27,7 @@ interface ExecuteFileEditOperationOptions { operation: ( originalContent: string ) => FileEditOperationResult | Promise>; + abortSignal?: AbortSignal; } /** @@ -37,6 +38,7 @@ export async function executeFileEditOperation({ config, filePath, operation, + abortSignal, }: ExecuteFileEditOperationOptions): Promise< FileEditErrorResult | (FileEditDiffSuccessBase & TMetadata) > { @@ -69,7 +71,7 @@ export async function executeFileEditOperation({ // Check if file exists and get stats using runtime let fileStat; try { - fileStat = await config.runtime.stat(resolvedPath); + fileStat = await config.runtime.stat(resolvedPath, abortSignal); } catch (err) { if (err instanceof RuntimeError) { return { @@ -98,7 +100,7 @@ export async function executeFileEditOperation({ // Read file content using runtime helper let originalContent: string; try { - originalContent = await readFileString(config.runtime, resolvedPath); + originalContent = await readFileString(config.runtime, resolvedPath, abortSignal); } catch (err) { if (err instanceof RuntimeError) { return { @@ -119,7 +121,7 @@ export async function executeFileEditOperation({ // Write file using runtime helper try { - await writeFileString(config.runtime, resolvedPath, operationResult.newContent); + await writeFileString(config.runtime, resolvedPath, operationResult.newContent, abortSignal); } catch (err) { if (err instanceof RuntimeError) { return { diff --git a/src/services/tools/file_edit_replace_lines.ts b/src/services/tools/file_edit_replace_lines.ts index 30be0a586..13b641e87 100644 --- a/src/services/tools/file_edit_replace_lines.ts +++ b/src/services/tools/file_edit_replace_lines.ts @@ -26,11 +26,15 @@ export const createFileEditReplaceLinesTool: ToolFactory = (config: ToolConfigur return tool({ description: TOOL_DEFINITIONS.file_edit_replace_lines.description, inputSchema: TOOL_DEFINITIONS.file_edit_replace_lines.schema, - execute: async (args: LineReplaceArgs): Promise => { + execute: async ( + args: LineReplaceArgs, + { abortSignal } + ): Promise => { const result = await executeFileEditOperation({ config, filePath: args.file_path, operation: (originalContent) => handleLineReplace(args, originalContent), + abortSignal, }); // handleLineReplace always returns lines_replaced and line_delta, diff --git a/src/services/tools/file_edit_replace_string.ts b/src/services/tools/file_edit_replace_string.ts index 8c255a068..abdeaf788 100644 --- a/src/services/tools/file_edit_replace_string.ts +++ b/src/services/tools/file_edit_replace_string.ts @@ -26,11 +26,15 @@ export const createFileEditReplaceStringTool: ToolFactory = (config: ToolConfigu return tool({ description: TOOL_DEFINITIONS.file_edit_replace_string.description, inputSchema: TOOL_DEFINITIONS.file_edit_replace_string.schema, - execute: async (args: StringReplaceArgs): Promise => { + execute: async ( + args: StringReplaceArgs, + { abortSignal } + ): Promise => { return executeFileEditOperation({ config, filePath: args.file_path, operation: (originalContent) => handleStringReplace(args, originalContent), + abortSignal, }); }, }); diff --git a/src/utils/runtime/fileExists.ts b/src/utils/runtime/fileExists.ts index 7f370faef..7504c9ac7 100644 --- a/src/utils/runtime/fileExists.ts +++ b/src/utils/runtime/fileExists.ts @@ -4,11 +4,16 @@ import type { Runtime } from "@/runtime/Runtime"; * Check if a path exists using runtime.stat() * @param runtime Runtime instance to use * @param path Path to check + * @param abortSignal Optional abort signal to cancel the operation * @returns True if path exists, false otherwise */ -export async function fileExists(runtime: Runtime, path: string): Promise { +export async function fileExists( + runtime: Runtime, + path: string, + abortSignal?: AbortSignal +): Promise { try { - await runtime.stat(path); + await runtime.stat(path, abortSignal); return true; } catch { return false; From 01ccb54842394a15b4430762b26e08a1b1a3106f Mon Sep 17 00:00:00 2001 From: Ammar Date: Tue, 28 Oct 2025 19:06:20 +0000 Subject: [PATCH 8/8] =?UTF-8?q?=F0=9F=A4=96=20style:=20fix=20prettier=20fo?= =?UTF-8?q?rmatting=20in=20file=5Fedit=5Finsert.ts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/services/tools/file_edit_insert.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/services/tools/file_edit_insert.ts b/src/services/tools/file_edit_insert.ts index ca03cc7c0..340dc838a 100644 --- a/src/services/tools/file_edit_insert.ts +++ b/src/services/tools/file_edit_insert.ts @@ -19,12 +19,7 @@ export const createFileEditInsertTool: ToolFactory = (config: ToolConfiguration) description: TOOL_DEFINITIONS.file_edit_insert.description, inputSchema: TOOL_DEFINITIONS.file_edit_insert.schema, execute: async ( - { - file_path, - line_offset, - content, - create, - }, + { file_path, line_offset, content, create }, { abortSignal } ): Promise => { try {