Conversation
…ry flush Compaction is the dominant per-step latency spike. This commit addresses two wall-clock contributors. Changes: - ``summarize_chunked`` now dispatches all chunk-level LLM calls concurrently via ``asyncio.gather`` bounded by an ``asyncio.Semaphore`` (default 4, override via ``FLOCKS_COMPACTION_CHUNK_CONCURRENCY``). The previous serial loop made total summary time scale linearly with chunk count and unfairly shrank the per-chunk timeout when many chunks were in play; each chunk now gets the full ``COMPACTION_TIMEOUT_SECONDS`` budget independently. - ``_flush_memory_to_daily`` (the second post-summary LLM call writing the daily memory file) is no longer awaited inline. The compacted session does not depend on it for continuation, so it is scheduled as a background ``asyncio.Task`` tracked in a module-level set; failures log but do not block continuation. Setting ``FLOCKS_COMPACTION_FLUSH_BACKGROUND=0`` restores the legacy synchronous behaviour for tests / debug. Existing test patches on ``SessionCompaction._flush_memory_to_daily`` keep working because the dispatch helper still invokes the classmethod through the same descriptor. Made-with: Cursor
Backgrounding ``_flush_memory_to_daily`` in f52857f removed the implicit serialisation that protected the daily memory file's read-modify-write update path; it also opened the door to stuck provider calls retaining ``chat_messages`` and provider references indefinitely. This commit closes both gaps and improves observability. Changes: - Per-session serialisation via a lazily-created ``asyncio.Lock`` registry (``_session_flush_locks``). Two consecutive compactions on the same session now queue their flushes instead of racing on ``daily.write_daily(append=True)`` (which is non-atomic and would silently lose memory entries). Different sessions remain fully parallel. - Hard timeout on the background flush via ``asyncio.wait_for``, default 600s, override via ``FLOCKS_COMPACTION_FLUSH_TIMEOUT``. A wedged provider call previously pinned ``chat_messages`` and provider references for the lifetime of the process. - Observability: ``_on_flush_task_done`` and the new wrapper log ``session_id``, queue wait time, and execution duration. Timeout emits ``compaction.flush.background.timeout`` with the configured budget. - Expose ``drain_pending_flush_tasks(timeout)`` so callers (graceful shutdown, test fixtures) can wait for in-flight flushes before tearing down the loop. Verified by ``tests/session/test_compaction_flush_dispatch.py`` covering background dispatch, inline fallback, same-session serialisation, cross-session parallelism, timeout cancellation, and chunk-concurrency caps. Refs: f52857f Made-with: Cursor
Second-pass review of 637d043 found three follow-up issues introduced together with the per-session flush lock. Changes: - ``_session_flush_locks`` only ever grew — every new ``session_id`` added a Lock entry that was never removed. In a long-running process this is an unbounded leak (sessions are UUIDs, never reused). Each background flush now calls ``_release_session_lock_if_idle`` after releasing the lock and pops its registry entry when no other in-flight task targets the same session. Asyncio's single-threaded scheduler makes the current-task exclusion correct without extra synchronisation. - ``drain_pending_flush_tasks`` previously dropped leftover tasks on timeout, which defeated its stated "graceful shutdown" purpose — the caller would still hit "Task was destroyed but it is pending!" once the loop closed. It now (a) returns the leftover count, (b) logs WARN with the task names when tasks survive the timeout, and (c) accepts ``cancel_on_timeout=True`` to actively cancel and await ``CancelledError`` so the loop can be torn down cleanly. Default behaviour is unchanged for non-shutdown callers. - The 600s default flush timeout was excessive — ``extract_and_save`` normally completes in 5–30s. Lowered the default to 90s so a wedged provider does not block the next compaction's flush queue for ten minutes. ``FLOCKS_COMPACTION_FLUSH_TIMEOUT`` still overrides. Smaller cleanups: - ``_session_id_from_task`` returns ``Optional[str]`` (was empty string); the done-callback only includes ``session_id`` in log payloads when the task name actually encodes one. - Log levels reversed: WARN for the planned ``wait_for`` timeout (a bounded, expected failure), ERROR for unexpected exceptions surfaced through ``task.exception()``. - Test fixture rewritten as ``async`` so it can cancel and await any surviving tasks in setup / teardown rather than just clearing the bookkeeping set, eliminating cross-test bleed. Verified by tests added under ``tests/session/test_compaction_flush_dispatch.py`` (now 20 in this file, all passing): registry cleanup after completion, lock retained while a same-session task is pending, drain-without-cancel reports leftovers, drain-with-cancel empties the pending set, and flush-timeout parsing for empty / zero / negative / non-numeric values. Refs: 637d043 Made-with: Cursor
Third-pass review of f7936bf surfaced documentation drift and test gaps around the per-session lock cleanup. This commit closes them without touching runtime behaviour. Changes: - ``_dispatch_memory_flush`` docstring no longer claims "default 600s"; the actual default became 90s in f7936bf. Now points at ``_DEFAULT_FLUSH_TIMEOUT_SECONDS`` so future changes stay in sync. - ``drain_pending_flush_tasks`` docstring spells out that the pending set is snapshotted at entry, so callers performing a real shutdown must stop new dispatches before draining. Without this note callers could misuse drain in a way that would still hit "Task was destroyed but it is pending!". - ``_release_session_lock_if_idle`` docstring documents the cancellation edge case (two same-session tasks cancelled simultaneously can each see the other as "still pending" and both skip cleanup). This is a bounded best-effort property; the recommended path is ``cancel_on_timeout=True`` followed by process exit. Verified by tests added to ``tests/session/test_compaction_flush_dispatch.py`` (now 21 in this file): - ``test_inline_mode_when_env_disables_background`` asserts the inline path does not register an entry in ``_session_flush_locks``, guarding against future refactors hoisting ``_get_flush_lock`` above the background-mode branch. - ``test_drain_cancel_on_timeout_clears_pending`` also asserts the cancelled task's session lock is gone from the registry, pinning the cleanup-under-cancellation property exercised by the runner's ``finally`` block. - ``test_single_task_observes_itself_in_pending_set_during_flush`` pins the ``if t is current: continue`` invariant in ``_release_session_lock_if_idle``. Removing that line would make a single dispatch see itself as "still pending" and never clean up its own lock — silently re-introducing the leak fixed in f7936bf. - Dropped the wall-clock ``assert elapsed < 1.5`` from ``test_summarize_chunked_runs_chunks_in_parallel``; the ``peak_in_flight >= 2`` semantic check covers parallelism without CI flakiness. Refs: f7936bf Made-with: Cursor
…plate Pure cleanup — no behaviour change. Changes: - Sync the module-level header comment with the current 90s default for ``FLOCKS_COMPACTION_FLUSH_TIMEOUT`` (was still claiming 600s after the earlier reduction). - ``_flush_timeout_seconds`` now logs ``compaction.flush.timeout_non_positive`` when the env var parses to ``<= 0``, matching the existing warn for parse errors so misconfigurations no longer fall back silently. - DRY the flush-dispatch test suite: extract ``_hard_reset()`` (was duplicated across pre / post fixture phases) and a ``_dispatch(session_id)`` helper that hides the seven-arg placeholder boilerplate repeated by every test. Verified by 21 / 21 flush-dispatch tests and 24 / 24 neighbouring compaction tests. Made-with: Cursor
… contexts Field telemetry on a real session showed ``summarize_single_pass`` taking 66s for a ~10k-char conversation against a slow OpenAI- compatible provider (minimax via threatbook). The legacy hand-off rule ``total_chars > target_chars * 2`` only kicked in for ~60k+ contexts, so medium conversations always took the slow serial path even though ``summarize_chunked`` already runs N chunk calls in parallel. Changes: - New ``_decide_chunked_strategy`` returns ``(use_chunked, chunk_size, decision)`` with three branches: ``oversize`` (legacy, must chunk), ``preemptive`` (fits single-pass but big enough that parallel beats serial), ``single_pass`` (small, merge tax not worth it). - ``summarize_chunked`` now takes an optional ``chunk_size`` param so callers can request finer splits without enlarging the per-chunk truncation cap (``target_chars`` was previously serving both roles). - Three tunables, all env-overridable for emergency tuning: ``FLOCKS_COMPACTION_PREEMPTIVE_CHUNK_RATIO`` (default 0.2), ``FLOCKS_COMPACTION_TARGET_PARALLEL_CHUNKS`` (default 3), ``FLOCKS_COMPACTION_MIN_CHUNK_CHARS`` (default 3000). - New ``compaction.process.strategy`` log entry surfaces which branch was taken so we can correlate with provider latency in production. Predicted impact for the 66s case: 1×14k LLM call → 3×3.6k parallel calls + 1 small merge ≈ 15–20s. Verified with a standalone harness (8 / 8 strategy + chunk_size assertions pass; existing parallel / concurrency invariants preserved). Made-with: Cursor
Field log analysis exposed a four-minute compaction stall caused by the merge LLM call hanging until the upstream gateway returned 504, with no per-chunk visibility into where time was actually spent. Changes: - Cap the merge LLM call at 60s (default, override via ``FLOCKS_COMPACTION_MERGE_TIMEOUT``) instead of inheriting the 300s per-chunk budget. The merge prompt is small and its fallback (concatenated chunk summaries) is already a usable result, so bailing early unblocks the user. - Emit ``duration_ms`` on every chunk-summary completion / timeout / error and on the merge call, plus a ``chunked_summarize.parallel_done`` rollup with ``parallel_duration_ms`` and ``merged_chars``. Removes the need to subtract clocks across log lines when diagnosing slow compactions. - Truncate gateway HTML error bodies in ``merge_summary_error`` logs to 200 chars to keep log lines readable. Made-with: Cursor
60s was too tight for slower models and larger merge prompts; 120s still bails out well before the typical upstream gateway 504 window (≥230s observed in the field) while giving honest providers room to finish. Remains tunable via ``FLOCKS_COMPACTION_MERGE_TIMEOUT``. Refs: 40aec76 Made-with: Cursor
Manual compaction was already wired through ``POST /command`` →
``_run_session_compaction`` → ``run_compaction`` →
``SessionCompaction.process``, but the route hard-rejected any
arguments with "Usage: /compact" and the compaction pipeline had no
way to bias what the summariser kept. Manual compaction was therefore
a blunt tool — users could trigger it but not steer it.
Now ``/compact <free-text focus>`` (e.g. ``/compact 专注于未解决的决策``)
threads ``focus_instruction`` end-to-end.
Changes:
- ``summary._build_focus_block`` renders an explicit ``## User Focus``
block, isolated by an obvious header so user text cannot collide
with the structural sections. Empty / whitespace-only focus
collapses to ``""`` so callers can interpolate unconditionally.
- ``summarize_chunked`` injects the block into BOTH the per-chunk
prompt AND the merge prompt. Chunk-stage injection is essential:
without it the chunk summaries discard details the user cares
about and the merge step cannot recover them — focus must steer
information selection, not just final phrasing.
- ``summarize_single_pass`` accepts the same parameter for the
small-context branch.
- ``SessionCompaction.process`` and ``run_compaction`` gain matching
pass-through params; ``process.strategy`` log gains ``has_focus``
so field logs make it obvious whether a manual focus was applied.
- ``send_session_command`` now treats ``request.arguments`` as the
focus text instead of replying "Usage: /compact"; empty arguments
preserve the legacy default-prompt behaviour.
- ``/compact`` dropdown description hints at the new arg form.
Front end requires zero changes — ``SessionChat.sendCommand`` already
forwards ``arguments`` to the existing ``/api/session/{id}/command``.
Made-with: Cursor
When ``SessionCompaction.process`` failed (e.g. the provider returns
``Error code: 529 - {'error': {'message': '模型服务暂时不可用'}}``)
the front-end toast showed an opaque "Compaction failed" because:
1. ``process()`` swallowed the exception via
``log.error + return "stop"`` to keep the ``session_loop`` contract
simple.
2. ``_run_session_compaction`` re-raised a fabricated
``RuntimeError("Compaction failed")``.
3. ``_handle_command`` published that message verbatim as the SSE
``session.error`` payload powering the red toast.
The user got no actionable signal — no HTTP code, no upstream text,
not even a hint that it was the model gateway that died.
Changes:
- ``process()`` now stashes the user-facing error via
``_record_compaction_error`` before returning ``"stop"``.
- ``_extract_provider_message`` parses the OpenAI-compatible
``{'error': {'message': '...'}}`` payload out of the SDK error
string and prepends the HTTP code.
- ``_run_session_compaction`` pops that message and raises
``RuntimeError(detail)``, so the toast now shows e.g.
``Error code: 529 — 模型服务暂时不可用``.
- The three legacy "stop" branches (ImportError, provider not found,
generic Exception) all record an appropriate detail so no failure
mode reverts to the opaque text.
The ``session_loop`` path (which only checks ``result == "stop"``) is
untouched — the error breadcrumb is single-shot and only consumed by
the manual ``/compact`` path. Records auto-overwrite on retry so a
stale error from yesterday cannot leak into today's failure.
``_extract_provider_message`` returns ``None`` on any unparseable
payload; callers fall back to ``str(exc)``.
Made-with: Cursor
The ``_last_compaction_error`` stash introduced in 754f455 had three issues uncovered during self-review: - Unbounded growth: auto-compaction failures write to the cache but never read from it (only the manual ``/compact`` SSE path pops). Long-running instances would leak ~hundreds of bytes per unique session id with no upper bound. - No length cap: provider exceptions can embed large JSON payloads or stack traces in ``str(exc)``. The UI surfaces this verbatim, so an unbounded toast was possible. - Fragile parsing: ``text.partition(" - ")`` mis-splits when the delimiter appears before the dict payload, e.g. ``"500 - upstream error - {'error': {...}}"``. ``literal_eval`` then fails and the user sees the raw string. Changes: - Switch the cache to ``OrderedDict`` with LRU eviction (256 entries, ``move_to_end`` on every write). - Truncate stored detail to 500 chars with a trailing ``…`` marker. - Anchor the dict payload by regex on the trailing ``{...}`` instead of splitting on the first ``" - "``. The full leading context now survives into the toast. - Hoist ``import ast`` and ``import re`` to module level. Broaden the ``literal_eval`` ``except`` to also catch ``MemoryError`` and ``RecursionError``. Verified with a 13-case isolated harness covering CN/EN provider errors, embedded separators, missing payloads, blank messages, truncation in both raw and parsed paths, LRU eviction at the boundary, and pathological nested input. Refs: 754f455 Made-with: Cursor
Compaction is the longest synchronous operation a user can trigger
from the UI: a single ``/compact`` against a real session today
takes 15–60 s but renders only a static "Compacting..." banner the
whole time. Field reports of the spinner sitting silent for 30+ s
make the feature feel hung; users have killed sessions thinking
something broke. The information was actually already there — we
log every phase to ``~/.flocks/logs`` — but the UI had no channel
to consume it.
Pipe the same phase events the logger sees onto a dedicated SSE
event so the front-end can render a live multi-stage panel
(strategy → chunk N/M → merge → summary written → complete).
Changes:
- ``compaction.py``: declare ``ProgressCallback`` and a fault-
tolerant ``_emit_progress`` (sink errors are logged WARN and
swallowed — progress is observability, never a correctness
contract). ``SessionCompaction.process`` now emits ``load`` /
``strategy`` / ``summarize_done`` / ``complete`` and forwards
the callback into ``summarize_chunked``.
- ``summary.py``: ``summarize_chunked`` emits one ``chunk_done``
per chunk (with ``ok`` + ``duration_ms``, including failure
reasons), plus ``merge_started`` / ``merge_done``. Helper
redeclared locally to keep the existing one-way
``compaction → summary`` import contract.
- ``orchestrator.py``, ``session.py``, ``session_loop.py``:
thread the optional ``progress_callback`` through both the
manual ``/compact`` route and the auto/overflow paths in the
session loop. Both use the same adapter that bridges
``(stage, data)`` onto ``publish_event("session.compaction_progress", ...)``;
introducing a dedicated event type (rather than overloading
``session.status``) keeps the dispatcher explicit and avoids
forcing unrelated consumers to filter on a nested ``stage``.
- ``SessionChat.tsx``: add ``compactionStages`` and
``compactionChunkProgress`` state, dispatch the new SSE event,
and extend the amber banner with a chunk progress bar plus a
scrollable stage list rendered via a typed ``describeCompactionStage``
helper. ``chunk_done`` events arrive in non-deterministic
order under ``asyncio.gather`` so we deduplicate by ``chunk``
index, never by arrival count, which also makes SSE reconnects
idempotent.
- ``session.json`` (zh-CN + en-US): add ``chat.compactionStage.*``
keys for every stage and the chunk progress label.
Verified: ``scripts/verify_compaction_progress.py`` (run via
``uv run python``) drives ``summarize_chunked`` and ``_emit_progress``
in isolation against stub provider modules and asserts:
1. ``_emit_progress(None, ...)`` and a raising sink both no-op.
2. Happy-path 6-chunk run emits exactly 6 ``chunk_done`` events,
followed by one ``merge_started`` and one ``merge_done`` (in
that order).
3. ``chunk_done`` payloads carry ``chunk`` / ``total`` /
``duration_ms`` / ``ok``.
4. Failed chunks still emit ``chunk_done`` with ``ok=False`` +
a ``reason`` so the UI progress bar always advances.
5. A raising progress callback never breaks ``summarize_chunked``.
All 5 checks pass; ReadLints clean across all 8 backend + frontend
files. No production behaviour change when ``progress_callback``
is ``None`` (the legacy contract for any caller that has not been
updated).
Refs: 40aec76, e45e2b0
Made-with: Cursor
…unk runs Self-review of a8cf92a found a stale-closure bug that broke the chunk progress bar in production for any chunked run. The new ``session.compaction_progress`` SSE handler maintained two mirror states for chunk progress: - ``compactionStages`` — full event log, updated via a functional ``setCompactionStages(prev => ...)`` updater. - ``compactionChunkProgress`` — ``{done, total}`` for the bar. The progress updater closed over ``compactionStages`` to derive ``done`` from the stage list, but ``handleSSEEvent`` is a useCallback whose dependency array does not list ``compactionStages``. React froze ``compactionStages`` at the value captured when the callback was first created (an empty array), so every ``chunk_done`` arrival saw an empty re-derivation set. ``done`` therefore equalled exactly 1 forever, making the progress bar useless for the very case (parallel chunked summarisation) it was built for. The stage list itself was correct, because it used the functional updater's ``prev`` rather than reading the closure. Fix: - Drop ``compactionChunkProgress`` state entirely. - Derive ``{done, total}`` from ``compactionStages`` via ``useMemo`` so a single source of truth (the event log) drives both the stage list and the progress bar. ``total`` takes the max value seen across ``chunk_done`` payloads in case an event arrives with a truncated payload. - Remove the now-orphaned ``setCompactionChunkProgress(null)`` calls from the four reset paths (status transition, error, session switch). Single-chunk runs are visually identical (bar hidden when total is 0). Multi-chunk runs now advance 1 → 2 → ... → N as expected. ReadLints clean. No backend or schema change — purely a UI state refactor; ``scripts/verify_compaction_progress.py`` still passes unchanged because it covers the SSE producer side, not React. Refs: a8cf92a Made-with: Cursor
…d percent User feedback on dfb4f98: the per-chunk milestones in the compaction panel were noisy and visually confusing. A 4-chunk run rendered as 分块完成进度 4 / 4 · 分 3 块并行压缩 · 分块 4/4 完成 (14.0s) · 分块 2/4 完成 (30.2s) · 分块 3/4 完成 (35.5s) · 分块 1/4 完成 (37.2s) · 合并 4 段摘要… — five "chunk N/4" lines arriving in non-deterministic order under ``asyncio.gather``, then a separate "merging" line, with the bar only covering the chunk band (so the merge step appeared to add no progress at all). The user only wants "where am I in the pipeline, 0–100%", with the merge step absorbed into the same bar. Changes: - Replace ``compactionChunkProgress`` with ``compactionPercent``, derived in useMemo from the same ``compactionStages`` source. Weights span the full pipeline: load → 5–10% strategy → 10–20% chunk_done × N → 10–70% (linear in done/total) merge_started → 75% merge_done → 95% summarize_done → 95–97% complete → 100% Single-pass paths (no chunked / no merge) jump straight from ``strategy`` (20%) to ``summarize_done`` (95%) since the LLM call is the only meaningful wait there. Fixed weights beat timing-based interpolation here: chunks finish out of order so a time-linear bar would jitter, and the user only needs a credible signal of life rather than an ETA. - ``describeCompactionStage`` now returns ``null`` for ``chunk_done``, so the milestone list collapses to load → strategy → merge_started → merge_done → summarize_done → complete. The per-chunk durations are still in ``~/.flocks/logs`` for ops/perf debugging. - Drop the i18n keys ``chunkProgressLabel`` / ``chunkDone`` / ``chunkFailed``; add ``overallProgressLabel`` ("会话消息压缩进度" / "Conversation compaction progress") used by the unified bar. ReadLints clean across the three touched files. No backend or SSE-schema change — the producer still emits ``chunk_done`` (we need the count to drive the bar) and the underlying log payloads are unchanged for any external consumer. Refs: dfb4f98, a8cf92a Made-with: Cursor
xiami762
approved these changes
Apr 20, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Compaction is the longest synchronous operation a user can trigger
from the UI: a single
/compactagainst a real session todaytakes 15–60 s but renders only a static "Compacting..." banner the
whole time. Field reports of the spinner sitting silent for 30+ s
make the feature feel hung; users have killed sessions thinking
something broke. The information was actually already there — we
log every phase to
~/.flocks/logs— but the UI had no channelto consume it.
Pipe the same phase events the logger sees onto a dedicated SSE
event so the front-end can render a live multi-stage panel
(strategy → chunk N/M → merge → summary written → complete).
Changes:
compaction.py: declareProgressCallbackand a fault-tolerant
_emit_progress(sink errors are logged WARN andswallowed — progress is observability, never a correctness
contract).
SessionCompaction.processnow emitsload/strategy/summarize_done/completeand forwardsthe callback into
summarize_chunked.summary.py:summarize_chunkedemits onechunk_doneper chunk (with
ok+duration_ms, including failurereasons), plus
merge_started/merge_done. Helperredeclared locally to keep the existing one-way
compaction → summaryimport contract.orchestrator.py,session.py,session_loop.py:thread the optional
progress_callbackthrough both themanual
/compactroute and the auto/overflow paths in thesession loop. Both use the same adapter that bridges
(stage, data)ontopublish_event("session.compaction_progress", ...);introducing a dedicated event type (rather than overloading
session.status) keeps the dispatcher explicit and avoidsforcing unrelated consumers to filter on a nested
stage.SessionChat.tsx: addcompactionStagesandcompactionChunkProgressstate, dispatch the new SSE event,and extend the amber banner with a chunk progress bar plus a
scrollable stage list rendered via a typed
describeCompactionStagehelper.
chunk_doneevents arrive in non-deterministicorder under
asyncio.gatherso we deduplicate bychunkindex, never by arrival count, which also makes SSE reconnects
idempotent.
session.json(zh-CN + en-US): addchat.compactionStage.*keys for every stage and the chunk progress label.
Verified:
scripts/verify_compaction_progress.py(run viauv run python) drivessummarize_chunkedand_emit_progressin isolation against stub provider modules and asserts:
_emit_progress(None, ...)and a raising sink both no-op.chunk_doneevents,followed by one
merge_startedand onemerge_done(inthat order).
chunk_donepayloads carrychunk/total/duration_ms/ok.chunk_donewithok=False+a
reasonso the UI progress bar always advances.summarize_chunked.