Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 48 additions & 38 deletions src/node/services/agentSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { EventEmitter } from "events";
import * as path from "path";
import { stat, readFile } from "fs/promises";
import { PlatformPaths } from "@/common/utils/paths";
import { log } from "@/node/services/log";
import { createMuxMessage } from "@/common/types/message";
import type { Config } from "@/node/config";
import type { AIService } from "@/node/services/aiService";
Expand Down Expand Up @@ -247,48 +248,57 @@ export class AgentSession {
private async emitHistoricalEvents(
listener: (event: AgentSessionChatEvent) => void
): Promise<void> {
// Read partial BEFORE iterating history so we can skip the corresponding
// placeholder message (which has empty parts). The partial has the real content.
const streamInfo = this.aiService.getStreamInfo(this.workspaceId);
const partial = await this.partialService.readPartial(this.workspaceId);
const partialHistorySequence = partial?.metadata?.historySequence;

// Load chat history (persisted messages from chat.jsonl)
const historyResult = await this.historyService.getHistory(this.workspaceId);
if (historyResult.success) {
for (const message of historyResult.data) {
// Skip the placeholder message if we have a partial with the same historySequence.
// The placeholder has empty parts; the partial has the actual content.
// Without this, both get loaded and the empty placeholder may be shown as "last message".
if (
partialHistorySequence !== undefined &&
message.metadata?.historySequence === partialHistorySequence
) {
continue;
// try/catch/finally guarantees caught-up is always sent, even if replay fails.
// Without caught-up, the frontend stays in "Loading workspace..." forever.
try {
// Read partial BEFORE iterating history so we can skip the corresponding
// placeholder message (which has empty parts). The partial has the real content.
const streamInfo = this.aiService.getStreamInfo(this.workspaceId);
const partial = await this.partialService.readPartial(this.workspaceId);
const partialHistorySequence = partial?.metadata?.historySequence;

// Load chat history (persisted messages from chat.jsonl)
const historyResult = await this.historyService.getHistory(this.workspaceId);
if (historyResult.success) {
for (const message of historyResult.data) {
// Skip the placeholder message if we have a partial with the same historySequence.
// The placeholder has empty parts; the partial has the actual content.
// Without this, both get loaded and the empty placeholder may be shown as "last message".
if (
partialHistorySequence !== undefined &&
message.metadata?.historySequence === partialHistorySequence
) {
continue;
}
// Add type: "message" for discriminated union (messages from chat.jsonl don't have it)
listener({ workspaceId: this.workspaceId, message: { ...message, type: "message" } });
}
// Add type: "message" for discriminated union (messages from chat.jsonl don't have it)
listener({ workspaceId: this.workspaceId, message: { ...message, type: "message" } });
}
}

if (streamInfo) {
await this.aiService.replayStream(this.workspaceId);
} else if (partial) {
// Add type: "message" for discriminated union (partials from disk don't have it)
listener({ workspaceId: this.workspaceId, message: { ...partial, type: "message" } });
}

// Replay init state BEFORE caught-up (treat as historical data)
// This ensures init events are buffered correctly by the frontend,
// preserving their natural timing characteristics from the hook execution.
await this.initStateManager.replayInit(this.workspaceId);
if (streamInfo) {
await this.aiService.replayStream(this.workspaceId);
} else if (partial) {
// Add type: "message" for discriminated union (partials from disk don't have it)
listener({ workspaceId: this.workspaceId, message: { ...partial, type: "message" } });
}

// Send caught-up after ALL historical data (including init events)
// This signals frontend that replay is complete and future events are real-time
listener({
workspaceId: this.workspaceId,
message: { type: "caught-up" },
});
// Replay init state BEFORE caught-up (treat as historical data)
// This ensures init events are buffered correctly by the frontend,
// preserving their natural timing characteristics from the hook execution.
await this.initStateManager.replayInit(this.workspaceId);
} catch (error) {
log.error("Failed to replay history for workspace", {
workspaceId: this.workspaceId,
error,
});
} finally {
// Send caught-up after ALL historical data (including init events)
// This signals frontend that replay is complete and future events are real-time
listener({
workspaceId: this.workspaceId,
message: { type: "caught-up" },
});
}
}

async ensureMetadata(args: {
Expand Down