Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions flocks/session/streaming/stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,17 @@ async def _handle_text_start(self, event: TextStartEvent) -> None:
metadata=event.metadata or {},
)
self.parts.append(self.current_text_part)

# Persist an empty placeholder immediately so the final stored part
# order matches the stream semantics. Without this, a tool part that
# starts after text streaming begins but before ``text-end`` would be
# stored earlier, and a completion-time refetch would reorder the UI as
# reasoning -> tool -> text instead of reasoning -> text -> tool.
await Message.store_part(
self.session_id,
self.assistant_message.id,
self.current_text_part,
)

# Publish part created event (matches Flocks Session.updatePart)
if self.event_publish_callback:
Expand Down
44 changes: 43 additions & 1 deletion tests/session/test_stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,25 @@ class TestTextAccumulation:
@pytest.mark.asyncio
async def test_text_start_creates_text_part(self):
proc = _make_processor()
with patch.object(proc, 'event_publish_callback', None):
with (
patch.object(proc, 'event_publish_callback', None),
patch("flocks.session.streaming.stream_processor.Message.store_part", new=AsyncMock()),
):
await proc.process_event(TextStartEvent())
assert proc.current_text_part is not None
assert proc.current_text_part.type == "text"

@pytest.mark.asyncio
async def test_text_start_persists_placeholder_immediately(self):
proc = _make_processor()
with patch("flocks.session.streaming.stream_processor.Message.store_part", new=AsyncMock()) as mock_store:
await proc.process_event(TextStartEvent())

mock_store.assert_awaited_once()
stored_part = mock_store.call_args.args[2]
assert stored_part.type == "text"
assert stored_part.text == ""

@pytest.mark.asyncio
async def test_text_delta_accumulates(self):
proc = _make_processor()
Expand Down Expand Up @@ -204,6 +218,34 @@ async def test_text_end_parses_minimax_tool_call_xml(self):
assert proc._text_tool_calls_executed is True
assert proc.get_text_content() == ""

@pytest.mark.asyncio
async def test_text_placeholder_keeps_text_before_tool_in_stored_order(self):
proc = _make_processor()
stored_parts = []

async def fake_store_part(_session_id, _message_id, part):
for idx, existing in enumerate(stored_parts):
if existing.id == part.id:
stored_parts[idx] = part
break
else:
stored_parts.append(part)
return part

with patch(
"flocks.session.streaming.stream_processor.Message.store_part",
new=AsyncMock(side_effect=fake_store_part),
):
await proc.process_event(ReasoningStartEvent(id="r-order"))
await proc.process_event(ReasoningDeltaEvent(id="r-order", text="plan"))
await proc.process_event(ReasoningEndEvent(id="r-order"))
await proc.process_event(TextStartEvent())
await proc.process_event(TextDeltaEvent(text="answer"))
await proc.process_event(ToolInputStartEvent(id="tc-order", tool_name="bash"))
await proc.process_event(TextEndEvent())

assert [part.type for part in stored_parts] == ["reasoning", "text", "tool"]


# ---------------------------------------------------------------------------
# Reasoning accumulation
Expand Down