diff --git a/.cmux/init b/.cmux/init new file mode 100755 index 000000000..7b4901009 --- /dev/null +++ b/.cmux/init @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +set -euo pipefail + +echo "Installing dependencies with bun..." +bun install +echo "Dependencies installed successfully!" + diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 50e2b8c88..67ced4604 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -10,6 +10,7 @@ - [Workspaces](./workspaces.md) - [Forking](./fork.md) + - [Init Hooks](./init-hooks.md) - [Models](./models.md) - [Keyboard Shortcuts](./keybinds.md) - [Vim Mode](./vim-mode.md) diff --git a/docs/init-hooks.md b/docs/init-hooks.md new file mode 100644 index 000000000..eedfbd6ee --- /dev/null +++ b/docs/init-hooks.md @@ -0,0 +1,48 @@ +# Init Hooks + +Add a `.cmux/init` executable script to your project root to run commands when creating new workspaces. + +## Example + +```bash +#!/bin/bash +set -e + +bun install +bun run build +``` + +Make it executable: + +```bash +chmod +x .cmux/init +``` + +## Behavior + +- **Runs once** per workspace on creation +- **Streams output** to the workspace UI in real-time +- **Non-blocking** - workspace is immediately usable, even while hook runs +- **Exit codes preserved** - failures are logged but don't prevent workspace usage + +The init script runs in the workspace directory with the workspace's environment. + +## Use Cases + +- Install dependencies (`npm install`, `bun install`, etc.) +- Run build steps +- Generate code or configs +- Set up databases or services +- Warm caches + +## Output + +Init output appears in a banner at the top of the workspace. Click to expand/collapse the log. The banner shows: + +- Script path (`.cmux/init`) +- Status (running, success, or exit code on failure) +- Full stdout/stderr output + +## Idempotency + +The hook runs every time you create a workspace, even if you delete and recreate with the same name. Make your script idempotent if you're modifying shared state. diff --git a/src/components/AIView.tsx b/src/components/AIView.tsx index 84bc8dd3f..6151eec5d 100644 --- a/src/components/AIView.tsx +++ b/src/components/AIView.tsx @@ -230,8 +230,10 @@ const AIViewInner: React.FC = ({ const mergedMessages = mergeConsecutiveStreamErrors(workspaceState.messages); const editCutoffHistoryId = mergedMessages.find( - (msg): msg is Exclude => - msg.type !== "history-hidden" && msg.historyId === editingMessage.id + (msg): msg is Exclude => + msg.type !== "history-hidden" && + msg.type !== "workspace-init" && + msg.historyId === editingMessage.id )?.historyId; if (!editCutoffHistoryId) { @@ -277,8 +279,10 @@ const AIViewInner: React.FC = ({ // When editing, find the cutoff point const editCutoffHistoryId = editingMessage ? mergedMessages.find( - (msg): msg is Exclude => - msg.type !== "history-hidden" && msg.historyId === editingMessage.id + (msg): msg is Exclude => + msg.type !== "history-hidden" && + msg.type !== "workspace-init" && + msg.historyId === editingMessage.id )?.historyId : undefined; @@ -381,6 +385,15 @@ const AIViewInner: React.FC = ({

No Messages Yet

Send a message below to begin

+

