diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index b15d13cc9d..5d8ab4387f 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -110,6 +110,7 @@ export class AgentSession { this.compactionHandler = new CompactionHandler({ workspaceId: this.workspaceId, historyService: this.historyService, + partialService: this.partialService, emitter: this.emitter, }); diff --git a/src/node/services/compactionHandler.test.ts b/src/node/services/compactionHandler.test.ts index c5f591de7d..8e498ae7ed 100644 --- a/src/node/services/compactionHandler.test.ts +++ b/src/node/services/compactionHandler.test.ts @@ -1,6 +1,7 @@ import { describe, it, expect, beforeEach, mock } from "bun:test"; import { CompactionHandler } from "./compactionHandler"; import type { HistoryService } from "./historyService"; +import type { PartialService } from "./partialService"; import type { EventEmitter } from "events"; import { createMuxMessage, type MuxMessage } from "@/common/types/message"; import type { StreamEndEvent } from "@/common/types/stream"; @@ -48,6 +49,26 @@ const createMockHistoryService = () => { }; }; +const createMockPartialService = () => { + let deletePartialResult: Result = Ok(undefined); + + const deletePartial = mock((_) => Promise.resolve(deletePartialResult)); + const readPartial = mock((_) => Promise.resolve(null)); + const writePartial = mock((_, __) => Promise.resolve(Ok(undefined))); + const commitToHistory = mock((_) => Promise.resolve(Ok(undefined))); + + return { + deletePartial, + readPartial, + writePartial, + commitToHistory, + // Allow setting mock return values + mockDeletePartial: (result: Result) => { + deletePartialResult = result; + }, + }; +}; + const createMockEmitter = (): { emitter: EventEmitter; events: EmittedEvent[] } => { const events: EmittedEvent[] = []; const emitter = { @@ -112,6 +133,7 @@ const setupSuccessfulCompaction = ( describe("CompactionHandler", () => { let handler: CompactionHandler; let mockHistoryService: ReturnType; + let mockPartialService: ReturnType; let mockEmitter: EventEmitter; let emittedEvents: EmittedEvent[]; const workspaceId = "test-workspace"; @@ -122,10 +144,12 @@ describe("CompactionHandler", () => { emittedEvents = events; mockHistoryService = createMockHistoryService(); + mockPartialService = createMockPartialService(); handler = new CompactionHandler({ workspaceId, historyService: mockHistoryService as unknown as HistoryService, + partialService: mockPartialService as unknown as PartialService, emitter: mockEmitter, }); }); @@ -209,6 +233,23 @@ describe("CompactionHandler", () => { ); }); + it("should delete partial.json before clearing history (race condition fix)", async () => { + const compactionReq = createCompactionRequest(); + mockHistoryService.mockGetHistory(Ok([compactionReq])); + mockHistoryService.mockClearHistory(Ok([0])); + mockHistoryService.mockAppendToHistory(Ok(undefined)); + + const event = createStreamEndEvent("Summary"); + await handler.handleCompletion(event); + + // deletePartial should be called once before clearHistory + expect(mockPartialService.deletePartial.mock.calls).toHaveLength(1); + expect(mockPartialService.deletePartial.mock.calls[0][0]).toBe(workspaceId); + + // Verify deletePartial was called (we can't easily verify order without more complex mocking, + // but the important thing is that it IS called during compaction) + }); + it("should call clearHistory() and appendToHistory()", async () => { const compactionReq = createCompactionRequest(); mockHistoryService.mockGetHistory(Ok([compactionReq])); diff --git a/src/node/services/compactionHandler.ts b/src/node/services/compactionHandler.ts index c24c20ea10..d9d9a998ed 100644 --- a/src/node/services/compactionHandler.ts +++ b/src/node/services/compactionHandler.ts @@ -1,5 +1,6 @@ import type { EventEmitter } from "events"; import type { HistoryService } from "./historyService"; +import type { PartialService } from "./partialService"; import type { StreamEndEvent } from "@/common/types/stream"; import type { WorkspaceChatMessage, DeleteMessage } from "@/common/orpc/types"; import type { Result } from "@/common/types/result"; @@ -13,6 +14,7 @@ import { log } from "@/node/services/log"; interface CompactionHandlerOptions { workspaceId: string; historyService: HistoryService; + partialService: PartialService; emitter: EventEmitter; } @@ -27,12 +29,14 @@ interface CompactionHandlerOptions { export class CompactionHandler { private readonly workspaceId: string; private readonly historyService: HistoryService; + private readonly partialService: PartialService; private readonly emitter: EventEmitter; private readonly processedCompactionRequestIds: Set = new Set(); constructor(options: CompactionHandlerOptions) { this.workspaceId = options.workspaceId; this.historyService = options.historyService; + this.partialService = options.partialService; this.emitter = options.emitter; } @@ -106,6 +110,18 @@ export class CompactionHandler { const historicalUsage = usageHistory.length > 0 ? sumUsageHistory(usageHistory) : undefined; + // CRITICAL: Delete partial.json BEFORE clearing history + // This prevents a race condition where: + // 1. CompactionHandler clears history and appends summary + // 2. sendQueuedMessages triggers commitToHistory + // 3. commitToHistory finds stale partial.json and appends it to history + // By deleting partial first, commitToHistory becomes a no-op + const deletePartialResult = await this.partialService.deletePartial(this.workspaceId); + if (!deletePartialResult.success) { + log.warn(`Failed to delete partial before compaction: ${deletePartialResult.error}`); + // Continue anyway - the partial may not exist, which is fine + } + // Clear entire history and get deleted sequences const clearResult = await this.historyService.clearHistory(this.workspaceId); if (!clearResult.success) {