Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions scripts/group_inconclusive_uncertainty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""Group inconclusive IAMScope findings by uncertainty class for reporting only."""

from __future__ import annotations

import argparse
import json
import os
import sys
from collections import OrderedDict
from pathlib import Path
from typing import Any

REPO_ROOT = Path(__file__).resolve().parents[1]
TEST_OVERRIDE_ENV = "IAMSCOPE_ALLOW_REPO_OUTPUT_FOR_TESTS"


def _load_json(path: Path) -> dict[str, Any]:
with path.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"expected JSON object in {path}")
return payload


def _is_relative_to(path: Path, parent: Path) -> bool:
try:
path.relative_to(parent)
except ValueError:
return False
return True


def _reviewer_actions(expected_groups: Path | None) -> dict[str, str]:
if expected_groups is None:
return {}
payload = _load_json(expected_groups)
actions: dict[str, str] = {}
for group in payload.get("groups", []):
if not isinstance(group, dict):
continue
uncertainty_class = group.get("uncertainty_class")
reviewer_action = group.get("reviewer_action")
if isinstance(uncertainty_class, str) and isinstance(reviewer_action, str):
actions[uncertainty_class] = reviewer_action
return actions


def group_inconclusive_uncertainty(
findings_payload: dict[str, Any],
*,
reviewer_actions: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Return report-only grouping for inconclusive findings.

This function does not mutate findings, change verdicts, infer exploitability,
or make replay-equivalence claims.
"""

actions = reviewer_actions or {}
grouped: OrderedDict[str, list[str]] = OrderedDict()
for finding in findings_payload.get("findings", []):
if not isinstance(finding, dict):
continue
if finding.get("verdict") != "inconclusive":
continue
uncertainty_class = finding.get("uncertainty_class")
finding_id = finding.get("finding_id")
if not isinstance(uncertainty_class, str) or not uncertainty_class:
uncertainty_class = "uncertainty_class_missing"
if not isinstance(finding_id, str) or not finding_id:
continue
grouped.setdefault(uncertainty_class, []).append(finding_id)

group_details: list[dict[str, Any]] = []
counts: dict[str, int] = {}
for uncertainty_class, finding_ids in grouped.items():
detail: dict[str, Any] = {
"uncertainty_class": uncertainty_class,
"count": len(finding_ids),
"finding_ids": finding_ids,
}
if uncertainty_class in actions:
detail["reviewer_action"] = actions[uncertainty_class]
group_details.append(detail)
counts[uncertainty_class] = len(finding_ids)

top_class = None
top_count = 0
if group_details:
top = max(group_details, key=lambda item: item["count"])
top_class = top["uncertainty_class"]
top_count = top["count"]

return {
"fixture_id": findings_payload.get("fixture_id"),
"report_only": True,
"groups": counts,
"group_details": group_details,
"top_uncertainty_class": top_class,
"top_uncertainty_count": top_count,
"non_claims": {
"does_not_mutate_findings": True,
"does_not_change_verdicts": True,
"does_not_infer_exploitability": True,
"does_not_claim_replay_equivalence": True,
"requires_aws_credentials": False,
},
}


def _write_output(output_path: Path, text: str) -> None:
output_abs = output_path.resolve()
repo_abs = REPO_ROOT.resolve()
if _is_relative_to(output_abs, repo_abs) and os.environ.get(TEST_OVERRIDE_ENV) != "1":
raise ValueError(
f"refusing to write uncertainty grouping output inside repository tree: {output_abs}"
)
output_abs.parent.mkdir(parents=True, exist_ok=True)
output_abs.write_text(text, encoding="utf-8")


def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--findings", required=True, help="Path to findings.json")
parser.add_argument(
"--expected-groups",
default=None,
help="Optional expected groups JSON used only to enrich reviewer_action fields",
)
parser.add_argument("--out", default=None, help="Optional output JSON path; stdout if omitted")
args = parser.parse_args(argv)

findings_path = Path(args.findings)
expected_path = Path(args.expected_groups) if args.expected_groups else None
try:
payload = group_inconclusive_uncertainty(
_load_json(findings_path),
reviewer_actions=_reviewer_actions(expected_path),
)
text = json.dumps(payload, indent=2, sort_keys=True) + "\n"
if args.out:
_write_output(Path(args.out), text)
else:
print(text, end="")
except Exception as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
return 0


