Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/debug/agentSessionCli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import { parseArgs } from "util";
import { Config } from "@/config";
import { HistoryService } from "@/services/historyService";
import { PartialService } from "@/services/partialService";
import { AIService } from "@/services/aiService";
import { InitStateManager } from "@/services/initStateManager";
import { AIService } from "@/services/aiService";
import { AgentSession, type AgentSessionChatEvent } from "@/services/agentSession";
import {
isCaughtUpMessage,
Expand Down Expand Up @@ -209,8 +209,8 @@ async function main(): Promise<void> {

const historyService = new HistoryService(config);
const partialService = new PartialService(config, historyService);
const aiService = new AIService(config, historyService, partialService);
const initStateManager = new InitStateManager(config);
const aiService = new AIService(config, historyService, partialService, initStateManager);
ensureProvidersConfig(config);

const session = new AgentSession({
Expand Down
4 changes: 3 additions & 1 deletion src/debug/replay-history.ts
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { parseArgs } from "util";
import { defaultConfig } from "@/config";
import type { CmuxMessage } from "@/types/message";
import { createCmuxMessage } from "@/types/message";
import { InitStateManager } from "@/services/initStateManager";
import { AIService } from "@/services/aiService";
import { HistoryService } from "@/services/historyService";
import { PartialService } from "@/services/partialService";
Expand Down Expand Up @@ -123,7 +124,8 @@ async function main() {
const config = defaultConfig;
const historyService = new HistoryService(config);
const partialService = new PartialService(config, historyService);
const aiService = new AIService(config, historyService, partialService);
const initStateManager = new InitStateManager(config);
const aiService = new AIService(config, historyService, partialService, initStateManager);

const modelString = values.model ?? "openai:gpt-5-codex";
const thinkingLevel = (values.thinking ?? "high") as "low" | "medium" | "high";
Expand Down
4 changes: 3 additions & 1 deletion src/services/aiService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { describe, it, expect, beforeEach } from "bun:test";
import { AIService } from "./aiService";
import { HistoryService } from "./historyService";
import { PartialService } from "./partialService";
import { InitStateManager } from "./initStateManager";
import { Config } from "@/config";

describe("AIService", () => {
Expand All @@ -15,7 +16,8 @@ describe("AIService", () => {
const config = new Config();
const historyService = new HistoryService(config);
const partialService = new PartialService(config, historyService);
service = new AIService(config, historyService, partialService);
const initStateManager = new InitStateManager(config);
service = new AIService(config, historyService, partialService, initStateManager);
});

// Note: These tests are placeholders as Bun doesn't support Jest mocking
Expand Down
44 changes: 31 additions & 13 deletions src/services/aiService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { CmuxMessage, CmuxTextPart } from "@/types/message";
import { createCmuxMessage } from "@/types/message";
import type { Config } from "@/config";
import { StreamManager } from "./streamManager";
import type { InitStateManager } from "./initStateManager";
import type { SendMessageError } from "@/types/errors";
import { getToolsForModel } from "@/utils/tools/tools";
import { createRuntime } from "@/runtime/runtimeFactory";
Expand Down Expand Up @@ -108,17 +109,24 @@ export class AIService extends EventEmitter {
private readonly historyService: HistoryService;
private readonly partialService: PartialService;
private readonly config: Config;
private readonly initStateManager: InitStateManager;
private readonly mockModeEnabled: boolean;
private readonly mockScenarioPlayer?: MockScenarioPlayer;

constructor(config: Config, historyService: HistoryService, partialService: PartialService) {
constructor(
config: Config,
historyService: HistoryService,
partialService: PartialService,
initStateManager: InitStateManager
) {
super();
// Increase max listeners to accommodate multiple concurrent workspace listeners
// Each workspace subscribes to stream events, and we expect >10 concurrent workspaces
this.setMaxListeners(50);
this.config = config;
this.historyService = historyService;
this.partialService = partialService;
this.initStateManager = initStateManager;
this.streamManager = new StreamManager(historyService, partialService);
void this.ensureSessionsDir();
this.setupStreamEventForwarding();
Expand Down Expand Up @@ -423,12 +431,17 @@ export class AIService extends EventEmitter {

// Get tool names early for mode transition sentinel (stub config, no workspace context needed)
const earlyRuntime = createRuntime({ type: "local", srcBaseDir: process.cwd() });
const earlyAllTools = await getToolsForModel(modelString, {
cwd: process.cwd(),
runtime: earlyRuntime,
runtimeTempDir: os.tmpdir(),
secrets: {},
});
const earlyAllTools = await getToolsForModel(
modelString,
{
cwd: process.cwd(),
runtime: earlyRuntime,
runtimeTempDir: os.tmpdir(),
secrets: {},
},
"", // Empty workspace ID for early stub config
this.initStateManager
);
const earlyTools = applyToolPolicy(earlyAllTools, toolPolicy);
const toolNamesForSentinel = Object.keys(earlyTools);

Expand Down Expand Up @@ -533,12 +546,17 @@ export class AIService extends EventEmitter {
const runtimeTempDir = await this.streamManager.createTempDirForStream(streamToken, runtime);

// Get model-specific tools with workspace path (correct for local or remote)
const allTools = await getToolsForModel(modelString, {
cwd: workspacePath,
runtime,
secrets: secretsToRecord(projectSecrets),
runtimeTempDir,
});
const allTools = await getToolsForModel(
modelString,
{
cwd: workspacePath,
runtime,
secrets: secretsToRecord(projectSecrets),
runtimeTempDir,
},
workspaceId,
this.initStateManager
);

// Apply tool policy to filter tools (if policy provided)
const tools = applyToolPolicy(allTools, toolPolicy);
Expand Down
30 changes: 23 additions & 7 deletions src/services/initStateManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ describe("InitStateManager", () => {
manager.appendOutput(workspaceId, "Installing deps...", false);
manager.appendOutput(workspaceId, "Done!", false);
expect(manager.getInitState(workspaceId)?.lines).toEqual([
{ line: "Installing deps...", isError: false, timestamp: expect.any(Number) as number },
{ line: "Done!", isError: false, timestamp: expect.any(Number) as number },
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
{ line: "Installing deps...", isError: false, timestamp: expect.any(Number) },
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
{ line: "Done!", isError: false, timestamp: expect.any(Number) },
]);

// End init (await to ensure event fires)
Expand All @@ -79,8 +81,10 @@ describe("InitStateManager", () => {

const state = manager.getInitState(workspaceId);
expect(state?.lines).toEqual([
{ line: "stdout line", isError: false, timestamp: expect.any(Number) as number },
{ line: "stderr line", isError: true, timestamp: expect.any(Number) as number },
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
{ line: "stdout line", isError: false, timestamp: expect.any(Number) },
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
{ line: "stderr line", isError: true, timestamp: expect.any(Number) },
]);
});

Expand Down Expand Up @@ -109,8 +113,10 @@ describe("InitStateManager", () => {
expect(diskState?.status).toBe("success");
expect(diskState?.exitCode).toBe(0);
expect(diskState?.lines).toEqual([
{ line: "Line 1", isError: false, timestamp: expect.any(Number) as number },
{ line: "Line 2", isError: true, timestamp: expect.any(Number) as number },
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
{ line: "Line 1", isError: false, timestamp: expect.any(Number) },
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
{ line: "Line 2", isError: true, timestamp: expect.any(Number) },
]);
});

Expand Down Expand Up @@ -221,15 +227,25 @@ describe("InitStateManager", () => {
expect(stateAfterDelete).toBeNull();
});

it("should clear in-memory state", () => {
it("should clear in-memory state", async () => {
const workspaceId = "test-workspace";
manager.startInit(workspaceId, "/path/to/hook");

expect(manager.getInitState(workspaceId)).toBeTruthy();

// Get the init promise before clearing
const initPromise = manager.waitForInit(workspaceId);

// Clear in-memory state (rejects internal promise, but waitForInit catches it)
manager.clearInMemoryState(workspaceId);

// Verify state is cleared
expect(manager.getInitState(workspaceId)).toBeUndefined();

// waitForInit never throws - it resolves even when init is canceled
// This allows tools to proceed and fail naturally with their own errors
// eslint-disable-next-line @typescript-eslint/await-thenable
await expect(initPromise).resolves.toBeUndefined();
});
});

Expand Down
119 changes: 115 additions & 4 deletions src/services/initStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,29 @@ type InitHookState = InitStatus;
* - Permanent persistence (init logs kept forever as workspace metadata)
*
* Lifecycle:
* 1. startInit() - Create in-memory state, emit init-start
* 1. startInit() - Create in-memory state, emit init-start, create completion promise
* 2. appendOutput() - Accumulate lines, emit init-output
* 3. endInit() - Finalize state, write to disk, emit init-end
* 3. endInit() - Finalize state, write to disk, emit init-end, resolve promise
* 4. State remains in memory until cleared or process restart
* 5. replayInit() - Re-emit events from in-memory or disk state (via EventStore)
*
* Waiting: Tools use waitForInit() which returns a promise that resolves when
* init completes. This promise is stored in initPromises map and resolved by
* endInit(). No event listeners needed, eliminating race conditions.
*/
export class InitStateManager extends EventEmitter {
private readonly store: EventStore<InitHookState, WorkspaceInitEvent & { workspaceId: string }>;

/**
* Promise-based completion tracking for running inits.
* Each running init has a promise that resolves when endInit() is called.
* Multiple tools can await the same promise without race conditions.
*/
private readonly initPromises = new Map<
string,
{ promise: Promise<void>; resolve: () => void; reject: (error: Error) => void }
>();

constructor(config: Config) {
super();
this.store = new EventStore(
Expand Down Expand Up @@ -111,7 +125,7 @@ export class InitStateManager extends EventEmitter {

/**
* Start tracking a new init hook execution.
* Creates in-memory state and emits init-start event.
* Creates in-memory state, completion promise, and emits init-start event.
*/
startInit(workspaceId: string, hookPath: string): void {
const startTime = Date.now();
Expand All @@ -127,6 +141,21 @@ export class InitStateManager extends EventEmitter {

this.store.setState(workspaceId, state);

// Create completion promise for this init
// This allows multiple tools to await the same init without event listeners
let resolve: () => void;
let reject: (error: Error) => void;
const promise = new Promise<void>((res, rej) => {
resolve = res;
reject = rej;
});

this.initPromises.set(workspaceId, {
promise,
resolve: resolve!,
reject: reject!,
});

log.debug(`Init hook started for workspace ${workspaceId}: ${hookPath}`);

// Emit init-start event
Expand Down Expand Up @@ -167,7 +196,7 @@ export class InitStateManager extends EventEmitter {

/**
* Finalize init hook execution.
* Updates state, persists to disk, and emits init-end event.
* Updates state, persists to disk, emits init-end event, and resolves completion promise.
*/
async endInit(workspaceId: string, exitCode: number): Promise<void> {
const state = this.store.getState(workspaceId);
Expand Down Expand Up @@ -197,6 +226,13 @@ export class InitStateManager extends EventEmitter {
timestamp: endTime,
} satisfies WorkspaceInitEvent & { workspaceId: string });

// Resolve completion promise for waiting tools
const promiseEntry = this.initPromises.get(workspaceId);
if (promiseEntry) {
promiseEntry.resolve();
this.initPromises.delete(workspaceId);
}

// Keep state in memory for replay (unlike streams which delete immediately)
}

Expand Down Expand Up @@ -244,8 +280,83 @@ export class InitStateManager extends EventEmitter {
* Clear in-memory state for a workspace.
* Useful for testing or cleanup after workspace deletion.
* Does NOT delete disk file (use deleteInitStatus for that).
*
* Also cancels any running init promises to prevent orphaned waiters.
*/
clearInMemoryState(workspaceId: string): void {
this.store.deleteState(workspaceId);

// Cancel any running init promise for this workspace
const promiseEntry = this.initPromises.get(workspaceId);
if (promiseEntry) {
promiseEntry.reject(new Error(`Workspace ${workspaceId} was deleted`));
this.initPromises.delete(workspaceId);
}
}

/**
* Wait for workspace initialization to complete.
* Used by tools (bash, file_*) to ensure files are ready before executing.
*
* Behavior:
* - No init state: Returns immediately (init not needed or backwards compat)
* - Init succeeded/failed: Returns immediately (tools proceed regardless of init outcome)
* - Init running: Waits for completion promise (up to 5 minutes, then proceeds anyway)
*
* This method NEVER throws - tools should always proceed. If init fails or times out,
* the tool will either succeed (if init wasn't critical) or fail with its own error
* (e.g., file not found). This provides better error messages than blocking on init.
*
* Promise-based approach eliminates race conditions:
* - Multiple tools share the same promise (no duplicate listeners)
* - No event cleanup needed (promise auto-resolves once)
* - Timeout races handled by Promise.race()
*
* @param workspaceId Workspace ID to wait for
*/
async waitForInit(workspaceId: string): Promise<void> {
const state = this.getInitState(workspaceId);

// No init state - proceed immediately (backwards compat or init not needed)
if (!state) {
return;
}

// Init already completed (success or failure) - proceed immediately
// Tools should work regardless of init outcome
if (state.status !== "running") {
return;
}

// Init is running - wait for completion promise with timeout
const promiseEntry = this.initPromises.get(workspaceId);

if (!promiseEntry) {
// State says running but no promise exists (shouldn't happen, but handle gracefully)
log.error(`Init state is running for ${workspaceId} but no promise found, proceeding`);
return;
}

const INIT_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes
const timeoutPromise = new Promise<void>((resolve) => {
setTimeout(() => {
log.error(
`Init timeout for ${workspaceId} after 5 minutes - tools will proceed anyway. ` +
`Init will continue in background.`
);
resolve(); // Resolve, don't reject - tools should proceed
}, INIT_TIMEOUT_MS);
});

// Race between completion and timeout
// Both resolve (no rejection), so tools always proceed
try {
await Promise.race([promiseEntry.promise, timeoutPromise]);
} catch (error) {
// Init promise was rejected (e.g., workspace deleted)
// Log and proceed anyway - let the tool fail with its own error if needed
const errorMsg = error instanceof Error ? error.message : String(error);
log.error(`Init wait interrupted for ${workspaceId}: ${errorMsg} - proceeding anyway`);
}
}
}
10 changes: 8 additions & 2 deletions src/services/ipcMain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,14 @@ export class IpcMain {
this.config = config;
this.historyService = new HistoryService(config);
this.partialService = new PartialService(config, this.historyService);
this.aiService = new AIService(config, this.historyService, this.partialService);
this.bashService = new BashExecutionService();
this.initStateManager = new InitStateManager(config);
this.aiService = new AIService(
config,
this.historyService,
this.partialService,
this.initStateManager
);
this.bashService = new BashExecutionService();
}

private getOrCreateSession(workspaceId: string): AgentSession {
Expand Down Expand Up @@ -919,6 +924,7 @@ export class IpcMain {

// Create bash tool with workspace's cwd and secrets
// All IPC bash calls are from UI (background operations) - use truncate to avoid temp file spam
// No init wait needed - IPC calls are user-initiated, not AI tool use
const bashTool = createBashTool({
cwd: workspacePath, // Bash executes in the workspace directory
runtime,
Expand Down
Loading