Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions flocks/ingest/syslog/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from flocks.storage.storage import Storage
from flocks.utils.log import Log
from flocks.workflow.execution_store import (
compact_history_for_storage,
compact_outputs_for_storage,
create_execution_record,
record_execution_result,
resolve_execution_outcome,
Expand All @@ -21,6 +23,7 @@

log = Log.create(service="syslog.manager")


# Maximum concurrent workflow executions per workflow to avoid FD exhaustion and SQLite write contention
_MAX_CONCURRENT_EXECUTIONS = 8
# Maximum number of buffered syslog messages per workflow; excess messages are dropped with a warning.
Expand Down Expand Up @@ -468,11 +471,11 @@ async def _trigger_workflow(
duration = time.time() - start_time
exec_data.update({
"status": status,
"outputResults": result.outputs if isinstance(result.outputs, dict) else {},
"outputResults": compact_outputs_for_storage(result.outputs),
"finishedAt": int(time.time() * 1000),
"duration": duration,
"errorMessage": error_msg,
"executionLog": list(result.history or []),
"executionLog": compact_history_for_storage(result.history),
"currentNodeId": result.last_node_id,
"currentPhase": status,
"currentStepIndex": result.steps,
Expand Down
28 changes: 25 additions & 3 deletions flocks/server/routes/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
)
from flocks.ingest.syslog.constants import WORKFLOW_SYSLOG_CONFIG_PREFIX
from flocks.workflow.execution_store import (
compact_history_for_storage,
compact_outputs_for_storage,
create_execution_record,
normalize_execution_status as _normalize_execution_status,
record_execution_result as _record_execution_result,
Expand Down Expand Up @@ -499,7 +501,15 @@ def _on_step_start(_run_id, step_index, node, _inputs):
return step_index

def _on_step_complete(step_result) -> None:
# Compact each step's outputs *before* appending so the running
# ``step_history`` (and every subsequent ``_write_progress`` snapshot
# that ships it to SQLite) stays bounded, even when a workflow node
# returns tens of thousands of alerts that are already persisted to
# JSONL on disk.
step_dict = step_result.model_dump(mode="json")
raw_outputs = step_dict.get("outputs")
if isinstance(raw_outputs, dict):
step_dict["outputs"] = compact_outputs_for_storage(raw_outputs)
step_history.append(step_dict)
_write_progress({
"executionLog": list(step_history),
Expand Down Expand Up @@ -530,12 +540,17 @@ def _on_step_complete(step_result) -> None:
# status write still succeeds rather than blowing up.
current_data = {"id": exec_id, "workflowId": workflow_id}
status_value, error_message = _resolve_execution_outcome(result)
# ``result.history`` is the engine-side authoritative history (not
# yet compacted), while ``step_history`` was already compacted in
# ``_on_step_complete``. Prefer the former when available, then
# run it through ``compact_history_for_storage`` so the persisted
# row stays small in either branch.
current_data.update({
"outputResults": result.outputs,
"outputResults": compact_outputs_for_storage(result.outputs),
"status": status_value,
"finishedAt": int(time.time() * 1000),
"duration": duration,
"executionLog": result.history or list(step_history),
"executionLog": compact_history_for_storage(result.history) or list(step_history),
"errorMessage": error_message,
"currentNodeId": result.last_node_id,
"currentNodeType": current_data.get("currentNodeType"),
Expand Down Expand Up @@ -1072,8 +1087,15 @@ async def workflow_center_invoke(workflow_id: str, req: WorkflowCenterInvokeRequ
raw_status = result.get("status", "SUCCEEDED") if isinstance(result, dict) else "SUCCEEDED"
status_value = _normalize_execution_status(raw_status)
success = status_value == "success"
# workflow_center_invoke proxies to an external published service; no
# step callbacks run locally so executionLog stays as the empty list
# set by create_execution_record. We still run compact_history here
# as a forward-compatible guard in case a future code path populates it.
exec_data.update({
"outputResults": result.get("outputs", {}) if isinstance(result, dict) else {},
"outputResults": compact_outputs_for_storage(
result.get("outputs", {}) if isinstance(result, dict) else {}
),
"executionLog": compact_history_for_storage(exec_data.get("executionLog")),
"status": status_value,
"finishedAt": int(time.time() * 1000),
"duration": duration,
Expand Down
103 changes: 100 additions & 3 deletions flocks/workflow/execution_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import asyncio
import time
import uuid
from typing import Any, Dict, Optional
from typing import Any, Dict, Iterable, List, Optional

from flocks.session.recorder import Recorder
from flocks.storage.storage import Storage
Expand All @@ -14,6 +14,94 @@

log = Log.create(service="workflow.execution_store")

# Keys whose values are expected to be large alert/event lists that have
# already been persisted elsewhere (typically JSONL on disk). When writing
# the execution record to SQLite we replace them with a ``_<key>_count``
# integer to keep row sizes bounded. Callers may extend or override this
# set via the ``keys`` argument of the compact helpers below.
DEFAULT_LARGE_LIST_KEYS: frozenset[str] = frozenset(
{
"enriched_alerts",
"unique_alerts",
"raw_alerts",
"normalized_alerts",
"filtered_alerts",
}
)

# Lists smaller than this many items are passed through verbatim. The cap
# protects against accidentally stripping small metadata lists that happen
# to share a name with a known large-list key.
DEFAULT_COMPACT_SIZE_THRESHOLD: int = 100


def compact_outputs_for_storage(
outputs: Any,
*,
keys: Iterable[str] = DEFAULT_LARGE_LIST_KEYS,
size_threshold: int = DEFAULT_COMPACT_SIZE_THRESHOLD,
) -> Dict[str, Any]:
"""Return a copy of *outputs* with large alert lists replaced by counts.

Only **list or tuple** values whose key is in *keys* AND whose length
exceeds *size_threshold* are compacted to ``_<key>_count``; everything
else is passed through unchanged. This prevents megabytes of alert data
from being serialised into the ``workflow_execution`` SQLite row on every
invocation, while still keeping small sequences (e.g. error details, short
configuration arrays) fully inspectable in the execution-history UI.

**Keys that are compacted by default** (see ``DEFAULT_LARGE_LIST_KEYS``):
``enriched_alerts``, ``unique_alerts``, ``raw_alerts``,
``normalized_alerts``, ``filtered_alerts``. Keys outside this set — such
as a generic ``alerts`` parameter — are *not* compacted unless the caller
passes a custom *keys* argument. Callers who depend on inspecting the
full list contents of compacted keys must read the data from the JSONL
files written by the workflow itself.
"""
if not isinstance(outputs, dict):
return {}
key_set = frozenset(keys)
compacted: Dict[str, Any] = {}
for k, v in outputs.items():
if (
k in key_set
and isinstance(v, (list, tuple))
and len(v) > size_threshold
):
compacted[f"_{k}_count"] = len(v)
else:
compacted[k] = v
return compacted


def compact_history_for_storage(
history: Any,
*,
keys: Iterable[str] = DEFAULT_LARGE_LIST_KEYS,
size_threshold: int = DEFAULT_COMPACT_SIZE_THRESHOLD,
) -> List[Any]:
"""Strip large alert lists from step outputs in workflow history.

Returns an empty list when *history* is falsy. Non-dict step entries
(defensive: shouldn't happen with normal ``StepResult`` dumps) are
passed through unchanged so the caller sees no surprising drops.
"""
if not history:
return []
result: List[Any] = []
for step in history:
if not isinstance(step, dict):
result.append(step)
continue
step_copy = dict(step)
raw_outputs = step_copy.get("outputs")
if isinstance(raw_outputs, dict):
step_copy["outputs"] = compact_outputs_for_storage(
raw_outputs, keys=keys, size_threshold=size_threshold
)
result.append(step_copy)
return result

# Maximum number of execution history records retained per workflow.
# Older records are pruned automatically to prevent a syslog flood from bloating Storage.
_MAX_EXECUTION_HISTORY_PER_WORKFLOW = 500
Expand Down Expand Up @@ -155,10 +243,19 @@ async def create_execution_record(
input_params: Optional[Dict[str, Any]] = None,
exec_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Create and persist a running workflow execution record."""
"""Create and persist a running workflow execution record.

*input_params* is passed through ``compact_outputs_for_storage`` before
writing to SQLite so that batch HTTP calls whose inputs contain a key in
``DEFAULT_LARGE_LIST_KEYS`` (e.g. ``{"raw_alerts": [...10k items...]}``
) don't bloat the row. Keys outside the default set — such as a generic
``alerts`` parameter — are stored verbatim; pass a custom *keys* argument
to ``compact_outputs_for_storage`` directly if you need broader coverage.
"""
compacted_params = compact_outputs_for_storage(input_params or {})
exec_data = build_initial_execution_record(
workflow_id,
input_params=input_params,
input_params=compacted_params,
exec_id=exec_id,
)
await Storage.write(workflow_execution_key(exec_data["id"]), exec_data)
Expand Down
Loading