+ 💡 Tip: Add a{" "} + + .cmux/init + {" "} + hook to your project to run setup commands +
+ (e.g., install dependencies, build) when creating new workspaces +

) : ( <> @@ -388,12 +401,17 @@ const AIViewInner: React.FC = ({ const isAtCutoff = editCutoffHistoryId !== undefined && msg.type !== "history-hidden" && + msg.type !== "workspace-init" && msg.historyId === editCutoffHistoryId; return (
; + className?: string; +} + +export const InitMessage = React.memo(({ message, className }) => { + const isError = message.status === "error"; + + return ( +
+
+ 🔧 +
+ {message.status === "running" ? ( + Running init hook... + ) : message.status === "success" ? ( + ✅ Init hook completed successfully + ) : ( + + Init hook exited with code {message.exitCode}. Workspace is ready, but some setup + failed. + + )} +
{message.hookPath}
+
+
+ {message.lines.length > 0 && ( +
+          {message.lines.join("\n")}
+        
+ )} +
+ ); +}); + +InitMessage.displayName = "InitMessage"; diff --git a/src/components/Messages/MessageRenderer.tsx b/src/components/Messages/MessageRenderer.tsx index e8824e750..47d45ce6a 100644 --- a/src/components/Messages/MessageRenderer.tsx +++ b/src/components/Messages/MessageRenderer.tsx @@ -6,6 +6,7 @@ import { ToolMessage } from "./ToolMessage"; import { ReasoningMessage } from "./ReasoningMessage"; import { StreamErrorMessage } from "./StreamErrorMessage"; import { HistoryHiddenMessage } from "./HistoryHiddenMessage"; +import { InitMessage } from "./InitMessage"; interface MessageRendererProps { message: DisplayedMessage; @@ -46,6 +47,8 @@ export const MessageRenderer = React.memo( return ; case "history-hidden": return ; + case "workspace-init": + return ; default: console.error("don't know how to render message", message); return null; diff --git a/src/constants/ipc-constants.ts b/src/constants/ipc-constants.ts index be42fd9ad..ce3a3ffb0 100644 --- a/src/constants/ipc-constants.ts +++ b/src/constants/ipc-constants.ts @@ -25,7 +25,6 @@ export const IPC_CHANNELS = { WORKSPACE_REMOVE: "workspace:remove", WORKSPACE_RENAME: "workspace:rename", WORKSPACE_FORK: "workspace:fork", - WORKSPACE_STREAM_META: "workspace:streamMeta", WORKSPACE_SEND_MESSAGE: "workspace:sendMessage", WORKSPACE_RESUME_STREAM: "workspace:resumeStream", WORKSPACE_INTERRUPT_STREAM: "workspace:interruptStream", diff --git a/src/debug/agentSessionCli.ts b/src/debug/agentSessionCli.ts index ab2ef5f1f..6254d0027 100644 --- a/src/debug/agentSessionCli.ts +++ b/src/debug/agentSessionCli.ts @@ -8,6 +8,7 @@ import { Config } from "@/config"; import { HistoryService } from "@/services/historyService"; import { PartialService } from "@/services/partialService"; import { AIService } from "@/services/aiService"; +import { InitStateManager } from "@/services/initStateManager"; import { AgentSession, type AgentSessionChatEvent } from "@/services/agentSession"; import { isCaughtUpMessage, @@ -216,6 +217,7 @@ async function main(): Promise { const historyService = new HistoryService(config); const partialService = new PartialService(config, historyService); const aiService = new AIService(config, historyService, partialService); + const initStateManager = new InitStateManager(config); ensureProvidersConfig(config); const session = new AgentSession({ @@ -224,6 +226,7 @@ async function main(): Promise { historyService, partialService, aiService, + initStateManager, }); session.ensureMetadata({ diff --git a/src/preload.ts b/src/preload.ts index a42e597e9..7fc5d49e5 100644 --- a/src/preload.ts +++ b/src/preload.ts @@ -73,7 +73,7 @@ const api: IPCApi = { openTerminal: (workspacePath) => ipcRenderer.invoke(IPC_CHANNELS.WORKSPACE_OPEN_TERMINAL, workspacePath), - onChat: (workspaceId, callback) => { + onChat: (workspaceId: string, callback) => { const channel = getChatChannel(workspaceId); const handler = (_event: unknown, data: WorkspaceChatMessage) => { callback(data); diff --git a/src/services/agentSession.ts b/src/services/agentSession.ts index 394d631cd..27ccadc23 100644 --- a/src/services/agentSession.ts +++ b/src/services/agentSession.ts @@ -6,6 +6,7 @@ import type { Config } from "@/config"; import type { AIService } from "@/services/aiService"; import type { HistoryService } from "@/services/historyService"; import type { PartialService } from "@/services/partialService"; +import type { InitStateManager } from "@/services/initStateManager"; import type { WorkspaceMetadata } from "@/types/workspace"; import type { WorkspaceChatMessage, StreamErrorMessage, SendMessageOptions } from "@/types/ipc"; import type { SendMessageError } from "@/types/errors"; @@ -36,6 +37,7 @@ interface AgentSessionOptions { historyService: HistoryService; partialService: PartialService; aiService: AIService; + initStateManager: InitStateManager; } export class AgentSession { @@ -44,14 +46,18 @@ export class AgentSession { private readonly historyService: HistoryService; private readonly partialService: PartialService; private readonly aiService: AIService; + private readonly initStateManager: InitStateManager; private readonly emitter = new EventEmitter(); private readonly aiListeners: Array<{ event: string; handler: (...args: unknown[]) => void }> = []; + private readonly initListeners: Array<{ event: string; handler: (...args: unknown[]) => void }> = + []; private disposed = false; constructor(options: AgentSessionOptions) { assert(options, "AgentSession requires options"); - const { workspaceId, config, historyService, partialService, aiService } = options; + const { workspaceId, config, historyService, partialService, aiService, initStateManager } = + options; assert(typeof workspaceId === "string", "workspaceId must be a string"); const trimmedWorkspaceId = workspaceId.trim(); @@ -62,8 +68,10 @@ export class AgentSession { this.historyService = historyService; this.partialService = partialService; this.aiService = aiService; + this.initStateManager = initStateManager; this.attachAiListeners(); + this.attachInitListeners(); } dispose(): void { @@ -75,6 +83,10 @@ export class AgentSession { this.aiService.off(event, handler as never); } this.aiListeners.length = 0; + for (const { event, handler } of this.initListeners) { + this.initStateManager.off(event, handler as never); + } + this.initListeners.length = 0; this.emitter.removeAllListeners(); } @@ -121,6 +133,7 @@ export class AgentSession { private async emitHistoricalEvents( listener: (event: AgentSessionChatEvent) => void ): Promise { + // Load chat history (persisted messages from chat.jsonl) const historyResult = await this.historyService.getHistory(this.workspaceId); if (historyResult.success) { for (const message of historyResult.data) { @@ -128,6 +141,7 @@ export class AgentSession { } } + // Check for interrupted streams (active streaming state) const streamInfo = this.aiService.getStreamInfo(this.workspaceId); const partial = await this.partialService.readPartial(this.workspaceId); @@ -137,6 +151,13 @@ export class AgentSession { listener({ workspaceId: this.workspaceId, message: partial }); } + // Replay init state BEFORE caught-up (treat as historical data) + // This ensures init events are buffered correctly by the frontend, + // preserving their natural timing characteristics from the hook execution. + await this.initStateManager.replayInit(this.workspaceId); + + // Send caught-up after ALL historical data (including init events) + // This signals frontend that replay is complete and future events are real-time listener({ workspaceId: this.workspaceId, message: { type: "caught-up" }, @@ -405,7 +426,35 @@ export class AgentSession { this.aiService.on("error", errorHandler as never); } - private emitChatEvent(message: WorkspaceChatMessage): void { + private attachInitListeners(): void { + const forward = (event: string, handler: (payload: WorkspaceChatMessage) => void) => { + const wrapped = (...args: unknown[]) => { + const [payload] = args; + if ( + typeof payload === "object" && + payload !== null && + "workspaceId" in payload && + (payload as { workspaceId: unknown }).workspaceId !== this.workspaceId + ) { + return; + } + // Strip workspaceId from payload before forwarding (WorkspaceInitEvent doesn't include it) + const { workspaceId: _, ...message } = payload as WorkspaceChatMessage & { + workspaceId: string; + }; + handler(message as WorkspaceChatMessage); + }; + this.initListeners.push({ event, handler: wrapped }); + this.initStateManager.on(event, wrapped as never); + }; + + forward("init-start", (payload) => this.emitChatEvent(payload)); + forward("init-output", (payload) => this.emitChatEvent(payload)); + forward("init-end", (payload) => this.emitChatEvent(payload)); + } + + // Public method to emit chat events (used by init hooks and other workspace events) + emitChatEvent(message: WorkspaceChatMessage): void { this.assertNotDisposed("emitChatEvent"); this.emitter.emit("chat-event", { workspaceId: this.workspaceId, diff --git a/src/services/bashExecutionService.ts b/src/services/bashExecutionService.ts new file mode 100644 index 000000000..623165f03 --- /dev/null +++ b/src/services/bashExecutionService.ts @@ -0,0 +1,188 @@ +import { spawn } from "child_process"; +import type { ChildProcess } from "child_process"; +import { log } from "./log"; + +/** + * Configuration for bash execution + */ +export interface BashExecutionConfig { + /** Working directory for command execution */ + cwd: string; + /** Environment secrets to inject (e.g., API keys) */ + secrets?: Record; + /** Whether to spawn as detached process group (default: true) */ + detached?: boolean; + /** Nice level for process priority (-20 to 19) */ + niceness?: number; +} + +/** + * Callbacks for streaming execution mode + */ +export interface StreamingCallbacks { + /** Called for each complete line from stdout */ + onStdout: (line: string) => void; + /** Called for each complete line from stderr */ + onStderr: (line: string) => void; + /** Called when process exits */ + onExit: (exitCode: number) => void; +} + +/** + * Wraps a ChildProcess to make it disposable for use with `using` statements. + * Always kills the entire process group with SIGKILL to prevent zombie processes. + * SIGKILL cannot be caught or ignored, guaranteeing immediate cleanup. + */ +export class DisposableProcess implements Disposable { + private disposed = false; + + constructor(private readonly process: ChildProcess) {} + + [Symbol.dispose](): void { + // Prevent double-signalling if dispose is called multiple times + if (this.disposed || this.process.pid === undefined) { + return; + } + + this.disposed = true; + + try { + // Kill entire process group with SIGKILL - cannot be caught/ignored + process.kill(-this.process.pid, "SIGKILL"); + } catch { + // Fallback: try killing just the main process + try { + this.process.kill("SIGKILL"); + } catch { + // Process already dead - ignore + } + } + } + + get child(): ChildProcess { + return this.process; + } +} + +/** + * Centralized bash execution service. + * + * All workspace command execution goes through this service to: + * - Maintain consistent environment setup across all bash execution + * - Provide single abstraction point for future host migration (containers, remote, etc.) + * - Eliminate duplication between init hooks and bash tool + * + * Provides two execution modes: + * - Streaming: Line-by-line output callbacks (for init hooks, real-time feedback) + * - Buffered: Collect all output, return at end (for bash tool, LLM consumption) + */ +export class BashExecutionService { + /** + * Create standardized bash environment. + * Prevents interactive prompts that would block execution. + */ + private createBashEnvironment(secrets?: Record): NodeJS.ProcessEnv { + return { + ...process.env, + // Inject secrets as environment variables + ...(secrets ?? {}), + // Prevent interactive editors from blocking bash execution + // Critical for git operations like rebase/commit that try to open editors + GIT_EDITOR: "true", // Git-specific editor (highest priority) + GIT_SEQUENCE_EDITOR: "true", // For interactive rebase sequences + EDITOR: "true", // General fallback for non-git commands + VISUAL: "true", // Another common editor environment variable + // Prevent git from prompting for credentials + // Critical for operations like fetch/pull that might try to authenticate + // Without this, git can hang waiting for user input if credentials aren't configured + GIT_TERMINAL_PROMPT: "0", // Disables git credential prompts + }; + } + + /** + * Execute bash command with streaming output. + * + * Output is emitted line-by-line through callbacks as it arrives. + * Used by init hooks for real-time progress feedback. + * + * @param script Bash script to execute + * @param config Execution configuration + * @param callbacks Output and exit callbacks + * @returns DisposableProcess that can be killed with `using` statement + */ + executeStreaming( + script: string, + config: BashExecutionConfig, + callbacks: StreamingCallbacks + ): DisposableProcess { + log.debug(`BashExecutionService: Executing streaming command in ${config.cwd}`); + log.debug( + `BashExecutionService: Script: ${script.substring(0, 100)}${script.length > 100 ? "..." : ""}` + ); + + const spawnCommand = config.niceness !== undefined ? "nice" : "bash"; + const spawnArgs = + config.niceness !== undefined + ? ["-n", config.niceness.toString(), "bash", "-c", script] + : ["-c", script]; + + const child = spawn(spawnCommand, spawnArgs, { + cwd: config.cwd, + env: this.createBashEnvironment(config.secrets), + stdio: ["ignore", "pipe", "pipe"], + // Spawn as detached process group leader to prevent zombie processes + // When bash spawns background processes, detached:true allows killing + // the entire group via process.kill(-pid) + detached: config.detached ?? true, + }); + + log.debug(`BashExecutionService: Spawned process with PID ${child.pid ?? "unknown"}`); + + // Line-by-line streaming with incremental buffers + let outBuf = ""; + let errBuf = ""; + + const flushLines = (buf: string, isStderr: boolean): string => { + const lines = buf.split(/\r?\n/); + // Keep the last partial line in buffer; emit full lines + const partial = lines.pop() ?? ""; + for (const line of lines) { + if (line.length === 0) continue; + if (isStderr) { + callbacks.onStderr(line); + } else { + callbacks.onStdout(line); + } + } + return partial; + }; + + child.stdout?.on("data", (chunk: Buffer) => { + outBuf += chunk.toString("utf8"); + outBuf = flushLines(outBuf, false); + }); + + child.stderr?.on("data", (chunk: Buffer) => { + errBuf += chunk.toString("utf8"); + errBuf = flushLines(errBuf, true); + }); + + child.on("close", (code: number | null) => { + log.debug(`BashExecutionService: Process exited with code ${code ?? "unknown"}`); + // Flush any remaining partial lines + if (outBuf.trim().length > 0) { + callbacks.onStdout(outBuf); + } + if (errBuf.trim().length > 0) { + callbacks.onStderr(errBuf); + } + callbacks.onExit(code ?? 0); + }); + + child.on("error", (error: Error) => { + log.error(`BashExecutionService: Process error:`, error); + }); + + return new DisposableProcess(child); + } +} diff --git a/src/services/initStateManager.test.ts b/src/services/initStateManager.test.ts new file mode 100644 index 000000000..936ea3b1e --- /dev/null +++ b/src/services/initStateManager.test.ts @@ -0,0 +1,255 @@ +import * as fs from "fs/promises"; +import * as path from "path"; +import * as os from "os"; +import { describe, it, expect, beforeEach, afterEach } from "bun:test"; +import { Config } from "@/config"; +import { InitStateManager } from "./initStateManager"; +import type { WorkspaceInitEvent } from "@/types/ipc"; + +describe("InitStateManager", () => { + let tempDir: string; + let config: Config; + let manager: InitStateManager; + + beforeEach(async () => { + // Create temp directory as cmux root + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "init-state-test-")); + + // Create sessions directory + const sessionsDir = path.join(tempDir, "sessions"); + await fs.mkdir(sessionsDir, { recursive: true }); + + // Config constructor takes rootDir directly + config = new Config(tempDir); + manager = new InitStateManager(config); + }); + + afterEach(async () => { + await fs.rm(tempDir, { recursive: true, force: true }); + }); + + describe("lifecycle", () => { + it("should track init hook lifecycle (start → output → end)", async () => { + const workspaceId = "test-workspace"; + const events: Array = []; + + // Subscribe to events + manager.on("init-start", (event: WorkspaceInitEvent & { workspaceId: string }) => + events.push(event) + ); + manager.on("init-output", (event: WorkspaceInitEvent & { workspaceId: string }) => + events.push(event) + ); + manager.on("init-end", (event: WorkspaceInitEvent & { workspaceId: string }) => + events.push(event) + ); + + // Start init + manager.startInit(workspaceId, "/path/to/hook"); + expect(manager.getInitState(workspaceId)).toBeTruthy(); + expect(manager.getInitState(workspaceId)?.status).toBe("running"); + + // Append output + manager.appendOutput(workspaceId, "Installing deps...", false); + manager.appendOutput(workspaceId, "Done!", false); + expect(manager.getInitState(workspaceId)?.lines).toEqual([ + { line: "Installing deps...", isError: false, timestamp: expect.any(Number) as number }, + { line: "Done!", isError: false, timestamp: expect.any(Number) as number }, + ]); + + // End init (await to ensure event fires) + await manager.endInit(workspaceId, 0); + expect(manager.getInitState(workspaceId)?.status).toBe("success"); + expect(manager.getInitState(workspaceId)?.exitCode).toBe(0); + + // Verify events + expect(events).toHaveLength(4); // start + 2 outputs + end + expect(events[0].type).toBe("init-start"); + expect(events[1].type).toBe("init-output"); + expect(events[2].type).toBe("init-output"); + expect(events[3].type).toBe("init-end"); + }); + + it("should track stderr lines with isError flag", () => { + const workspaceId = "test-workspace"; + manager.startInit(workspaceId, "/path/to/hook"); + + manager.appendOutput(workspaceId, "stdout line", false); + manager.appendOutput(workspaceId, "stderr line", true); + + const state = manager.getInitState(workspaceId); + expect(state?.lines).toEqual([ + { line: "stdout line", isError: false, timestamp: expect.any(Number) as number }, + { line: "stderr line", isError: true, timestamp: expect.any(Number) as number }, + ]); + }); + + it("should set status to error on non-zero exit code", async () => { + const workspaceId = "test-workspace"; + manager.startInit(workspaceId, "/path/to/hook"); + await manager.endInit(workspaceId, 1); + + const state = manager.getInitState(workspaceId); + expect(state?.status).toBe("error"); + expect(state?.exitCode).toBe(1); + }); + }); + + describe("persistence", () => { + it("should persist state to disk on endInit", async () => { + const workspaceId = "test-workspace"; + manager.startInit(workspaceId, "/path/to/hook"); + manager.appendOutput(workspaceId, "Line 1", false); + manager.appendOutput(workspaceId, "Line 2", true); + await manager.endInit(workspaceId, 0); + + // Read from disk + const diskState = await manager.readInitStatus(workspaceId); + expect(diskState).toBeTruthy(); + expect(diskState?.status).toBe("success"); + expect(diskState?.exitCode).toBe(0); + expect(diskState?.lines).toEqual([ + { line: "Line 1", isError: false, timestamp: expect.any(Number) as number }, + { line: "Line 2", isError: true, timestamp: expect.any(Number) as number }, + ]); + }); + + it("should replay from in-memory state when available", async () => { + const workspaceId = "test-workspace"; + const events: Array = []; + + manager.on("init-start", (event: WorkspaceInitEvent & { workspaceId: string }) => + events.push(event) + ); + manager.on("init-output", (event: WorkspaceInitEvent & { workspaceId: string }) => + events.push(event) + ); + manager.on("init-end", (event: WorkspaceInitEvent & { workspaceId: string }) => + events.push(event) + ); + + // Create state + manager.startInit(workspaceId, "/path/to/hook"); + manager.appendOutput(workspaceId, "Line 1", false); + await manager.endInit(workspaceId, 0); + + events.length = 0; // Clear events + + // Replay from in-memory + await manager.replayInit(workspaceId); + + expect(events).toHaveLength(3); // start + output + end + expect(events[0].type).toBe("init-start"); + expect(events[1].type).toBe("init-output"); + expect(events[2].type).toBe("init-end"); + }); + + it("should replay from disk when not in memory", async () => { + const workspaceId = "test-workspace"; + const events: Array = []; + + // Create and persist state + manager.startInit(workspaceId, "/path/to/hook"); + manager.appendOutput(workspaceId, "Line 1", false); + manager.appendOutput(workspaceId, "Error line", true); + await manager.endInit(workspaceId, 1); + + // Clear in-memory state (simulate process restart) + manager.clearInMemoryState(workspaceId); + expect(manager.getInitState(workspaceId)).toBeUndefined(); + + // Subscribe to events + manager.on("init-start", (event: WorkspaceInitEvent & { workspaceId: string }) => + events.push(event) + ); + manager.on("init-output", (event: WorkspaceInitEvent & { workspaceId: string }) => + events.push(event) + ); + manager.on("init-end", (event: WorkspaceInitEvent & { workspaceId: string }) => + events.push(event) + ); + + // Replay from disk + await manager.replayInit(workspaceId); + + expect(events).toHaveLength(4); // start + 2 outputs + end + expect(events[0].type).toBe("init-start"); + expect(events[1].type).toBe("init-output"); + expect((events[1] as { line: string }).line).toBe("Line 1"); + expect(events[2].type).toBe("init-output"); + expect((events[2] as { line: string }).line).toBe("Error line"); + expect((events[2] as { isError?: boolean }).isError).toBe(true); + expect(events[3].type).toBe("init-end"); + expect((events[3] as { exitCode: number }).exitCode).toBe(1); + }); + + it("should not replay if no state exists", async () => { + const workspaceId = "nonexistent-workspace"; + const events: Array = []; + + manager.on("init-start", (event: WorkspaceInitEvent & { workspaceId: string }) => + events.push(event) + ); + manager.on("init-output", (event: WorkspaceInitEvent & { workspaceId: string }) => + events.push(event) + ); + manager.on("init-end", (event: WorkspaceInitEvent & { workspaceId: string }) => + events.push(event) + ); + + await manager.replayInit(workspaceId); + + expect(events).toHaveLength(0); + }); + }); + + describe("cleanup", () => { + it("should delete persisted state from disk", async () => { + const workspaceId = "test-workspace"; + manager.startInit(workspaceId, "/path/to/hook"); + await manager.endInit(workspaceId, 0); + + // Verify state exists + const stateBeforeDelete = await manager.readInitStatus(workspaceId); + expect(stateBeforeDelete).toBeTruthy(); + + // Delete + await manager.deleteInitStatus(workspaceId); + + // Verify deleted + const stateAfterDelete = await manager.readInitStatus(workspaceId); + expect(stateAfterDelete).toBeNull(); + }); + + it("should clear in-memory state", () => { + const workspaceId = "test-workspace"; + manager.startInit(workspaceId, "/path/to/hook"); + + expect(manager.getInitState(workspaceId)).toBeTruthy(); + + manager.clearInMemoryState(workspaceId); + + expect(manager.getInitState(workspaceId)).toBeUndefined(); + }); + }); + + describe("error handling", () => { + it("should handle appendOutput with no active state", () => { + const workspaceId = "nonexistent-workspace"; + // Should not throw + manager.appendOutput(workspaceId, "Line", false); + }); + + it("should handle endInit with no active state", async () => { + const workspaceId = "nonexistent-workspace"; + // Should not throw + await manager.endInit(workspaceId, 0); + }); + + it("should handle deleteInitStatus for nonexistent file", async () => { + const workspaceId = "nonexistent-workspace"; + // Should not throw + await manager.deleteInitStatus(workspaceId); + }); + }); +}); diff --git a/src/services/initStateManager.ts b/src/services/initStateManager.ts new file mode 100644 index 000000000..495c977f1 --- /dev/null +++ b/src/services/initStateManager.ts @@ -0,0 +1,251 @@ +import { EventEmitter } from "events"; +import type { Config } from "@/config"; +import { EventStore } from "@/utils/eventStore"; +import type { WorkspaceInitEvent } from "@/types/ipc"; +import { log } from "@/services/log"; + +/** + * Output line with timestamp for replay timing. + */ +export interface TimedLine { + line: string; + isError: boolean; // true if from stderr + timestamp: number; +} + +/** + * Persisted state for init hooks. + * Stored in ~/.cmux/sessions/{workspaceId}/init-status.json + */ +export interface InitStatus { + status: "running" | "success" | "error"; + hookPath: string; + startTime: number; + lines: TimedLine[]; + exitCode: number | null; + endTime: number | null; // When init-end event occurred +} + +/** + * In-memory state for active init hooks. + * Currently identical to InitStatus, but kept separate for future extension. + */ +type InitHookState = InitStatus; + +/** + * InitStateManager - Manages init hook lifecycle with persistence and replay. + * + * Uses EventStore abstraction for state management: + * - In-memory Map for active init hooks (via EventStore) + * - Disk persistence to init-status.json for replay across page reloads + * - EventEmitter for streaming events to AgentSession + * - Permanent storage (never auto-deleted, unlike stream partials) + * + * Key differences from StreamManager: + * - Simpler state machine (running → success/error, no abort) + * - No throttling (init hooks emit discrete lines, not streaming tokens) + * - Permanent persistence (init logs kept forever as workspace metadata) + * + * Lifecycle: + * 1. startInit() - Create in-memory state, emit init-start + * 2. appendOutput() - Accumulate lines, emit init-output + * 3. endInit() - Finalize state, write to disk, emit init-end + * 4. State remains in memory until cleared or process restart + * 5. replayInit() - Re-emit events from in-memory or disk state (via EventStore) + */ +export class InitStateManager extends EventEmitter { + private readonly store: EventStore; + + constructor(config: Config) { + super(); + this.store = new EventStore( + config, + "init-status.json", + (state) => this.serializeInitEvents(state), + (event) => this.emit(event.type, event), + "InitStateManager" + ); + } + + /** + * Serialize InitHookState into array of events for replay. + * Used by EventStore.replay() to reconstruct the event stream. + */ + private serializeInitEvents( + state: InitHookState & { workspaceId?: string } + ): Array { + const events: Array = []; + const workspaceId = state.workspaceId ?? "unknown"; + + // Emit init-start + events.push({ + type: "init-start", + workspaceId, + hookPath: state.hookPath, + timestamp: state.startTime, + }); + + // Emit init-output for each accumulated line with original timestamps + for (const timedLine of state.lines) { + events.push({ + type: "init-output", + workspaceId, + line: timedLine.line, + isError: timedLine.isError, + timestamp: timedLine.timestamp, // Use original timestamp for replay + }); + } + + // Emit init-end (only if completed) + if (state.exitCode !== null) { + events.push({ + type: "init-end", + workspaceId, + exitCode: state.exitCode, + timestamp: state.endTime ?? state.startTime, + }); + } + + return events; + } + + /** + * Start tracking a new init hook execution. + * Creates in-memory state and emits init-start event. + */ + startInit(workspaceId: string, hookPath: string): void { + const startTime = Date.now(); + + const state: InitHookState = { + status: "running", + hookPath, + startTime, + lines: [], + exitCode: null, + endTime: null, + }; + + this.store.setState(workspaceId, state); + + log.debug(`Init hook started for workspace ${workspaceId}: ${hookPath}`); + + // Emit init-start event + this.emit("init-start", { + type: "init-start", + workspaceId, + hookPath, + timestamp: startTime, + } satisfies WorkspaceInitEvent & { workspaceId: string }); + } + + /** + * Append output line from init hook. + * Accumulates in state and emits init-output event. + */ + appendOutput(workspaceId: string, line: string, isError: boolean): void { + const state = this.store.getState(workspaceId); + + if (!state) { + log.error(`appendOutput called for workspace ${workspaceId} with no active init state`); + return; + } + + const timestamp = Date.now(); + + // Store line with isError flag and timestamp + state.lines.push({ line, isError, timestamp }); + + // Emit init-output event + this.emit("init-output", { + type: "init-output", + workspaceId, + line, + isError, + timestamp, + } satisfies WorkspaceInitEvent & { workspaceId: string }); + } + + /** + * Finalize init hook execution. + * Updates state, persists to disk, and emits init-end event. + */ + async endInit(workspaceId: string, exitCode: number): Promise { + const state = this.store.getState(workspaceId); + + if (!state) { + log.error(`endInit called for workspace ${workspaceId} with no active init state`); + return; + } + + const endTime = Date.now(); + state.status = exitCode === 0 ? "success" : "error"; + state.exitCode = exitCode; + state.endTime = endTime; + + // Persist to disk (fire-and-forget, errors logged internally by EventStore) + await this.store.persist(workspaceId, state); + + log.info( + `Init hook ${state.status} for workspace ${workspaceId} (exit code ${exitCode}, duration ${endTime - state.startTime}ms)` + ); + + // Emit init-end event + this.emit("init-end", { + type: "init-end", + workspaceId, + exitCode, + timestamp: endTime, + } satisfies WorkspaceInitEvent & { workspaceId: string }); + + // Keep state in memory for replay (unlike streams which delete immediately) + } + + /** + * Get current in-memory init state for a workspace. + * Returns undefined if no init state exists. + */ + getInitState(workspaceId: string): InitHookState | undefined { + return this.store.getState(workspaceId); + } + + /** + * Read persisted init status from disk. + * Returns null if no status file exists. + */ + async readInitStatus(workspaceId: string): Promise { + return this.store.readPersisted(workspaceId); + } + + /** + * Replay init events for a workspace. + * Delegates to EventStore.replay() which: + * 1. Checks in-memory state first, then falls back to disk + * 2. Serializes state into events via serializeInitEvents() + * 3. Emits events (init-start, init-output*, init-end) + * + * This is called during AgentSession.emitHistoricalEvents() to ensure + * init state is visible after page reloads. + */ + async replayInit(workspaceId: string): Promise { + // Pass workspaceId as context for serialization + await this.store.replay(workspaceId, { workspaceId }); + } + + /** + * Delete persisted init status from disk. + * Useful for testing or manual cleanup. + * Does NOT clear in-memory state (for active replay). + */ + async deleteInitStatus(workspaceId: string): Promise { + await this.store.deletePersisted(workspaceId); + } + + /** + * Clear in-memory state for a workspace. + * Useful for testing or cleanup after workspace deletion. + * Does NOT delete disk file (use deleteInitStatus for that). + */ + clearInMemoryState(workspaceId: string): void { + this.store.deleteState(workspaceId); + } +} diff --git a/src/services/ipcMain.ts b/src/services/ipcMain.ts index 465d9ad21..18b1e2382 100644 --- a/src/services/ipcMain.ts +++ b/src/services/ipcMain.ts @@ -1,6 +1,7 @@ import assert from "@/utils/assert"; import type { BrowserWindow, IpcMain as ElectronIpcMain } from "electron"; import { spawn, spawnSync } from "child_process"; +import * as fs from "fs"; import * as fsPromises from "fs/promises"; import * as path from "path"; import type { Config, ProjectConfig } from "@/config"; @@ -28,13 +29,15 @@ import { createBashTool } from "@/services/tools/bash"; import type { BashToolResult } from "@/types/tools"; import { secretsToRecord } from "@/types/secrets"; import { DisposableTempDir } from "@/services/tempDir"; +import { BashExecutionService } from "@/services/bashExecutionService"; +import { InitStateManager } from "@/services/initStateManager"; /** * IpcMain - Manages all IPC handlers and service coordination * * This class encapsulates: * - All ipcMain handler registration - * - Service lifecycle management (AIService, HistoryService, PartialService) + * - Service lifecycle management (AIService, HistoryService, PartialService, InitStateManager) * - Event forwarding from services to renderer * * Design: @@ -47,12 +50,84 @@ export class IpcMain { private readonly historyService: HistoryService; private readonly partialService: PartialService; private readonly aiService: AIService; + private readonly bashService: BashExecutionService; + private readonly initStateManager: InitStateManager; private readonly sessions = new Map(); private readonly sessionSubscriptions = new Map< string, { chat: () => void; metadata: () => void } >(); private mainWindow: BrowserWindow | null = null; + + // Run optional .cmux/init hook for a newly created workspace and stream its output + private async startWorkspaceInitHook(params: { + projectPath: string; + worktreePath: string; + workspaceId: string; + }): Promise { + const { projectPath, worktreePath, workspaceId } = params; + const hookPath = path.join(projectPath, ".cmux", "init"); + + // Check if hook exists and is executable + const exists = await fsPromises + .access(hookPath, fs.constants.X_OK) + .then(() => true) + .catch(() => false); + + if (!exists) { + log.debug(`No init hook found at ${hookPath}`); + return; // Nothing to do + } + + log.info(`Starting init hook for workspace ${workspaceId}: ${hookPath}`); + + // Start init hook tracking (creates in-memory state + emits init-start event) + // This MUST complete before we return so replayInit() finds state + this.initStateManager.startInit(workspaceId, hookPath); + + // Launch the hook process (don't await completion) + void (() => { + try { + const startTime = Date.now(); + + // Execute init hook through centralized bash service + // Quote path to handle spaces and special characters + this.bashService.executeStreaming( + `"${hookPath}"`, + { + cwd: worktreePath, + detached: false, // Don't need process group for simple script execution + }, + { + onStdout: (line) => { + this.initStateManager.appendOutput(workspaceId, line, false); + }, + onStderr: (line) => { + this.initStateManager.appendOutput(workspaceId, line, true); + }, + onExit: (exitCode) => { + const duration = Date.now() - startTime; + const status = exitCode === 0 ? "success" : "error"; + log.info( + `Init hook ${status} for workspace ${workspaceId} (exit code ${exitCode}, duration ${duration}ms)` + ); + // Finalize init state (automatically emits init-end event and persists to disk) + void this.initStateManager.endInit(workspaceId, exitCode); + }, + } + ); + } catch (error) { + log.error(`Failed to run init hook for workspace ${workspaceId}:`, error); + // Report error through init state manager + this.initStateManager.appendOutput( + workspaceId, + error instanceof Error ? error.message : String(error), + true + ); + void this.initStateManager.endInit(workspaceId, -1); + } + })(); + } private registered = false; constructor(config: Config) { @@ -60,6 +135,8 @@ export class IpcMain { this.historyService = new HistoryService(config); this.partialService = new PartialService(config, this.historyService); this.aiService = new AIService(config, this.historyService, this.partialService); + this.bashService = new BashExecutionService(); + this.initStateManager = new InitStateManager(config); } private getOrCreateSession(workspaceId: string): AgentSession { @@ -78,6 +155,7 @@ export class IpcMain { historyService: this.historyService, partialService: this.partialService, aiService: this.aiService, + initStateManager: this.initStateManager, }); const chatUnsubscribe = session.onChatEvent((event) => { @@ -247,6 +325,14 @@ export class IpcMain { const session = this.getOrCreateSession(workspaceId); session.emitMetadata(completeMetadata); + // Start optional .cmux/init hook (waits for state creation, then returns) + // This ensures replayInit() will find state when frontend subscribes + await this.startWorkspaceInitHook({ + projectPath, + worktreePath: result.path, + workspaceId, + }); + // Return complete metadata with paths for frontend return { success: true, diff --git a/src/stores/WorkspaceStore.ts b/src/stores/WorkspaceStore.ts index b91782b32..ec649630b 100644 --- a/src/stores/WorkspaceStore.ts +++ b/src/stores/WorkspaceStore.ts @@ -9,20 +9,7 @@ import { updatePersistedState } from "@/hooks/usePersistedState"; import { getRetryStateKey } from "@/constants/storage"; import { CUSTOM_EVENTS } from "@/constants/events"; import { useSyncExternalStore } from "react"; -import { - isCaughtUpMessage, - isStreamError, - isDeleteMessage, - isStreamStart, - isStreamDelta, - isStreamEnd, - isStreamAbort, - isToolCallStart, - isToolCallDelta, - isToolCallEnd, - isReasoningDelta, - isReasoningEnd, -} from "@/types/ipc"; +import { isCaughtUpMessage, isStreamError, isDeleteMessage } from "@/types/ipc"; import { MapStore } from "./MapStore"; import { createDisplayUsage } from "@/utils/tokens/displayUsage"; import { WorkspaceConsumerManager } from "./WorkspaceConsumerManager"; @@ -122,6 +109,96 @@ export class WorkspaceStore { private historicalMessages = new Map(); private pendingStreamEvents = new Map(); + /** + * Map of event types to their handlers. This is the single source of truth for: + * 1. Which events should be buffered during replay (the keys) + * 2. How to process those events (the values) + * + * By keeping check and processing in one place, we make it structurally impossible + * to buffer an event type without having a handler for it. + */ + private readonly bufferedEventHandlers: Record< + string, + ( + workspaceId: string, + aggregator: StreamingMessageAggregator, + data: WorkspaceChatMessage + ) => void + > = { + "stream-start": (workspaceId, aggregator, data) => { + aggregator.handleStreamStart(data as never); + if (this.onModelUsed) { + this.onModelUsed((data as { model: string }).model); + } + updatePersistedState(getRetryStateKey(workspaceId), { + attempt: 0, + retryStartTime: Date.now(), + }); + this.states.bump(workspaceId); + }, + "stream-delta": (workspaceId, aggregator, data) => { + aggregator.handleStreamDelta(data as never); + this.states.bump(workspaceId); + }, + "stream-end": (workspaceId, aggregator, data) => { + aggregator.handleStreamEnd(data as never); + aggregator.clearTokenState((data as { messageId: string }).messageId); + + if (this.handleCompactionCompletion(workspaceId, aggregator, data)) { + return; + } + + this.states.bump(workspaceId); + this.checkAndBumpRecencyIfChanged(); + this.finalizeUsageStats(workspaceId, (data as { metadata?: never }).metadata); + }, + "stream-abort": (workspaceId, aggregator, data) => { + aggregator.clearTokenState((data as { messageId: string }).messageId); + aggregator.handleStreamAbort(data as never); + + if (this.handleCompactionAbort(workspaceId, aggregator, data)) { + return; + } + + this.states.bump(workspaceId); + this.dispatchResumeCheck(workspaceId); + this.finalizeUsageStats(workspaceId, (data as { metadata?: never }).metadata); + }, + "tool-call-start": (workspaceId, aggregator, data) => { + aggregator.handleToolCallStart(data as never); + this.states.bump(workspaceId); + }, + "tool-call-delta": (workspaceId, aggregator, data) => { + aggregator.handleToolCallDelta(data as never); + this.states.bump(workspaceId); + }, + "tool-call-end": (workspaceId, aggregator, data) => { + aggregator.handleToolCallEnd(data as never); + this.states.bump(workspaceId); + this.consumerManager.scheduleCalculation(workspaceId, aggregator); + }, + "reasoning-delta": (workspaceId, aggregator, data) => { + aggregator.handleReasoningDelta(data as never); + this.states.bump(workspaceId); + }, + "reasoning-end": (workspaceId, aggregator, data) => { + aggregator.handleReasoningEnd(data as never); + this.states.bump(workspaceId); + }, + "init-start": (workspaceId, aggregator, data) => { + aggregator.handleMessage(data); + this.states.bump(workspaceId); + }, + "init-output": (workspaceId, aggregator, data) => { + aggregator.handleMessage(data); + this.states.bump(workspaceId); + }, + "init-end": (workspaceId, aggregator, data) => { + aggregator.handleMessage(data); + this.states.bump(workspaceId); + }, + }; + // Cache of last known recency per workspace (for change detection) private recencyCache = new Map(); @@ -780,18 +857,12 @@ export class WorkspaceStore { return this.aggregators.get(workspaceId)!; } - private isStreamEvent(data: WorkspaceChatMessage): boolean { - return ( - isStreamStart(data) || - isStreamDelta(data) || - isStreamEnd(data) || - isStreamAbort(data) || - isToolCallStart(data) || - isToolCallDelta(data) || - isToolCallEnd(data) || - isReasoningDelta(data) || - isReasoningEnd(data) - ); + /** + * Check if data is a buffered event type by checking the handler map. + * This ensures isStreamEvent() and processStreamEvent() can never fall out of sync. + */ + private isBufferedEvent(data: WorkspaceChatMessage): boolean { + return "type" in data && data.type in this.bufferedEventHandlers; } private handleChatMessage(workspaceId: string, data: WorkspaceChatMessage): void { @@ -835,8 +906,21 @@ export class WorkspaceStore { return; } - // Buffer stream events until caught up (so they have full historical context) - if (!isCaughtUp && this.isStreamEvent(data)) { + // OPTIMIZATION: Buffer stream events until caught-up to reduce excess re-renders + // When first subscribing to a workspace, we receive: + // 1. Historical messages from chat.jsonl (potentially hundreds of messages) + // 2. Partial stream state (if stream was interrupted) + // 3. Active stream events (if currently streaming) + // + // Without buffering, each event would trigger a separate re-render as messages + // arrive one-by-one over IPC. By buffering until "caught-up", we: + // - Load all historical messages in one batch (O(1) render instead of O(N)) + // - Replay buffered stream events after history is loaded + // - Provide correct context for stream continuation (history is complete) + // + // This is especially important for workspaces with long histories (100+ messages), + // where unbuffered rendering would cause visible lag and UI stutter. + if (!isCaughtUp && this.isBufferedEvent(data)) { const pending = this.pendingStreamEvents.get(workspaceId) ?? []; pending.push(data); this.pendingStreamEvents.set(workspaceId, pending); @@ -852,6 +936,7 @@ export class WorkspaceStore { aggregator: StreamingMessageAggregator, data: WorkspaceChatMessage ): void { + // Handle non-buffered special events first if (isStreamError(data)) { aggregator.handleStreamError(data); this.states.bump(workspaceId); @@ -862,120 +947,30 @@ export class WorkspaceStore { if (isDeleteMessage(data)) { aggregator.handleDeleteMessage(data); this.states.bump(workspaceId); - this.checkAndBumpRecencyIfChanged(); // Message deleted, update recency + this.checkAndBumpRecencyIfChanged(); return; } - if (isStreamStart(data)) { - aggregator.handleStreamStart(data); - if (this.onModelUsed) { - this.onModelUsed(data.model); - } - updatePersistedState(getRetryStateKey(workspaceId), { - attempt: 0, - retryStartTime: Date.now(), - }); - this.states.bump(workspaceId); - return; - } - - if (isStreamDelta(data)) { - aggregator.handleStreamDelta(data); - // Always bump for chat components to see deltas - // Sidebar components won't re-render because getWorkspaceSidebarState() returns cached object - this.states.bump(workspaceId); - return; - } - - if (isStreamEnd(data)) { - aggregator.handleStreamEnd(data); - aggregator.clearTokenState(data.messageId); - - // Early return if compaction handled (async replacement in progress) - if (this.handleCompactionCompletion(workspaceId, aggregator, data)) { - return; - } - - // Normal stream-end handling - this.states.bump(workspaceId); - this.checkAndBumpRecencyIfChanged(); // Stream ended, update recency - - // Update usage stats and schedule consumer calculation - // MUST happen after aggregator.handleStreamEnd() stores the metadata - this.finalizeUsageStats(workspaceId, data.metadata); - - return; - } - - if (isStreamAbort(data)) { - aggregator.clearTokenState(data.messageId); - aggregator.handleStreamAbort(data); - - // Check if this was a compaction stream that got interrupted - if (this.handleCompactionAbort(workspaceId, aggregator, data)) { - // Compaction abort handled, don't do normal abort processing - return; - } - - // Normal abort handling - this.states.bump(workspaceId); - this.dispatchResumeCheck(workspaceId); - - // Update usage stats if available (abort may have usage if stream completed processing) - // MUST happen after aggregator.handleStreamAbort() stores the metadata - this.finalizeUsageStats(workspaceId, data.metadata); - - return; - } - - if (isToolCallStart(data)) { - aggregator.handleToolCallStart(data); - this.states.bump(workspaceId); - return; - } - - if (isToolCallDelta(data)) { - aggregator.handleToolCallDelta(data); - this.states.bump(workspaceId); - return; - } - - if (isToolCallEnd(data)) { - aggregator.handleToolCallEnd(data); - this.states.bump(workspaceId); - - // Bump consumers on tool-end for real-time updates during streaming - // Tools complete before stream-end, so we want breakdown to update immediately - this.consumerManager.scheduleCalculation(workspaceId, aggregator); - - return; - } - - if (isReasoningDelta(data)) { - aggregator.handleReasoningDelta(data); - this.states.bump(workspaceId); - return; - } - - if (isReasoningEnd(data)) { - aggregator.handleReasoningEnd(data); - this.states.bump(workspaceId); + // Try buffered event handlers (single source of truth) + if ("type" in data && data.type in this.bufferedEventHandlers) { + this.bufferedEventHandlers[data.type](workspaceId, aggregator, data); return; } // Regular messages (CmuxMessage without type field) const isCaughtUp = this.caughtUp.get(workspaceId) ?? false; - if (!isCaughtUp) { - if ("role" in data && !("type" in data)) { - const historicalMsgs = this.historicalMessages.get(workspaceId) ?? []; - historicalMsgs.push(data); - this.historicalMessages.set(workspaceId, historicalMsgs); - } - } else { + if (!isCaughtUp && "role" in data && !("type" in data)) { + // Buffer historical CmuxMessages + const historicalMsgs = this.historicalMessages.get(workspaceId) ?? []; + historicalMsgs.push(data); + this.historicalMessages.set(workspaceId, historicalMsgs); + } else if (isCaughtUp) { + // Process live events immediately (after history loaded) aggregator.handleMessage(data); this.states.bump(workspaceId); - this.checkAndBumpRecencyIfChanged(); // New message, update recency + this.checkAndBumpRecencyIfChanged(); } + // Note: Init events and stream events are handled by isStreamEvent() buffering above } } diff --git a/src/types/ipc.ts b/src/types/ipc.ts index e513ba3b7..9a50745f3 100644 --- a/src/types/ipc.ts +++ b/src/types/ipc.ts @@ -53,6 +53,25 @@ export interface DeleteMessage { historySequences: number[]; } +// Workspace init hook events (persisted to init-status.json, not chat.jsonl) +export type WorkspaceInitEvent = + | { + type: "init-start"; + hookPath: string; + timestamp: number; + } + | { + type: "init-output"; + line: string; + timestamp: number; + isError?: boolean; + } + | { + type: "init-end"; + exitCode: number; + timestamp: number; + }; + // Union type for workspace chat messages export type WorkspaceChatMessage = | CmuxMessage @@ -67,7 +86,8 @@ export type WorkspaceChatMessage = | ToolCallDeltaEvent | ToolCallEndEvent | ReasoningDeltaEvent - | ReasoningEndEvent; + | ReasoningEndEvent + | WorkspaceInitEvent; // Type guard for caught up messages export function isCaughtUpMessage(msg: WorkspaceChatMessage): msg is CaughtUpMessage { @@ -129,6 +149,25 @@ export function isReasoningEnd(msg: WorkspaceChatMessage): msg is ReasoningEndEv return "type" in msg && msg.type === "reasoning-end"; } +// Type guards for init events +export function isInitStart( + msg: WorkspaceChatMessage +): msg is Extract { + return "type" in msg && msg.type === "init-start"; +} + +export function isInitOutput( + msg: WorkspaceChatMessage +): msg is Extract { + return "type" in msg && msg.type === "init-output"; +} + +export function isInitEnd( + msg: WorkspaceChatMessage +): msg is Extract { + return "type" in msg && msg.type === "init-end"; +} + // Type guard for stream stats events // Options for sendMessage and resumeStream diff --git a/src/types/message.ts b/src/types/message.ts index 4da548409..a5cfaf311 100644 --- a/src/types/message.ts +++ b/src/types/message.ts @@ -166,6 +166,16 @@ export type DisplayedMessage = id: string; // Display ID for UI/React keys hiddenCount: number; // Number of messages hidden historySequence: number; // Global ordering across all messages + } + | { + type: "workspace-init"; + id: string; // Display ID for UI/React keys + historySequence: number; // Position in message stream (-1 for ephemeral, non-persisted events) + status: "running" | "success" | "error"; + hookPath: string; // Path to the init script being executed + lines: string[]; // Accumulated output lines (stderr prefixed with "ERROR:") + exitCode: number | null; // Final exit code (null while running) + timestamp: number; }; // Helper to create a simple text message diff --git a/src/utils/eventStore.test.ts b/src/utils/eventStore.test.ts new file mode 100644 index 000000000..9c46919a8 --- /dev/null +++ b/src/utils/eventStore.test.ts @@ -0,0 +1,251 @@ +import { describe, it, expect, beforeEach, afterEach } from "@jest/globals"; +import * as fs from "fs/promises"; +import * as path from "path"; +import { EventStore } from "./eventStore"; +import type { Config } from "@/config"; + +// Test types +interface TestState { + id: string; + value: number; + items: string[]; +} + +interface TestEvent { + type: "start" | "item" | "end"; + id: string; + data?: string | number; +} + +describe("EventStore", () => { + const testSessionDir = path.join(__dirname, "../../test-sessions"); + const testWorkspaceId = "test-workspace-123"; + const testFilename = "test-state.json"; + + let mockConfig: Config; + let store: EventStore; + let emittedEvents: TestEvent[] = []; + + // Test serializer: converts state into events + const serializeState = (state: TestState & { workspaceId?: string }): TestEvent[] => { + const events: TestEvent[] = []; + events.push({ type: "start", id: state.workspaceId ?? state.id, data: state.value }); + for (const item of state.items) { + events.push({ type: "item", id: state.workspaceId ?? state.id, data: item }); + } + events.push({ type: "end", id: state.workspaceId ?? state.id, data: state.items.length }); + return events; + }; + + // Test emitter: captures events + const emitEvent = (event: TestEvent): void => { + emittedEvents.push(event); + }; + + beforeEach(async () => { + // Create test session directory + try { + await fs.access(testSessionDir); + } catch { + await fs.mkdir(testSessionDir, { recursive: true }); + } + + mockConfig = { + cmuxDir: path.join(__dirname, "../.."), + sessionsDir: testSessionDir, + getSessionDir: (workspaceId: string) => path.join(testSessionDir, workspaceId), + } as unknown as Config; + + emittedEvents = []; + + store = new EventStore(mockConfig, testFilename, serializeState, emitEvent, "TestStore"); + }); + + afterEach(async () => { + // Clean up test files + try { + await fs.access(testSessionDir); + await fs.rm(testSessionDir, { recursive: true, force: true }); + } catch { + // Directory doesn't exist, nothing to clean up + } + }); + + describe("State Management", () => { + it("should store and retrieve in-memory state", () => { + const state: TestState = { id: "test", value: 42, items: ["a", "b"] }; + + store.setState(testWorkspaceId, state); + const retrieved = store.getState(testWorkspaceId); + + expect(retrieved).toEqual(state); + }); + + it("should return undefined for non-existent state", () => { + const retrieved = store.getState("non-existent"); + expect(retrieved).toBeUndefined(); + }); + + it("should delete in-memory state", () => { + const state: TestState = { id: "test", value: 42, items: [] }; + + store.setState(testWorkspaceId, state); + expect(store.hasState(testWorkspaceId)).toBe(true); + + store.deleteState(testWorkspaceId); + expect(store.hasState(testWorkspaceId)).toBe(false); + expect(store.getState(testWorkspaceId)).toBeUndefined(); + }); + + it("should check if state exists", () => { + expect(store.hasState(testWorkspaceId)).toBe(false); + + store.setState(testWorkspaceId, { id: "test", value: 1, items: [] }); + expect(store.hasState(testWorkspaceId)).toBe(true); + }); + + it("should get all active workspace IDs", () => { + store.setState("workspace-1", { id: "1", value: 1, items: [] }); + store.setState("workspace-2", { id: "2", value: 2, items: [] }); + + const ids = store.getActiveWorkspaceIds(); + expect(ids).toHaveLength(2); + expect(ids).toContain("workspace-1"); + expect(ids).toContain("workspace-2"); + }); + }); + + describe("Persistence", () => { + it("should persist state to disk", async () => { + const state: TestState = { id: "test", value: 99, items: ["x", "y", "z"] }; + + await store.persist(testWorkspaceId, state); + + // Verify file exists + const workspaceDir = path.join(testSessionDir, testWorkspaceId); + const filePath = path.join(workspaceDir, testFilename); + try { + await fs.access(filePath); + } catch { + throw new Error(`File ${filePath} does not exist`); + } + + // Verify content + const content = await fs.readFile(filePath, "utf-8"); + const parsed = JSON.parse(content) as TestState; + expect(parsed).toEqual(state); + }); + + it("should read persisted state from disk", async () => { + const state: TestState = { id: "test", value: 123, items: ["foo", "bar"] }; + + await store.persist(testWorkspaceId, state); + const retrieved = await store.readPersisted(testWorkspaceId); + + expect(retrieved).toEqual(state); + }); + + it("should return null for non-existent persisted state", async () => { + const retrieved = await store.readPersisted("non-existent"); + expect(retrieved).toBeNull(); + }); + + it("should delete persisted state from disk", async () => { + const state: TestState = { id: "test", value: 456, items: [] }; + + await store.persist(testWorkspaceId, state); + await store.deletePersisted(testWorkspaceId); + + const retrieved = await store.readPersisted(testWorkspaceId); + expect(retrieved).toBeNull(); + }); + + it("should not throw when deleting non-existent persisted state", async () => { + // Should complete without throwing (logs error but doesn't throw) + await store.deletePersisted("non-existent"); + // If we get here, it didn't throw + expect(true).toBe(true); + }); + }); + + describe("Replay", () => { + it("should replay events from in-memory state", async () => { + const state: TestState = { id: "mem", value: 10, items: ["a", "b", "c"] }; + store.setState(testWorkspaceId, state); + + await store.replay(testWorkspaceId, { workspaceId: testWorkspaceId }); + + expect(emittedEvents).toHaveLength(5); // start + 3 items + end + expect(emittedEvents[0]).toEqual({ type: "start", id: testWorkspaceId, data: 10 }); + expect(emittedEvents[1]).toEqual({ type: "item", id: testWorkspaceId, data: "a" }); + expect(emittedEvents[2]).toEqual({ type: "item", id: testWorkspaceId, data: "b" }); + expect(emittedEvents[3]).toEqual({ type: "item", id: testWorkspaceId, data: "c" }); + expect(emittedEvents[4]).toEqual({ type: "end", id: testWorkspaceId, data: 3 }); + }); + + it("should replay events from disk state when not in memory", async () => { + const state: TestState = { id: "disk", value: 20, items: ["x"] }; + + await store.persist(testWorkspaceId, state); + // Don't set in-memory state + + await store.replay(testWorkspaceId, { workspaceId: testWorkspaceId }); + + expect(emittedEvents).toHaveLength(3); // start + 1 item + end + expect(emittedEvents[0]).toEqual({ type: "start", id: testWorkspaceId, data: 20 }); + expect(emittedEvents[1]).toEqual({ type: "item", id: testWorkspaceId, data: "x" }); + expect(emittedEvents[2]).toEqual({ type: "end", id: testWorkspaceId, data: 1 }); + }); + + it("should prefer in-memory state over disk state", async () => { + const diskState: TestState = { id: "disk", value: 1, items: [] }; + const memState: TestState = { id: "mem", value: 2, items: [] }; + + await store.persist(testWorkspaceId, diskState); + store.setState(testWorkspaceId, memState); + + await store.replay(testWorkspaceId, { workspaceId: testWorkspaceId }); + + expect(emittedEvents[0]).toEqual({ type: "start", id: testWorkspaceId, data: 2 }); // Memory value + }); + + it("should do nothing when replaying non-existent state", async () => { + await store.replay("non-existent", { workspaceId: "non-existent" }); + expect(emittedEvents).toHaveLength(0); + }); + + it("should pass context to serializer", async () => { + const state: TestState = { id: "original", value: 100, items: [] }; + store.setState(testWorkspaceId, state); + + await store.replay(testWorkspaceId, { workspaceId: "override-id" }); + + // Serializer should use workspaceId from context + expect(emittedEvents[0]).toEqual({ type: "start", id: "override-id", data: 100 }); + }); + }); + + describe("Integration", () => { + it("should handle full lifecycle: set → persist → delete memory → replay from disk", async () => { + const state: TestState = { id: "lifecycle", value: 777, items: ["test"] }; + + // Set in memory + store.setState(testWorkspaceId, state); + expect(store.hasState(testWorkspaceId)).toBe(true); + + // Persist to disk + await store.persist(testWorkspaceId, state); + + // Clear memory + store.deleteState(testWorkspaceId); + expect(store.hasState(testWorkspaceId)).toBe(false); + + // Replay from disk + await store.replay(testWorkspaceId, { workspaceId: testWorkspaceId }); + + // Verify events were emitted + expect(emittedEvents).toHaveLength(3); + expect(emittedEvents[0].data).toBe(777); + }); + }); +}); diff --git a/src/utils/eventStore.ts b/src/utils/eventStore.ts new file mode 100644 index 000000000..34b8fde7f --- /dev/null +++ b/src/utils/eventStore.ts @@ -0,0 +1,195 @@ +import { SessionFileManager } from "@/utils/sessionFile"; +import type { Config } from "@/config"; +import { log } from "@/services/log"; + +/** + * EventStore - Generic state management with persistence and replay for workspace events. + * + * This abstraction captures the common pattern between InitStateManager and StreamManager: + * 1. In-memory Map for active state + * 2. Disk persistence for crash recovery / page reload + * 3. Replay by serializing state into events and emitting them + * + * Type parameters: + * - TState: The state object stored in memory/disk (e.g., InitStatus, WorkspaceStreamInfo) + * - TEvent: The event type emitted (e.g., WorkspaceInitEvent) + * + * Design pattern: + * - Composition over inheritance (doesn't extend EventEmitter directly) + * - Subclasses provide serialization logic (state → events) + * - Handles common operations (get/set/delete state, persist, replay) + * + * Example usage: + * + * class InitStateManager { + * private store = new EventStore( + * config, + * "init-status.json", + * (state) => this.serializeInitEvents(state), + * (event) => this.emit(event.type, event) + * ); + * + * async replayInit(workspaceId: string) { + * await this.store.replay(workspaceId); + * } + * } + */ +export class EventStore { + private stateMap = new Map(); + private readonly fileManager: SessionFileManager; + private readonly serializeState: (state: TState) => TEvent[]; + private readonly emitEvent: (event: TEvent) => void; + private readonly storeName: string; + + /** + * Create a new EventStore. + * + * @param config - Config object for SessionFileManager + * @param filename - Filename for persisted state (e.g., "init-status.json") + * @param serializeState - Function to convert state into array of events for replay + * @param emitEvent - Function to emit a single event (typically wraps EventEmitter.emit) + * @param storeName - Name for logging (e.g., "InitStateManager") + */ + constructor( + config: Config, + filename: string, + serializeState: (state: TState) => TEvent[], + emitEvent: (event: TEvent) => void, + storeName = "EventStore" + ) { + this.fileManager = new SessionFileManager(config, filename); + this.serializeState = serializeState; + this.emitEvent = emitEvent; + this.storeName = storeName; + } + + /** + * Get in-memory state for a workspace. + * Returns undefined if no state exists. + */ + getState(workspaceId: string): TState | undefined { + return this.stateMap.get(workspaceId); + } + + /** + * Set in-memory state for a workspace. + */ + setState(workspaceId: string, state: TState): void { + this.stateMap.set(workspaceId, state); + } + + /** + * Delete in-memory state for a workspace. + * Does NOT delete the persisted file (use deletePersisted for that). + */ + deleteState(workspaceId: string): void { + this.stateMap.delete(workspaceId); + } + + /** + * Check if in-memory state exists for a workspace. + */ + hasState(workspaceId: string): boolean { + return this.stateMap.has(workspaceId); + } + + /** + * Read persisted state from disk. + * Returns null if no file exists. + */ + async readPersisted(workspaceId: string): Promise { + return this.fileManager.read(workspaceId); + } + + /** + * Write state to disk. + * Logs errors but doesn't throw (fire-and-forget pattern). + */ + async persist(workspaceId: string, state: TState): Promise { + const result = await this.fileManager.write(workspaceId, state); + if (!result.success) { + log.error(`[${this.storeName}] Failed to persist state for ${workspaceId}: ${result.error}`); + } + } + + /** + * Delete persisted state from disk. + * Does NOT clear in-memory state (use deleteState for that). + */ + async deletePersisted(workspaceId: string): Promise { + const result = await this.fileManager.delete(workspaceId); + if (!result.success) { + log.error( + `[${this.storeName}] Failed to delete persisted state for ${workspaceId}: ${result.error}` + ); + } + } + + /** + * Replay events for a workspace. + * Checks in-memory state first, falls back to disk. + * Emits events using the provided emitEvent function. + * + * @param workspaceId - Workspace ID to replay events for + * @param context - Optional context to pass to serializeState (e.g., workspaceId) + */ + async replay(workspaceId: string, context?: Record): Promise { + // Try in-memory state first (most recent) + let state: TState | undefined = this.stateMap.get(workspaceId); + + // Fall back to disk if not in memory + if (!state) { + const diskState = await this.fileManager.read(workspaceId); + if (!diskState) { + return; // No state to replay + } + state = diskState; + } + + // Augment state with context for serialization + const augmentedState = { ...state, ...context }; + + // Serialize state into events and emit them + const events = this.serializeState(augmentedState); + for (const event of events) { + this.emitEvent(event); + } + } + + /** + * Get all workspace IDs with in-memory state. + * Useful for debugging or cleanup. + */ + getActiveWorkspaceIds(): string[] { + return Array.from(this.stateMap.keys()); + } +} + +/** + * FUTURE REFACTORING: StreamManager Pattern + * + * StreamManager (src/services/streamManager.ts) follows a similar pattern to InitStateManager + * but has NOT been refactored to use EventStore yet due to: + * 1. Complexity: StreamManager is 1332 LoC with intricate state machine logic + * 2. Risk: Heavily tested streaming infrastructure (40+ integration tests) + * 3. Lifecycle differences: Streams auto-cleanup on completion, init logs persist forever + * + * Future refactoring could extract: + * - WorkspaceStreamInfo state management (workspaceStreams Map) + * - Replay logic (replayStream method at line 1244) + * - Partial persistence (currently using PartialService) + * + * Key differences to handle: + * - StreamManager has complex throttling (partialWriteTimer, PARTIAL_WRITE_THROTTLE_MS) + * - Different persistence strategy (partial.json → chat.jsonl → delete partial) + * - AbortController integration for stream cancellation + * - Token tracking and usage statistics + * + * Pattern for adoption: + * 1. Extract WorkspaceStreamInfo → MessagePart[] serialization into helper + * 2. Create EventStore instance for stream state (similar to InitStateManager) + * 3. Replace manual replay loop (line 1270-1272) with store.replay() + * 4. Keep existing throttling and persistence strategies (out of scope for EventStore) + * + * See InitStateManager refactor (this PR) for reference implementation. + */ diff --git a/src/utils/messages/StreamingMessageAggregator.init.test.ts b/src/utils/messages/StreamingMessageAggregator.init.test.ts new file mode 100644 index 000000000..084608bb5 --- /dev/null +++ b/src/utils/messages/StreamingMessageAggregator.init.test.ts @@ -0,0 +1,76 @@ +import { StreamingMessageAggregator } from "./StreamingMessageAggregator"; + +interface InitDisplayedMessage { + type: "workspace-init"; + status: "running" | "success" | "error"; + lines: string[]; + exitCode: number | null; +} + +describe("Init display after cleanup changes", () => { + it("should display init messages correctly", () => { + const aggregator = new StreamingMessageAggregator(); + + // Simulate init start + aggregator.handleMessage({ + type: "init-start", + hookPath: "/test/.cmux/init", + timestamp: Date.now(), + }); + + let messages = aggregator.getDisplayedMessages(); + expect(messages).toHaveLength(1); + expect(messages[0].type).toBe("workspace-init"); + expect((messages[0] as InitDisplayedMessage).status).toBe("running"); + + // Simulate init output + aggregator.handleMessage({ + type: "init-output", + line: "Installing dependencies...", + timestamp: Date.now(), + isError: false, + }); + + messages = aggregator.getDisplayedMessages(); + expect(messages).toHaveLength(1); + expect((messages[0] as InitDisplayedMessage).lines).toContain("Installing dependencies..."); + + // Simulate init end + aggregator.handleMessage({ + type: "init-end", + exitCode: 0, + timestamp: Date.now(), + }); + + messages = aggregator.getDisplayedMessages(); + expect(messages).toHaveLength(1); + expect((messages[0] as InitDisplayedMessage).status).toBe("success"); + expect((messages[0] as InitDisplayedMessage).exitCode).toBe(0); + }); + + it("should handle init-output without init-start (defensive)", () => { + const aggregator = new StreamingMessageAggregator(); + + // This might crash with non-null assertion if initState is null + expect(() => { + aggregator.handleMessage({ + type: "init-output", + line: "Some output", + timestamp: Date.now(), + isError: false, + }); + }).not.toThrow(); + }); + + it("should handle init-end without init-start (defensive)", () => { + const aggregator = new StreamingMessageAggregator(); + + expect(() => { + aggregator.handleMessage({ + type: "init-end", + exitCode: 0, + timestamp: Date.now(), + }); + }).not.toThrow(); + }); +}); diff --git a/src/utils/messages/StreamingMessageAggregator.test.ts b/src/utils/messages/StreamingMessageAggregator.test.ts index 7eb9a1afe..ffce06fc9 100644 --- a/src/utils/messages/StreamingMessageAggregator.test.ts +++ b/src/utils/messages/StreamingMessageAggregator.test.ts @@ -1,692 +1,137 @@ -import { describe, it, expect } from "bun:test"; +import { describe, test, expect } from "bun:test"; import { StreamingMessageAggregator } from "./StreamingMessageAggregator"; -import type { StreamEndEvent } from "@/types/stream"; -import type { DynamicToolPart } from "@/types/toolParts"; describe("StreamingMessageAggregator", () => { - it("should preserve temporal ordering of text and tool parts", () => { - const aggregator = new StreamingMessageAggregator(); - - // Simulate a stream-end event with interleaved content - const streamEndEvent: StreamEndEvent = { - type: "stream-end", - workspaceId: "test-ws", - messageId: "msg-1", - metadata: { - model: "claude-3", - }, - parts: [ - { type: "text", text: "Let me check the weather for you." }, - { - type: "dynamic-tool", - toolCallId: "tool-1", - toolName: "getWeather", - state: "output-available", - input: { city: "SF" }, - output: { temp: 72 }, - }, - ], - }; - - // Process the event - aggregator.handleStreamEnd(streamEndEvent); - - // Get the resulting message - const messages = aggregator.getAllMessages(); - expect(messages).toHaveLength(1); - - const message = messages[0]; - expect(message.parts).toHaveLength(2); - - // Verify temporal order: text first, then tool - expect(message.parts[0].type).toBe("text"); - if (message.parts[0].type === "text") { - expect(message.parts[0].text).toBe("Let me check the weather for you."); - } - - expect(message.parts[1].type).toBe("dynamic-tool"); - const toolPart = message.parts[1] as DynamicToolPart; - expect(toolPart.toolName).toBe("getWeather"); - }); - - it("should split messages into DisplayedMessages correctly", () => { - const aggregator = new StreamingMessageAggregator(); - - // Add a user message - aggregator.handleMessage({ - id: "user-1", - role: "user", - parts: [{ type: "text", text: "Hello world" }], - metadata: { historySequence: 0 }, - }); - - // Add an assistant message with text and tool - const streamEndEvent: StreamEndEvent = { - type: "stream-end", - workspaceId: "test-ws", - messageId: "assistant-1", - metadata: { - model: "claude-3", - }, - parts: [ - { type: "text", text: "I'll help you with that." }, - { - type: "dynamic-tool", - toolCallId: "tool-1", - toolName: "searchFiles", - state: "output-available", - input: { pattern: "*.ts" }, - output: ["file1.ts", "file2.ts"], - }, - ], - }; - aggregator.handleStreamEnd(streamEndEvent); - - // Get DisplayedMessages - const displayedMessages = aggregator.getDisplayedMessages(); - - // Should have 3 messages: user, assistant text, tool - expect(displayedMessages).toHaveLength(3); - - // Check user message - expect(displayedMessages[0].type).toBe("user"); - if (displayedMessages[0].type === "user") { - expect(displayedMessages[0].content).toBe("Hello world"); - } - - // Check assistant text message - expect(displayedMessages[1].type).toBe("assistant"); - if (displayedMessages[1].type === "assistant") { - expect(displayedMessages[1].content).toBe("I'll help you with that."); - expect(displayedMessages[1].isStreaming).toBe(false); - } - - // Check tool message - expect(displayedMessages[2].type).toBe("tool"); - if (displayedMessages[2].type === "tool") { - expect(displayedMessages[2].toolName).toBe("searchFiles"); - expect(displayedMessages[2].status).toBe("completed"); - expect(displayedMessages[2].args).toEqual({ pattern: "*.ts" }); - expect(displayedMessages[2].result).toEqual(["file1.ts", "file2.ts"]); - } - }); - - it("should properly interleave text and tool calls temporally", () => { - const aggregator = new StreamingMessageAggregator(); - - // Start streaming - aggregator.handleStreamStart({ - type: "stream-start", - workspaceId: "test-ws", - messageId: "msg-interleaved", - model: "claude-3", - historySequence: 0, - }); - - // Stream first part of text - aggregator.handleStreamDelta({ - type: "stream-delta", - workspaceId: "test-ws", - messageId: "msg-interleaved", - delta: "Let me search for that. ", - tokens: 0, - timestamp: Date.now(), - }); - - // Tool call interrupts - aggregator.handleToolCallStart({ - type: "tool-call-start", - workspaceId: "test-ws", - messageId: "msg-interleaved", - toolCallId: "tool-search", - toolName: "searchFiles", - args: { query: "test" }, - tokens: 0, - timestamp: Date.now(), - }); - - // More text after tool call - aggregator.handleStreamDelta({ - type: "stream-delta", - workspaceId: "test-ws", - messageId: "msg-interleaved", - delta: "I found the following results: ", - tokens: 0, - timestamp: Date.now(), - }); - - aggregator.handleStreamDelta({ - type: "stream-delta", - workspaceId: "test-ws", - messageId: "msg-interleaved", - delta: "file1.ts and file2.ts", - tokens: 0, - timestamp: Date.now(), - }); - - // Tool call completes - aggregator.handleToolCallEnd({ - type: "tool-call-end", - workspaceId: "test-ws", - messageId: "msg-interleaved", - toolCallId: "tool-search", - toolName: "searchFiles", - result: ["file1.ts", "file2.ts"], - }); - - // Get the message and verify structure - const messages = aggregator.getAllMessages(); - expect(messages).toHaveLength(1); - - const message = messages[0]; - // Should have 4 parts: text, tool, text, text (deltas not merged during streaming) - expect(message.parts).toHaveLength(4); - - // First text part (before tool) - expect(message.parts[0].type).toBe("text"); - if (message.parts[0].type === "text") { - expect(message.parts[0].text).toBe("Let me search for that. "); - } - - // Tool part in the middle - expect(message.parts[1].type).toBe("dynamic-tool"); - const toolPart = message.parts[1] as DynamicToolPart; - expect(toolPart.toolName).toBe("searchFiles"); - expect(toolPart.state).toBe("output-available"); - - // Second and third text parts (after tool) - separate deltas not yet merged - expect(message.parts[2].type).toBe("text"); - expect(message.parts[3].type).toBe("text"); - if (message.parts[2].type === "text" && message.parts[3].type === "text") { - expect(message.parts[2].text).toBe("I found the following results: "); - expect(message.parts[3].text).toBe("file1.ts and file2.ts"); - } - - // Test DisplayedMessages split - const displayedMessages = aggregator.getDisplayedMessages(); - // Should have 3 displayed messages: text, tool, text - expect(displayedMessages).toHaveLength(3); - - expect(displayedMessages[0].type).toBe("assistant"); - if (displayedMessages[0].type === "assistant") { - expect(displayedMessages[0].content).toBe("Let me search for that. "); - } - - expect(displayedMessages[1].type).toBe("tool"); - if (displayedMessages[1].type === "tool") { - expect(displayedMessages[1].toolName).toBe("searchFiles"); - } - - expect(displayedMessages[2].type).toBe("assistant"); - if (displayedMessages[2].type === "assistant") { - expect(displayedMessages[2].content).toBe( - "I found the following results: file1.ts and file2.ts" - ); - expect(displayedMessages[2].isStreaming).toBe(true); - } - }); - - it("should preserve temporal ordering after stream-end", () => { - const aggregator = new StreamingMessageAggregator(); - - // Start streaming - aggregator.handleStreamStart({ - type: "stream-start", - workspaceId: "test-ws", - messageId: "msg-end-test", - model: "claude-3", - historySequence: 0, - }); - - // Stream first text - aggregator.handleStreamDelta({ - type: "stream-delta", - workspaceId: "test-ws", - messageId: "msg-end-test", - delta: "First part. ", - tokens: 0, - timestamp: Date.now(), - }); - - // Tool interrupts - aggregator.handleToolCallStart({ - type: "tool-call-start", - workspaceId: "test-ws", - messageId: "msg-end-test", - toolCallId: "tool-1", - toolName: "readFile", - args: { file: "test.ts" }, - tokens: 0, - timestamp: Date.now(), - }); - - // More text after tool - aggregator.handleStreamDelta({ - type: "stream-delta", - workspaceId: "test-ws", - messageId: "msg-end-test", - delta: "Second part after tool.", - tokens: 0, - timestamp: Date.now(), - }); - - // End stream with complete content - should preserve temporal ordering - aggregator.handleStreamEnd({ - type: "stream-end", - workspaceId: "test-ws", - messageId: "msg-end-test", - metadata: { - model: "claude-3", - }, - parts: [ - { type: "text", text: "First part. " }, - { - type: "dynamic-tool", - toolCallId: "tool-1", - toolName: "readFile", - state: "output-available", - input: { file: "test.ts" }, - output: "file contents", - }, - { type: "text", text: "Second part after tool." }, - ], - }); - - // Verify temporal ordering is preserved - const messages = aggregator.getAllMessages(); - expect(messages).toHaveLength(1); - - const message = messages[0]; - expect(message.parts).toHaveLength(3); - - // First text part - expect(message.parts[0].type).toBe("text"); - if (message.parts[0].type === "text") { - expect(message.parts[0].text).toBe("First part. "); - } - - // Tool in the middle - expect(message.parts[1].type).toBe("dynamic-tool"); - - // Second text part - should be preserved, not merged - expect(message.parts[2].type).toBe("text"); - if (message.parts[2].type === "text") { - expect(message.parts[2].text).toBe("Second part after tool."); - } - - // Verify DisplayedMessages also maintains order - const displayed = aggregator.getDisplayedMessages(); - expect(displayed).toHaveLength(3); - expect(displayed[0].type).toBe("assistant"); - expect(displayed[1].type).toBe("tool"); - expect(displayed[2].type).toBe("assistant"); - }); - - it("should handle streaming to non-streaming transition smoothly", () => { - const aggregator = new StreamingMessageAggregator(); - - // Start streaming - aggregator.handleStreamStart({ - type: "stream-start", - workspaceId: "test-ws", - messageId: "msg-2", - model: "claude-3", - historySequence: 0, - }); - - // Add some content - aggregator.handleStreamDelta({ - type: "stream-delta", - workspaceId: "test-ws", - messageId: "msg-2", - delta: "Hello, ", - tokens: 0, - timestamp: Date.now(), - }); - - aggregator.handleStreamDelta({ - type: "stream-delta", - workspaceId: "test-ws", - messageId: "msg-2", - delta: "world!", - tokens: 0, - timestamp: Date.now(), - }); - - // End streaming - aggregator.handleStreamEnd({ - type: "stream-end", - workspaceId: "test-ws", - messageId: "msg-2", - metadata: { - model: "claude-3", - }, - parts: [{ type: "text", text: "Hello, world!" }], - }); - - // Verify the message content - const messages = aggregator.getAllMessages(); - expect(messages).toHaveLength(1); - - // Raw parts are separate deltas (2 parts: "Hello, " and "world!") - expect(messages[0].parts).toHaveLength(2); - const firstPart = messages[0].parts[0]; - if (firstPart.type === "text") { - expect(firstPart.text).toBe("Hello, "); - } - - // DisplayedMessages should merge them - const displayedMessages = aggregator.getDisplayedMessages(); - expect(displayedMessages).toHaveLength(1); - if (displayedMessages[0].type === "assistant") { - expect(displayedMessages[0].content).toBe("Hello, world!"); - } - }); - - it("should preserve sequence numbers when loading historical messages", () => { - const aggregator = new StreamingMessageAggregator(); - - // Simulate historical messages with existing history sequences - const historicalMessages = [ - { - id: "hist-1", - role: "user" as const, - parts: [{ type: "text" as const, text: "First message" }], - metadata: { historySequence: 0 }, - }, - { - id: "hist-2", - role: "assistant" as const, - parts: [{ type: "text" as const, text: "Second message" }], - metadata: { historySequence: 1 }, - }, - { - id: "hist-3", - role: "user" as const, - parts: [{ type: "text" as const, text: "Third message" }], - metadata: { historySequence: 2 }, - }, - ]; - - // Load historical messages in batch - aggregator.loadHistoricalMessages(historicalMessages); - - // Verify all messages retained their history sequences - const messages = aggregator.getAllMessages(); - expect(messages).toHaveLength(3); - expect(messages[0].metadata?.historySequence).toBe(0); - expect(messages[1].metadata?.historySequence).toBe(1); - expect(messages[2].metadata?.historySequence).toBe(2); - - // Now add a new streaming message - backend must provide historySequence - aggregator.handleStreamStart({ - type: "stream-start", - workspaceId: "test-ws", - messageId: "new-msg", - model: "claude-3", - historySequence: 3, // Backend assigns this - }); - - // Add some content so it appears in DisplayedMessages - aggregator.handleStreamDelta({ - type: "stream-delta", - workspaceId: "test-ws", - messageId: "new-msg", - delta: "New streaming content", - tokens: 0, - timestamp: Date.now(), - }); - - // Verify new message has correct history sequence (from backend) - const updatedMessages = aggregator.getAllMessages(); - expect(updatedMessages).toHaveLength(4); - expect(updatedMessages[3].metadata?.historySequence).toBe(3); - - // Verify temporal ordering in DisplayedMessages - const displayedMessages = aggregator.getDisplayedMessages(); - expect(displayedMessages).toHaveLength(4); - expect(displayedMessages[0].historySequence).toBe(0); - expect(displayedMessages[1].historySequence).toBe(1); - expect(displayedMessages[2].historySequence).toBe(2); - expect(displayedMessages[3].historySequence).toBe(3); - }); - - it("should handle addMessage() storing messages as-is", () => { - const aggregator = new StreamingMessageAggregator(); - - // Add a message with history sequence from backend - const messageWithSeq = { - id: "msg-with-seq", - role: "user" as const, - parts: [{ type: "text" as const, text: "Has history sequence" }], - metadata: { historySequence: 5 }, - }; - - aggregator.addMessage(messageWithSeq); - - // Verify history sequence was preserved - const messages = aggregator.getAllMessages(); - expect(messages[0].metadata?.historySequence).toBe(5); - - // Add another message with different history sequence - const anotherMessage = { - id: "msg-2", - role: "user" as const, - parts: [{ type: "text" as const, text: "Another message" }], - metadata: { historySequence: 10 }, - }; - - aggregator.addMessage(anotherMessage); - - // Verify both messages retained their backend-assigned sequences - const updatedMessages = aggregator.getAllMessages(); - expect(updatedMessages[0].metadata?.historySequence).toBe(5); - expect(updatedMessages[1].metadata?.historySequence).toBe(10); - }); -}); - -it("should clear TODOs on reconnection stream-end", () => { - const aggregator = new StreamingMessageAggregator(); - - // Simulate a streaming session where TODOs are written - aggregator.handleStreamStart({ - type: "stream-start", - workspaceId: "test-ws", - messageId: "msg-1", - model: "claude-3", - historySequence: 1, - }); - - // Add a tool call that writes TODOs - aggregator.handleToolCallStart({ - type: "tool-call-start", - workspaceId: "test-ws", - messageId: "msg-1", - toolCallId: "tool-1", - toolName: "todo_write", - args: { - todos: [ - { content: "Fix bug", status: "in_progress" as const }, - { content: "Write test", status: "pending" as const }, - ], - }, - tokens: 10, - timestamp: Date.now(), - }); - - // Complete the tool call successfully - aggregator.handleToolCallEnd({ - type: "tool-call-end", - workspaceId: "test-ws", - messageId: "msg-1", - toolCallId: "tool-1", - toolName: "todo_write", - result: { success: true, count: 2 }, - }); - - // Verify TODOs were set - expect(aggregator.getCurrentTodos()).toHaveLength(2); - - // Simulate reconnection case: handleStreamEnd called without active stream - // (User reconnects after stream completed) - const streamEndEvent: StreamEndEvent = { - type: "stream-end", - workspaceId: "test-ws", - messageId: "msg-2", - metadata: { model: "claude-3" }, - parts: [{ type: "text", text: "Reconnection response" }], - }; - - // Note: No handleStreamStart for msg-2, so no active stream exists - aggregator.handleStreamEnd(streamEndEvent); - - // Verify TODOs were cleared on stream-end (even in reconnection case) - expect(aggregator.getCurrentTodos()).toHaveLength(0); -}); - -describe("Part-level timestamps", () => { - it("should assign timestamps to text/reasoning parts during streaming", () => { - const aggregator = new StreamingMessageAggregator(); - const startTime = Date.now(); - - // Start a stream - aggregator.handleStreamStart({ - type: "stream-start", - workspaceId: "test-ws", - messageId: "msg-1", - model: "claude-3", - historySequence: 1, - }); - - // Add text deltas - aggregator.handleStreamDelta({ - type: "stream-delta", - workspaceId: "test-ws", - messageId: "msg-1", - delta: "First part ", - tokens: 2, - timestamp: startTime, - }); - - aggregator.handleStreamDelta({ - type: "stream-delta", - workspaceId: "test-ws", - messageId: "msg-1", - delta: "second part", - tokens: 2, - timestamp: startTime + 100, - }); - - // Add reasoning delta - aggregator.handleReasoningDelta({ - type: "reasoning-delta", - workspaceId: "test-ws", - messageId: "msg-1", - delta: "thinking...", - tokens: 1, - timestamp: startTime + 200, - }); - - // End stream - aggregator.handleStreamEnd({ - type: "stream-end", - workspaceId: "test-ws", - messageId: "msg-1", - metadata: { - model: "claude-3", - historySequence: 1, - }, - parts: [], - }); - - // Check that parts have timestamps - const messages = aggregator.getAllMessages(); - expect(messages).toHaveLength(1); - const msg = messages[0]; - - // Text parts should have timestamps - const textParts = msg.parts.filter((p) => p.type === "text"); - expect(textParts.length).toBeGreaterThan(0); - for (const part of textParts) { - if (part.type === "text") { - expect(part.timestamp).toBeNumber(); + describe("init state reference stability", () => { + test("should return new array reference when state changes", () => { + const aggregator = new StreamingMessageAggregator(); + + // Start init hook + aggregator.handleMessage({ + type: "init-start", + hookPath: "/test/init", + timestamp: Date.now(), + }); + + const messages1 = aggregator.getDisplayedMessages(); + + // Add output to change state + aggregator.handleMessage({ + type: "init-output", + line: "Line 1", + isError: false, + timestamp: Date.now(), + }); + + const messages2 = aggregator.getDisplayedMessages(); + + // Array references should be different when state changes + expect(messages1).not.toBe(messages2); + }); + + test("should return new lines array reference when init state changes", () => { + const aggregator = new StreamingMessageAggregator(); + + // Start init hook + aggregator.handleMessage({ + type: "init-start", + hookPath: "/test/init", + timestamp: Date.now(), + }); + + const messages1 = aggregator.getDisplayedMessages(); + const initMsg1 = messages1.find((m) => m.type === "workspace-init"); + expect(initMsg1).toBeDefined(); + + // Add output + aggregator.handleMessage({ + type: "init-output", + line: "Line 1", + isError: false, + timestamp: Date.now(), + }); + + const messages2 = aggregator.getDisplayedMessages(); + const initMsg2 = messages2.find((m) => m.type === "workspace-init"); + expect(initMsg2).toBeDefined(); + + // Lines array should be a NEW reference (critical for React.memo) + if (initMsg1?.type === "workspace-init" && initMsg2?.type === "workspace-init") { + expect(initMsg1.lines).not.toBe(initMsg2.lines); + expect(initMsg2.lines).toHaveLength(1); + expect(initMsg2.lines[0]).toBe("Line 1"); } - } + }); - // Reasoning parts should have timestamps - const reasoningParts = msg.parts.filter((p) => p.type === "reasoning"); - expect(reasoningParts.length).toBeGreaterThan(0); - for (const part of reasoningParts) { - if (part.type === "reasoning") { - expect(part.timestamp).toBeNumber(); + test("should create new init message object on each state change", () => { + const aggregator = new StreamingMessageAggregator(); + + // Start init hook + aggregator.handleMessage({ + type: "init-start", + hookPath: "/test/init", + timestamp: Date.now(), + }); + + const messages1 = aggregator.getDisplayedMessages(); + const initMsg1 = messages1.find((m) => m.type === "workspace-init"); + + // Add multiple outputs + aggregator.handleMessage({ + type: "init-output", + line: "Line 1", + isError: false, + timestamp: Date.now(), + }); + + const messages2 = aggregator.getDisplayedMessages(); + const initMsg2 = messages2.find((m) => m.type === "workspace-init"); + + aggregator.handleMessage({ + type: "init-output", + line: "Line 2", + isError: false, + timestamp: Date.now(), + }); + + const messages3 = aggregator.getDisplayedMessages(); + const initMsg3 = messages3.find((m) => m.type === "workspace-init"); + + // Each message object should be a new reference + expect(initMsg1).not.toBe(initMsg2); + expect(initMsg2).not.toBe(initMsg3); + + // Lines arrays should be different references + if ( + initMsg1?.type === "workspace-init" && + initMsg2?.type === "workspace-init" && + initMsg3?.type === "workspace-init" + ) { + expect(initMsg1.lines).not.toBe(initMsg2.lines); + expect(initMsg2.lines).not.toBe(initMsg3.lines); + + // Verify content progression + expect(initMsg1.lines).toHaveLength(0); + expect(initMsg2.lines).toHaveLength(1); + expect(initMsg3.lines).toHaveLength(2); } - } - }); - - it("should preserve individual part timestamps when displaying", () => { - const aggregator = new StreamingMessageAggregator(); - const startTime = 1000; - - // Simulate stream-end with pre-timestamped parts - aggregator.handleStreamEnd({ - type: "stream-end", - workspaceId: "test-ws", - messageId: "msg-1", - metadata: { - model: "claude-3", - historySequence: 1, - timestamp: startTime, // Message-level timestamp - }, - parts: [ - { type: "text", text: "First", timestamp: startTime }, - { type: "text", text: " second", timestamp: startTime + 100 }, - { type: "reasoning", text: "thinking", timestamp: startTime + 200 }, - ], }); - // Get displayed messages - const displayed = aggregator.getDisplayedMessages(); - - // Should have merged text parts into one display message and one reasoning message - const assistantMsgs = displayed.filter((m) => m.type === "assistant"); - const reasoningMsgs = displayed.filter((m) => m.type === "reasoning"); + test("should return same cached reference when state has not changed", () => { + const aggregator = new StreamingMessageAggregator(); - expect(assistantMsgs).toHaveLength(1); - expect(reasoningMsgs).toHaveLength(1); + // Start init hook + aggregator.handleMessage({ + type: "init-start", + hookPath: "/test/init", + timestamp: Date.now(), + }); - // Assistant message should use the timestamp of the first text part - expect(assistantMsgs[0].timestamp).toBe(startTime); - - // Reasoning message should use its part's timestamp - expect(reasoningMsgs[0].timestamp).toBe(startTime + 200); - }); + const messages1 = aggregator.getDisplayedMessages(); + const messages2 = aggregator.getDisplayedMessages(); - it("should use message-level timestamp as fallback when parts don't have timestamps", () => { - const aggregator = new StreamingMessageAggregator(); - const messageTimestamp = 5000; - - // Load a message without part-level timestamps (e.g., from old history) - aggregator.handleStreamEnd({ - type: "stream-end", - workspaceId: "test-ws", - messageId: "msg-1", - metadata: { - model: "claude-3", - historySequence: 1, - timestamp: messageTimestamp, - }, - parts: [ - { type: "text", text: "No timestamp" }, - { type: "reasoning", text: "thinking" }, - ], + // When no state changes, cache should return same reference + expect(messages1).toBe(messages2); }); - - const displayed = aggregator.getDisplayedMessages(); - const assistantMsgs = displayed.filter((m) => m.type === "assistant"); - const reasoningMsgs = displayed.filter((m) => m.type === "reasoning"); - - // Both should fall back to message-level timestamp - expect(assistantMsgs[0].timestamp).toBe(messageTimestamp); - expect(reasoningMsgs[0].timestamp).toBe(messageTimestamp); }); }); diff --git a/src/utils/messages/StreamingMessageAggregator.ts b/src/utils/messages/StreamingMessageAggregator.ts index b7ea83a1d..67af75874 100644 --- a/src/utils/messages/StreamingMessageAggregator.ts +++ b/src/utils/messages/StreamingMessageAggregator.ts @@ -14,6 +14,7 @@ import type { import type { TodoItem } from "@/types/tools"; import type { WorkspaceChatMessage, StreamErrorMessage, DeleteMessage } from "@/types/ipc"; +import { isInitStart, isInitOutput, isInitEnd } from "@/types/ipc"; import type { DynamicToolPart, DynamicToolPartPending, @@ -50,6 +51,15 @@ export class StreamingMessageAggregator { // Current TODO list (updated when todo_write succeeds) private currentTodos: TodoItem[] = []; + // Workspace init hook state (ephemeral, not persisted to history) + private initState: { + status: "running" | "success" | "error"; + hookPath: string; + lines: string[]; + exitCode: number | null; + timestamp: number; + } | null = null; + // Workspace creation timestamp (used for recency calculation) private readonly createdAt?: string; @@ -407,10 +417,6 @@ export class StreamingMessageAggregator { return; } - console.log( - `[Aggregator] tool-call-start: toolName=${data.toolName}, args=${JSON.stringify(data.args).substring(0, 50)}..., tokens=${data.tokens}` - ); - // Add tool part to maintain temporal order const toolPart: DynamicToolPartPending = { type: "dynamic-tool", @@ -429,10 +435,6 @@ export class StreamingMessageAggregator { } handleToolCallDelta(data: ToolCallDeltaEvent): void { - const deltaStr = String(data.delta); - console.log( - `[Aggregator] tool-call-delta: toolName=${data.toolName}, delta=${deltaStr.substring(0, 20)}..., tokens=${data.tokens}` - ); // Track delta for token counting and TPS calculation this.trackDelta(data.messageId, data.tokens, data.timestamp, "tool-args"); // Tool deltas are for display - args are in dynamic-tool part @@ -495,6 +497,35 @@ export class StreamingMessageAggregator { } handleMessage(data: WorkspaceChatMessage): void { + // Handle init hook events (ephemeral, not persisted to history) + if (isInitStart(data)) { + this.initState = { + status: "running", + hookPath: data.hookPath, + lines: [], + exitCode: null, + timestamp: data.timestamp, + }; + this.invalidateCache(); + return; + } + + if (isInitOutput(data)) { + if (!this.initState) return; // Defensive: shouldn't happen but handle gracefully + const line = data.isError ? `ERROR: ${data.line}` : data.line; + this.initState.lines.push(line.trimEnd()); + this.invalidateCache(); + return; + } + + if (isInitEnd(data)) { + if (!this.initState) return; // Defensive: shouldn't happen but handle gracefully + this.initState.exitCode = data.exitCode; + this.initState.status = data.exitCode === 0 ? "success" : "error"; + this.invalidateCache(); + return; + } + // Handle regular messages (user messages, historical messages) // Check if it's a CmuxMessage (has role property but no type) if ("role" in data && !("type" in data)) { @@ -717,6 +748,21 @@ export class StreamingMessageAggregator { } } + // Add init state if present (ephemeral, appears at top) + if (this.initState) { + const initMessage: DisplayedMessage = { + type: "workspace-init", + id: "workspace-init", + historySequence: -1, // Appears before all history + status: this.initState.status, + hookPath: this.initState.hookPath, + lines: [...this.initState.lines], // Shallow copy for React.memo change detection + exitCode: this.initState.exitCode, + timestamp: this.initState.timestamp, + }; + displayedMessages.unshift(initMessage); + } + // Limit to last N messages for DOM performance // Full history is still maintained internally for token counting if (displayedMessages.length > MAX_DISPLAYED_MESSAGES) { diff --git a/src/utils/messages/messageUtils.ts b/src/utils/messages/messageUtils.ts index 22bcfd229..a8a34f22f 100644 --- a/src/utils/messages/messageUtils.ts +++ b/src/utils/messages/messageUtils.ts @@ -8,7 +8,12 @@ import type { DisplayedMessage } from "@/types/message"; * - For multi-part messages, only show on the last part */ export function shouldShowInterruptedBarrier(msg: DisplayedMessage): boolean { - if (msg.type === "user" || msg.type === "stream-error" || msg.type === "history-hidden") + if ( + msg.type === "user" || + msg.type === "stream-error" || + msg.type === "history-hidden" || + msg.type === "workspace-init" + ) return false; // Only show on the last part of multi-part messages diff --git a/src/utils/sessionFile.ts b/src/utils/sessionFile.ts new file mode 100644 index 000000000..79f457fc8 --- /dev/null +++ b/src/utils/sessionFile.ts @@ -0,0 +1,86 @@ +import * as fs from "fs/promises"; +import * as path from "path"; +import type { Result } from "@/types/result"; +import { Ok, Err } from "@/types/result"; +import type { Config } from "@/config"; +import { workspaceFileLocks } from "@/utils/concurrency/workspaceFileLocks"; + +/** + * Shared utility for managing JSON files in workspace session directories. + * Provides consistent file locking, error handling, and path resolution. + * + * Used by PartialService, InitStateManager, and other services that need + * to persist state to ~/.cmux/sessions/{workspaceId}/. + */ +export class SessionFileManager { + private readonly config: Config; + private readonly fileName: string; + private readonly fileLocks = workspaceFileLocks; + + constructor(config: Config, fileName: string) { + this.config = config; + this.fileName = fileName; + } + + private getFilePath(workspaceId: string): string { + return path.join(this.config.getSessionDir(workspaceId), this.fileName); + } + + /** + * Read JSON file from workspace session directory. + * Returns null if file doesn't exist (not an error). + */ + async read(workspaceId: string): Promise { + try { + const filePath = this.getFilePath(workspaceId); + const data = await fs.readFile(filePath, "utf-8"); + return JSON.parse(data) as T; + } catch (error) { + if (error && typeof error === "object" && "code" in error && error.code === "ENOENT") { + return null; // File doesn't exist + } + // Log other errors but don't fail + console.error(`Error reading ${this.fileName}:`, error); + return null; + } + } + + /** + * Write JSON file to workspace session directory with file locking. + * Creates session directory if it doesn't exist. + */ + async write(workspaceId: string, data: T): Promise> { + return this.fileLocks.withLock(workspaceId, async () => { + try { + const sessionDir = this.config.getSessionDir(workspaceId); + await fs.mkdir(sessionDir, { recursive: true }); + const filePath = this.getFilePath(workspaceId); + await fs.writeFile(filePath, JSON.stringify(data, null, 2)); + return Ok(undefined); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + return Err(`Failed to write ${this.fileName}: ${message}`); + } + }); + } + + /** + * Delete JSON file from workspace session directory with file locking. + * Idempotent - no error if file doesn't exist. + */ + async delete(workspaceId: string): Promise> { + return this.fileLocks.withLock(workspaceId, async () => { + try { + const filePath = this.getFilePath(workspaceId); + await fs.unlink(filePath); + return Ok(undefined); + } catch (error) { + if (error && typeof error === "object" && "code" in error && error.code === "ENOENT") { + return Ok(undefined); // Already deleted + } + const message = error instanceof Error ? error.message : String(error); + return Err(`Failed to delete ${this.fileName}: ${message}`); + } + }); + } +} diff --git a/tests/ipcMain/setup.ts b/tests/ipcMain/setup.ts index 60a95d18f..c0cb3ba01 100644 --- a/tests/ipcMain/setup.ts +++ b/tests/ipcMain/setup.ts @@ -19,7 +19,7 @@ export interface TestEnvironment { mockIpcRenderer: Electron.IpcRenderer; mockWindow: BrowserWindow; tempDir: string; - sentEvents: Array<{ channel: string; data: unknown }>; + sentEvents: Array<{ channel: string; data: unknown; timestamp: number }>; } /** @@ -27,14 +27,14 @@ export interface TestEnvironment { */ function createMockBrowserWindow(): { window: BrowserWindow; - sentEvents: Array<{ channel: string; data: unknown }>; + sentEvents: Array<{ channel: string; data: unknown; timestamp: number }>; } { - const sentEvents: Array<{ channel: string; data: unknown }> = []; + const sentEvents: Array<{ channel: string; data: unknown; timestamp: number }> = []; const mockWindow = { webContents: { send: (channel: string, data: unknown) => { - sentEvents.push({ channel, data }); + sentEvents.push({ channel, data, timestamp: Date.now() }); }, openDevTools: jest.fn(), } as unknown as WebContents, diff --git a/tests/ipcMain/workspaceInitHook.test.ts b/tests/ipcMain/workspaceInitHook.test.ts new file mode 100644 index 000000000..052bac5cb --- /dev/null +++ b/tests/ipcMain/workspaceInitHook.test.ts @@ -0,0 +1,428 @@ +import { shouldRunIntegrationTests, createTestEnvironment, cleanupTestEnvironment } from "./setup"; +import { IPC_CHANNELS, getChatChannel } from "../../src/constants/ipc-constants"; +import { generateBranchName, createWorkspace } from "./helpers"; +import type { WorkspaceChatMessage, WorkspaceInitEvent } from "../../src/types/ipc"; +import { isInitStart, isInitOutput, isInitEnd } from "../../src/types/ipc"; +import * as path from "path"; +import * as os from "os"; + +// Skip all tests if TEST_INTEGRATION is not set +const describeIntegration = shouldRunIntegrationTests() ? describe : describe.skip; + +/** + * Create a temp git repo with a .cmux/init hook that writes to stdout/stderr and exits with a given code + */ +async function createTempGitRepoWithInitHook(options: { + exitCode: number; + stdoutLines?: string[]; + stderrLines?: string[]; + sleepBetweenLines?: number; // milliseconds +}): Promise { + const fs = await import("fs/promises"); + const { exec } = await import("child_process"); + const { promisify } = await import("util"); + const execAsync = promisify(exec); + + // Use mkdtemp to avoid race conditions + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "cmux-test-init-hook-")); + + // Initialize git repo + await execAsync(`git init`, { cwd: tempDir }); + await execAsync(`git config user.email "test@example.com" && git config user.name "Test User"`, { + cwd: tempDir, + }); + await execAsync(`echo "test" > README.md && git add . && git commit -m "Initial commit"`, { + cwd: tempDir, + }); + + // Create .cmux directory + const cmuxDir = path.join(tempDir, ".cmux"); + await fs.mkdir(cmuxDir, { recursive: true }); + + // Create init hook script + const hookPath = path.join(cmuxDir, "init"); + const sleepCmd = options.sleepBetweenLines ? `sleep ${options.sleepBetweenLines / 1000}` : ""; + + const stdoutCmds = (options.stdoutLines ?? []) + .map((line, idx) => { + const needsSleep = sleepCmd && idx < (options.stdoutLines?.length ?? 0) - 1; + return `echo "${line}"${needsSleep ? `\n${sleepCmd}` : ""}`; + }) + .join("\n"); + + const stderrCmds = (options.stderrLines ?? []).map((line) => `echo "${line}" >&2`).join("\n"); + + const scriptContent = `#!/usr/bin/env bash +${stdoutCmds} +${stderrCmds} +exit ${options.exitCode} +`; + + await fs.writeFile(hookPath, scriptContent, { mode: 0o755 }); + + return tempDir; +} + +/** + * Cleanup temporary git repository + */ +async function cleanupTempGitRepo(repoPath: string): Promise { + const fs = await import("fs/promises"); + const maxRetries = 3; + let lastError: unknown; + + for (let i = 0; i < maxRetries; i++) { + try { + await fs.rm(repoPath, { recursive: true, force: true }); + return; + } catch (error) { + lastError = error; + if (i < maxRetries - 1) { + await new Promise((resolve) => setTimeout(resolve, 100 * (i + 1))); + } + } + } + console.warn(`Failed to cleanup temp git repo after ${maxRetries} attempts:`, lastError); +} + +describeIntegration("IpcMain workspace init hook integration tests", () => { + test.concurrent( + "should stream init hook output and allow workspace usage on hook success", + async () => { + const env = await createTestEnvironment(); + const tempGitRepo = await createTempGitRepoWithInitHook({ + exitCode: 0, + stdoutLines: ["Installing dependencies...", "Build complete!"], + stderrLines: ["Warning: deprecated package"], + }); + + try { + const branchName = generateBranchName("init-hook-success"); + + // Create workspace (which will trigger the hook) + const createResult = await createWorkspace(env.mockIpcRenderer, tempGitRepo, branchName); + expect(createResult.success).toBe(true); + if (!createResult.success) return; + + const workspaceId = createResult.metadata.id; + + // Wait for hook to complete by polling sentEvents + const deadline = Date.now() + 10000; + let initEvents: WorkspaceInitEvent[] = []; + while (Date.now() < deadline) { + // Filter sentEvents for this workspace's init events on chat channel + initEvents = env.sentEvents + .filter((e) => e.channel === getChatChannel(workspaceId)) + .map((e) => e.data as WorkspaceChatMessage) + .filter( + (msg) => isInitStart(msg) || isInitOutput(msg) || isInitEnd(msg) + ) as WorkspaceInitEvent[]; + + // Check if we have the end event + const hasEnd = initEvents.some((e) => isInitEnd(e)); + if (hasEnd) break; + + // Wait a bit before checking again + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + // Verify we got the end event + const successEndEvent = initEvents.find((e) => isInitEnd(e)); + if (!successEndEvent) { + throw new Error("Hook did not complete in time"); + } + + // Verify event sequence + expect(initEvents.length).toBeGreaterThan(0); + + // First event should be start + const startEvent = initEvents.find((e) => isInitStart(e)); + expect(startEvent).toBeDefined(); + if (startEvent && isInitStart(startEvent)) { + expect(startEvent.hookPath).toContain(".cmux/init"); + } + + // Should have output and error lines + const outputEvents = initEvents.filter((e) => isInitOutput(e) && !e.isError) as Extract< + WorkspaceInitEvent, + { type: "init-output" } + >[]; + const errorEvents = initEvents.filter((e) => isInitOutput(e) && e.isError) as Extract< + WorkspaceInitEvent, + { type: "init-output" } + >[]; + + expect(outputEvents.length).toBe(2); + expect(outputEvents[0].line).toBe("Installing dependencies..."); + expect(outputEvents[1].line).toBe("Build complete!"); + + expect(errorEvents.length).toBe(1); + expect(errorEvents[0].line).toBe("Warning: deprecated package"); + + // Last event should be end with exitCode 0 + const finalEvent = initEvents[initEvents.length - 1]; + expect(isInitEnd(finalEvent)).toBe(true); + if (isInitEnd(finalEvent)) { + expect(finalEvent.exitCode).toBe(0); + } + + // Workspace should be usable - verify getInfo succeeds + const info = await env.mockIpcRenderer.invoke(IPC_CHANNELS.WORKSPACE_GET_INFO, workspaceId); + expect(info).not.toBeNull(); + expect(info.id).toBe(workspaceId); + } finally { + await cleanupTestEnvironment(env); + await cleanupTempGitRepo(tempGitRepo); + } + }, + 15000 + ); + + test.concurrent( + "should stream init hook output and allow workspace usage on hook failure", + async () => { + const env = await createTestEnvironment(); + const tempGitRepo = await createTempGitRepoWithInitHook({ + exitCode: 1, + stdoutLines: ["Starting setup..."], + stderrLines: ["ERROR: Failed to install dependencies"], + }); + + try { + const branchName = generateBranchName("init-hook-failure"); + + // Create workspace + const createResult = await createWorkspace(env.mockIpcRenderer, tempGitRepo, branchName); + expect(createResult.success).toBe(true); + if (!createResult.success) return; + + const workspaceId = createResult.metadata.id; + + // Wait for hook to complete by polling sentEvents + const deadline = Date.now() + 10000; + let initEvents: WorkspaceInitEvent[] = []; + while (Date.now() < deadline) { + initEvents = env.sentEvents + .filter((e) => e.channel === getChatChannel(workspaceId)) + .map((e) => e.data as WorkspaceChatMessage) + .filter( + (msg) => isInitStart(msg) || isInitOutput(msg) || isInitEnd(msg) + ) as WorkspaceInitEvent[]; + + const hasEnd = initEvents.some((e) => isInitEnd(e)); + if (hasEnd) break; + + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + const failureEndEvent = initEvents.find((e) => isInitEnd(e)); + if (!failureEndEvent) { + throw new Error("Hook did not complete in time"); + } + + // Verify we got events + expect(initEvents.length).toBeGreaterThan(0); + + // Should have start event + const failureStartEvent = initEvents.find((e) => isInitStart(e)); + expect(failureStartEvent).toBeDefined(); + + // Should have output and error + const failureOutputEvents = initEvents.filter((e) => isInitOutput(e) && !e.isError); + const failureErrorEvents = initEvents.filter((e) => isInitOutput(e) && e.isError); + expect(failureOutputEvents.length).toBeGreaterThanOrEqual(1); + expect(failureErrorEvents.length).toBeGreaterThanOrEqual(1); + + // Last event should be end with exitCode 1 + const failureFinalEvent = initEvents[initEvents.length - 1]; + expect(isInitEnd(failureFinalEvent)).toBe(true); + if (isInitEnd(failureFinalEvent)) { + expect(failureFinalEvent.exitCode).toBe(1); + } + + // CRITICAL: Workspace should remain usable even after hook failure + const info = await env.mockIpcRenderer.invoke(IPC_CHANNELS.WORKSPACE_GET_INFO, workspaceId); + expect(info).not.toBeNull(); + expect(info.id).toBe(workspaceId); + } finally { + await cleanupTestEnvironment(env); + await cleanupTempGitRepo(tempGitRepo); + } + }, + 15000 + ); + + test.concurrent( + "should not emit meta events when no init hook exists", + async () => { + const env = await createTestEnvironment(); + // Create repo without .cmux/init hook + const fs = await import("fs/promises"); + const { exec } = await import("child_process"); + const { promisify } = await import("util"); + const execAsync = promisify(exec); + + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "cmux-test-no-hook-")); + + try { + // Initialize git repo without hook + await execAsync(`git init`, { cwd: tempDir }); + await execAsync( + `git config user.email "test@example.com" && git config user.name "Test User"`, + { cwd: tempDir } + ); + await execAsync(`echo "test" > README.md && git add . && git commit -m "Initial commit"`, { + cwd: tempDir, + }); + + const branchName = generateBranchName("no-hook"); + + // Create workspace + const createResult = await createWorkspace(env.mockIpcRenderer, tempDir, branchName); + expect(createResult.success).toBe(true); + if (!createResult.success) return; + + const workspaceId = createResult.metadata.id; + + // Wait a bit to ensure no events are emitted + await new Promise((resolve) => setTimeout(resolve, 500)); + + // Verify no init events were sent on chat channel + const initEvents = env.sentEvents + .filter((e) => e.channel === getChatChannel(workspaceId)) + .map((e) => e.data as WorkspaceChatMessage) + .filter((msg) => isInitStart(msg) || isInitOutput(msg) || isInitEnd(msg)); + + expect(initEvents.length).toBe(0); + + // Workspace should still be usable + const info = await env.mockIpcRenderer.invoke( + IPC_CHANNELS.WORKSPACE_GET_INFO, + createResult.metadata.id + ); + expect(info).not.toBeNull(); + } finally { + await cleanupTestEnvironment(env); + await cleanupTempGitRepo(tempDir); + } + }, + 15000 + ); + + test.concurrent( + "should persist init state to disk for replay across page reloads", + async () => { + const env = await createTestEnvironment(); + const fs = await import("fs/promises"); + const repoPath = await createTempGitRepoWithInitHook({ + exitCode: 0, + stdoutLines: ["Installing dependencies", "Done!"], + stderrLines: [], + }); + + try { + const branchName = generateBranchName("replay-test"); + const createResult = await createWorkspace(env.mockIpcRenderer, repoPath, branchName); + expect(createResult.success).toBe(true); + if (!createResult.success) return; + + const workspaceId = createResult.metadata.id; + + // Wait for init hook to complete + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify init-status.json exists on disk + const initStatusPath = path.join(env.config.getSessionDir(workspaceId), "init-status.json"); + const statusExists = await fs + .access(initStatusPath) + .then(() => true) + .catch(() => false); + expect(statusExists).toBe(true); + + // Read and verify persisted state + const statusContent = await fs.readFile(initStatusPath, "utf-8"); + const status = JSON.parse(statusContent); + expect(status.status).toBe("success"); + expect(status.exitCode).toBe(0); + expect(status.lines).toEqual([ + { line: "Installing dependencies", isError: false, timestamp: expect.any(Number) }, + { line: "Done!", isError: false, timestamp: expect.any(Number) }, + ]); + expect(status.hookPath).toContain(".cmux/init"); + expect(status.startTime).toBeGreaterThan(0); + expect(status.endTime).toBeGreaterThan(status.startTime); + } finally { + await cleanupTestEnvironment(env); + await cleanupTempGitRepo(repoPath); + } + }, + 15000 + ); +}); + +test.concurrent( + "should receive init events with natural timing (not batched)", + async () => { + const env = await createTestEnvironment(); + + // Create project with slow init hook (100ms sleep between lines) + const tempGitRepo = await createTempGitRepoWithInitHook({ + exitCode: 0, + stdoutLines: ["Line 1", "Line 2", "Line 3", "Line 4"], + sleepBetweenLines: 100, // 100ms between each echo + }); + + try { + const branchName = generateBranchName("timing-test"); + const startTime = Date.now(); + + // Create workspace - init hook will start immediately + const createResult = await createWorkspace(env.mockIpcRenderer, tempGitRepo, branchName); + expect(createResult.success).toBe(true); + if (!createResult.success) return; + + const workspaceId = createResult.metadata.id; + + // Wait for all init events to arrive + const deadline = Date.now() + 10000; + let initOutputEvents: Array<{ timestamp: number; line: string }> = []; + + while (Date.now() < deadline) { + const currentEvents = env.sentEvents + .filter((e) => e.channel === getChatChannel(workspaceId)) + .filter((e) => isInitOutput(e.data as WorkspaceChatMessage)); + + initOutputEvents = currentEvents.map((e) => ({ + timestamp: e.timestamp, // Use timestamp from when event was sent + line: (e.data as { line: string }).line, + })); + + if (initOutputEvents.length >= 4) break; + await new Promise((resolve) => setTimeout(resolve, 50)); + } + + expect(initOutputEvents.length).toBe(4); + + // Calculate time between consecutive events + const timeDiffs = initOutputEvents + .slice(1) + .map((event, i) => event.timestamp - initOutputEvents[i].timestamp); + + // ASSERTION: If streaming in real-time, events should be ~100ms apart + // If batched/replayed, events will be <10ms apart + const avgTimeDiff = timeDiffs.reduce((a, b) => a + b, 0) / timeDiffs.length; + + // Real-time streaming: expect at least 70ms average (accounting for variance) + // Batched replay: would be <10ms + expect(avgTimeDiff).toBeGreaterThan(70); + + // Also verify first event arrives early (not waiting for hook to complete) + const firstEventDelay = initOutputEvents[0].timestamp - startTime; + expect(firstEventDelay).toBeLessThan(1000); // Should arrive reasonably quickly (bash startup + git worktree setup) + } finally { + await cleanupTestEnvironment(env); + await cleanupTempGitRepo(tempGitRepo); + } + }, + 15000 +);