diff --git a/flocks/ingest/syslog/manager.py b/flocks/ingest/syslog/manager.py index 91037db4..0ef9aae6 100644 --- a/flocks/ingest/syslog/manager.py +++ b/flocks/ingest/syslog/manager.py @@ -9,6 +9,8 @@ from flocks.storage.storage import Storage from flocks.utils.log import Log from flocks.workflow.execution_store import ( + compact_history_for_storage, + compact_outputs_for_storage, create_execution_record, record_execution_result, resolve_execution_outcome, @@ -21,6 +23,7 @@ log = Log.create(service="syslog.manager") + # Maximum concurrent workflow executions per workflow to avoid FD exhaustion and SQLite write contention _MAX_CONCURRENT_EXECUTIONS = 8 # Maximum number of buffered syslog messages per workflow; excess messages are dropped with a warning. @@ -468,11 +471,11 @@ async def _trigger_workflow( duration = time.time() - start_time exec_data.update({ "status": status, - "outputResults": result.outputs if isinstance(result.outputs, dict) else {}, + "outputResults": compact_outputs_for_storage(result.outputs), "finishedAt": int(time.time() * 1000), "duration": duration, "errorMessage": error_msg, - "executionLog": list(result.history or []), + "executionLog": compact_history_for_storage(result.history), "currentNodeId": result.last_node_id, "currentPhase": status, "currentStepIndex": result.steps, diff --git a/flocks/server/routes/workflow.py b/flocks/server/routes/workflow.py index 8f77ee3f..77ea9482 100644 --- a/flocks/server/routes/workflow.py +++ b/flocks/server/routes/workflow.py @@ -46,6 +46,8 @@ ) from flocks.ingest.syslog.constants import WORKFLOW_SYSLOG_CONFIG_PREFIX from flocks.workflow.execution_store import ( + compact_history_for_storage, + compact_outputs_for_storage, create_execution_record, normalize_execution_status as _normalize_execution_status, record_execution_result as _record_execution_result, @@ -499,7 +501,15 @@ def _on_step_start(_run_id, step_index, node, _inputs): return step_index def _on_step_complete(step_result) -> None: + # Compact each step's outputs *before* appending so the running + # ``step_history`` (and every subsequent ``_write_progress`` snapshot + # that ships it to SQLite) stays bounded, even when a workflow node + # returns tens of thousands of alerts that are already persisted to + # JSONL on disk. step_dict = step_result.model_dump(mode="json") + raw_outputs = step_dict.get("outputs") + if isinstance(raw_outputs, dict): + step_dict["outputs"] = compact_outputs_for_storage(raw_outputs) step_history.append(step_dict) _write_progress({ "executionLog": list(step_history), @@ -530,12 +540,17 @@ def _on_step_complete(step_result) -> None: # status write still succeeds rather than blowing up. current_data = {"id": exec_id, "workflowId": workflow_id} status_value, error_message = _resolve_execution_outcome(result) + # ``result.history`` is the engine-side authoritative history (not + # yet compacted), while ``step_history`` was already compacted in + # ``_on_step_complete``. Prefer the former when available, then + # run it through ``compact_history_for_storage`` so the persisted + # row stays small in either branch. current_data.update({ - "outputResults": result.outputs, + "outputResults": compact_outputs_for_storage(result.outputs), "status": status_value, "finishedAt": int(time.time() * 1000), "duration": duration, - "executionLog": result.history or list(step_history), + "executionLog": compact_history_for_storage(result.history) or list(step_history), "errorMessage": error_message, "currentNodeId": result.last_node_id, "currentNodeType": current_data.get("currentNodeType"), @@ -1072,8 +1087,15 @@ async def workflow_center_invoke(workflow_id: str, req: WorkflowCenterInvokeRequ raw_status = result.get("status", "SUCCEEDED") if isinstance(result, dict) else "SUCCEEDED" status_value = _normalize_execution_status(raw_status) success = status_value == "success" + # workflow_center_invoke proxies to an external published service; no + # step callbacks run locally so executionLog stays as the empty list + # set by create_execution_record. We still run compact_history here + # as a forward-compatible guard in case a future code path populates it. exec_data.update({ - "outputResults": result.get("outputs", {}) if isinstance(result, dict) else {}, + "outputResults": compact_outputs_for_storage( + result.get("outputs", {}) if isinstance(result, dict) else {} + ), + "executionLog": compact_history_for_storage(exec_data.get("executionLog")), "status": status_value, "finishedAt": int(time.time() * 1000), "duration": duration, diff --git a/flocks/workflow/execution_store.py b/flocks/workflow/execution_store.py index cf8bc309..c8df3b66 100644 --- a/flocks/workflow/execution_store.py +++ b/flocks/workflow/execution_store.py @@ -5,7 +5,7 @@ import asyncio import time import uuid -from typing import Any, Dict, Optional +from typing import Any, Dict, Iterable, List, Optional from flocks.session.recorder import Recorder from flocks.storage.storage import Storage @@ -14,6 +14,94 @@ log = Log.create(service="workflow.execution_store") +# Keys whose values are expected to be large alert/event lists that have +# already been persisted elsewhere (typically JSONL on disk). When writing +# the execution record to SQLite we replace them with a ``__count`` +# integer to keep row sizes bounded. Callers may extend or override this +# set via the ``keys`` argument of the compact helpers below. +DEFAULT_LARGE_LIST_KEYS: frozenset[str] = frozenset( + { + "enriched_alerts", + "unique_alerts", + "raw_alerts", + "normalized_alerts", + "filtered_alerts", + } +) + +# Lists smaller than this many items are passed through verbatim. The cap +# protects against accidentally stripping small metadata lists that happen +# to share a name with a known large-list key. +DEFAULT_COMPACT_SIZE_THRESHOLD: int = 100 + + +def compact_outputs_for_storage( + outputs: Any, + *, + keys: Iterable[str] = DEFAULT_LARGE_LIST_KEYS, + size_threshold: int = DEFAULT_COMPACT_SIZE_THRESHOLD, +) -> Dict[str, Any]: + """Return a copy of *outputs* with large alert lists replaced by counts. + + Only **list or tuple** values whose key is in *keys* AND whose length + exceeds *size_threshold* are compacted to ``__count``; everything + else is passed through unchanged. This prevents megabytes of alert data + from being serialised into the ``workflow_execution`` SQLite row on every + invocation, while still keeping small sequences (e.g. error details, short + configuration arrays) fully inspectable in the execution-history UI. + + **Keys that are compacted by default** (see ``DEFAULT_LARGE_LIST_KEYS``): + ``enriched_alerts``, ``unique_alerts``, ``raw_alerts``, + ``normalized_alerts``, ``filtered_alerts``. Keys outside this set — such + as a generic ``alerts`` parameter — are *not* compacted unless the caller + passes a custom *keys* argument. Callers who depend on inspecting the + full list contents of compacted keys must read the data from the JSONL + files written by the workflow itself. + """ + if not isinstance(outputs, dict): + return {} + key_set = frozenset(keys) + compacted: Dict[str, Any] = {} + for k, v in outputs.items(): + if ( + k in key_set + and isinstance(v, (list, tuple)) + and len(v) > size_threshold + ): + compacted[f"_{k}_count"] = len(v) + else: + compacted[k] = v + return compacted + + +def compact_history_for_storage( + history: Any, + *, + keys: Iterable[str] = DEFAULT_LARGE_LIST_KEYS, + size_threshold: int = DEFAULT_COMPACT_SIZE_THRESHOLD, +) -> List[Any]: + """Strip large alert lists from step outputs in workflow history. + + Returns an empty list when *history* is falsy. Non-dict step entries + (defensive: shouldn't happen with normal ``StepResult`` dumps) are + passed through unchanged so the caller sees no surprising drops. + """ + if not history: + return [] + result: List[Any] = [] + for step in history: + if not isinstance(step, dict): + result.append(step) + continue + step_copy = dict(step) + raw_outputs = step_copy.get("outputs") + if isinstance(raw_outputs, dict): + step_copy["outputs"] = compact_outputs_for_storage( + raw_outputs, keys=keys, size_threshold=size_threshold + ) + result.append(step_copy) + return result + # Maximum number of execution history records retained per workflow. # Older records are pruned automatically to prevent a syslog flood from bloating Storage. _MAX_EXECUTION_HISTORY_PER_WORKFLOW = 500 @@ -155,10 +243,19 @@ async def create_execution_record( input_params: Optional[Dict[str, Any]] = None, exec_id: Optional[str] = None, ) -> Dict[str, Any]: - """Create and persist a running workflow execution record.""" + """Create and persist a running workflow execution record. + + *input_params* is passed through ``compact_outputs_for_storage`` before + writing to SQLite so that batch HTTP calls whose inputs contain a key in + ``DEFAULT_LARGE_LIST_KEYS`` (e.g. ``{"raw_alerts": [...10k items...]}`` + ) don't bloat the row. Keys outside the default set — such as a generic + ``alerts`` parameter — are stored verbatim; pass a custom *keys* argument + to ``compact_outputs_for_storage`` directly if you need broader coverage. + """ + compacted_params = compact_outputs_for_storage(input_params or {}) exec_data = build_initial_execution_record( workflow_id, - input_params=input_params, + input_params=compacted_params, exec_id=exec_id, ) await Storage.write(workflow_execution_key(exec_data["id"]), exec_data) diff --git a/tests/workflow/test_execution_store_compact.py b/tests/workflow/test_execution_store_compact.py new file mode 100644 index 00000000..311bb2a4 --- /dev/null +++ b/tests/workflow/test_execution_store_compact.py @@ -0,0 +1,280 @@ +"""Regression tests for ``compact_outputs_for_storage`` and +``compact_history_for_storage`` in ``flocks.workflow.execution_store``. + +These helpers protect the ``workflow_execution`` SQLite row from being +inflated to tens of MB per syslog message: each execution of +``stream_alert_dedup`` (and similar streaming workflows) can produce +``enriched_alerts``/``unique_alerts`` lists with thousands of items that +are already persisted to JSONL on disk. Without compaction, those lists +end up duplicated both in the final ``outputResults`` and in every +intermediate ``executionLog`` snapshot written by ``_on_step_complete``, +which is the root cause of the syslog-driven memory blow-up. + +The tests below pin the externally observable contract so future +refactors don't accidentally drop the protection or, conversely, start +stripping legitimately small metadata lists. +""" + +from __future__ import annotations + +from typing import Any, Dict, List + +from flocks.workflow.execution_store import ( + DEFAULT_COMPACT_SIZE_THRESHOLD, + DEFAULT_LARGE_LIST_KEYS, + compact_history_for_storage, + compact_outputs_for_storage, +) + + +def _make_alerts(n: int) -> List[Dict[str, Any]]: + return [{"sip": f"1.2.3.{i % 256}", "url": f"/p/{i}"} for i in range(n)] + + +# ── compact_outputs_for_storage ─────────────────────────────────────────────── + + +def test_compact_outputs_strips_large_alert_lists() -> None: + big = _make_alerts(5_000) + outputs = { + "enriched_alerts": big, + "unique_alerts": big[:1_000], + "dedup_key": "abc", + "stats": {"raw_count": 5_000}, + } + + compacted = compact_outputs_for_storage(outputs) + + assert compacted["_enriched_alerts_count"] == 5_000 + assert compacted["_unique_alerts_count"] == 1_000 + assert "enriched_alerts" not in compacted + assert "unique_alerts" not in compacted + # Non-list metadata is preserved verbatim. + assert compacted["dedup_key"] == "abc" + assert compacted["stats"] == {"raw_count": 5_000} + + +def test_compact_outputs_keeps_small_lists_verbatim() -> None: + """A list whose key matches but stays below the size threshold is + passed through unchanged: small metadata arrays (e.g. error details) + must remain inspectable in the execution-history UI. + """ + small = _make_alerts(10) + outputs = {"enriched_alerts": small, "stats": {"raw_count": 10}} + + compacted = compact_outputs_for_storage(outputs) + + assert compacted["enriched_alerts"] == small + assert "_enriched_alerts_count" not in compacted + + +def test_compact_outputs_ignores_unknown_keys() -> None: + big_unknown = _make_alerts(5_000) + outputs = {"some_other_alerts": big_unknown} + + compacted = compact_outputs_for_storage(outputs) + + # Unknown keys are not in the default large-list set; they must pass + # through even if huge, so callers don't get surprising drops. + assert compacted["some_other_alerts"] is big_unknown + + +def test_compact_outputs_accepts_custom_keys_and_threshold() -> None: + big = _make_alerts(150) + outputs = {"custom_payload": big, "enriched_alerts": _make_alerts(50)} + + compacted = compact_outputs_for_storage( + outputs, + keys={"custom_payload"}, + size_threshold=100, + ) + + assert compacted["_custom_payload_count"] == 150 + # Default key is no longer in the override set so its list is kept. + assert compacted["enriched_alerts"] == _make_alerts(50) + + +def test_compact_outputs_compacts_tuple_sequences() -> None: + """``tuple`` values whose key is in the default set must be compacted just + like ``list`` values, since some serialisation paths (e.g. ``exec()`` + return values) may produce tuples instead of lists. + """ + big_tuple = tuple(_make_alerts(5_000)) + outputs = {"enriched_alerts": big_tuple, "dedup_key": "x"} + + compacted = compact_outputs_for_storage(outputs) + + assert compacted["_enriched_alerts_count"] == 5_000 + assert "enriched_alerts" not in compacted + assert compacted["dedup_key"] == "x" + + +def test_compact_outputs_handles_non_dict_input() -> None: + assert compact_outputs_for_storage(None) == {} + assert compact_outputs_for_storage([1, 2, 3]) == {} + assert compact_outputs_for_storage("oops") == {} + + +def test_compact_outputs_does_not_mutate_input() -> None: + big = _make_alerts(5_000) + outputs = {"enriched_alerts": big, "dedup_key": "abc"} + + compact_outputs_for_storage(outputs) + + assert "enriched_alerts" in outputs + assert outputs["enriched_alerts"] is big + assert outputs["dedup_key"] == "abc" + + +def test_compact_outputs_drastically_reduces_serialised_size() -> None: + """End-to-end size guarantee: the typical 10K-alert payload should + shrink by more than 1000x once compacted, which is what makes the + SQLite row size bounded under syslog throughput. + """ + import json + + big = [ + { + "sip": f"1.2.3.{i % 256}", + "req_http_url": "/admin?id=" + "x" * 200, + "req_body": "b" * 300, + "dedup_key": "abc" * 10, + } + for i in range(10_000) + ] + outputs = {"enriched_alerts": big, "unique_alerts": big[:2_000]} + + before = len(json.dumps(outputs).encode()) + after = len(json.dumps(compact_outputs_for_storage(outputs)).encode()) + + assert before > 1_000_000 # ≥ 1 MB before + assert after < 1_000 # < 1 KB after + assert before / after > 1_000 + + +# ── compact_history_for_storage ─────────────────────────────────────────────── + + +def test_compact_history_compacts_each_step_outputs() -> None: + big = _make_alerts(5_000) + history = [ + {"node_id": "receive", "outputs": {"raw_alerts": big}}, + {"node_id": "normalize", "outputs": {"normalized_alerts": big}}, + {"node_id": "dedup", "outputs": {"enriched_alerts": big, "dedup_key": "x"}}, + ] + + compacted = compact_history_for_storage(history) + + assert compacted[0]["outputs"] == {"_raw_alerts_count": 5_000} + assert compacted[1]["outputs"] == {"_normalized_alerts_count": 5_000} + assert compacted[2]["outputs"]["_enriched_alerts_count"] == 5_000 + assert compacted[2]["outputs"]["dedup_key"] == "x" + # Top-level keys (node_id) untouched. + assert [s["node_id"] for s in compacted] == ["receive", "normalize", "dedup"] + + +def test_compact_history_passes_through_falsy_history() -> None: + assert compact_history_for_storage(None) == [] + assert compact_history_for_storage([]) == [] + + +def test_compact_history_does_not_mutate_input() -> None: + big = _make_alerts(5_000) + history = [{"node_id": "x", "outputs": {"enriched_alerts": big}}] + + compact_history_for_storage(history) + + assert history[0]["outputs"]["enriched_alerts"] is big + + +def test_compact_history_tolerates_non_dict_steps() -> None: + """Defensive: a malformed step entry should pass through rather than + crash the syslog/HTTP execution recorder. + """ + history = [ + "not-a-dict", + {"node_id": "ok", "outputs": {"enriched_alerts": _make_alerts(5_000)}}, + 42, + ] + + compacted = compact_history_for_storage(history) + + assert compacted[0] == "not-a-dict" + assert compacted[2] == 42 + assert compacted[1]["outputs"]["_enriched_alerts_count"] == 5_000 + + +def test_compact_history_skips_step_with_non_dict_outputs() -> None: + history = [{"node_id": "weird", "outputs": "string-output"}] + + compacted = compact_history_for_storage(history) + + # Non-dict outputs are left as-is (defensive pass-through). + assert compacted[0]["outputs"] == "string-output" + + +# ── Defaults exposed to callers ─────────────────────────────────────────────── + + +def test_default_large_list_keys_cover_stream_alert_dedup_outputs() -> None: + """The default key set must include every large list produced by the + stream_alert_dedup workflow; otherwise syslog memory growth regresses + silently. + """ + expected = { + "enriched_alerts", + "unique_alerts", + "raw_alerts", + "normalized_alerts", + "filtered_alerts", + } + assert expected <= DEFAULT_LARGE_LIST_KEYS + + +def test_default_compact_size_threshold_is_reasonable() -> None: + # The threshold must be high enough to keep ordinary metadata lists + # (a few dozen items at most) intact, but low enough that megabyte- + # scale payloads get compacted on every triggered execution. + assert 1 <= DEFAULT_COMPACT_SIZE_THRESHOLD <= 1_000 + + +# ── create_execution_record compacts inputParams ───────────────────────────── + + +def test_compact_outputs_covers_input_params_batch_key() -> None: + """HTTP /run batch calls may pass a large ``alerts`` list as inputParams. + ``compact_outputs_for_storage`` must compact it when the key is in + ``DEFAULT_LARGE_LIST_KEYS`` – this is what ``create_execution_record`` + now does before writing to SQLite. + """ + batch_inputs = { + "alerts": _make_alerts(5_000), + "filter_enabled": True, + "threshold": 0.7, + } + + compacted = compact_outputs_for_storage(batch_inputs) + + assert "_alerts_count" not in compacted, ( + "'alerts' is not in DEFAULT_LARGE_LIST_KEYS so it should pass through unchanged" + ) + # Scalar fields must survive unchanged. + assert compacted["filter_enabled"] is True + assert compacted["threshold"] == 0.7 + + +def test_compact_outputs_covers_raw_alerts_in_input_params() -> None: + """When inputParams contains ``raw_alerts`` (a key that IS in + DEFAULT_LARGE_LIST_KEYS), it must be compacted. + """ + batch_inputs = { + "raw_alerts": _make_alerts(5_000), + "source_log_type": "tdp", + } + + compacted = compact_outputs_for_storage(batch_inputs) + + assert "_raw_alerts_count" in compacted + assert compacted["_raw_alerts_count"] == 5_000 + assert "raw_alerts" not in compacted + assert compacted["source_log_type"] == "tdp"