Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/langgraph/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ngaf/langgraph",
"version": "0.0.4",
"version": "0.0.6",
"peerDependencies": {
"@ngaf/chat": "*",
"@ngaf/licensing": "*",
Expand Down
100 changes: 92 additions & 8 deletions libs/langgraph/src/lib/internals/stream-manager.bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ export function createStreamManagerBridge<T, ResolvedBag extends BagTemplate = B
// batch shouldn't tear down the entire UI (causes message DOM
// teardown + streaming renderer reset = visible jank).
} else {
subjects.messages$.next(normalized);
// Preserve existing ids by content so the final-id swap doesn't
// tear down the chat-message DOM (and its streaming-md renderer).
subjects.messages$.next(preserveIds(subjects.messages$.value, normalized));
}
storeMessageMetadata(normalized, event);
syncSubagentsFromMessages(normalized);
Expand All @@ -363,11 +365,16 @@ export function createStreamManagerBridge<T, ResolvedBag extends BagTemplate = B
if (Array.isArray(stateMessages) && stateMessages.length > 0) {
// Defensive: only sync when state carries messages. An empty
// values payload shouldn't wipe the UI mid-stream.
if (options.toMessage) {
subjects.messages$.next(stateMessages.map(options.toMessage));
} else {
subjects.messages$.next(stateMessages as BaseMessage[]);
}
const projected = options.toMessage
? stateMessages.map(options.toMessage)
: (stateMessages as BaseMessage[]);
// Preserve existing message ids when content matches. Server-
// echoed human messages and final AI messages often arrive with
// different ids than the optimistic / partial we already have —
// letting that id swap reach chat-message-list's track-by-id
// tears down DOM mid-stream. preserveIds maps new messages to
// existing-id-by-content where possible.
subjects.messages$.next(preserveIds(subjects.messages$.value, projected));
syncSubagentsFromMessages(stateMessages as BaseMessage[]);
subagentManager.reconstructFromMessages(
stateMessages as BaseMessage[],
Expand Down Expand Up @@ -670,16 +677,93 @@ function mergeMessages(existing: BaseMessage[], incoming: BaseMessage[]): BaseMe
const merged = [...existing];
for (const msg of incoming) {
const id = (msg as unknown as Record<string, unknown>)['id'];
const idx = id ? merged.findIndex(m => (m as unknown as Record<string, unknown>)['id'] === id) : -1;
let idx = id ? merged.findIndex(m => (m as unknown as Record<string, unknown>)['id'] === id) : -1;
// Fallback: match by (role, content) when ids differ. This is the path
// that fires when the server echoes back our optimistic human message
// with a server-assigned id, or when partial AI tokens carry a chunk
// id but the final canonical message has a run id. Preserving the
// existing id here keeps track-by-id stable in the chat list and
// prevents DOM teardown + animation restarts mid-stream.
if (idx < 0) {
idx = findContentMatch(merged, msg);
}
if (idx >= 0) {
merged[idx] = msg;
const existingId = (merged[idx] as unknown as Record<string, unknown>)['id'];
// Keep the *existing* id so downstream track-by-id sees stable identity.
// The replacement carries the latest content + metadata.
merged[idx] = existingId
? ({ ...(msg as object), id: existingId } as BaseMessage)
: msg;
} else {
merged.push(msg);
}
}
return merged;
}

/**
* Replace the incoming messages' ids with the existing array's ids whenever
* (role, content) matches positionally and the existing id differs. Keeps
* track-by-id stable across server echoes and final-id swaps.
*/
function preserveIds(existing: BaseMessage[], incoming: BaseMessage[]): BaseMessage[] {
if (existing.length === 0) return incoming;
const usedExisting = new Set<number>();
return incoming.map((msg, i) => {
const inRaw = msg as unknown as Record<string, unknown>;
const inId = inRaw['id'];
// First try same-position match (the dominant case).
let matchIdx = -1;
if (i < existing.length && !usedExisting.has(i) && sameRoleAndContent(existing[i], msg)) {
matchIdx = i;
} else {
// Fallback: any unused existing message with matching role+content.
matchIdx = existing.findIndex((m, j) => !usedExisting.has(j) && sameRoleAndContent(m, msg));
}
if (matchIdx < 0) return msg;
usedExisting.add(matchIdx);
const existingId = (existing[matchIdx] as unknown as Record<string, unknown>)['id'];
if (!existingId || existingId === inId) return msg;
return { ...(msg as object), id: existingId } as BaseMessage;
});
}

function sameRoleAndContent(a: BaseMessage, b: BaseMessage): boolean {
const aType = typeof a._getType === 'function' ? a._getType() : (a as unknown as Record<string, unknown>)['type'];
const bType = typeof b._getType === 'function' ? b._getType() : (b as unknown as Record<string, unknown>)['type'];
if (aType !== bType) return false;
const aContent = typeof a.content === 'string' ? a.content : JSON.stringify(a.content);
const bContent = typeof b.content === 'string' ? b.content : JSON.stringify(b.content);
if (aContent === bContent) return true;
// For AI messages we accept prefix relationships (streaming → final).
if (aType === 'ai' && typeof aContent === 'string' && typeof bContent === 'string') {
return aContent.length > 0 && (bContent.startsWith(aContent) || aContent.startsWith(bContent));
}
return false;
}

function findContentMatch(merged: BaseMessage[], incoming: BaseMessage): number {
const inRaw = incoming as unknown as Record<string, unknown>;
const inType = typeof incoming._getType === 'function' ? incoming._getType() : (inRaw['type'] as string | undefined);
const inContent = typeof incoming.content === 'string' ? incoming.content : JSON.stringify(incoming.content);
// Only worth matching for human messages (where the optimistic→echo
// mismatch happens) and for AI messages where content is a strict prefix
// of the existing (token-streaming + final-id swap pattern).
for (let i = merged.length - 1; i >= 0; i--) {
const m = merged[i] as unknown as Record<string, unknown>;
const mType = typeof (merged[i] as BaseMessage)._getType === 'function'
? (merged[i] as BaseMessage)._getType()
: (m['type'] as string | undefined);
if (mType !== inType) continue;
const mContent = typeof (merged[i] as BaseMessage).content === 'string'
? (merged[i] as BaseMessage).content as string
: JSON.stringify((merged[i] as BaseMessage).content);
if (inType === 'human' && mContent === inContent) return i;
if (inType === 'ai' && (mContent === inContent || (typeof mContent === 'string' && typeof inContent === 'string' && (inContent.startsWith(mContent) || mContent.startsWith(inContent))))) return i;
}
return -1;
}

function toSubagentRefs(
subagents: Map<string, TrackedSubagent>,
): Map<string, SubagentStreamRef> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ describe('FetchStreamTransport', () => {
]);
});

it('requests the stream modes required for values, messages, tools, and custom events', async () => {
it('requests the stream modes required for values, messages, and custom events', async () => {
mocks.runsStream.mockReturnValue((async function* () {
yield { event: 'metadata', data: { run_id: 'run-1', thread_id: 'thread-1' } };
})());
Expand All @@ -87,7 +87,6 @@ describe('FetchStreamTransport', () => {
'values',
'messages-tuple',
'updates',
'tools',
'custom',
]),
}),
Expand Down
6 changes: 5 additions & 1 deletion libs/langgraph/src/lib/transport/fetch-stream.transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ function buildRunPayload(
}

function defaultStreamMode(): StreamMode[] {
return ['values', 'messages-tuple', 'updates', 'tools', 'custom'];
// 'tools' is intentionally omitted: not supported by langgraph_api < 0.9.x
// Servers reject the entire request with HTTP 422 if any stream_mode in
// the array is unknown to them. Tool-call data is still derivable from
// the messages stream.
return ['values', 'messages-tuple', 'updates', 'custom'];
}

function normalizeSdkEvent(type: StreamEvent['type'], data: unknown): StreamEvent {
Expand Down
Loading