Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flocks/server/routes/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async def publish_event(event_type: str, properties: dict = None, directory: str
if event_type == "message.part.updated":
text_len = properties.get("part", {}).get("text", "") if properties else ""
delta = properties.get("delta", "") if properties else ""
log.info("event.publish.part_updated", {
log.debug("event.publish.part_updated", {
"clients": broadcaster.client_count,
"text_length": len(text_len) if text_len else 0,
"delta_length": len(delta) if delta else 0,
Expand Down
6 changes: 3 additions & 3 deletions flocks/session/streaming/stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ async def _handle_reasoning_delta(self, event: ReasoningDeltaEvent) -> None:

if should_publish:
self._last_reasoning_event_time[event.id] = current_time
log.info("stream.reasoning.publishing_delta", {
log.debug("stream.reasoning.publishing_delta", {
"delta_len": len(event.text),
"total_len": len(part.text),
"reasoning_id": event.id,
Expand Down Expand Up @@ -1055,7 +1055,7 @@ async def _handle_text_delta(self, event: TextDeltaEvent) -> None:
visible_delta = self._compute_visible_delta(self._last_visible_text, visible_text)

if len(self.current_text_part.text) <= 100 or len(self.current_text_part.text) % 100 < len(event.text):
log.info("stream.text.delta", {
log.debug("stream.text.delta", {
"delta_length": len(event.text),
"total_length": len(self.current_text_part.text),
})
Expand Down Expand Up @@ -1086,7 +1086,7 @@ async def _handle_text_delta(self, event: TextDeltaEvent) -> None:
if should_publish:
self._last_text_event_time = current_time
if len(self.current_text_part.text) <= 100 or len(self.current_text_part.text) % 100 < len(event.text):
log.info("stream.text.publishing_delta", {
log.debug("stream.text.publishing_delta", {
"delta_len": len(visible_delta),
"part_id": self.current_text_part.id,
"throttled": time_since_last < self._text_event_throttle_ms,
Expand Down
75 changes: 36 additions & 39 deletions webui/src/components/common/SessionChat.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@
* - 消息复制、时间戳等可选功能
*/

import { useState, useCallback, useRef, useEffect, useMemo } from 'react';
import { useState, useCallback, useRef, useEffect, useMemo, memo } from 'react';
import { Send, Loader2, ChevronDown, Square, Copy, User } from 'lucide-react';
import ReactMarkdown from 'react-markdown';
import rehypeHighlight from 'rehype-highlight';
import rehypeRaw from 'rehype-raw';
import remarkGfm from 'remark-gfm';
import 'highlight.js/styles/github-dark.css';
import { StreamingMarkdown } from './StreamingMarkdown';
import { useTranslation } from 'react-i18next';
import LoadingSpinner from './LoadingSpinner';
import { QuestionTool } from './QuestionTool';
Expand Down Expand Up @@ -225,7 +221,7 @@ export default function SessionChat({
const scrollToBottom = useCallback(() => {
if (!isAtBottomRef.current) return;
requestAnimationFrame(() => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
messagesEndRef.current?.scrollIntoView({ behavior: 'instant' });
});
}, []);

Expand Down Expand Up @@ -1002,7 +998,7 @@ export interface ChatMessageBubbleProps {
compactedMessages?: MergedMessage[];
}

export function ChatMessageBubble({
function ChatMessageBubbleInner({
message,
isActive = false,
pendingQuestions,
Expand Down Expand Up @@ -1112,37 +1108,10 @@ export function ChatMessageBubble({
<span className="text-[9px] text-gray-500 flex-shrink-0">{nodeRefMatch[2]}</span>
</div>
)}
<div className="prose prose-sm max-w-none">
<ReactMarkdown
remarkPlugins={[remarkGfm]}
rehypePlugins={[rehypeRaw, [rehypeHighlight, { detect: false, ignoreMissing: true }]]}
components={{
code({ className, children, ...props }) {
// 检测是否为块级代码(fenced code block):
// 1. 有 language-* 类(有语言标记)
// 2. 有 hljs 类(rehype-highlight 添加的)
// 3. children 以 \n 结尾(react-markdown 对块级代码添加尾随换行)
const isBlock =
/language-/.test(className || '') ||
/\bhljs\b/.test(className || '') ||
String(children ?? '').endsWith('\n');
if (!isBlock) {
return (
<code
className="bg-gray-100 text-gray-800 px-1 py-0.5 rounded text-[0.85em] font-mono"
{...props}
>
{children}
</code>
);
}
return <code className={className} {...props}>{children}</code>;
},
}}
>
{displayText}
</ReactMarkdown>
</div>
<StreamingMarkdown
content={displayText}
isStreaming={isActive && !isUser}
/>
</>
);
})()}
Expand Down Expand Up @@ -1364,3 +1333,31 @@ export function ChatToolPart({ part, pendingQuestion, onAnswer, onReject }: Chat
</details>
);
}

/**
* Memoized export of ChatMessageBubble.
*
* Fast path (O(1) field checks, aligned with Open WebUI's approach):
* - structural props: isActive, role, finish, parts.length
* - content probe: last part's text/thinking field
*
* Only triggers a re-render when something actually visible has changed,
* avoiding unnecessary reconciliation during high-frequency streaming.
*/
export const ChatMessageBubble = memo(ChatMessageBubbleInner, (prev, next) => {
if (prev.isActive !== next.isActive) return false;
if (prev.showActions !== next.showActions) return false;
if (prev.message.finish !== next.message.finish) return false;
const prevParts = prev.message.parts as any[] | undefined;
const nextParts = next.message.parts as any[] | undefined;
if ((prevParts?.length ?? 0) !== (nextParts?.length ?? 0)) return false;
if (prev.pendingQuestions !== next.pendingQuestions) return false;
// O(1) content probe on the last part — covers the streaming delta case
const prevLast = prevParts?.[prevParts.length - 1];
const nextLast = nextParts?.[nextParts.length - 1];
return (
prevLast?.text === nextLast?.text &&
prevLast?.thinking === nextLast?.thinking &&
prevLast?.state?.status === nextLast?.state?.status
);
});
175 changes: 175 additions & 0 deletions webui/src/components/common/StreamingMarkdown.test.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import { describe, expect, it, vi, beforeEach, afterEach } from 'vitest';
import { renderHook, act } from '@testing-library/react';

// Extract the hook for isolated testing by re-implementing it here.
// This mirrors the approach in useSessions.test.ts: test pure logic directly.
import { useState, useEffect, useRef } from 'react';

function useStreamingContent(content: string, isStreaming: boolean): string {
const [displayContent, setDisplayContent] = useState(content);
const pendingRafRef = useRef<number | null>(null);
const latestContentRef = useRef(content);

useEffect(() => {
latestContentRef.current = content;
if (!isStreaming) {
if (pendingRafRef.current !== null) {
cancelAnimationFrame(pendingRafRef.current);
pendingRafRef.current = null;
}
setDisplayContent(content);
} else if (pendingRafRef.current === null) {
pendingRafRef.current = requestAnimationFrame(() => {
pendingRafRef.current = null;
setDisplayContent(latestContentRef.current);
});
}
}, [content, isStreaming]);

useEffect(
() => () => {
if (pendingRafRef.current !== null) {
cancelAnimationFrame(pendingRafRef.current);
}
},
[],
);

return displayContent;
}

// ─── rAF fake setup ──────────────────────────────────────────────────────────

type RafCallback = (time: number) => void;

let rafQueue: RafCallback[] = [];
let rafIdCounter = 0;

function setupFakeRaf() {
vi.stubGlobal('requestAnimationFrame', (cb: RafCallback) => {
rafIdCounter++;
rafQueue.push(cb);
return rafIdCounter;
});
vi.stubGlobal('cancelAnimationFrame', (id: number) => {
// Mark cancelled by removing; simplified — good enough for these tests
rafQueue = rafQueue.filter((_, i) => i !== id - 1);
});
}

function flushRaf() {
const pending = [...rafQueue];
rafQueue = [];
pending.forEach(cb => cb(performance.now()));
}

// ─── Tests ───────────────────────────────────────────────────────────────────

describe('useStreamingContent', () => {
beforeEach(() => {
rafQueue = [];
rafIdCounter = 0;
setupFakeRaf();
});

afterEach(() => {
vi.unstubAllGlobals();
});

it('returns initial content immediately on mount', () => {
const { result } = renderHook(() => useStreamingContent('hello', false));
expect(result.current).toBe('hello');
});

it('non-streaming: updates displayContent synchronously when content changes', () => {
const { result, rerender } = renderHook(
({ content, isStreaming }) => useStreamingContent(content, isStreaming),
{ initialProps: { content: 'a', isStreaming: false } },
);

expect(result.current).toBe('a');

act(() => {
rerender({ content: 'b', isStreaming: false });
});

expect(result.current).toBe('b');
});

it('streaming: does not update displayContent until rAF fires', () => {
const { result, rerender } = renderHook(
({ content, isStreaming }) => useStreamingContent(content, isStreaming),
{ initialProps: { content: 'chunk1', isStreaming: true } },
);

// Initial value applied immediately (useState initializer)
expect(result.current).toBe('chunk1');

// New content arrives while streaming — should NOT update yet
act(() => {
rerender({ content: 'chunk1 chunk2', isStreaming: true });
});
expect(result.current).toBe('chunk1');

// After rAF fires, picks up latest content
act(() => {
flushRaf();
});
expect(result.current).toBe('chunk1 chunk2');
});

it('streaming: multiple content updates in same frame only trigger one rAF', () => {
const rafSpy = vi.fn().mockImplementation((cb: RafCallback) => {
rafQueue.push(cb);
return ++rafIdCounter;
});
vi.stubGlobal('requestAnimationFrame', rafSpy);

const { rerender } = renderHook(
({ content, isStreaming }) => useStreamingContent(content, isStreaming),
{ initialProps: { content: 'a', isStreaming: true } },
);

act(() => { rerender({ content: 'ab', isStreaming: true }); });
act(() => { rerender({ content: 'abc', isStreaming: true }); });
act(() => { rerender({ content: 'abcd', isStreaming: true }); });

// Only one rAF should have been scheduled (subsequent calls skipped because pendingRaf != null)
expect(rafSpy).toHaveBeenCalledTimes(1);
});

it('streaming→done: cancels pending rAF and applies final content immediately', () => {
const cancelSpy = vi.fn();
vi.stubGlobal('cancelAnimationFrame', cancelSpy);

const { result, rerender } = renderHook(
({ content, isStreaming }) => useStreamingContent(content, isStreaming),
{ initialProps: { content: 'chunk1', isStreaming: true } },
);

// Queue a pending rAF by updating content while streaming
act(() => { rerender({ content: 'chunk1 chunk2', isStreaming: true }); });

// Now streaming ends with the final content — should cancel rAF and update immediately
act(() => { rerender({ content: 'chunk1 chunk2 final', isStreaming: false }); });

expect(cancelSpy).toHaveBeenCalled();
expect(result.current).toBe('chunk1 chunk2 final');
});

it('streaming: after rAF fires it picks up the very latest content ref value', () => {
const { result, rerender } = renderHook(
({ content, isStreaming }) => useStreamingContent(content, isStreaming),
{ initialProps: { content: 'v1', isStreaming: true } },
);

// Multiple updates before the frame fires
act(() => { rerender({ content: 'v2', isStreaming: true }); });
act(() => { rerender({ content: 'v3', isStreaming: true }); });
act(() => { rerender({ content: 'v4', isStreaming: true }); });

// Only one frame scheduled; it should use the latest ref value (v4)
act(() => { flushRaf(); });
expect(result.current).toBe('v4');
});
});
Loading
Loading