Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion flocks/storage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,35 @@ async def list_entries(
value = json.loads(value_str)
entries.append((key, value))
return entries


@classmethod
async def list_raw(
cls,
prefix: Optional[str] = None,
) -> List[Tuple[str, str]]:
"""List ``(key, raw_value_str)`` pairs without Python-side JSON parsing.

Unlike :meth:`list_entries`, the value is returned as a plain string
so callers can apply lightweight extraction (e.g. regex) instead of
full ``json.loads``. This is critical for hot trim paths that only
need a couple of scalar fields from otherwise large JSON blobs.
Compatible with all SQLite versions (no ``json_extract`` required).
"""
await cls._ensure_init()

if prefix:
query = "SELECT key, value FROM storage WHERE key LIKE ?"
params: tuple = (f"{prefix}%",)
else:
query = "SELECT key, value FROM storage"
params = ()

async with cls.connect(cls._db_path) as db:
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()

return [(row[0], row[1]) for row in rows]

@classmethod
async def exists(cls, key: str) -> bool:
"""
Expand Down
79 changes: 59 additions & 20 deletions flocks/workflow/execution_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from __future__ import annotations

import asyncio
import re
import time
import uuid
from typing import Any, Dict, Iterable, List, Optional
from typing import Any, Dict, Iterable, List, Optional, Set

from flocks.session.recorder import Recorder
from flocks.storage.storage import Storage
Expand Down Expand Up @@ -120,6 +121,12 @@ def compact_history_for_storage(
# call per workflow to amortise the cost under high syslog throughput.
_TRIM_CHECK_INTERVAL = 5
_trim_counters: Dict[str, int] = {}
# Workflows with an in-flight trim task. Because trims run as fire-and-forget
# ``asyncio.create_task`` background jobs, a slow trim under high syslog load
# could otherwise spawn many overlapping scans that each materialise table
# state simultaneously — the exact pattern that drove RSS to 20 GB. This
# guard ensures at most one trim per workflow is ever running.
_trim_in_flight: Set[str] = set()

# Per-workflow lock to serialize read-modify-write of stats. Concurrent
# executions of the same workflow (e.g. syslog-triggered runs with
Expand Down Expand Up @@ -341,24 +348,56 @@ async def _record_audit() -> None:
pass


# Regex patterns to extract scalar fields from raw JSON strings without
# calling json.loads. workflowId/startedAt are always serialised near the
# start of the record (set in build_initial_execution_record), so we only
# scan a small prefix of each value string — O(prefix) instead of O(value).
_RE_WORKFLOW_ID = re.compile(r'"workflowId"\s*:\s*"([^"]+)"')
_RE_STARTED_AT = re.compile(r'"startedAt"\s*:\s*(\d+)')


async def _trim_execution_history(workflow_id: str) -> None:
"""Delete the oldest execution records once the per-workflow cap is exceeded."""
all_entries = await Storage.list_entries("workflow_execution/")
wf_entries = [
(key, data)
for key, data in all_entries
if isinstance(data, dict) and data.get("workflowId") == workflow_id
]
if len(wf_entries) <= _MAX_EXECUTION_HISTORY_PER_WORKFLOW:
"""Delete the oldest execution records once the per-workflow cap is exceeded.

Uses ``Storage.list_raw`` + regex instead of ``list_entries`` + ``json.loads``
so that scanning the execution-history table never materialises large JSON
blobs as Python objects. The previous approach caused 100% single-core CPU
usage (``json.raw_decode``) and drove RSS to 20 GB under syslog load.

Also guards against overlapping trim tasks via ``_trim_in_flight``: because
trims run as fire-and-forget background tasks, without the guard a slow trim
would spawn multiple concurrent scans that each load the full table
simultaneously, multiplying the memory spike.
"""
# Coalesce overlapping trims: only one scan per workflow at a time.
if workflow_id in _trim_in_flight:
return
# Sort ascending by startedAt and remove the oldest excess records
wf_entries.sort(key=lambda kd: kd[1].get("startedAt", 0))
excess = len(wf_entries) - _MAX_EXECUTION_HISTORY_PER_WORKFLOW
for key, _ in wf_entries[:excess]:
try:
exec_id = key.rsplit("/", 1)[-1]
await Storage.remove(key)
record_path = Recorder.paths().workflow_dir / f"{exec_id}.jsonl"
await asyncio.to_thread(record_path.unlink, missing_ok=True)
except Exception:
pass
_trim_in_flight.add(workflow_id)
try:
raw_rows = await Storage.list_raw("workflow_execution/")
wf_entries: List[tuple] = []
for key, value_str in raw_rows:
# Scan only the first 400 chars — enough for workflowId + startedAt.
head = value_str[:400]
m_wf = _RE_WORKFLOW_ID.search(head)
if not m_wf or m_wf.group(1) != workflow_id:
continue
m_ts = _RE_STARTED_AT.search(head)
started_at = int(m_ts.group(1)) if m_ts else 0
wf_entries.append((key, started_at))

if len(wf_entries) <= _MAX_EXECUTION_HISTORY_PER_WORKFLOW:
return
# Sort ascending by startedAt and remove the oldest excess records.
wf_entries.sort(key=lambda kd: kd[1])
excess = len(wf_entries) - _MAX_EXECUTION_HISTORY_PER_WORKFLOW
for key, _ in wf_entries[:excess]:
try:
exec_id = key.rsplit("/", 1)[-1]
await Storage.remove(key)
record_path = Recorder.paths().workflow_dir / f"{exec_id}.jsonl"
await asyncio.to_thread(record_path.unlink, missing_ok=True)
except Exception:
pass
finally:
_trim_in_flight.discard(workflow_id)