diff --git a/flocks/server/routes/event.py b/flocks/server/routes/event.py index 502a0f2d..fcc7f61c 100644 --- a/flocks/server/routes/event.py +++ b/flocks/server/routes/event.py @@ -147,7 +147,7 @@ async def publish_event(event_type: str, properties: dict = None, directory: str if event_type == "message.part.updated": text_len = properties.get("part", {}).get("text", "") if properties else "" delta = properties.get("delta", "") if properties else "" - log.info("event.publish.part_updated", { + log.debug("event.publish.part_updated", { "clients": broadcaster.client_count, "text_length": len(text_len) if text_len else 0, "delta_length": len(delta) if delta else 0, diff --git a/flocks/session/streaming/stream_processor.py b/flocks/session/streaming/stream_processor.py index c31a3fa5..0d52b00a 100644 --- a/flocks/session/streaming/stream_processor.py +++ b/flocks/session/streaming/stream_processor.py @@ -287,7 +287,7 @@ async def _handle_reasoning_delta(self, event: ReasoningDeltaEvent) -> None: if should_publish: self._last_reasoning_event_time[event.id] = current_time - log.info("stream.reasoning.publishing_delta", { + log.debug("stream.reasoning.publishing_delta", { "delta_len": len(event.text), "total_len": len(part.text), "reasoning_id": event.id, @@ -1055,7 +1055,7 @@ async def _handle_text_delta(self, event: TextDeltaEvent) -> None: visible_delta = self._compute_visible_delta(self._last_visible_text, visible_text) if len(self.current_text_part.text) <= 100 or len(self.current_text_part.text) % 100 < len(event.text): - log.info("stream.text.delta", { + log.debug("stream.text.delta", { "delta_length": len(event.text), "total_length": len(self.current_text_part.text), }) @@ -1086,7 +1086,7 @@ async def _handle_text_delta(self, event: TextDeltaEvent) -> None: if should_publish: self._last_text_event_time = current_time if len(self.current_text_part.text) <= 100 or len(self.current_text_part.text) % 100 < len(event.text): - log.info("stream.text.publishing_delta", { + log.debug("stream.text.publishing_delta", { "delta_len": len(visible_delta), "part_id": self.current_text_part.id, "throttled": time_since_last < self._text_event_throttle_ms, diff --git a/webui/src/components/common/SessionChat.tsx b/webui/src/components/common/SessionChat.tsx index 842657fd..1b37e14b 100644 --- a/webui/src/components/common/SessionChat.tsx +++ b/webui/src/components/common/SessionChat.tsx @@ -16,13 +16,9 @@ * - 消息复制、时间戳等可选功能 */ -import { useState, useCallback, useRef, useEffect, useMemo } from 'react'; +import { useState, useCallback, useRef, useEffect, useMemo, memo } from 'react'; import { Send, Loader2, ChevronDown, Square, Copy, User } from 'lucide-react'; -import ReactMarkdown from 'react-markdown'; -import rehypeHighlight from 'rehype-highlight'; -import rehypeRaw from 'rehype-raw'; -import remarkGfm from 'remark-gfm'; -import 'highlight.js/styles/github-dark.css'; +import { StreamingMarkdown } from './StreamingMarkdown'; import { useTranslation } from 'react-i18next'; import LoadingSpinner from './LoadingSpinner'; import { QuestionTool } from './QuestionTool'; @@ -225,7 +221,7 @@ export default function SessionChat({ const scrollToBottom = useCallback(() => { if (!isAtBottomRef.current) return; requestAnimationFrame(() => { - messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }); + messagesEndRef.current?.scrollIntoView({ behavior: 'instant' }); }); }, []); @@ -1002,7 +998,7 @@ export interface ChatMessageBubbleProps { compactedMessages?: MergedMessage[]; } -export function ChatMessageBubble({ +function ChatMessageBubbleInner({ message, isActive = false, pendingQuestions, @@ -1112,37 +1108,10 @@ export function ChatMessageBubble({ {nodeRefMatch[2]} )} -
- - {children} - - ); - } - return {children}; - }, - }} - > - {displayText} - -
+ ); })()} @@ -1364,3 +1333,31 @@ export function ChatToolPart({ part, pendingQuestion, onAnswer, onReject }: Chat ); } + +/** + * Memoized export of ChatMessageBubble. + * + * Fast path (O(1) field checks, aligned with Open WebUI's approach): + * - structural props: isActive, role, finish, parts.length + * - content probe: last part's text/thinking field + * + * Only triggers a re-render when something actually visible has changed, + * avoiding unnecessary reconciliation during high-frequency streaming. + */ +export const ChatMessageBubble = memo(ChatMessageBubbleInner, (prev, next) => { + if (prev.isActive !== next.isActive) return false; + if (prev.showActions !== next.showActions) return false; + if (prev.message.finish !== next.message.finish) return false; + const prevParts = prev.message.parts as any[] | undefined; + const nextParts = next.message.parts as any[] | undefined; + if ((prevParts?.length ?? 0) !== (nextParts?.length ?? 0)) return false; + if (prev.pendingQuestions !== next.pendingQuestions) return false; + // O(1) content probe on the last part — covers the streaming delta case + const prevLast = prevParts?.[prevParts.length - 1]; + const nextLast = nextParts?.[nextParts.length - 1]; + return ( + prevLast?.text === nextLast?.text && + prevLast?.thinking === nextLast?.thinking && + prevLast?.state?.status === nextLast?.state?.status + ); +}); diff --git a/webui/src/components/common/StreamingMarkdown.test.tsx b/webui/src/components/common/StreamingMarkdown.test.tsx new file mode 100644 index 00000000..c063d7c5 --- /dev/null +++ b/webui/src/components/common/StreamingMarkdown.test.tsx @@ -0,0 +1,175 @@ +import { describe, expect, it, vi, beforeEach, afterEach } from 'vitest'; +import { renderHook, act } from '@testing-library/react'; + +// Extract the hook for isolated testing by re-implementing it here. +// This mirrors the approach in useSessions.test.ts: test pure logic directly. +import { useState, useEffect, useRef } from 'react'; + +function useStreamingContent(content: string, isStreaming: boolean): string { + const [displayContent, setDisplayContent] = useState(content); + const pendingRafRef = useRef(null); + const latestContentRef = useRef(content); + + useEffect(() => { + latestContentRef.current = content; + if (!isStreaming) { + if (pendingRafRef.current !== null) { + cancelAnimationFrame(pendingRafRef.current); + pendingRafRef.current = null; + } + setDisplayContent(content); + } else if (pendingRafRef.current === null) { + pendingRafRef.current = requestAnimationFrame(() => { + pendingRafRef.current = null; + setDisplayContent(latestContentRef.current); + }); + } + }, [content, isStreaming]); + + useEffect( + () => () => { + if (pendingRafRef.current !== null) { + cancelAnimationFrame(pendingRafRef.current); + } + }, + [], + ); + + return displayContent; +} + +// ─── rAF fake setup ────────────────────────────────────────────────────────── + +type RafCallback = (time: number) => void; + +let rafQueue: RafCallback[] = []; +let rafIdCounter = 0; + +function setupFakeRaf() { + vi.stubGlobal('requestAnimationFrame', (cb: RafCallback) => { + rafIdCounter++; + rafQueue.push(cb); + return rafIdCounter; + }); + vi.stubGlobal('cancelAnimationFrame', (id: number) => { + // Mark cancelled by removing; simplified — good enough for these tests + rafQueue = rafQueue.filter((_, i) => i !== id - 1); + }); +} + +function flushRaf() { + const pending = [...rafQueue]; + rafQueue = []; + pending.forEach(cb => cb(performance.now())); +} + +// ─── Tests ─────────────────────────────────────────────────────────────────── + +describe('useStreamingContent', () => { + beforeEach(() => { + rafQueue = []; + rafIdCounter = 0; + setupFakeRaf(); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it('returns initial content immediately on mount', () => { + const { result } = renderHook(() => useStreamingContent('hello', false)); + expect(result.current).toBe('hello'); + }); + + it('non-streaming: updates displayContent synchronously when content changes', () => { + const { result, rerender } = renderHook( + ({ content, isStreaming }) => useStreamingContent(content, isStreaming), + { initialProps: { content: 'a', isStreaming: false } }, + ); + + expect(result.current).toBe('a'); + + act(() => { + rerender({ content: 'b', isStreaming: false }); + }); + + expect(result.current).toBe('b'); + }); + + it('streaming: does not update displayContent until rAF fires', () => { + const { result, rerender } = renderHook( + ({ content, isStreaming }) => useStreamingContent(content, isStreaming), + { initialProps: { content: 'chunk1', isStreaming: true } }, + ); + + // Initial value applied immediately (useState initializer) + expect(result.current).toBe('chunk1'); + + // New content arrives while streaming — should NOT update yet + act(() => { + rerender({ content: 'chunk1 chunk2', isStreaming: true }); + }); + expect(result.current).toBe('chunk1'); + + // After rAF fires, picks up latest content + act(() => { + flushRaf(); + }); + expect(result.current).toBe('chunk1 chunk2'); + }); + + it('streaming: multiple content updates in same frame only trigger one rAF', () => { + const rafSpy = vi.fn().mockImplementation((cb: RafCallback) => { + rafQueue.push(cb); + return ++rafIdCounter; + }); + vi.stubGlobal('requestAnimationFrame', rafSpy); + + const { rerender } = renderHook( + ({ content, isStreaming }) => useStreamingContent(content, isStreaming), + { initialProps: { content: 'a', isStreaming: true } }, + ); + + act(() => { rerender({ content: 'ab', isStreaming: true }); }); + act(() => { rerender({ content: 'abc', isStreaming: true }); }); + act(() => { rerender({ content: 'abcd', isStreaming: true }); }); + + // Only one rAF should have been scheduled (subsequent calls skipped because pendingRaf != null) + expect(rafSpy).toHaveBeenCalledTimes(1); + }); + + it('streaming→done: cancels pending rAF and applies final content immediately', () => { + const cancelSpy = vi.fn(); + vi.stubGlobal('cancelAnimationFrame', cancelSpy); + + const { result, rerender } = renderHook( + ({ content, isStreaming }) => useStreamingContent(content, isStreaming), + { initialProps: { content: 'chunk1', isStreaming: true } }, + ); + + // Queue a pending rAF by updating content while streaming + act(() => { rerender({ content: 'chunk1 chunk2', isStreaming: true }); }); + + // Now streaming ends with the final content — should cancel rAF and update immediately + act(() => { rerender({ content: 'chunk1 chunk2 final', isStreaming: false }); }); + + expect(cancelSpy).toHaveBeenCalled(); + expect(result.current).toBe('chunk1 chunk2 final'); + }); + + it('streaming: after rAF fires it picks up the very latest content ref value', () => { + const { result, rerender } = renderHook( + ({ content, isStreaming }) => useStreamingContent(content, isStreaming), + { initialProps: { content: 'v1', isStreaming: true } }, + ); + + // Multiple updates before the frame fires + act(() => { rerender({ content: 'v2', isStreaming: true }); }); + act(() => { rerender({ content: 'v3', isStreaming: true }); }); + act(() => { rerender({ content: 'v4', isStreaming: true }); }); + + // Only one frame scheduled; it should use the latest ref value (v4) + act(() => { flushRaf(); }); + expect(result.current).toBe('v4'); + }); +}); diff --git a/webui/src/components/common/StreamingMarkdown.tsx b/webui/src/components/common/StreamingMarkdown.tsx new file mode 100644 index 00000000..aa1fc09b --- /dev/null +++ b/webui/src/components/common/StreamingMarkdown.tsx @@ -0,0 +1,107 @@ +import { useState, useEffect, useRef } from 'react'; +import ReactMarkdown from 'react-markdown'; +import rehypeHighlight from 'rehype-highlight'; +import rehypeRaw from 'rehype-raw'; +import remarkGfm from 'remark-gfm'; +import 'highlight.js/styles/github-dark.css'; + +/** + * Throttles content updates to at most once per animation frame while streaming. + * When streaming ends, immediately flushes the latest content. + * + * Mirrors Open WebUI's Markdown.svelte approach: + * if (done) { cancelAnimationFrame(pending); parseTokens(); } + * else if (!pending) { pending = rAF(() => { pending = null; parseTokens(); }) } + */ +function useStreamingContent(content: string, isStreaming: boolean): string { + const [displayContent, setDisplayContent] = useState(content); + const pendingRafRef = useRef(null); + const latestContentRef = useRef(content); + + useEffect(() => { + latestContentRef.current = content; + + if (!isStreaming) { + // Streaming done: cancel any pending frame and apply final content immediately + if (pendingRafRef.current !== null) { + cancelAnimationFrame(pendingRafRef.current); + pendingRafRef.current = null; + } + setDisplayContent(content); + } else if (pendingRafRef.current === null) { + // Streaming: schedule at most one update per frame + pendingRafRef.current = requestAnimationFrame(() => { + pendingRafRef.current = null; + setDisplayContent(latestContentRef.current); + }); + } + // If pendingRafRef.current !== null, a frame is already scheduled; + // latestContentRef ensures it will pick up the most recent content when it fires. + }, [content, isStreaming]); + + // Cancel any pending rAF on unmount + useEffect( + () => () => { + if (pendingRafRef.current !== null) { + cancelAnimationFrame(pendingRafRef.current); + } + }, + [], + ); + + return displayContent; +} + +export interface StreamingMarkdownProps { + /** Full accumulated text content to render */ + content: string; + /** When true, content updates are throttled to one per animation frame */ + isStreaming: boolean; +} + +/** + * Renders Markdown at all times (no plain-text fallback during streaming). + * Content updates are throttled via requestAnimationFrame while streaming, + * limiting ReactMarkdown re-parses to ~60fps instead of every SSE chunk. + */ +export function StreamingMarkdown({ content, isStreaming }: StreamingMarkdownProps) { + const displayContent = useStreamingContent(content, isStreaming); + + return ( +
+ + {children} + + ); + } + return ( + + {children} + + ); + }, + }} + > + {displayContent} + +
+ ); +} diff --git a/webui/src/hooks/useSSE.ts b/webui/src/hooks/useSSE.ts index 6553f130..9476fc7e 100644 --- a/webui/src/hooks/useSSE.ts +++ b/webui/src/hooks/useSSE.ts @@ -84,15 +84,18 @@ export function useSSE({ eventSourceRef.current = null; } - console.log('[SSE] Creating EventSource connection to:', url); + if (import.meta.env.DEV) { + console.log('[SSE] Creating EventSource connection to:', url); + } setStatus('connecting'); - const eventSource = new EventSource(url); eventSourceRef.current = eventSource; eventSource.onopen = () => { if (!mountedRef.current) return; - console.log('[SSE] Connection opened successfully'); + if (import.meta.env.DEV) { + console.log('[SSE] Connection opened successfully'); + } setStatus('connected'); retryCountRef.current = 0; // 重置重试计数 }; @@ -103,7 +106,7 @@ export function useSSE({ try { const data = JSON.parse(event.data); // Debug: 打印收到的 SSE 事件 - if (data.type === 'message.part.updated') { + if (import.meta.env.DEV && data.type === 'message.part.updated') { console.log('[SSE] message.part.updated:', { partId: data.properties?.part?.id, textLength: data.properties?.part?.text?.length, @@ -121,7 +124,9 @@ export function useSSE({ eventSource.onerror = (error) => { if (!mountedRef.current) return; - console.warn('[SSE] Connection error, will attempt to reconnect'); + if (import.meta.env.DEV) { + console.warn('[SSE] Connection error, will attempt to reconnect'); + } onErrorRef.current?.(error); // 关闭当前连接 @@ -131,7 +136,9 @@ export function useSSE({ // 尝试重连(使用更宽松的策略) if (reconnectEnabled && retryCountRef.current < maxRetries) { const delay = getReconnectDelay(retryCountRef.current); - console.log(`[SSE] Reconnecting in ${Math.round(delay)}ms (attempt ${retryCountRef.current + 1}/${maxRetries})`); + if (import.meta.env.DEV) { + console.log(`[SSE] Reconnecting in ${Math.round(delay)}ms (attempt ${retryCountRef.current + 1}/${maxRetries})`); + } setStatus('reconnecting'); clearReconnectTimeout(); @@ -143,7 +150,9 @@ export function useSSE({ }, delay); } else { // 即使达到最大重试次数,也不要完全放弃,而是使用较长的间隔继续尝试 - console.log('[SSE] Max fast retries reached, switching to slow retry mode'); + if (import.meta.env.DEV) { + console.log('[SSE] Max fast retries reached, switching to slow retry mode'); + } setStatus('reconnecting'); clearReconnectTimeout(); @@ -161,7 +170,9 @@ export function useSSE({ mountedRef.current = true; if (!enabled) { - console.log('[SSE] Not enabled, skipping connection'); + if (import.meta.env.DEV) { + console.log('[SSE] Not enabled, skipping connection'); + } setStatus('disconnected'); return; } diff --git a/webui/src/hooks/useSessions.test.ts b/webui/src/hooks/useSessions.test.ts new file mode 100644 index 00000000..390135c7 --- /dev/null +++ b/webui/src/hooks/useSessions.test.ts @@ -0,0 +1,206 @@ +import { describe, expect, it, vi, afterEach } from 'vitest'; +import { renderHook, act } from '@testing-library/react'; +import { applyMessagePartUpdate, useSessionMessages } from './useSessions'; +import type { Message } from '@/types'; + +// --------------------------------------------------------------------------- +// Mocks — keep API calls from running in unit tests +// --------------------------------------------------------------------------- +vi.mock('@/api/session', () => ({ sessionApi: { list: vi.fn().mockResolvedValue([]) } })); +vi.mock('@/api/client', () => ({ + default: { get: vi.fn().mockResolvedValue({ data: [] }) }, +})); + +// Minimal message factory +function makeMsg(overrides: Partial & { id: string }): Message { + return { + sessionID: 'sess-1', + role: 'assistant', + parts: [], + timestamp: 0, + ...overrides, + } as unknown as Message; +} + +describe('applyMessagePartUpdate', () => { + describe('message not found', () => { + it('appends part to the last in-progress assistant message when messageID does not match', () => { + const partInfo = { id: 'p1', messageID: 'msg-unknown', sessionID: 'sess-1', type: 'text', text: 'hello' }; + const prev: Message[] = [ + makeMsg({ id: 'msg-1', role: 'assistant', parts: [] }), + ]; + const result = applyMessagePartUpdate(prev, partInfo); + expect(result[0].parts).toHaveLength(1); + expect((result[0].parts as any[])[0].id).toBe('p1'); + }); + + it('skips finished assistant messages when looking for in-progress message', () => { + const partInfo = { id: 'p1', messageID: 'msg-unknown', sessionID: 'sess-1', type: 'text', text: 'hi' }; + const prev: Message[] = [ + makeMsg({ id: 'msg-1', role: 'assistant', parts: [], finish: 'stop' } as any), + ]; + const result = applyMessagePartUpdate(prev, partInfo); + // should create a new placeholder message + expect(result).toHaveLength(2); + expect(result[1].id).toBe('msg-unknown'); + expect((result[1].parts as any[])[0].id).toBe('p1'); + }); + + it('creates a new placeholder message when no in-progress assistant exists', () => { + const partInfo = { id: 'p1', messageID: 'msg-new', sessionID: 'sess-1', type: 'text', text: 'hello' }; + const prev: Message[] = [makeMsg({ id: 'msg-user', role: 'user', parts: [] })]; + const result = applyMessagePartUpdate(prev, partInfo); + expect(result).toHaveLength(2); + expect(result[1].id).toBe('msg-new'); + expect(result[1].role).toBe('assistant'); + }); + }); + + describe('message found', () => { + it('appends a new part when the part id does not exist', () => { + const partInfo = { id: 'p2', messageID: 'msg-1', sessionID: 'sess-1', type: 'text', text: 'world' }; + const prev: Message[] = [ + makeMsg({ id: 'msg-1', parts: [{ id: 'p1', type: 'text', text: 'hello' } as any] }), + ]; + const result = applyMessagePartUpdate(prev, partInfo); + expect((result[0].parts as any[])).toHaveLength(2); + expect((result[0].parts as any[])[1].id).toBe('p2'); + }); + + it('removes temp parts before appending a new real part', () => { + const partInfo = { id: 'p-real', messageID: 'msg-1', sessionID: 'sess-1', type: 'text', text: 'x' }; + const prev: Message[] = [ + makeMsg({ + id: 'msg-1', + parts: [{ id: 'temp-abc', type: 'text', text: '' } as any], + }), + ]; + const result = applyMessagePartUpdate(prev, partInfo); + const parts = result[0].parts as any[]; + expect(parts).toHaveLength(1); + expect(parts[0].id).toBe('p-real'); + }); + + it('updates existing text part with accumulated text when delta is provided', () => { + const existing = { id: 'p1', messageID: 'msg-1', type: 'text', text: 'hello ' }; + const partInfo = { id: 'p1', messageID: 'msg-1', sessionID: 'sess-1', type: 'text', text: 'hello world' }; + const prev: Message[] = [makeMsg({ id: 'msg-1', parts: [existing as any] })]; + const result = applyMessagePartUpdate(prev, partInfo, ' world'); + const parts = result[0].parts as any[]; + expect(parts[0].text).toBe('hello world'); + }); + + it('replaces existing part without delta for non-text types', () => { + const existing = { id: 'p1', messageID: 'msg-1', type: 'tool', state: { status: 'pending' } }; + const partInfo = { id: 'p1', messageID: 'msg-1', sessionID: 'sess-1', type: 'tool', state: { status: 'completed' } }; + const prev: Message[] = [makeMsg({ id: 'msg-1', parts: [existing as any] })]; + const result = applyMessagePartUpdate(prev, partInfo); + const parts = result[0].parts as any[]; + expect((parts[0] as any).state.status).toBe('completed'); + }); + + it('does not mutate the original messages array', () => { + const partInfo = { id: 'p1', messageID: 'msg-1', sessionID: 'sess-1', type: 'text', text: 'hi' }; + const originalParts = [{ id: 'p-old', type: 'text', text: 'old' } as any]; + const prev: Message[] = [makeMsg({ id: 'msg-1', parts: originalParts })]; + applyMessagePartUpdate(prev, partInfo, 'hi'); + expect(originalParts).toHaveLength(1); + expect(originalParts[0].id).toBe('p-old'); + }); + }); + + describe('streaming text accumulation', () => { + it('supports reasoning type delta update', () => { + const existing = { id: 'r1', messageID: 'msg-1', type: 'reasoning', text: 'think ' }; + const partInfo = { id: 'r1', messageID: 'msg-1', sessionID: 'sess-1', type: 'reasoning', text: 'think more' }; + const prev: Message[] = [makeMsg({ id: 'msg-1', parts: [existing as any] })]; + const result = applyMessagePartUpdate(prev, partInfo, ' more'); + expect((result[0].parts as any[])[0].text).toBe('think more'); + }); + + it('supports thinking type delta update', () => { + const existing = { id: 't1', messageID: 'msg-1', type: 'thinking', text: 'a' }; + const partInfo = { id: 't1', messageID: 'msg-1', sessionID: 'sess-1', type: 'thinking', text: 'ab' }; + const prev: Message[] = [makeMsg({ id: 'msg-1', parts: [existing as any] })]; + const result = applyMessagePartUpdate(prev, partInfo, 'b'); + expect((result[0].parts as any[])[0].text).toBe('ab'); + }); + }); +}); + +// --------------------------------------------------------------------------- +// updateMessagePart scheduling behaviour +// Verifies observable state changes (not internal scheduling details): +// - first call with a new part ID causes immediate state update +// - subsequent calls with the same part ID accumulate content correctly +// --------------------------------------------------------------------------- +describe('updateMessagePart scheduling', () => { + afterEach(() => { + vi.clearAllMocks(); + }); + + it('first appearance of a new part updates messages state immediately', async () => { + const { result } = renderHook(() => useSessionMessages('sess-1')); + // Wait for the initial fetchMessages effect to settle so it doesn't wipe state + await act(async () => {}); + + const newPart = { id: 'part-new', messageID: 'msg-1', sessionID: 'sess-1', type: 'text', text: 'hello' }; + + await act(async () => { + result.current.updateMessagePart(newPart); + }); + + const msgs = result.current.messages; + // A placeholder message should have been created with the part + const created = msgs.find((m: any) => m.id === 'msg-1'); + expect(created).toBeDefined(); + expect((created!.parts as any[])[0].id).toBe('part-new'); + expect((created!.parts as any[])[0].text).toBe('hello'); + }); + + it('second call with same part ID accumulates delta content correctly', async () => { + const { result } = renderHook(() => useSessionMessages('sess-1')); + // Wait for initial fetch to settle + await act(async () => {}); + + const part = { id: 'part-known', messageID: 'msg-2', sessionID: 'sess-1', type: 'text', text: 'hello' }; + const delta = { ...part, text: 'hello world' }; + + // First call — registers the part + await act(async () => { + result.current.updateMessagePart(part); + }); + + // Second call — content delta on the same part + await act(async () => { + result.current.updateMessagePart(delta, ' world'); + }); + + const msgs = result.current.messages; + const msg = msgs.find((m: any) => m.id === 'msg-2'); + expect(msg).toBeDefined(); + expect((msg!.parts as any[])[0].text).toBe('hello world'); + }); + + it('resets known part tracking when session changes', async () => { + const { result, rerender } = renderHook( + ({ id }: { id?: string }) => useSessionMessages(id), + { initialProps: { id: 'sess-a' } }, + ); + // Wait for initial fetch to settle + await act(async () => {}); + + const part = { id: 'part-sess-a', messageID: 'msg-1', sessionID: 'sess-a', type: 'text', text: 'data' }; + + await act(async () => { + result.current.updateMessagePart(part); + }); + + // Switch to a different session — messages and knownPartIds should reset + await act(async () => { + rerender({ id: 'sess-b' }); + }); + + expect(result.current.messages).toHaveLength(0); + }); +}); diff --git a/webui/src/hooks/useSessions.ts b/webui/src/hooks/useSessions.ts index 976e43f4..62a110be 100644 --- a/webui/src/hooks/useSessions.ts +++ b/webui/src/hooks/useSessions.ts @@ -1,11 +1,83 @@ -import { useState, useEffect, useLayoutEffect, useCallback, useRef } from 'react'; -import { flushSync } from 'react-dom'; +import { useState, useEffect, useLayoutEffect, useCallback, useRef, startTransition } from 'react'; import { sessionApi } from '@/api/session'; import client from '@/api/client'; import type { Session, Message, MessagePart } from '@/types'; const VISIBLE_CATEGORIES = new Set(['user', 'workflow', 'entity-config']); +/** + * Pure reducer for updating a message part in the messages list. + * Exported for unit testing. + */ +export function applyMessagePartUpdate( + prev: Message[], + partInfo: any, + delta?: string, +): Message[] { + const messageIndex = prev.findIndex(m => m.id === partInfo.messageID); + + if (messageIndex < 0) { + // Message not found — reuse the last in-progress assistant message if available + let lastAssistantIndex = -1; + for (let i = prev.length - 1; i >= 0; i--) { + if (prev[i].role === 'assistant' && !prev[i].finish) { + lastAssistantIndex = i; + break; + } + } + + if (lastAssistantIndex >= 0) { + const updated = [...prev]; + const message = { ...updated[lastAssistantIndex] }; + const parts = [...(message.parts || [])]; + parts.push(partInfo); + message.parts = parts; + updated[lastAssistantIndex] = message; + return updated; + } + + // No in-progress assistant message — create a placeholder + return [...prev, { + id: partInfo.messageID, + sessionID: partInfo.sessionID, + role: 'assistant' as const, + parts: [partInfo], + timestamp: Date.now(), + }]; + } + + // Message exists — update its parts + const updated = [...prev]; + const message = { ...updated[messageIndex] }; + const parts = [...(message.parts || [])]; + + const partIndex = parts.findIndex((p: any) => p.id === partInfo.id); + + if (partIndex < 0) { + for (let j = parts.length - 1; j >= 0; j--) { + if (String(parts[j].id).startsWith('temp-')) { + parts.splice(j, 1); + } + } + parts.push(partInfo); + } else { + if (delta && (partInfo.type === 'text' || partInfo.type === 'reasoning' || partInfo.type === 'thinking')) { + const existingPart = parts[partIndex]; + parts[partIndex] = { + ...existingPart, + ...partInfo, + text: partInfo.text, + }; + } else { + parts[partIndex] = partInfo; + } + } + + message.parts = parts; + updated[messageIndex] = message; + return updated; +} + export function useSessions() { const [sessions, setSessions] = useState([]); const [loading, setLoading] = useState(true); @@ -86,6 +158,9 @@ export function useSessionMessages(sessionId?: string) { const [messages, setMessages] = useState([]); const [loading, setLoading] = useState(false); const [error, setError] = useState(null); + // Tracks part IDs seen in this session to distinguish first-time creation + // (structural change → immediate update) from content deltas (low-priority). + const knownPartIdsRef = useRef>(new Set()); const fetchMessages = useCallback(async () => { if (!sessionId) return; @@ -122,6 +197,7 @@ export function useSessionMessages(sessionId?: string) { useLayoutEffect(() => { setMessages([]); setError(null); + knownPartIdsRef.current.clear(); if (sessionId) { setLoading(true); } else { @@ -157,6 +233,14 @@ export function useSessionMessages(sessionId?: string) { compacted: messageInfo.compacted ?? existing.compacted, finish: messageInfo.finish ?? existing.finish, }; + // When a message finishes streaming, evict its part IDs from the + // known-parts registry to reclaim memory. + if (messageInfo.finish) { + const parts = updated[existingIndex].parts as any[] | undefined; + parts?.forEach((p: any) => { + if (p?.id) knownPartIdsRef.current.delete(p.id); + }); + } return updated; } @@ -199,83 +283,24 @@ export function useSessionMessages(sessionId?: string) { * 增量更新 message part(用于流式展示) * @param partInfo - part 对象,包含 id, messageID, sessionID, type, text 等 * @param delta - 本次增量文本(如果有的话) - * - * 使用 flushSync 强制同步更新,确保每个 chunk 立即渲染 + * + * 首次出现的 part(结构性变化)立即同步更新,确保"思考中"等指示符 + * 即时显示;已知 part 的内容增量则用 startTransition 降低优先级, + * 允许 React 合批调度以避免高频 SSE chunk 阻塞主线程。 */ updateMessagePart: (partInfo: any, delta?: string) => { - flushSync(() => { - setMessages(prev => { - const messageIndex = prev.findIndex(m => m.id === partInfo.messageID); - - if (messageIndex < 0) { - // Message 不存在,检查是否有任何未完成的assistant message - // 如果有,使用那个message而不是创建新的(避免重复) - let lastAssistantIndex = -1; - for (let i = prev.length - 1; i >= 0; i--) { - if (prev[i].role === 'assistant' && !prev[i].finish) { - lastAssistantIndex = i; - break; - } - } - - if (lastAssistantIndex >= 0) { - // 更新最后一个未完成的assistant message - const updated = [...prev]; - const message = { ...updated[lastAssistantIndex] }; - const parts = [...(message.parts || [])]; - parts.push(partInfo); - message.parts = parts; - updated[lastAssistantIndex] = message; - return updated; - } - - // 没有未完成的assistant message,创建新的占位 message - return [...prev, { - id: partInfo.messageID, - sessionID: partInfo.sessionID, - role: 'assistant' as const, - parts: [partInfo], - timestamp: Date.now(), - }]; - } - - // Message 存在,更新其 parts - const updated = [...prev]; - const message = { ...updated[messageIndex] }; - const parts = [...(message.parts || [])]; - - const partIndex = parts.findIndex((p: any) => p.id === partInfo.id); - - if (partIndex < 0) { - for (let j = parts.length - 1; j >= 0; j--) { - if (String(parts[j].id).startsWith('temp-')) { - parts.splice(j, 1); - } - } - parts.push(partInfo); - } else { - // Part 存在 - if (delta && (partInfo.type === 'text' || partInfo.type === 'reasoning' || partInfo.type === 'thinking')) { - // 有 delta,追加到现有 text(实现逐字效果) - // text、reasoning 和 thinking 类型都支持流式更新 - const existingPart = parts[partIndex]; - parts[partIndex] = { - ...existingPart, - ...partInfo, - // 使用累积的 text(后端已经累积好了) - text: partInfo.text, - }; - } else { - // 无 delta 或其他类型,直接替换整个 part - parts[partIndex] = partInfo; - } - } - - message.parts = parts; - updated[messageIndex] = message; - return updated; + const isNewPart = !knownPartIdsRef.current.has(partInfo.id); + if (isNewPart) { + // Structural change: first appearance of this part — must render immediately + // so that "thinking" / "streaming" indicators show without delay. + knownPartIdsRef.current.add(partInfo.id); + setMessages(prev => applyMessagePartUpdate(prev, partInfo, delta)); + } else { + // Content delta on an existing part — low priority, allow React to batch. + startTransition(() => { + setMessages(prev => applyMessagePartUpdate(prev, partInfo, delta)); }); - }); + } }, }; }