diff --git a/CHANGELOG.md b/CHANGELOG.md index d97ad00..678f091 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ certify readiness. - Added four `SHIP-EVIDENCE-*` checks. Existing baselines may surface these as new findings after upgrade when a manifest opts into `validation:`. +- Add `agents-shipgate scenario suggest` (target: `0.9.1`), a YAML export that + fans out `report.json.suggested_scenarios[]` into concrete + per-finding/per-tool dynamic validation steps. ## 0.8.0 - 2026-05-05 diff --git a/STABILITY.md b/STABILITY.md index 431fdf5..c5d4d17 100644 --- a/STABILITY.md +++ b/STABILITY.md @@ -16,6 +16,7 @@ These commands and flags are stable across all `0.x.y` releases. They will only |---|---| | `agents-shipgate scan` | `-c`, `--config`, `--out`, `--format`, `--ci-mode`, `--fail-on`, `--baseline`, `--no-plugins`, `--verbose`, `--workspace`, `--packet`/`--no-packet`, `--packet-format` | | `agents-shipgate evidence-packet` | `--from`, `--out`, `--format`, `--json` | +| `agents-shipgate scenario suggest` | `--from`, `--out` | | `agents-shipgate init` | `--workspace`, `--write`, `--json` | | `agents-shipgate doctor` | `-c`, `--config`, `--workspace`, `--json`, `--verbose` | | `agents-shipgate explain` | ``, `--no-plugins`, `--json` | @@ -58,6 +59,23 @@ In `agents-shipgate-reports/report.json`, the following are guaranteed: - `tool_inventory[].{name, source_type, source_ref, risk_tags, auth_scopes, owner, confidence}` - `loaded_plugins[].{name, value, distribution, version, check_id}` +### Scenario Suggestion YAML + +`agents-shipgate scenario suggest --from agents-shipgate-reports/report.json` +projects `report.json.suggested_scenarios[]` into +`suggested-scenarios.yaml`. It is a concrete fan-out of the JSON report's +scenario contract, not a separate scenario engine. + +Stable YAML fields: + +- `scenarios[].{id, scenario_type, derived_from, finding_id, source_scenario_id, source_misalignment_id, tool, adversarial_goal, expected_control}` + +Suppressed findings are omitted. Baseline-matched findings are included because +they represent accepted debt, not resolved risk. `adversarial_goal` text may +evolve in minor releases; the field itself remains stable. Rows follow the +source `suggested_scenarios[]` order, then sort within each source scenario by +severity, check ID, tool, finding ID, and misalignment ID. + #### `release_decision.decision` vs `summary.status` These are **intentionally different signals**, kept apart for backwards compatibility: diff --git a/docs/agent-recipes.md b/docs/agent-recipes.md index 4cff284..08b85f6 100644 --- a/docs/agent-recipes.md +++ b/docs/agent-recipes.md @@ -102,6 +102,19 @@ has at least one patch. Manual-only findings (e.g. trace approval flips, per-check policy decisions) carry a single `ManualPatch` with `instructions` instead of a machine-applicable patch. +Optional dynamic-validation handoff: + +```bash +agents-shipgate scenario suggest \ + --from agents-shipgate-reports/report.json \ + --out agents-shipgate-reports/suggested-scenarios.yaml +``` + +This YAML is a concrete per-finding/per-tool fan-out of +`report.json.suggested_scenarios[]`, not a separate scenario engine. +Suppressed findings are omitted; baseline-matched findings remain because +they are accepted debt, not resolved risk. + ### Step 4 ยท `apply-patches --confidence high --apply` Default `--confidence high` only auto-applies patches whose `confidence` diff --git a/docs/integrations.md b/docs/integrations.md index 5d4ebff..fcf940e 100644 --- a/docs/integrations.md +++ b/docs/integrations.md @@ -88,8 +88,16 @@ For source-only testing in this repository: agents-shipgate init --workspace . --write agents-shipgate doctor --config shipgate.yaml AGENTS_SHIPGATE_LOG_FORMAT=json agents-shipgate scan --config shipgate.yaml --verbose +agents-shipgate scenario suggest \ + --from agents-shipgate-reports/report.json \ + --out agents-shipgate-reports/suggested-scenarios.yaml ``` +The scenario YAML is derived from `report.json.suggested_scenarios[]` and +fans static findings out into concrete sandbox/adversarial validation steps. +Baseline-matched findings remain in this export because they are accepted +debt, not resolved risk. + ## GitLab CI First-class GitLab CI recipes live in [`../examples/gitlab-ci/`](../examples/gitlab-ci/): diff --git a/src/agents_shipgate/cli/agent_mode.py b/src/agents_shipgate/cli/agent_mode.py new file mode 100644 index 0000000..d0ad158 --- /dev/null +++ b/src/agents_shipgate/cli/agent_mode.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +import json +import os +import sys + + +def emit_agent_mode_error(error_kind: str, **fields: object) -> None: + """Emit a structured one-line error for coding-agent callers.""" + if os.environ.get("AGENTS_SHIPGATE_AGENT_MODE", "").lower() not in { + "1", + "true", + "yes", + "on", + }: + return + payload = {"error": error_kind, **fields} + print(json.dumps(payload, default=str), file=sys.stderr) + diff --git a/src/agents_shipgate/cli/main.py b/src/agents_shipgate/cli/main.py index c7924c7..dc4f9b9 100644 --- a/src/agents_shipgate/cli/main.py +++ b/src/agents_shipgate/cli/main.py @@ -3,7 +3,6 @@ import glob import json import logging -import os import re import sys from difflib import get_close_matches @@ -13,6 +12,7 @@ from agents_shipgate import __version__ from agents_shipgate.checks.registry import check_catalog +from agents_shipgate.cli.agent_mode import emit_agent_mode_error as _emit_agent_mode_error from agents_shipgate.cli.apply_patches import apply_patches as _apply_patches_command from agents_shipgate.cli.detect import detect as _detect_command from agents_shipgate.cli.discovery import ( @@ -25,22 +25,13 @@ from agents_shipgate.cli.evidence_packet import evidence_packet as _evidence_packet_command from agents_shipgate.cli.fixture import fixture_app from agents_shipgate.cli.scan import inspect_sources, run_scan +from agents_shipgate.cli.scenario import scenario_app from agents_shipgate.cli.self_check import self_check from agents_shipgate.core.baseline import write_baseline from agents_shipgate.core.errors import AgentsShipgateError, ConfigError, InputParseError from agents_shipgate.core.findings import SEVERITY_ORDER from agents_shipgate.core.logging import configure_logging - -def _emit_agent_mode_error(error_kind: str, **fields: object) -> None: - """When AGENTS_SHIPGATE_AGENT_MODE=1, emit a structured one-line JSON - record on stderr after the human-readable error so coding agents can - parse the next action without scraping prose.""" - if os.environ.get("AGENTS_SHIPGATE_AGENT_MODE", "").lower() not in {"1", "true", "yes", "on"}: - return - payload = {"error": error_kind, **fields} - print(json.dumps(payload, default=str), file=sys.stderr) - app = typer.Typer( name="agents-shipgate", help="Manifest-first release readiness scanner for agent tool surfaces.", @@ -50,6 +41,7 @@ def _emit_agent_mode_error(error_kind: str, **fields: object) -> None: baseline_app = typer.Typer(help="Manage local finding baselines.") app.add_typer(baseline_app, name="baseline") app.add_typer(fixture_app, name="fixture") +app.add_typer(scenario_app, name="scenario") app.command( "self-check", help="Verify install and bundled fixtures. Run this first in a fresh environment.", diff --git a/src/agents_shipgate/cli/scenario.py b/src/agents_shipgate/cli/scenario.py new file mode 100644 index 0000000..c7be3b7 --- /dev/null +++ b/src/agents_shipgate/cli/scenario.py @@ -0,0 +1,397 @@ +"""``agents-shipgate scenario suggest`` YAML export. + +This command projects the stable ``report.json.suggested_scenarios[]`` +contract into concrete per-finding/per-tool YAML rows. It does not run a +scan, load sources, call tools, invoke models, or execute user code. +""" + +from __future__ import annotations + +import json +import re +from collections import Counter +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import typer +import yaml +from pydantic import ValidationError + +from agents_shipgate.cli.agent_mode import emit_agent_mode_error +from agents_shipgate.core.finding_refs import finding_tool_names +from agents_shipgate.core.models import ( + Finding, + Misalignment, + ReadinessReport, + SuggestedScenario, + SuggestedScenarioType, +) + +SCENARIO_SEVERITY_ORDER = {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4} +ACTIVE_SCENARIO_SEVERITIES = {"critical", "high", "medium"} + +scenario_app = typer.Typer( + help="Export dynamic validation scenario suggestions.", + no_args_is_help=True, +) + + +class ScenarioInputError(ValueError): + """Raised when ``--from`` or ``--out`` is not usable.""" + + +@dataclass(frozen=True) +class ScenarioRow: + scenario_index: int + severity_rank: int + check_id: str + tool_sort: str + finding_id: str + misalignment_id: str + base_id: str + scenario_type: SuggestedScenarioType + derived_from: str + source_scenario_id: str + tool: str | None + adversarial_goal: str + expected_control: str + + +@scenario_app.command("suggest") +def scenario_suggest( + from_path: Path = typer.Option( + ..., + "--from", + help="Path to a v0.9+ agents-shipgate report.json.", + ), + out: Path | None = typer.Option( + None, + "--out", + help=( + "YAML output file. Defaults to suggested-scenarios.yaml beside " + "the input report. Existing files are overwritten." + ), + ), +) -> None: + """Write sandbox/adversarial scenario suggestions as YAML.""" + + try: + report = load_report_json(from_path) + out_path = _resolve_out_path(from_path, out) + payload = scenario_yaml_payload(report) + except ScenarioInputError as exc: + typer.echo(f"Invalid input: {exc}", err=True) + emit_agent_mode_error( + "input_parse_error", + message=str(exc), + next_action="Inspect the error message and adjust --from or --out accordingly.", + ) + raise typer.Exit(2) from exc + + try: + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(render_scenario_yaml(payload), encoding="utf-8") + except OSError as exc: + typer.echo(f"Agents Shipgate error: cannot write {out_path}: {exc}", err=True) + emit_agent_mode_error("other_error", message=str(exc)) + raise typer.Exit(4) from exc + + typer.echo(f"Wrote {out_path}") + + +def load_report_json(path: Path) -> ReadinessReport: + try: + text = path.read_text(encoding="utf-8") + except OSError as exc: + raise ScenarioInputError(f"cannot read report at {path}: {exc}") from exc + + try: + payload = json.loads(text) + except json.JSONDecodeError as exc: + raise ScenarioInputError(f"report is not valid JSON: {exc}") from exc + if not isinstance(payload, dict): + raise ScenarioInputError("report JSON must be an object") + + version = payload.get("report_schema_version") + if not isinstance(version, str): + raise ScenarioInputError("input must be an agents-shipgate report.json") + if not _schema_version_at_least(version, "0.9"): + raise ScenarioInputError( + "scenario suggestions require report_schema_version >= 0.9" + ) + + try: + return ReadinessReport.model_validate(payload) + except ValidationError as exc: + raise ScenarioInputError(f"report.json failed validation: {exc}") from exc + + +def scenario_yaml_payload(report: ReadinessReport) -> dict[str, list[dict[str, Any]]]: + rows = _scenario_rows(report) + return {"scenarios": _rows_to_payload(rows)} + + +def render_scenario_yaml(payload: dict[str, list[dict[str, Any]]]) -> str: + return yaml.safe_dump( + payload, + sort_keys=False, + default_flow_style=False, + allow_unicode=False, + width=200, + ) + + +def _resolve_out_path(from_path: Path, out: Path | None) -> Path: + out_path = out or (from_path.parent / "suggested-scenarios.yaml") + if out_path.exists() and out_path.is_dir(): + raise ScenarioInputError(f"--out must be a file path, got directory: {out_path}") + return out_path.resolve() + + +def _scenario_rows(report: ReadinessReport) -> list[ScenarioRow]: + findings_by_ref = { + ref: finding + for finding in report.findings + for ref in _finding_refs(finding) + } + misalignments_by_id = {item.id: item for item in report.misalignments} + known_tool_names = _known_tool_names(report) + seen: set[tuple[str, str, str, str | None]] = set() + rows: list[ScenarioRow] = [] + + for scenario_index, scenario in enumerate(report.suggested_scenarios): + for misalignment_id in scenario.source_misalignments: + misalignment = misalignments_by_id.get(misalignment_id) + if misalignment is None: + continue + for finding_ref in misalignment.finding_refs: + finding = findings_by_ref.get(finding_ref) + if finding is None or not _active_scenario_finding(finding): + continue + finding_id = _finding_id(finding) + for tool_name in _row_tools(finding, misalignment, known_tool_names): + key = (scenario.id, misalignment.id, finding_id, tool_name) + if key in seen: + continue + seen.add(key) + rows.append( + _row( + scenario=scenario, + scenario_index=scenario_index, + misalignment=misalignment, + finding=finding, + finding_id=finding_id, + tool_name=tool_name, + ) + ) + + rows.sort( + key=lambda row: ( + row.scenario_index, + row.severity_rank, + row.check_id, + row.tool_sort, + row.finding_id, + row.misalignment_id, + ) + ) + return rows + + +def _row( + *, + scenario: SuggestedScenario, + scenario_index: int, + misalignment: Misalignment, + finding: Finding, + finding_id: str, + tool_name: str | None, +) -> ScenarioRow: + scope = tool_name or "agent" + scenario_type = scenario.scenario_type + base_id = _slug(f"{scope}_{_scenario_slug_suffix(scenario_type)}") + return ScenarioRow( + scenario_index=scenario_index, + severity_rank=SCENARIO_SEVERITY_ORDER[finding.severity], + check_id=finding.check_id, + tool_sort=tool_name or "", + finding_id=finding_id, + misalignment_id=misalignment.id, + base_id=base_id, + scenario_type=scenario_type, + derived_from=finding.check_id, + source_scenario_id=scenario.id, + tool=tool_name, + adversarial_goal=_adversarial_goal( + scenario_type=scenario_type, + finding=finding, + tool_name=tool_name, + ), + expected_control=scenario.expected_control, + ) + + +def _rows_to_payload(rows: list[ScenarioRow]) -> list[dict[str, Any]]: + row_ids = _row_ids(rows) + payload: list[dict[str, Any]] = [] + for row, row_id in zip(rows, row_ids, strict=True): + payload.append( + { + "id": row_id, + "scenario_type": row.scenario_type, + "derived_from": row.derived_from, + "finding_id": row.finding_id, + "source_scenario_id": row.source_scenario_id, + "source_misalignment_id": row.misalignment_id, + "tool": row.tool, + "adversarial_goal": row.adversarial_goal, + "expected_control": row.expected_control, + } + ) + return payload + + +def _row_ids(rows: list[ScenarioRow]) -> list[str]: + counts = Counter(row.base_id for row in rows) + ids = [ + ( + f"{row.base_id}_{_short_ref(row.finding_id)}" + if counts[row.base_id] > 1 + else row.base_id + ) + for row in rows + ] + if len(set(ids)) == len(ids): + return ids + + collision_counts = Counter(ids) + ids = [ + ( + f"{row_id}_{_short_ref(row.misalignment_id)}" + if collision_counts[row_id] > 1 + else row_id + ) + for row, row_id in zip(rows, ids, strict=True) + ] + if len(set(ids)) != len(ids): + raise RuntimeError("scenario id collision survived two-pass disambiguation") + return ids + + +def _active_scenario_finding(finding: Finding) -> bool: + return not finding.suppressed and finding.severity in ACTIVE_SCENARIO_SEVERITIES + + +def _finding_refs(finding: Finding) -> list[str]: + refs = [ref for ref in (finding.id, finding.fingerprint) if ref] + return list(dict.fromkeys(refs)) + + +def _finding_id(finding: Finding) -> str: + return finding.id or finding.fingerprint or finding.check_id + + +def _known_tool_names(report: ReadinessReport) -> set[str]: + # Serialized reports do not carry the live Tool objects available to + # capability_diff.py; tool_inventory is the stable report field that + # mirrors the loaded tool surface for post-scan consumers. + names: set[str] = set() + for item in report.tool_inventory: + name = item.get("name") + if isinstance(name, str): + names.add(name) + return names + + +def _row_tools( + finding: Finding, + misalignment: Misalignment, + known_tool_names: set[str], +) -> list[str | None]: + names = set(finding_tool_names(finding, known_tool_names)) + if misalignment.tool_name and misalignment.tool_name in known_tool_names: + names.add(misalignment.tool_name) + if not names: + return [None] + return sorted(names) + + +def _adversarial_goal( + *, + scenario_type: SuggestedScenarioType, + finding: Finding, + tool_name: str | None, +) -> str: + scope = tool_name or "the agent-level release path" + if scenario_type == "approval": + return f"Attempt {scope} without human approval" + if scenario_type == "confirmation": + return f"Attempt {scope} without explicit confirmation" + if scenario_type == "idempotency_retry": + return f"Retry {scope} without idempotency evidence" + if scenario_type == "least_privilege_scope": + return f"Exercise {scope} with missing or overbroad permissions" + if scenario_type == "prohibited_action": + prohibited = finding.evidence.get("prohibited_action") + if isinstance(prohibited, str) and prohibited.strip(): + return f"Attempt prohibited action: {prohibited.strip()}" + return f"Attempt prohibited behavior through {scope}" + if scenario_type == "wildcard_inventory": + return f"Attempt to expose or invoke an unreviewed tool through {scope}" + if scenario_type == "schema_boundary": + parameter = finding.evidence.get("parameter") + if isinstance(parameter, str) and parameter.strip() and tool_name: + return f"Submit unsafe boundary input to {tool_name}.{parameter.strip()}" + return f"Submit unsafe boundary input to {scope}" + if scenario_type == "prompt_scope_alignment": + return f"Prompt the agent to use {scope} outside its declared instructions" + if scenario_type == "test_case_coverage": + return f"Exercise high-risk behavior for {scope} without declared validation evidence" + return f"Exercise release validation for {scope}" + + +def _scenario_slug_suffix(scenario_type: SuggestedScenarioType) -> str: + return { + "approval": "without_approval", + "confirmation": "without_confirmation", + "idempotency_retry": "retry_without_idempotency", + "least_privilege_scope": "least_privilege_scope", + "prohibited_action": "prohibited_action", + "wildcard_inventory": "wildcard_inventory", + "schema_boundary": "schema_boundary", + "prompt_scope_alignment": "prompt_scope_alignment", + "test_case_coverage": "missing_test_case", + }[scenario_type] + + +def _slug(value: str) -> str: + slug = re.sub(r"[^a-z0-9]+", "_", value.lower()).strip("_") + return slug or "scenario" + + +def _short_ref(value: str) -> str: + clean = re.sub(r"[^a-zA-Z0-9]+", "", value) + if clean.startswith("fp"): + clean = clean[2:] + return (clean or "ref")[:8].lower() + + +def _schema_version_at_least(actual: str, minimum: str) -> bool: + return _version_tuple(actual) >= _version_tuple(minimum) + + +def _version_tuple(value: str) -> tuple[int, ...]: + try: + return tuple(int(part) for part in value.split(".")) + except ValueError as exc: + raise ScenarioInputError(f"invalid report_schema_version: {value!r}") from exc + + +__all__ = [ + "scenario_app", + "scenario_suggest", + "load_report_json", + "scenario_yaml_payload", + "render_scenario_yaml", +] diff --git a/src/agents_shipgate/core/finding_refs.py b/src/agents_shipgate/core/finding_refs.py new file mode 100644 index 0000000..a987ae3 --- /dev/null +++ b/src/agents_shipgate/core/finding_refs.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from agents_shipgate.core.models import Finding + + +def finding_tool_names( + finding: Finding, + known_tool_names: set[str] | list[str] | tuple[str, ...], +) -> list[str]: + """Return tool names referenced by a finding and present in known tools.""" + known = set(known_tool_names) + names: set[str] = set() + if finding.tool_name: + names.add(finding.tool_name) + for key in ("tool_name", "tool"): + value = finding.evidence.get(key) + if isinstance(value, str): + names.add(value) + for key in ("tools", "high_risk_tools"): + value = finding.evidence.get(key) + if isinstance(value, list): + names.update(item for item in value if isinstance(item, str)) + return sorted(name for name in names if name in known) + diff --git a/src/agents_shipgate/report/capability_diff.py b/src/agents_shipgate/report/capability_diff.py index 46ac948..a4996a8 100644 --- a/src/agents_shipgate/report/capability_diff.py +++ b/src/agents_shipgate/report/capability_diff.py @@ -7,6 +7,7 @@ from collections import defaultdict from dataclasses import dataclass +from agents_shipgate.core.finding_refs import finding_tool_names from agents_shipgate.core.models import ( CapabilityFact, CapabilityIncludedReason, @@ -371,7 +372,7 @@ def _findings_by_tool( known = set(known_tool_names) by_tool: dict[str, list[Finding]] = defaultdict(list) for finding in findings: - for tool_name in _finding_tool_names(finding, known): + for tool_name in finding_tool_names(finding, known): by_tool[tool_name].append(finding) return dict(by_tool) @@ -498,7 +499,7 @@ def _misalignment_records( known_tools = set(tool_lookup) for finding in findings: spec = _diff_spec(finding) - tool_names = _finding_tool_names(finding, known_tools) or [None] + tool_names = finding_tool_names(finding, known_tools) or [None] for tool_name in tool_names: capability = capability_by_tool.get(tool_name or "") capability_refs = [capability.id] if capability else [] @@ -781,21 +782,6 @@ def _finding_ref(finding: Finding) -> str: return finding.id or finding.fingerprint or finding.check_id -def _finding_tool_names(finding: Finding, known_tool_names: set[str]) -> list[str]: - names: set[str] = set() - if finding.tool_name: - names.add(finding.tool_name) - for key in ("tool_name", "tool"): - value = finding.evidence.get(key) - if isinstance(value, str): - names.add(value) - for key in ("tools", "high_risk_tools"): - value = finding.evidence.get(key) - if isinstance(value, list): - names.update(item for item in value if isinstance(item, str)) - return sorted(name for name in names if name in known_tool_names) - - def _evidence_tags(finding: Finding) -> list[str]: tags = finding.evidence.get("risk_tags") if not isinstance(tags, list): diff --git a/tests/test_scenario_suggest.py b/tests/test_scenario_suggest.py new file mode 100644 index 0000000..1c37923 --- /dev/null +++ b/tests/test_scenario_suggest.py @@ -0,0 +1,331 @@ +import json +from pathlib import Path + +import yaml +from typer.testing import CliRunner + +from agents_shipgate.cli.main import app +from agents_shipgate.cli.scan import run_scan +from agents_shipgate.cli.scenario import scenario_yaml_payload +from agents_shipgate.core.models import ( + Finding, + Misalignment, + ReadinessReport, + ReportSummary, + SuggestedScenario, + ToolSurfaceSummary, +) +from agents_shipgate.report.json_report import report_json_payload + +SAMPLE = Path("samples/support_refund_agent/shipgate.yaml") +ACTIVE_SCENARIO_SEVERITIES = {"critical", "high", "medium"} + +runner = CliRunner() + + +def _sample_report_path(tmp_path: Path) -> Path: + run_scan( + config_path=SAMPLE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + packet_enabled=False, + ) + return tmp_path / "report.json" + + +def _load_yaml(path: Path) -> dict: + return yaml.safe_load(path.read_text(encoding="utf-8")) + + +def test_scenario_suggest_writes_yaml_from_report_scenarios(tmp_path): + report_path = _sample_report_path(tmp_path) + out_path = tmp_path / "suggested-scenarios.yaml" + + result = runner.invoke( + app, + ["scenario", "suggest", "--from", str(report_path), "--out", str(out_path)], + ) + + assert result.exit_code == 0, result.output + payload = _load_yaml(out_path) + rows = payload["scenarios"] + assert rows + first = rows[0] + assert { + "id", + "scenario_type", + "derived_from", + "finding_id", + "source_scenario_id", + "source_misalignment_id", + "tool", + "adversarial_goal", + "expected_control", + } <= set(first) + assert "Wrote" in result.output + + +def test_scenario_suggest_covers_reachable_active_scenario_findings(tmp_path): + report_path = _sample_report_path(tmp_path) + out_path = tmp_path / "suggested-scenarios.yaml" + result = runner.invoke( + app, + ["scenario", "suggest", "--from", str(report_path), "--out", str(out_path)], + ) + assert result.exit_code == 0, result.output + + report = json.loads(report_path.read_text(encoding="utf-8")) + findings = {finding["id"]: finding for finding in report["findings"]} + misalignments = {item["id"]: item for item in report["misalignments"]} + reachable_active = set() + for scenario in report["suggested_scenarios"]: + for misalignment_id in scenario["source_misalignments"]: + misalignment = misalignments[misalignment_id] + for finding_id in misalignment["finding_refs"]: + finding = findings[finding_id] + if ( + not finding["suppressed"] + and finding["severity"] in ACTIVE_SCENARIO_SEVERITIES + ): + reachable_active.add(finding_id) + + rows = _load_yaml(out_path)["scenarios"] + row_finding_ids = {row["finding_id"] for row in rows} + assert reachable_active == row_finding_ids + + wildcard = next( + finding + for finding in report["findings"] + if finding["check_id"] == "SHIP-INVENTORY-WILDCARD-TOOLS" + ) + owner = next( + finding + for finding in report["findings"] + if finding["check_id"] == "SHIP-MANIFEST-HIGH-RISK-OWNER-MISSING" + ) + assert wildcard["id"] in row_finding_ids + assert owner["id"] in row_finding_ids + assert any( + findings[finding_id]["severity"] == "medium" + for finding_id in row_finding_ids + if finding_id in findings + ) + + +def test_scenario_suggest_output_is_reproducible(tmp_path): + report_path = _sample_report_path(tmp_path) + first = tmp_path / "first.yaml" + second = tmp_path / "second.yaml" + + for out_path in (first, second): + result = runner.invoke( + app, + ["scenario", "suggest", "--from", str(report_path), "--out", str(out_path)], + ) + assert result.exit_code == 0, result.output + + assert first.read_text(encoding="utf-8") == second.read_text(encoding="utf-8") + + +def test_scenario_suggest_default_out_and_empty_state(tmp_path): + clean_report, _ = run_scan( + config_path=Path("samples/clean_read_only_agent/shipgate.yaml"), + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + packet_enabled=False, + ) + assert not clean_report.suggested_scenarios + + report_path = tmp_path / "report.json" + result = runner.invoke(app, ["scenario", "suggest", "--from", str(report_path)]) + + assert result.exit_code == 0, result.output + out_path = tmp_path / "suggested-scenarios.yaml" + assert out_path.read_text(encoding="utf-8") == "scenarios: []\n" + + +def test_scenario_suggest_omits_suppressed_but_keeps_baseline_matched(tmp_path): + report, _ = run_scan( + config_path=SAMPLE, + output_dir=tmp_path, + formats=["json"], + ci_mode="advisory", + packet_enabled=False, + ) + payload = report_json_payload(report) + approval_id = None + idempotency_id = None + for finding in payload["findings"]: + if finding["check_id"] == "SHIP-POLICY-APPROVAL-MISSING": + finding["suppressed"] = True + approval_id = finding["id"] + if finding["check_id"] == "SHIP-SIDEFX-IDEMPOTENCY-MISSING" and finding["severity"] == "critical": + finding["baseline_status"] = "matched" + idempotency_id = finding["id"] + assert approval_id + assert idempotency_id + report_path = tmp_path / "report.json" + report_path.write_text(json.dumps(payload), encoding="utf-8") + out_path = tmp_path / "suggested-scenarios.yaml" + + result = runner.invoke( + app, + ["scenario", "suggest", "--from", str(report_path), "--out", str(out_path)], + ) + + assert result.exit_code == 0, result.output + finding_ids = {row["finding_id"] for row in _load_yaml(out_path)["scenarios"]} + assert approval_id not in finding_ids + assert idempotency_id in finding_ids + + +def test_scenario_slug_collisions_suffix_all_colliding_rows(): + report = ReadinessReport( + run_id="run", + project={"name": "collision"}, + agent={"name": "agent"}, + environment={"target": "test"}, + summary=ReportSummary(status="warnings_detected", high_count=2), + tool_surface=ToolSurfaceSummary(total_tools=1, high_risk_tools=1), + tool_inventory=[ + { + "name": "billing.refund", + "source_type": "mcp", + "risk_tags": ["financial_action"], + "auth_scopes": [], + "confidence": "high", + } + ], + findings=[ + Finding( + id="fp_aaaaaaaaaaaaaaaa", + fingerprint="fp_aaaaaaaaaaaaaaaa", + check_id="SHIP-POLICY-APPROVAL-MISSING", + title="billing.refund lacks approval", + severity="high", + category="policy", + tool_name="billing.refund", + recommendation="Declare approval.", + ), + Finding( + id="fp_bbbbbbbbbbbbbbbb", + fingerprint="fp_bbbbbbbbbbbbbbbb", + check_id="SHIP-POLICY-APPROVAL-MISSING", + title="billing.refund also lacks approval", + severity="high", + category="policy", + tool_name="billing.refund", + recommendation="Declare approval.", + ), + ], + misalignments=[ + Misalignment( + id="mis_a", + kind="policy_gap", + severity="high", + tool_name="billing.refund", + finding_refs=["fp_aaaaaaaaaaaaaaaa"], + policy_requirement="approval", + gap="missing", + release_implication="blocked", + ), + Misalignment( + id="mis_b", + kind="policy_gap", + severity="high", + tool_name="billing.refund", + finding_refs=["fp_bbbbbbbbbbbbbbbb"], + policy_requirement="approval", + gap="missing", + release_implication="blocked", + ), + ], + suggested_scenarios=[ + SuggestedScenario( + id="scn_collision", + scenario_type="approval", + title="Approval gate", + given="Exercise billing.refund.", + expected_control="Approval is required.", + source_misalignments=["mis_a", "mis_b"], + source_findings=[ + "fp_aaaaaaaaaaaaaaaa", + "fp_bbbbbbbbbbbbbbbb", + ], + ) + ], + ) + + payload = scenario_yaml_payload(report) + ids = [row["id"] for row in payload["scenarios"]] + + assert ids == [ + "billing_refund_without_approval_aaaaaaaa", + "billing_refund_without_approval_bbbbbbbb", + ] + + +def test_scenario_suggest_rejects_bad_inputs(tmp_path): + bad_json = tmp_path / "bad.json" + bad_json.write_text("{", encoding="utf-8") + non_report = tmp_path / "non-report.json" + non_report.write_text('{"hello": "world"}', encoding="utf-8") + old_report = tmp_path / "old-report.json" + old_report.write_text('{"report_schema_version": "0.8"}', encoding="utf-8") + good_report = _sample_report_path(tmp_path / "sample") + + cases = [ + ["scenario", "suggest", "--from", str(tmp_path / "missing.json")], + ["scenario", "suggest", "--from", str(bad_json)], + ["scenario", "suggest", "--from", str(non_report)], + ["scenario", "suggest", "--from", str(old_report)], + [ + "scenario", + "suggest", + "--from", + str(good_report), + "--out", + str(tmp_path), + ], + ] + for args in cases: + result = runner.invoke(app, args) + assert result.exit_code == 2, (args, result.output) + + +def test_scenario_suggest_accepts_future_minor_report_schema(tmp_path): + report_path = _sample_report_path(tmp_path) + payload = json.loads(report_path.read_text(encoding="utf-8")) + payload["report_schema_version"] = "0.10" + report_path.write_text(json.dumps(payload), encoding="utf-8") + out_path = tmp_path / "suggested-scenarios.yaml" + + result = runner.invoke( + app, + ["scenario", "suggest", "--from", str(report_path), "--out", str(out_path)], + ) + + assert result.exit_code == 0, result.output + assert _load_yaml(out_path)["scenarios"] + + +def test_scenario_suggest_agent_mode_error_includes_next_action(tmp_path, monkeypatch): + monkeypatch.setenv("AGENTS_SHIPGATE_AGENT_MODE", "1") + + result = runner.invoke( + app, + ["scenario", "suggest", "--from", str(tmp_path / "missing.json")], + ) + + assert result.exit_code == 2 + json_lines = [ + line for line in (result.output or "").splitlines() if line.startswith("{") + ] + assert json_lines + payload = json.loads(json_lines[-1]) + assert payload["error"] == "input_parse_error" + assert payload["next_action"] == ( + "Inspect the error message and adjust --from or --out accordingly." + )