Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ares-cli/src/cli/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,21 @@ pub(crate) enum OpsCommands {
cmd: SessionsCommands,
},

/// Replay the operation state event log into a point-in-time snapshot
Replay {
/// Operation ID to replay
operation_id: String,
/// Stop applying events whose `recorded_at` exceeds this ISO-8601 timestamp
#[arg(long)]
until: Option<String>,
/// Stop after this many events have been applied
#[arg(long)]
until_count: Option<usize>,
/// Emit the snapshot as JSON instead of a human-readable summary
#[arg(long)]
json: bool,
},

/// Persist token usage from Redis to PostgreSQL for an operation
OffloadCost {
/// Operation ID
Expand Down
7 changes: 7 additions & 0 deletions ares-cli/src/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod kill;
mod list;
mod loot;
mod queue;
mod replay;
mod report;
pub(crate) mod resolve;
mod runtime;
Expand Down Expand Up @@ -160,6 +161,12 @@ pub(crate) async fn run_ops(cmd: OpsCommands, redis_url: Option<String>) -> Resu
operation_id,
latest,
} => backfill::ops_offload_cost(redis_url, operation_id, latest).await,
OpsCommands::Replay {
operation_id,
until,
until_count,
json,
} => replay::ops_replay(operation_id, until, until_count, json).await,
OpsCommands::Report {
operation_id,
latest,
Expand Down
70 changes: 70 additions & 0 deletions ares-cli/src/ops/replay.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//! `ares ops replay` — rebuild a point-in-time state snapshot from the
//! JetStream `ARES_OPSTATE` event log. Phase 5 forensics tooling.

use anyhow::{Context, Result};
use chrono::{DateTime, Utc};

use ares_core::nats::NatsBroker;

use crate::orchestrator::state::replay::{replay_op_to_snapshot, ReplayCutoff, ReplaySnapshot};

pub(crate) async fn ops_replay(
operation_id: String,
until: Option<String>,
until_count: Option<usize>,
json: bool,
) -> Result<()> {
let until_dt: Option<DateTime<Utc>> = match until.as_deref() {
None => None,
Some(raw) => Some(
DateTime::parse_from_rfc3339(raw)
.with_context(|| format!("--until value '{raw}' is not RFC 3339"))?
.with_timezone(&Utc),
),
};
let cutoff = ReplayCutoff {
until: until_dt,
until_count,
};

let nats = NatsBroker::connect_from_env()
.await
.context("Failed to connect to NATS (set ARES_NATS_URL)")?;

let snapshot = replay_op_to_snapshot(&nats, &operation_id, cutoff)
.await
.context("Replay failed")?;

if json {
let s = serde_json::to_string_pretty(&snapshot).context("serialize snapshot")?;
println!("{s}");
} else {
print_human_summary(&snapshot);
}
Ok(())
}

fn print_human_summary(s: &ReplaySnapshot) {
println!("Replay snapshot for operation: {}", s.operation_id);
println!("Events applied: {}", s.events_applied);
println!("Credentials: {}", s.credentials.len());
println!("Hashes: {}", s.hashes.len());
println!("Hosts: {}", s.hosts.len());
println!(
" owned: {}",
s.hosts.iter().filter(|h| h.owned).count()
);
println!(
" domain controllers: {}",
s.hosts.iter().filter(|h| h.is_dc).count()
);
println!("Users: {}", s.users.len());
println!(
"Discovered vulnerabilities: {}",
s.discovered_vulnerabilities.len()
);
println!(
"Exploited vulnerabilities: {}",
s.exploited_vulnerabilities.len()
);
}
90 changes: 83 additions & 7 deletions ares-cli/src/orchestrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod recovery;
mod result_processing;
mod results;
mod routing;
mod state;
pub(crate) mod state;
pub(crate) mod strategy;
mod task_queue;
mod throttling;
Expand All @@ -40,7 +40,7 @@ use std::sync::Arc;
use anyhow::{Context, Result};
use tokio::signal;
use tokio::sync::watch;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};

use self::automation_spawner::spawn_automation_tasks;
use self::bootstrap::{bootstrap_meta, dispatch_initial_recon};
Expand Down Expand Up @@ -131,11 +131,87 @@ async fn run_inner() -> Result<()> {
);
}

let shared_state = SharedState::new(config.operation_id.clone());
shared_state
.load_from_redis(&queue)
.await
.context("Failed to load state from Redis")?;
let mut shared_state = SharedState::new(config.operation_id.clone());