if __name__ == "__main__":
raise SystemExit(main())
104 changes: 104 additions & 0 deletions tests/test_group_inconclusive_uncertainty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from __future__ import annotations

import json
import subprocess
from pathlib import Path

from scripts.group_inconclusive_uncertainty import group_inconclusive_uncertainty

REPO_ROOT = Path(__file__).resolve().parents[1]
HELPER = REPO_ROOT / "scripts" / "group_inconclusive_uncertainty.py"
FIXTURE_DIR = REPO_ROOT / "tests" / "fixtures" / "demo" / "path_overcounting_shared_uncertainty"
FINDINGS = FIXTURE_DIR / "findings.json"
EXPECTED_GROUPS = FIXTURE_DIR / "expected_uncertainty_groups.json"
EXPECTED_COUNTS = {
"shared_passrole_target_resource_scope_unknown": 8,
"shared_boundary_context_unresolved": 2,
"session_policy_context_missing": 1,
}


def _run_helper(*args: str) -> subprocess.CompletedProcess[str]:
return subprocess.run(
[str(HELPER), *args],
cwd=REPO_ROOT,
check=False,
text=True,
capture_output=True,
)


def _load(path: Path) -> dict:
return json.loads(path.read_text())


def test_groups_only_inconclusive_findings() -> None:
findings = _load(FINDINGS)
result = group_inconclusive_uncertainty(findings)
inconclusive_ids = {
finding["finding_id"] for finding in findings["findings"] if finding["verdict"] == "inconclusive"
}
grouped_ids = {finding_id for group in result["group_details"] for finding_id in group["finding_ids"]}
assert grouped_ids == inconclusive_ids
assert len(grouped_ids) == 11


def test_demo_fixture_group_counts_and_finding_ids() -> None:
result = group_inconclusive_uncertainty(_load(FINDINGS))
expected = _load(EXPECTED_GROUPS)
expected_ids = {group["uncertainty_class"]: group["finding_ids"] for group in expected["groups"]}
actual_ids = {group["uncertainty_class"]: group["finding_ids"] for group in result["group_details"]}
assert result["groups"] == EXPECTED_COUNTS
assert actual_ids == expected_ids
assert result["top_uncertainty_class"] == "shared_passrole_target_resource_scope_unknown"
assert result["top_uncertainty_count"] == 8


def test_expected_groups_enriches_reviewer_actions() -> None:
expected = _load(EXPECTED_GROUPS)
actions = {group["uncertainty_class"]: group["reviewer_action"] for group in expected["groups"]}
result = group_inconclusive_uncertainty(_load(FINDINGS), reviewer_actions=actions)
actual_actions = {group["uncertainty_class"]: group["reviewer_action"] for group in result["group_details"]}
assert actual_actions == actions


def test_helper_does_not_mutate_input_findings() -> None:
before = FINDINGS.read_text()
group_inconclusive_uncertainty(_load(FINDINGS))
assert FINDINGS.read_text() == before


def test_helper_prints_json_to_stdout() -> None:
result = _run_helper("--findings", str(FINDINGS), "--expected-groups", str(EXPECTED_GROUPS))
assert result.returncode == 0, result.stderr
payload = json.loads(result.stdout)
assert payload["groups"] == EXPECTED_COUNTS
assert payload["non_claims"]["does_not_mutate_findings"] is True
assert payload["non_claims"]["does_not_change_verdicts"] is True
assert payload["non_claims"]["does_not_infer_exploitability"] is True
assert payload["non_claims"]["does_not_claim_replay_equivalence"] is True
assert payload["non_claims"]["requires_aws_credentials"] is False


def test_helper_writes_to_temp_output_path(tmp_path: Path) -> None:
output = tmp_path / "uncertainty-groups.json"
result = _run_helper(
"--findings",
str(FINDINGS),
"--expected-groups",
str(EXPECTED_GROUPS),
"--out",
str(output),
)
assert result.returncode == 0, result.stderr
assert json.loads(output.read_text())["groups"] == EXPECTED_COUNTS


def test_helper_refuses_repository_output_path() -> None:
output = REPO_ROOT / "uncertainty-groups.json"
if output.exists():
output.unlink()
result = _run_helper("--findings", str(FINDINGS), "--out", str(output))
assert result.returncode == 1
assert "refusing to write uncertainty grouping output inside repository tree" in result.stderr
assert not output.exists()