Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 153 additions & 118 deletions src/audit/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,61 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
..AuditData::default()
};

let filters = artifact_filters.as_deref();
let saw_artifact_auth_error =
fetch_and_record_artifacts(&client, &ctx, &auth, parsed.build_id, filters, &run_dir, &mut audit)
.await?;

if saw_artifact_auth_error && !has_any_local_artifacts(&run_dir).await {
anyhow::bail!(
"failed to download artifacts and no local cache. Use 'az pipelines runs artifact download --run-id {}' to fetch them manually, then re-run.",
parsed.build_id
);
}

run_analyzers(&client, &ctx, &auth, parsed.build_id, filters, &run_dir, &mut audit).await;
populate_performance_metrics(&mut audit);

audit.metrics.error_count = audit.errors.len() as u64;
audit.metrics.warning_count = audit.warnings.len() as u64;
findings::derive_findings(&mut audit);

save_run_summary(
&run_dir,
&RunSummary {
ado_aw_version: env!("CARGO_PKG_VERSION").to_string(),
build_id: parsed.build_id,
processed_at: Utc::now(),
audit_data: audit.clone(),
},
)
.await?;

render_audit(&audit, opts.json)?;
if !opts.json {
eprintln!("✓ Audit complete. Reports in {}", run_dir.display());
}
Ok(())
}