// Phase 2 dual-write: install a Nats-backed op-state recorder when NATS is
// available. Redis remains authoritative until Phase 4; emit failures are
// logged (see `emit_op_state`) but never abort the op.
let nats_broker = queue.nats_broker();
if let Some(broker) = nats_broker.clone() {
shared_state.set_recorder(std::sync::Arc::new(
ares_core::op_state_log::OpStateRecorder::nats(std::sync::Arc::new(broker)),
));
info!("Op-state event log enabled (JetStream ARES_OPSTATE)");
} else {
info!("Op-state event log disabled — no NATS broker on TaskQueue");
}

// Phase 3: spawn the Postgres projector consumer when both NATS and a
// database URL are available. The projector tails ARES_OPSTATE and
// upserts each event into PG, replacing the manual `ares ops offload`
// path with an always-current archive.
let _projector_handle: Option<tokio::task::JoinHandle<()>> = match (
nats_broker.clone(),
std::env::var("ARES_DATABASE_URL").ok(),
) {
(Some(broker), Some(database_url)) => {
match ares_core::persistent_store::PersistentStore::connect(&database_url).await {
Ok(store) => {
if let Err(e) = store.migrate().await {
warn!(err = %e, "Postgres projector: schema migration failed; continuing without projection");
None
} else {
let projector =
ares_core::persistent_store::OpStateProjector::new(store, broker);
match projector.spawn().await {
Ok(h) => Some(h),
Err(e) => {
warn!(err = %e, "Failed to spawn Postgres projector — events will accumulate in JetStream without PG sync");
None
}
}
}
}
Err(e) => {
warn!(err = %e, "Postgres projector: PG connect failed; continuing without projection");
None
}
}
}
(None, _) => None,
(Some(_), None) => {
debug!("ARES_DATABASE_URL not set — Postgres projector disabled");
None
}
};

// Phase 4 (opt-in): replay state from the JetStream event log instead of
// loading from Redis. Falls through to Redis on failure or when the env
// var is unset, so the default startup path is unchanged.
let replay_enabled = std::env::var("ARES_USE_EVENT_LOG_REPLAY").as_deref() == Ok("1");
let replayed = match (replay_enabled, nats_broker.as_ref()) {
(true, Some(broker)) => match shared_state.load_from_event_log(broker).await {
Ok(n) => {
info!(events = n, "Loaded state from JetStream event log replay");
true
}
Err(e) => {
warn!(err = %e, "Event log replay failed; falling back to Redis");
false
}
},
(true, None) => {
warn!("ARES_USE_EVENT_LOG_REPLAY=1 but no NATS broker; falling back to Redis");
false
}
(false, _) => false,
};
if !replayed {
shared_state
.load_from_redis(&queue)
.await
.context("Failed to load state from Redis")?;
}

{
let mut state = shared_state.write().await;
Expand Down
33 changes: 33 additions & 0 deletions ares-cli/src/orchestrator/state/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
use anyhow::Result;
use redis::AsyncCommands;

use ares_core::models::OpStateEventPayload;
use ares_core::state;

use redis::aio::ConnectionLike;

use super::publishing::emit_op_state;
use super::SharedState;
use crate::orchestrator::task_queue::TaskQueueCore;

Expand All @@ -31,6 +33,18 @@ impl SharedState {
let _: () = conn.sadd(&key, vuln_id).await?;
let _: () = conn.expire(&key, 86400).await?;

// Phase 2 dual-write: append vuln.exploited event.
emit_op_state(
self.recorder(),
&operation_id,
OpStateEventPayload::VulnExploited {
vuln_id: vuln_id.to_string(),
exploited_by: String::new(),
result: None,
},
)
.await;

let mut state = self.inner.write().await;
state.exploited_vulnerabilities.insert(vuln_id.to_string());
Ok(())
Expand Down Expand Up @@ -150,4 +164,23 @@ mod tests {
.unwrap();
assert!(members.contains("192.168.58.5"));
}

#[tokio::test]
async fn mark_exploited_emits_event_with_capturing_recorder() {
use ares_core::models::OpStateEventPayload;
let recorder = std::sync::Arc::new(ares_core::op_state_log::OpStateRecorder::capturing());
let state = SharedState::with_recorder("op-ex".to_string(), recorder.clone());
let q = mock_queue();

state.mark_exploited(&q, "VULN-007").await.unwrap();

let evs = recorder.captured().await;
assert_eq!(evs.len(), 1);
match &evs[0].payload {
OpStateEventPayload::VulnExploited { vuln_id, .. } => {
assert_eq!(vuln_id, "VULN-007");
}
other => panic!("expected VulnExploited, got {other:?}"),
}
}
}
1 change: 1 addition & 0 deletions ares-cli/src/orchestrator/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod dedup;
mod inner;
mod persistence;
mod publishing;
pub(crate) mod replay;
mod shared;

// Re-export everything that was publicly visible from the old single file.
Expand Down
Loading
Loading