From 5dcdee066f1090f697943effd95c19e708064f09 Mon Sep 17 00:00:00 2001 From: Brian Love Date: Fri, 10 Apr 2026 12:09:19 -0700 Subject: [PATCH 1/2] fix(agent): filter SDK metadata from messages/partial events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit normalizeMessages() had two code paths: event['messages'] (returned unfiltered) and event['data'] (filtered by isMessageLike). In production, FetchStreamTransport's normalizeSdkEvent wraps the raw SDK data array—which includes metadata objects like { langgraph_node, langgraph_triggers }—into event.messages. These metadata objects lack content/type/id fields, causing messageContent() to return undefined and crashing the content classifier's detectType() on undefined.length. The fix applies the existing isMessageLike filter to the event['messages'] path. Tests now simulate post-normalization event shapes matching what FetchStreamTransport produces. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../internals/stream-manager.bridge.spec.ts | 82 +++++++++++++++++++ .../lib/internals/stream-manager.bridge.ts | 5 +- 2 files changed, 86 insertions(+), 1 deletion(-) diff --git a/libs/agent/src/lib/internals/stream-manager.bridge.spec.ts b/libs/agent/src/lib/internals/stream-manager.bridge.spec.ts index 1af26243d..744661f47 100644 --- a/libs/agent/src/lib/internals/stream-manager.bridge.spec.ts +++ b/libs/agent/src/lib/internals/stream-manager.bridge.spec.ts @@ -139,6 +139,88 @@ describe('createStreamManagerBridge', () => { } ); + it.each(['messages/partial', 'messages/complete'] as const)( + 'filters metadata from normalized SDK %s events (messages array path)', + async (type) => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of(null), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({}); + // Simulate post-normalizeSdkEvent shape: messages array includes metadata + // This is what FetchStreamTransport produces in production + transport.emit([{ + type, + messages: [ + { id: 'ai-1', type: 'ai', content: 'Hello' }, + { langgraph_node: 'chatbot', langgraph_triggers: ['start:chatbot'] }, + ], + data: [ + { id: 'ai-1', type: 'ai', content: 'Hello' }, + { langgraph_node: 'chatbot', langgraph_triggers: ['start:chatbot'] }, + ], + } as any]); + transport.close(); + + await new Promise(r => setTimeout(r, 10)); + + // Only the real message should be in messages$, not the metadata + expect(subjects.messages$.value).toHaveLength(1); + expect(subjects.messages$.value[0]).toMatchObject({ id: 'ai-1', content: 'Hello' }); + destroy$.next(); + } + ); + + it('does not accumulate metadata across multiple messages/partial events', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of(null), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({}); + + // First values event — sets up the human message + transport.emit([{ + type: 'values', + values: { messages: [{ id: 'h-1', type: 'human', content: 'hi' }] }, + } as any]); + + // Simulate multiple messages/partial events (production SDK shape) + for (let i = 0; i < 5; i++) { + transport.emit([{ + type: 'messages/partial', + messages: [ + { id: 'ai-1', type: 'ai', content: 'Hello'.slice(0, i + 1) }, + { langgraph_node: 'chatbot' }, + ], + data: [ + { id: 'ai-1', type: 'ai', content: 'Hello'.slice(0, i + 1) }, + { langgraph_node: 'chatbot' }, + ], + } as any]); + } + + transport.close(); + await new Promise(r => setTimeout(r, 10)); + + // Should only have human + AI messages, no accumulated metadata + expect(subjects.messages$.value).toHaveLength(2); + expect(subjects.messages$.value[0]).toMatchObject({ id: 'h-1', content: 'hi' }); + expect(subjects.messages$.value[1]).toMatchObject({ id: 'ai-1', content: 'Hello' }); + destroy$.next(); + }); + it('ignores late events from the previous stream after threadId changes', async () => { const transport = new MockAgentTransport(); const subjects = makeSubjects(); diff --git a/libs/agent/src/lib/internals/stream-manager.bridge.ts b/libs/agent/src/lib/internals/stream-manager.bridge.ts index fbce3cc0b..ba9adec47 100644 --- a/libs/agent/src/lib/internals/stream-manager.bridge.ts +++ b/libs/agent/src/lib/internals/stream-manager.bridge.ts @@ -255,7 +255,10 @@ function isMessagesEvent(type: StreamEvent['type']): boolean { function normalizeMessages(event: StreamEvent): unknown[] | null { const directMessages = event['messages']; if (Array.isArray(directMessages)) { - return directMessages; + // Filter out non-message metadata objects (e.g. { langgraph_node, langgraph_triggers }) + // that the LangGraph SDK includes alongside real messages in messages/* events. + const filtered = directMessages.filter(isMessageLike); + return filtered.length > 0 ? filtered : null; } const data = event['data']; From a00541e9871471e0ceac34fa0d862f237b7efcf8 Mon Sep 17 00:00:00 2001 From: Brian Love Date: Fri, 10 Apr 2026 13:25:04 -0700 Subject: [PATCH 2/2] fix(chat): wrap content classifier update in untracked() to prevent NG0600 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit classifyMessage() is called during Angular template rendering (in the AI message template via @let). The classifier's update() method writes to signals (typeSignal.set, markdownSignal.set, etc.), which Angular 21's stricter signal write guards flag as NG0600 — writing signals during change detection is forbidden. Wrapping update() in untracked() opts out of the reactive graph for this imperative push-based API. The template reads the classifier's signals after the update call returns, so reactivity is preserved. Verified with multi-turn streaming conversation against production LangGraph backend — markdown renders correctly, zero console errors. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/lib/streaming/content-classifier.ts | 132 +++++++++--------- 1 file changed, 69 insertions(+), 63 deletions(-) diff --git a/libs/chat/src/lib/streaming/content-classifier.ts b/libs/chat/src/lib/streaming/content-classifier.ts index b1d02eedf..1fc5cb4b0 100644 --- a/libs/chat/src/lib/streaming/content-classifier.ts +++ b/libs/chat/src/lib/streaming/content-classifier.ts @@ -1,5 +1,5 @@ // SPDX-License-Identifier: PolyForm-Noncommercial-1.0.0 -import { signal, type Signal } from '@angular/core'; +import { signal, untracked, type Signal } from '@angular/core'; import type { Spec } from '@json-render/core'; import { createPartialJsonParser } from '@cacheplane/partial-json'; import { createParseTreeStore, type ElementAccumulationState, type ParseTreeStore } from './parse-tree-store'; @@ -98,82 +98,88 @@ export function createContentClassifier(): ContentClassifier { } function update(content: string): void { - const currentType = typeSignal(); + // Wrap in untracked() because this is called during template rendering + // (via classifyMessage in ChatComponent's AI message template). Angular's + // NG0600 forbids writing signals during change detection; untracked() + // opts out of the reactive graph for this imperative push-based update. + untracked(() => { + const currentType = typeSignal(); + + if (currentType === 'undetermined') { + const detected = detectType(content); + if (detected === 'undetermined') return; + + typeSignal.set(detected); + + if (detected === 'markdown') { + markdownSignal.set(content); + processedLength = content.length; + } else if (detected === 'json-render') { + streamingSignal.set(true); + // Find where JSON starts (skip whitespace) + jsonStartIndex = 0; + for (let i = 0; i < content.length; i++) { + if (content[i] !== ' ' && content[i] !== '\t' && content[i] !== '\n' && content[i] !== '\r') { + jsonStartIndex = i; + break; + } + } + const jsonContent = content.slice(jsonStartIndex); + try { + initJsonStore(jsonContent); + } catch (err) { + errorsSignal.update(prev => [...prev, err instanceof Error ? err.message : String(err)]); + } + processedLength = content.length; + } else if (detected === 'a2ui') { + streamingSignal.set(true); + a2uiParser = createA2uiMessageParser(); + a2uiStore = createA2uiSurfaceStore(); + jsonStartIndex = content.indexOf(A2UI_PREFIX) + A2UI_PREFIX.length; + const a2uiContent = content.slice(jsonStartIndex); + if (a2uiContent.length > 0) { + try { + const msgs = a2uiParser.push(a2uiContent); + for (const msg of msgs) a2uiStore.apply(msg); + a2uiSurfacesSignal.set(a2uiStore.surfaces()); + } catch (err) { + errorsSignal.update(prev => [...prev, err instanceof Error ? err.message : String(err)]); + } + } + processedLength = content.length; + } + return; + } - if (currentType === 'undetermined') { - const detected = detectType(content); - if (detected === 'undetermined') return; + // Compute delta + const delta = content.slice(processedLength); + processedLength = content.length; - typeSignal.set(detected); + if (delta.length === 0) return; - if (detected === 'markdown') { + if (currentType === 'markdown' || currentType === 'mixed') { markdownSignal.set(content); - processedLength = content.length; - } else if (detected === 'json-render') { - streamingSignal.set(true); - // Find where JSON starts (skip whitespace) - jsonStartIndex = 0; - for (let i = 0; i < content.length; i++) { - if (content[i] !== ' ' && content[i] !== '\t' && content[i] !== '\n' && content[i] !== '\r') { - jsonStartIndex = i; - break; + } else if (currentType === 'json-render') { + if (store) { + try { + store.push(delta); + syncJsonSignals(); + } catch (err) { + errorsSignal.update(prev => [...prev, err instanceof Error ? err.message : String(err)]); } } - const jsonContent = content.slice(jsonStartIndex); - try { - initJsonStore(jsonContent); - } catch (err) { - errorsSignal.update(prev => [...prev, err instanceof Error ? err.message : String(err)]); - } - processedLength = content.length; - } else if (detected === 'a2ui') { - streamingSignal.set(true); - a2uiParser = createA2uiMessageParser(); - a2uiStore = createA2uiSurfaceStore(); - jsonStartIndex = content.indexOf(A2UI_PREFIX) + A2UI_PREFIX.length; - const a2uiContent = content.slice(jsonStartIndex); - if (a2uiContent.length > 0) { + } else if (currentType === 'a2ui') { + if (a2uiParser && a2uiStore) { try { - const msgs = a2uiParser.push(a2uiContent); + const msgs = a2uiParser.push(delta); for (const msg of msgs) a2uiStore.apply(msg); a2uiSurfacesSignal.set(a2uiStore.surfaces()); } catch (err) { errorsSignal.update(prev => [...prev, err instanceof Error ? err.message : String(err)]); } } - processedLength = content.length; } - return; - } - - // Compute delta - const delta = content.slice(processedLength); - processedLength = content.length; - - if (delta.length === 0) return; - - if (currentType === 'markdown' || currentType === 'mixed') { - markdownSignal.set(content); - } else if (currentType === 'json-render') { - if (store) { - try { - store.push(delta); - syncJsonSignals(); - } catch (err) { - errorsSignal.update(prev => [...prev, err instanceof Error ? err.message : String(err)]); - } - } - } else if (currentType === 'a2ui') { - if (a2uiParser && a2uiStore) { - try { - const msgs = a2uiParser.push(delta); - for (const msg of msgs) a2uiStore.apply(msg); - a2uiSurfacesSignal.set(a2uiStore.surfaces()); - } catch (err) { - errorsSignal.update(prev => [...prev, err instanceof Error ? err.message : String(err)]); - } - } - } + }); } function dispose(): void {