Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { describe, expect, it, vi } from 'vitest';
import type { Pty } from '@main/core/pty/pty';
import {
CONVERSATION_REPLACEMENT_SUSTAINED_MS,
ConversationSessionSupervisor,
} from './conversation-session-supervisor';

function fakePty(): Pty {
return {
write: vi.fn(),
resize: vi.fn(),
kill: vi.fn(),
onData: vi.fn(),
onExit: vi.fn(),
};
}

describe('ConversationSessionSupervisor', () => {
it('rejects and kills a spawn that returns after explicit stop invalidated its generation', () => {
const supervisor = new ConversationSessionSupervisor();
const token = supervisor.beginStart('session-1');
expect(token).toBeDefined();

supervisor.stop('session-1');

const pty = fakePty();
expect(supervisor.acceptSpawn('session-1', token!, pty)).toBe(false);
});

it('allows one replacement inside a failure window and then fails until sustained running resets it', () => {
vi.useFakeTimers();
try {
const supervisor = new ConversationSessionSupervisor();
const first = fakePty();
const second = fakePty();
const firstToken = supervisor.beginStart('session-1');
expect(supervisor.acceptSpawn('session-1', firstToken!, first)).toBe(true);

expect(supervisor.handleExit('session-1', first)).toEqual({ kind: 'replace' });
const secondToken = supervisor.beginStart('session-1', {
requireDesired: true,
});
expect(supervisor.acceptSpawn('session-1', secondToken!, second)).toBe(true);
expect(supervisor.handleExit('session-1', second)).toEqual({ kind: 'failed' });

const third = fakePty();
const thirdToken = supervisor.beginStart('session-2');
expect(supervisor.acceptSpawn('session-2', thirdToken!, third)).toBe(true);
expect(supervisor.handleExit('session-2', third)).toEqual({ kind: 'replace' });
const fourth = fakePty();
const fourthToken = supervisor.beginStart('session-2', {
requireDesired: true,
});
expect(supervisor.acceptSpawn('session-2', fourthToken!, fourth)).toBe(true);
vi.advanceTimersByTime(CONVERSATION_REPLACEMENT_SUSTAINED_MS);
expect(supervisor.handleExit('session-2', fourth)).toEqual({ kind: 'replace' });
} finally {
vi.useRealTimers();
}
});

it('clears the replacement failure window after a spawn failure', () => {
const supervisor = new ConversationSessionSupervisor();
const first = fakePty();
const firstToken = supervisor.beginStart('session-1');
expect(supervisor.acceptSpawn('session-1', firstToken!, first)).toBe(true);

expect(supervisor.handleExit('session-1', first)).toEqual({ kind: 'replace' });
const failedToken = supervisor.beginStart('session-1', {
requireDesired: true,
});
expect(failedToken).toBeDefined();
supervisor.failSpawn('session-1', failedToken!);

const retry = fakePty();
const retryToken = supervisor.beginStart('session-1', {
requireDesired: true,
});
expect(supervisor.acceptSpawn('session-1', retryToken!, retry)).toBe(true);
expect(supervisor.handleExit('session-1', retry)).toEqual({ kind: 'replace' });
});
});
133 changes: 133 additions & 0 deletions src/main/core/conversations/conversation-session-supervisor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import type { Pty } from '@main/core/pty/pty';

export type ConversationSpawnToken = {
generation: number;
};

type ConversationRuntime = {
desired: boolean;
pty?: Pty;
spawnInFlightGeneration?: number;
replacementGeneration: number;
replacementAttemptedInWindow: boolean;
stableTimer?: ReturnType<typeof setTimeout>;
};

export const CONVERSATION_REPLACEMENT_SUSTAINED_MS = 5_000;

export type ExitDecision =
| { kind: 'stale' }
| { kind: 'stopped' }
| { kind: 'replace' }
| { kind: 'failed' };

export class ConversationSessionSupervisor {
private runtimes = new Map<string, ConversationRuntime>();

beginStart(
sessionId: string,
options: { requireDesired?: boolean } = {}
): ConversationSpawnToken | undefined {
const runtime = this.getOrCreateRuntime(sessionId);
if (runtime.pty || runtime.spawnInFlightGeneration !== undefined) return undefined;
if (options.requireDesired === true && !runtime.desired) return undefined;

runtime.desired = true;
runtime.replacementGeneration += 1;
runtime.spawnInFlightGeneration = runtime.replacementGeneration;

return { generation: runtime.replacementGeneration };
}

acceptSpawn(sessionId: string, token: ConversationSpawnToken, pty: Pty): boolean {
const runtime = this.runtimes.get(sessionId);
if (!runtime || runtime.spawnInFlightGeneration !== token.generation) return false;

runtime.spawnInFlightGeneration = undefined;
if (!runtime.desired || runtime.replacementGeneration !== token.generation) return false;

runtime.pty = pty;
this.armStableTimer(runtime);
return true;
}

failSpawn(sessionId: string, token: ConversationSpawnToken): void {
const runtime = this.runtimes.get(sessionId);
if (!runtime || runtime.spawnInFlightGeneration !== token.generation) return;
runtime.spawnInFlightGeneration = undefined;
runtime.replacementAttemptedInWindow = false;
}

stop(sessionId: string): Pty | undefined {
const runtime = this.runtimes.get(sessionId);
if (!runtime) return undefined;

runtime.desired = false;
runtime.replacementGeneration += 1;
runtime.spawnInFlightGeneration = undefined;
this.clearStableTimer(runtime);

const pty = runtime.pty;
runtime.pty = undefined;
runtime.replacementAttemptedInWindow = false;
return pty;
}

isDesired(sessionId: string): boolean {
return this.runtimes.get(sessionId)?.desired === true;
}

handleExit(sessionId: string, pty: Pty): ExitDecision {
const runtime = this.runtimes.get(sessionId);
if (!runtime || runtime.pty !== pty) return { kind: 'stale' };

runtime.pty = undefined;
runtime.spawnInFlightGeneration = undefined;
this.clearStableTimer(runtime);

if (!runtime.desired) return { kind: 'stopped' };

if (runtime.replacementAttemptedInWindow) {
runtime.desired = false;
runtime.replacementAttemptedInWindow = false;
return { kind: 'failed' };
}

runtime.replacementAttemptedInWindow = true;
return { kind: 'replace' };
}

forget(sessionId: string): void {
const runtime = this.runtimes.get(sessionId);
if (runtime) this.clearStableTimer(runtime);
this.runtimes.delete(sessionId);
}

private getOrCreateRuntime(sessionId: string): ConversationRuntime {
let runtime = this.runtimes.get(sessionId);
if (!runtime) {
runtime = {
desired: false,
replacementGeneration: 0,
replacementAttemptedInWindow: false,
};
this.runtimes.set(sessionId, runtime);
}
return runtime;
}

private armStableTimer(runtime: ConversationRuntime): void {
this.clearStableTimer(runtime);
runtime.stableTimer = setTimeout(() => {
runtime.replacementAttemptedInWindow = false;
runtime.stableTimer = undefined;
}, CONVERSATION_REPLACEMENT_SUSTAINED_MS);
}

private clearStableTimer(runtime: ConversationRuntime): void {
if (runtime.stableTimer !== undefined) {
clearTimeout(runtime.stableTimer);
runtime.stableTimer = undefined;
}
}
}
Loading
Loading