/// Download all selected artifacts for the build, recording auth errors and
/// non-fatal download failures as warnings rather than hard failures.
/// Returns `true` if at least one artifact download was blocked by an auth error.
async fn fetch_and_record_artifacts(
client: &reqwest::Client,
ctx: &AdoContext,
auth: &crate::ado::AdoAuth,
build_id: u64,
artifact_filters: Option<&[String]>,
run_dir: &Path,
audit: &mut AuditData,
) -> Result<bool> {
let mut saw_artifact_auth_error = false;
match list_build_artifacts(&client, &ctx, &auth, parsed.build_id).await {
match list_build_artifacts(client, ctx, auth, build_id).await {
Ok(artifacts) => {
let selected: Vec<_> = artifacts
.into_iter()
.filter(|artifact| {
artifact_matches_selected(&artifact.name, artifact_filters.as_deref())
})
.filter(|artifact| artifact_matches_selected(&artifact.name, artifact_filters))
.collect();

if selected.is_empty() {
Expand All @@ -84,17 +131,16 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
} else {
"no artifacts were published for this build".to_string()
};
warn_and_record(&mut audit, "audit::artifacts", message);
warn_and_record(audit, "audit::artifacts", message);
}

for artifact in selected {
match download_artifact_preserving_cache(&client, &auth, &artifact, &run_dir).await
{
match download_artifact_preserving_cache(client, auth, &artifact, run_dir).await {
Ok(()) => {}
Err(error) if is_authz_error(&error) => {
saw_artifact_auth_error = true;
warn_and_record(
&mut audit,
audit,
"audit::artifacts",
format!(
"failed to download artifact '{}': {:#}; using any local copy already present",
Expand All @@ -104,12 +150,9 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
}
Err(error) => {
warn_and_record(
&mut audit,
audit,
"audit::artifacts",
format!(
"failed to download artifact '{}': {:#}",
artifact.name, error
),
format!("failed to download artifact '{}': {:#}", artifact.name, error),
);
}
}
Expand All @@ -118,7 +161,7 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
Err(error) if is_authz_error(&error) => {
saw_artifact_auth_error = true;
warn_and_record(
&mut audit,
audit,
"audit::artifacts",
format!(
"failed to list build artifacts: {:#}; using any local cache already present",
Expand All @@ -127,90 +170,38 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
);
}
Err(error) => {
return Err(error).context(format!(
"failed to list artifacts for build {}",
parsed.build_id
));
return Err(error)
.context(format!("failed to list artifacts for build {}", build_id));
}
}
Ok(saw_artifact_auth_error)
}

if saw_artifact_auth_error && !has_any_local_artifacts(&run_dir).await {
anyhow::bail!(
"failed to download artifacts and no local cache. Use 'az pipelines runs artifact download --run-id {}' to fetch them manually, then re-run.",
parsed.build_id
);
}

match collect_downloaded_files(&run_dir, artifact_filters.as_deref()).await {
/// Run all analysis passes over the downloaded artifacts and populate `audit`.
/// Individual analyzer failures are recorded as warnings rather than returned as errors.
async fn run_analyzers(
client: &reqwest::Client,
ctx: &AdoContext,
auth: &crate::ado::AdoAuth,
build_id: u64,
artifact_filters: Option<&[String]>,
run_dir: &Path,
audit: &mut AuditData,
) {
match collect_downloaded_files(run_dir, artifact_filters).await {
Ok(files) => audit.downloaded_files = files,
Err(error) => warn_and_record(
&mut audit,
audit,
"audit::artifacts",
format!("failed to enumerate downloaded files: {:#}", error),
),
}

if let Some(agent_outputs_dir) = find_artifact_dir(&run_dir, "agent_outputs").await {
let firewall_dir = agent_outputs_dir.join("logs").join("firewall");
match firewall::analyze_firewall_logs(&firewall_dir).await {
Ok(result) => audit.firewall_analysis = result,
Err(error) => warn_and_record(
&mut audit,
"audit::firewall",
format!("firewall analysis failed: {:#}", error),
),
}
match policy::analyze_policy(&firewall_dir).await {
Ok(result) => audit.policy_analysis = result,
Err(error) => warn_and_record(
&mut audit,
"audit::policy",
format!("policy analysis failed: {:#}", error),
),
}

let mcpg_dir = agent_outputs_dir.join("logs").join("mcpg");
match mcp::analyze_mcp_tool_usage(&mcpg_dir).await {
Ok(result) => audit.mcp_tool_usage = result,
Err(error) => warn_and_record(
&mut audit,
"audit::mcp",
format!("MCP tool-usage analysis failed: {:#}", error),
),
}
match mcp::analyze_mcp_server_health(&mcpg_dir).await {
Ok(result) => audit.mcp_server_health = result,
Err(error) => warn_and_record(
&mut audit,
"audit::mcp",
format!("MCP server-health analysis failed: {:#}", error),
),
}
match mcp::extract_mcp_failures(&mcpg_dir).await {
Ok(result) => audit.mcp_failures = result,
Err(error) => warn_and_record(
&mut audit,
"audit::mcp",
format!("MCP failure extraction failed: {:#}", error),
),
}

match otel::analyze_otel(&agent_outputs_dir).await {
Ok(result) => {
audit.metrics = result.metrics;
audit.engine_config = result.engine_config;
audit.performance_metrics = result.performance;
audit.overview.aw_info = result.aw_info;
}
Err(error) => warn_and_record(
&mut audit,
"audit::otel",
format!("OTel analysis failed: {:#}", error),
),
}
if let Some(agent_outputs_dir) = find_artifact_dir(run_dir, "agent_outputs").await {
run_agent_output_analyzers(&agent_outputs_dir, audit).await;
}

match safe_outputs::analyze_safe_outputs(&run_dir).await {
match safe_outputs::analyze_safe_outputs(run_dir).await {
Ok(result) => {
audit.safe_output_summary = result.summary;
audit.safe_output_execution = result.execution;
Expand All @@ -219,55 +210,120 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
audit.key_findings.extend(result.findings);
}
Err(error) => warn_and_record(
&mut audit,
audit,
"audit::safe_outputs",
format!("safe-output analysis failed: {:#}", error),
),
}

match detection::analyze_detection(&run_dir).await {
match detection::analyze_detection(run_dir).await {
Ok(result) => audit.detection_analysis = result,
Err(error) => warn_and_record(
&mut audit,
audit,
"audit::detection",
format!("detection analysis failed: {:#}", error),
),
}

match missing::extract_missing_tools(&run_dir).await {
match missing::extract_missing_tools(run_dir).await {
Ok(result) => audit.missing_tools = result,
Err(error) => warn_and_record(
&mut audit,
audit,
"audit::missing_tools",
format!("missing-tool extraction failed: {:#}", error),
),
}
match missing::extract_missing_data(&run_dir).await {
match missing::extract_missing_data(run_dir).await {
Ok(result) => audit.missing_data = result,
Err(error) => warn_and_record(
&mut audit,
audit,
"audit::missing_data",
format!("missing-data extraction failed: {:#}", error),
),
}
match missing::extract_noops(&run_dir).await {
match missing::extract_noops(run_dir).await {
Ok(result) => audit.noops = result,
Err(error) => warn_and_record(
&mut audit,
audit,
"audit::noops",
format!("noop extraction failed: {:#}", error),
),
}

match jobs::fetch_timeline(&client, &ctx, &auth, parsed.build_id).await {
match jobs::fetch_timeline(client, ctx, auth, build_id).await {
Ok(timeline) => audit.jobs = jobs::timeline_to_jobs(&timeline),
Err(error) => warn_and_record(
&mut audit,
audit,
"audit::jobs",
format!("job timeline analysis failed: {:#}", error),
),
}
}

/// Run analyzers that operate on the `agent_outputs` artifact directory.
async fn run_agent_output_analyzers(agent_outputs_dir: &Path, audit: &mut AuditData) {
let firewall_dir = agent_outputs_dir.join("logs").join("firewall");
match firewall::analyze_firewall_logs(&firewall_dir).await {
Ok(result) => audit.firewall_analysis = result,
Err(error) => warn_and_record(
audit,
"audit::firewall",
format!("firewall analysis failed: {:#}", error),
),
}
match policy::analyze_policy(&firewall_dir).await {
Ok(result) => audit.policy_analysis = result,
Err(error) => warn_and_record(
audit,
"audit::policy",
format!("policy analysis failed: {:#}", error),
),
}

let mcpg_dir = agent_outputs_dir.join("logs").join("mcpg");
match mcp::analyze_mcp_tool_usage(&mcpg_dir).await {
Ok(result) => audit.mcp_tool_usage = result,
Err(error) => warn_and_record(
audit,
"audit::mcp",
format!("MCP tool-usage analysis failed: {:#}", error),
),
}
match mcp::analyze_mcp_server_health(&mcpg_dir).await {
Ok(result) => audit.mcp_server_health = result,
Err(error) => warn_and_record(
audit,
"audit::mcp",
format!("MCP server-health analysis failed: {:#}", error),
),
}
match mcp::extract_mcp_failures(&mcpg_dir).await {
Ok(result) => audit.mcp_failures = result,
Err(error) => warn_and_record(
audit,
"audit::mcp",
format!("MCP failure extraction failed: {:#}", error),
),
}

match otel::analyze_otel(agent_outputs_dir).await {
Ok(result) => {
audit.metrics = result.metrics;
audit.engine_config = result.engine_config;
audit.performance_metrics = result.performance;
audit.overview.aw_info = result.aw_info;
}
Err(error) => warn_and_record(
audit,
"audit::otel",
format!("OTel analysis failed: {:#}", error),
),
}
}

/// Backfill performance metric fields that can be derived from other already-populated
/// analysis results (firewall request count, most-used MCP tool).
fn populate_performance_metrics(audit: &mut AuditData) {
if let Some(firewall_analysis) = &audit.firewall_analysis {
let performance = audit.performance_metrics.get_or_insert_default();
if performance.network_requests.is_none() {
Expand All @@ -282,27 +338,6 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
performance.most_used_tool = Some(tool.name.clone());
}
}

audit.metrics.error_count = audit.errors.len() as u64;
audit.metrics.warning_count = audit.warnings.len() as u64;
findings::derive_findings(&mut audit);

save_run_summary(
&run_dir,
&RunSummary {
ado_aw_version: env!("CARGO_PKG_VERSION").to_string(),
build_id: parsed.build_id,
processed_at: Utc::now(),
audit_data: audit.clone(),
},
)
.await?;

render_audit(&audit, opts.json)?;
if !opts.json {
eprintln!("✓ Audit complete. Reports in {}", run_dir.display());
}
Ok(())
}

async fn resolve_audit_context(
Expand Down
Loading