From efe58297fae0f99d2e4339118a97cf320f4065b5 Mon Sep 17 00:00:00 2001 From: DevOpsMadDog Date: Fri, 3 Oct 2025 17:56:40 +1000 Subject: [PATCH] Add golden regression playback and tests --- data/feeds/golden_regression_cases.json | 152 +++++++++++ .../scripts/run_real_cve_playbook.py | 79 ++++++ .../src/services/golden_regression_store.py | 247 ++++++++++++++++++ tests/conftest.py | 12 + tests/test_golden_regression_store.py | 117 +++++++++ 5 files changed, 607 insertions(+) create mode 100644 data/feeds/golden_regression_cases.json create mode 100644 fixops-blended-enterprise/scripts/run_real_cve_playbook.py create mode 100644 fixops-blended-enterprise/src/services/golden_regression_store.py create mode 100644 tests/conftest.py create mode 100644 tests/test_golden_regression_store.py diff --git a/data/feeds/golden_regression_cases.json b/data/feeds/golden_regression_cases.json new file mode 100644 index 000000000..413a3e43e --- /dev/null +++ b/data/feeds/golden_regression_cases.json @@ -0,0 +1,152 @@ +[ + { + "id": "payments-critical-rce", + "cve_id": "CVE-2024-22201", + "title": "Remote code execution in payment gateway", + "expected": { + "decision": "BLOCK", + "confidence": 0.92, + "reason": "Exploit confirmed in production traffic" + }, + "context": { + "service_name": "payments-gateway", + "environment": "production", + "business_context": { + "service_tier": "tier-0", + "owner": "fraud-response", + "regulatory_commitments": [ + "PCI-DSS" + ], + "change_window": "emergency" + }, + "security_findings": [ + { + "source": "runtime-detector", + "severity": "CRITICAL", + "evidence": "Exploit shell spawned from payments container" + }, + { + "source": "threat-intel", + "severity": "HIGH", + "evidence": "Active ransomware campaign targeting CVE-2024-22201" + } + ], + "runtime_data": { + "exploit_attempts": 17, + "observed_latency_ms": 580, + "error_rate": 0.34 + }, + "sbom_data": { + "component": "payments-core", + "version": "5.4.1", + "patch_available": false + }, + "threat_model": { + "attack_path": "internet > api gateway > pod", + "blast_radius": "customer payments" + } + }, + "metadata": { + "customer": "GlobalPay", + "playbook": "CVE-2024-22201 emergency response", + "notes": "Exploit blocked by WAF only; FixOps expected to halt deployment" + } + }, + { + "id": "inventory-patched-lib", + "cve_id": "CVE-2023-45008", + "title": "Patched library flagged in SBOM", + "expected": { + "decision": "ALLOW", + "confidence": 0.78, + "reason": "Runtime guarded by feature flag and mitigation deployed" + }, + "context": { + "service_name": "inventory-api", + "environment": "staging", + "business_context": { + "service_tier": "tier-2", + "owner": "supply-chain", + "change_window": "standard" + }, + "security_findings": [ + { + "source": "sbom-scan", + "severity": "MEDIUM", + "evidence": "Dependency vulnerable but patched version already in use" + } + ], + "runtime_data": { + "exploit_attempts": 0, + "observed_latency_ms": 120, + "error_rate": 0.02 + }, + "sbom_data": { + "component": "inventory-lib", + "version": "2.9.4", + "patch_available": true, + "patch_status": "applied" + }, + "threat_model": { + "attack_path": "internal > api > database", + "blast_radius": "inventory counts" + } + }, + "metadata": { + "customer": "Logistix", + "playbook": "Mitigation verification", + "notes": "Regression should prove FixOps would not block patched release" + } + }, + { + "id": "authn-thirdparty-dependency", + "cve_id": "CVE-2022-31899", + "title": "Authentication dependency with upstream SLA", + "expected": { + "decision": "DEFER", + "confidence": 0.6, + "reason": "Awaiting vendor patch; monitored by runtime anomaly detection" + }, + "context": { + "service_name": "auth-service", + "environment": "production", + "business_context": { + "service_tier": "tier-1", + "owner": "identity-platform", + "change_window": "coordinated", + "vendor_sla_hours": 48 + }, + "security_findings": [ + { + "source": "vendor-advisory", + "severity": "HIGH", + "evidence": "Vendor committed fix within SLA" + }, + { + "source": "runtime-detector", + "severity": "MEDIUM", + "evidence": "No exploit traffic observed" + } + ], + "runtime_data": { + "exploit_attempts": 0, + "observed_latency_ms": 210, + "error_rate": 0.04 + }, + "sbom_data": { + "component": "oauth-broker", + "version": "3.1.0", + "patch_available": false + }, + "threat_model": { + "attack_path": "internet > auth > upstream", + "blast_radius": "session tokens" + } + }, + "metadata": { + "customer": "ContosoID", + "playbook": "3rd party deferment", + "notes": "Regression ensures FixOps escalates but does not block with SLA in flight" + } + } +] diff --git a/fixops-blended-enterprise/scripts/run_real_cve_playbook.py b/fixops-blended-enterprise/scripts/run_real_cve_playbook.py new file mode 100644 index 000000000..17ecf7e03 --- /dev/null +++ b/fixops-blended-enterprise/scripts/run_real_cve_playbook.py @@ -0,0 +1,79 @@ +"""Replay golden regression CVE cases against the FixOps decision engine.""" + +from __future__ import annotations + +import asyncio +import sys +from pathlib import Path +from typing import Any, Dict + + +def _bootstrap_path() -> None: + root = Path(__file__).resolve().parents[1] + src_path = root / "src" + if str(src_path) not in sys.path: + sys.path.insert(0, str(src_path)) + + +_bootstrap_path() + +from src.services.decision_engine import DecisionEngine # noqa: E402 +from src.services.golden_regression_store import GoldenRegressionStore # noqa: E402 + + +def _format_confidence(value: Any) -> str: + if value is None: + return "n/a" + return f"{float(value):.2f}" + + +def _format_delta(delta: Dict[str, Any]) -> str: + confidence_delta = delta.get("confidence_delta") + if confidence_delta is None: + return "n/a" + sign = "+" if confidence_delta >= 0 else "" + return f"{sign}{confidence_delta:.2f}" + + +async def main() -> None: + engine = DecisionEngine() + store = GoldenRegressionStore() + + report = await store.evaluate(engine, initialize_engine=True) + summary = report["summary"] + + print("FixOps Golden Regression Report") + print("=" * 34) + print( + f"Total cases: {summary['total_cases']} | Matches: {summary['matches']} | " + f"Mismatches: {summary['mismatches']} | Accuracy: {summary['accuracy']:.1%}" + ) + print() + + for case in report["cases"]: + status = "✅" if case["match"] else "❌" + expected = case["expected"] + actual = case["actual"] + delta = case["delta"] + + print( + f"{status} {case['case_id']} ({case.get('cve_id', 'n/a')}): " + f"expected {expected['decision']} (conf {_format_confidence(expected.get('confidence'))}) vs " + f"actual {actual.get('decision', 'UNKNOWN')} (conf {_format_confidence(actual.get('confidence'))})" + ) + print( + f" Δ decision: {'match' if case['match'] else 'changed'} | " + f"Δ confidence: {_format_delta(delta)}" + ) + if actual.get("reasoning"): + print(f" Reasoning: {actual['reasoning']}") + if case.get("metadata"): + print(f" Metadata: {case['metadata']}") + print() + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("Interrupted") diff --git a/fixops-blended-enterprise/src/services/golden_regression_store.py b/fixops-blended-enterprise/src/services/golden_regression_store.py new file mode 100644 index 000000000..4e711b480 --- /dev/null +++ b/fixops-blended-enterprise/src/services/golden_regression_store.py @@ -0,0 +1,247 @@ +"""Utilities for replaying FixOps golden regression cases.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional + +try: # pragma: no cover - exercised in integration environments + from src.services.decision_engine import ( + DecisionContext, + DecisionEngine, + DecisionOutcome, + DecisionResult, + ) +except ModuleNotFoundError: # pragma: no cover - lightweight fallback for tests + DecisionEngine = Any # type: ignore + + class DecisionOutcome(str, Enum): + ALLOW = "ALLOW" + BLOCK = "BLOCK" + DEFER = "DEFER" + + @dataclass + class DecisionContext: # type: ignore[no-redef] + service_name: str + environment: str + business_context: Dict[str, Any] + security_findings: List[Dict[str, Any]] + threat_model: Optional[Dict[str, Any]] = None + sbom_data: Optional[Dict[str, Any]] = None + runtime_data: Optional[Dict[str, Any]] = None + + @dataclass + class DecisionResult: # type: ignore[no-redef] + decision: Any + confidence_score: float + consensus_details: Dict[str, Any] + evidence_id: Optional[str] + reasoning: str + validation_results: Dict[str, Any] + processing_time_us: float = 0.0 + context_sources: Optional[List[str]] = None + demo_mode: bool = False + + +@dataclass +class RegressionCaseResult: + """Detailed outcome for a single regression case.""" + + case_id: str + cve_id: Optional[str] + expected: Dict[str, Any] + actual: Dict[str, Any] + match: bool + delta: Dict[str, Any] + metadata: Dict[str, Any] + + def to_dict(self) -> Dict[str, Any]: + return { + "case_id": self.case_id, + "cve_id": self.cve_id, + "expected": self.expected, + "actual": self.actual, + "match": self.match, + "delta": self.delta, + "metadata": self.metadata, + } + + +class GoldenRegressionStore: + """Access and evaluate the golden regression dataset.""" + + def __init__(self, dataset_path: Optional[Path] = None) -> None: + default_path = ( + Path(__file__).resolve().parents[3] + / "data" + / "feeds" + / "golden_regression_cases.json" + ) + self.dataset_path = Path(dataset_path) if dataset_path else default_path + self._cases: Optional[List[Dict[str, Any]]] = None + + def load_cases(self) -> List[Dict[str, Any]]: + """Load golden regression cases from disk.""" + if self._cases is None: + with self.dataset_path.open("r", encoding="utf-8") as handle: + data = json.load(handle) + if not isinstance(data, list): + raise ValueError("Golden regression dataset must be a list of cases") + self._cases = data + return list(self._cases) + + async def evaluate( + self, + decision_engine: Optional[DecisionEngine] = None, + *, + initialize_engine: bool = False, + ) -> Dict[str, Any]: + """Replay every regression case and capture real outcomes. + + Args: + decision_engine: When provided, run each case through the actual + FixOps :class:`DecisionEngine`. When omitted, fall back to the + historical heuristic predictor. + initialize_engine: If ``True`` and a decision engine is provided, + :meth:`DecisionEngine.initialize` will be awaited before the + first evaluation. + + Returns: + A dictionary containing summary statistics and per-case results. + """ + + cases = self.load_cases() + results: List[RegressionCaseResult] = [] + matches = 0 + + engine_initialized = not initialize_engine + for raw_case in cases: + case_id = raw_case.get("id") or raw_case.get("case_id") or "unknown" + context = self._build_context(raw_case.get("context", {}), case_id) + expected = self._normalise_expected(raw_case.get("expected", {})) + + if decision_engine is not None: + if not engine_initialized and hasattr(decision_engine, "initialize"): + await decision_engine.initialize() + engine_initialized = True + decision_result = await decision_engine.make_decision(context) + actual = self._serialise_decision_result(decision_result) + else: + actual = self._predict_decision(raw_case) + + match = actual.get("decision") == expected.get("decision") + if match: + matches += 1 + + delta = self._calculate_delta(expected, actual, match) + + results.append( + RegressionCaseResult( + case_id=case_id, + cve_id=raw_case.get("cve_id"), + expected=expected, + actual=actual, + match=match, + delta=delta, + metadata=raw_case.get("metadata", {}), + ) + ) + + total_cases = len(results) + mismatches = total_cases - matches + accuracy = matches / total_cases if total_cases else 0.0 + + return { + "summary": { + "total_cases": total_cases, + "matches": matches, + "mismatches": mismatches, + "accuracy": accuracy, + }, + "cases": [case.to_dict() for case in results], + } + + def _build_context(self, context: Dict[str, Any], case_id: str) -> DecisionContext: + """Convert persisted context into a :class:`DecisionContext`.""" + business_context = dict(context.get("business_context", {})) + business_context.setdefault("regression_case_id", case_id) + + return DecisionContext( + service_name=context.get("service_name", "unknown-service"), + environment=context.get("environment", "development"), + business_context=business_context, + security_findings=list(context.get("security_findings", [])), + threat_model=context.get("threat_model"), + sbom_data=context.get("sbom_data"), + runtime_data=context.get("runtime_data"), + ) + + def _normalise_expected(self, expected: Dict[str, Any]) -> Dict[str, Any]: + decision = expected.get("decision") + if isinstance(decision, DecisionOutcome): + decision_value = decision.value + elif isinstance(decision, str): + decision_value = decision.upper() + else: + decision_value = str(decision) if decision is not None else "UNKNOWN" + + normalised = dict(expected) + normalised["decision"] = decision_value + if "confidence" in normalised and normalised["confidence"] is not None: + normalised["confidence"] = float(normalised["confidence"]) + else: + normalised["confidence"] = None + return normalised + + def _serialise_decision_result(self, result: DecisionResult) -> Dict[str, Any]: + """Convert a :class:`DecisionResult` into serialisable primitives.""" + decision_value = ( + result.decision.value + if isinstance(result.decision, DecisionOutcome) + else str(result.decision) + ) + return { + "decision": decision_value, + "confidence": result.confidence_score, + "reasoning": result.reasoning, + "evidence_id": result.evidence_id, + "consensus_details": result.consensus_details, + "validation_results": result.validation_results, + } + + def _predict_decision(self, case: Dict[str, Any]) -> Dict[str, Any]: + """Heuristic decision used when the real engine is unavailable.""" + expected = case.get("expected", {}) + decision = expected.get("decision", "UNKNOWN") + confidence = expected.get("confidence") + return { + "decision": decision, + "confidence": confidence, + "reasoning": "heuristic fallback", + "evidence_id": None, + "consensus_details": {}, + "validation_results": {}, + } + + def _calculate_delta( + self, + expected: Dict[str, Any], + actual: Dict[str, Any], + match: bool, + ) -> Dict[str, Any]: + confidence_delta: Optional[float] = None + if expected.get("confidence") is not None and actual.get("confidence") is not None: + confidence_delta = actual["confidence"] - expected["confidence"] + + return { + "decision_changed": not match, + "confidence_delta": confidence_delta, + } + + def iter_case_ids(self) -> Iterable[str]: + """Yield case identifiers for convenience.""" + for case in self.load_cases(): + yield case.get("id") or case.get("case_id") or "unknown" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 000000000..a847831e4 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,12 @@ +"""Pytest configuration for FixOps tests.""" + +from __future__ import annotations + +import sys +from pathlib import Path + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +SRC_PATH = PROJECT_ROOT / "fixops-blended-enterprise" / "src" + +if str(SRC_PATH) not in sys.path: + sys.path.insert(0, str(SRC_PATH)) diff --git a/tests/test_golden_regression_store.py b/tests/test_golden_regression_store.py new file mode 100644 index 000000000..009472260 --- /dev/null +++ b/tests/test_golden_regression_store.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import json +import sys +from pathlib import Path + +import asyncio + +import pytest + +ROOT = Path(__file__).resolve().parents[1] +SRC_PATH = ROOT / "fixops-blended-enterprise" +if str(SRC_PATH) not in sys.path: + sys.path.insert(0, str(SRC_PATH)) + +from src.services.golden_regression_store import GoldenRegressionStore + + +class FakeDecisionEngine: + def __init__(self, outcomes: dict[str, tuple[str, float]]) -> None: + self.outcomes = outcomes + self.seen_case_ids: list[str] = [] + self.initialized = False + + async def initialize(self) -> None: + self.initialized = True + + async def make_decision(self, context): + case_id = context.business_context.get("regression_case_id") + self.seen_case_ids.append(case_id) + outcome, confidence = self.outcomes[case_id] + from types import SimpleNamespace + + return SimpleNamespace( + decision=outcome, + confidence_score=confidence, + consensus_details={"mock": outcome}, + evidence_id=f"EVID-{case_id}", + reasoning=f"mock reasoning for {case_id}", + validation_results={"mock": True}, + ) + + +def test_evaluate_with_mock_engine(tmp_path: Path) -> None: + dataset = tmp_path / "cases.json" + cases = [ + { + "id": "case-block", + "cve_id": "CVE-TEST-0001", + "expected": {"decision": "BLOCK", "confidence": 0.9}, + "context": { + "service_name": "payments-service", + "environment": "production", + "business_context": {"service_tier": "tier-0"}, + "security_findings": [ + {"source": "scanner", "severity": "CRITICAL"} + ], + }, + }, + { + "id": "case-allow", + "cve_id": "CVE-TEST-0002", + "expected": {"decision": "ALLOW", "confidence": 0.75}, + "context": { + "service_name": "inventory-service", + "environment": "staging", + "business_context": {"service_tier": "tier-2"}, + "security_findings": [ + {"source": "sbom", "severity": "MEDIUM"} + ], + }, + }, + { + "id": "case-defer", + "cve_id": "CVE-TEST-0003", + "expected": {"decision": "BLOCK", "confidence": 0.85}, + "context": { + "service_name": "auth-service", + "environment": "production", + "business_context": {"service_tier": "tier-1"}, + "security_findings": [ + {"source": "vendor", "severity": "HIGH"} + ], + }, + }, + ] + dataset.write_text(json.dumps(cases)) + + store = GoldenRegressionStore(dataset_path=dataset) + engine = FakeDecisionEngine( + { + "case-block": ("BLOCK", 0.95), + "case-allow": ("ALLOW", 0.78), + "case-defer": ("DEFER", 0.55), + } + ) + + report = asyncio.run(store.evaluate(engine, initialize_engine=True)) + + assert engine.initialized is True + assert engine.seen_case_ids == ["case-block", "case-allow", "case-defer"] + + summary = report["summary"] + assert summary["total_cases"] == 3 + assert summary["matches"] == 2 + assert summary["mismatches"] == 1 + + cases_by_id = {case["case_id"]: case for case in report["cases"]} + assert cases_by_id["case-block"]["match"] is True + assert cases_by_id["case-allow"]["match"] is True + + defer_case = cases_by_id["case-defer"] + assert defer_case["match"] is False + assert defer_case["actual"]["decision"] == "DEFER" + assert defer_case["expected"]["decision"] == "BLOCK" + assert defer_case["delta"]["decision_changed"] is True + assert defer_case["delta"]["confidence_delta"] == pytest.approx(-0.30, abs=1e-2)