From f4eafec9ac8ee0e3b1ed9c7f6d2c677c4dd39f2a Mon Sep 17 00:00:00 2001 From: Jayson Grace Date: Tue, 12 May 2026 14:09:27 -0600 Subject: [PATCH 1/5] feat: add durable op-state event log with subject hierarchy and publish API **Added:** - Introduced `op_state_event.rs` defining `OpStateEvent` and `OpStateEventPayload` for operation state mutations with subject hierarchy, JSON serialization, and deduplication via event IDs - Added export and module wiring for `OpStateEvent` and `OpStateEventPayload` in `models/mod.rs` - Added new NATS subject prefix and stream constants for operation state events (`OP_STATE_SUBJECT_PREFIX`, `OP_STATE_STREAM`) - Implemented subject builders `op_state_subject` and `op_state_filter_for_op` for granular or wildcard subscription to operation state events - Created `StreamSpec::op_state` for configuring the durable `ARES_OPSTATE` stream with 30-day retention, `Limits` policy, and file storage - Added `NatsBroker::publish_op_state_event` for publishing op-state events with deduplication and optional optimistic concurrency control - Defined `OpStatePublishError` and error classification for publish failures and concurrency conflicts - Added comprehensive tests for subject formatting, stream configuration, and subject hierarchy disjointness in `nats.rs` - Added unit tests for event construction, JSON serialization, and subject suffix logic in `op_state_event.rs` **Changed:** - Updated `VulnerabilityInfo` to derive `PartialEq` for use in event payloads and tests - Refactored `NatsBroker::ensure_streams` to include the new op-state stream --- ares-core/src/models/mod.rs | 2 + ares-core/src/models/op_state_event.rs | 319 +++++++++++++++++++++++++ ares-core/src/models/task.rs | 2 +- ares-core/src/nats.rs | 196 ++++++++++++++- 4 files changed, 516 insertions(+), 3 deletions(-) create mode 100644 ares-core/src/models/op_state_event.rs diff --git a/ares-core/src/models/mod.rs b/ares-core/src/models/mod.rs index ce1d432e..20b2e5e3 100644 --- a/ares-core/src/models/mod.rs +++ b/ares-core/src/models/mod.rs @@ -6,6 +6,7 @@ #[cfg(feature = "blue")] mod blue; mod core; +mod op_state_event; mod operation; mod task; mod util; @@ -16,6 +17,7 @@ pub use blue::{ TriageDecision, TriageRecord, }; pub use core::{Credential, Hash, Host, Share, Target, TrustInfo, User}; +pub use op_state_event::{OpStateEvent, OpStateEventPayload}; pub use operation::{AttackChainStep, OperationMeta, SharedRedTeamState}; pub use task::{ AgentInfo, AgentRole, TaskInfo, TaskResult, TaskStatus, TaskStatusRecord, VulnerabilityInfo, diff --git a/ares-core/src/models/op_state_event.rs b/ares-core/src/models/op_state_event.rs new file mode 100644 index 00000000..b3536d8c --- /dev/null +++ b/ares-core/src/models/op_state_event.rs @@ -0,0 +1,319 @@ +//! Operation state event envelope for the JetStream `ARES_OPSTATE` log. +//! +//! Every mutation to live operation state (credentials, hosts, users, +//! vulnerabilities, timeline) is appended to JetStream as an `OpStateEvent`. +//! The stream is the durable source of truth — Redis becomes a read cache and +//! Postgres becomes a projection that the projector consumer keeps current. +//! +//! Subject layout (granular per entity-action; see [`OpStateEventPayload::subject_suffix`]): +//! +//! - `ares.ops.{op_id}.cred.captured` +//! - `ares.ops.{op_id}.hash.captured` +//! - `ares.ops.{op_id}.host.discovered` +//! - `ares.ops.{op_id}.host.owned` +//! - `ares.ops.{op_id}.user.discovered` +//! - `ares.ops.{op_id}.vuln.discovered` +//! - `ares.ops.{op_id}.vuln.exploited` +//! - `ares.ops.{op_id}.timeline` +//! +//! `event_id` is sent as the `Nats-Msg-Id` header so JetStream dedups +//! at-least-once retries. Per-subject optimistic concurrency uses +//! `Nats-Expected-Last-Subject-Sequence`. + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +use super::core::{Credential, Hash, Host, User}; +use super::task::VulnerabilityInfo; +use super::util::new_uuid; + +/// Envelope for a single mutation to operation state. +/// +/// Serialized as JSON onto the `ARES_OPSTATE` stream. The `event_id` doubles +/// as the JetStream dedup key (`Nats-Msg-Id` header) so a publisher retrying +/// after a transient error never produces a duplicate event in the log. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct OpStateEvent { + /// Stable identifier for this event. Used as the JetStream dedup key. + #[serde(default = "new_uuid")] + pub event_id: String, + /// Operation that produced the event. + pub op_id: String, + /// Wall-clock time the event was recorded at the publisher. + #[serde(default = "Utc::now")] + pub recorded_at: DateTime, + /// Typed payload — discriminated by `kind` in the JSON form. + #[serde(flatten)] + pub payload: OpStateEventPayload, +} + +impl OpStateEvent { + /// Build a new event with a freshly generated id and current timestamp. + pub fn new(op_id: impl Into, payload: OpStateEventPayload) -> Self { + Self { + event_id: new_uuid(), + op_id: op_id.into(), + recorded_at: Utc::now(), + payload, + } + } + + /// Subject suffix for this event (e.g. `cred.captured`). + pub fn subject_suffix(&self) -> &'static str { + self.payload.subject_suffix() + } +} + +/// Typed payload for [`OpStateEvent`]. +/// +/// Serializes with an internal `kind` tag matching the subject suffix so a +/// consumer can route purely on subject filter or fall back to payload kind +/// when subscribed to the wildcard. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum OpStateEventPayload { + CredentialCaptured { + credential: Credential, + }, + HashCaptured { + hash: Hash, + }, + HostDiscovered { + host: Host, + }, + HostOwned { + ip: String, + #[serde(default, skip_serializing_if = "String::is_empty")] + hostname: String, + #[serde(default, skip_serializing_if = "String::is_empty")] + owned_by: String, + }, + UserDiscovered { + user: User, + }, + VulnDiscovered { + vuln: VulnerabilityInfo, + }, + VulnExploited { + vuln_id: String, + #[serde(default, skip_serializing_if = "String::is_empty")] + exploited_by: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + result: Option, + }, + TimelineEvent { + event: serde_json::Value, + }, +} + +impl OpStateEventPayload { + /// Subject suffix that identifies this entity-action pair. Stable — + /// changing values here is a wire-format break and requires migration. + pub fn subject_suffix(&self) -> &'static str { + match self { + Self::CredentialCaptured { .. } => "cred.captured", + Self::HashCaptured { .. } => "hash.captured", + Self::HostDiscovered { .. } => "host.discovered", + Self::HostOwned { .. } => "host.owned", + Self::UserDiscovered { .. } => "user.discovered", + Self::VulnDiscovered { .. } => "vuln.discovered", + Self::VulnExploited { .. } => "vuln.exploited", + Self::TimelineEvent { .. } => "timeline", + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::core::Host; + + fn sample_credential() -> Credential { + Credential { + id: "cred-1".into(), + username: "alice".into(), + password: "P@ssw0rd!".into(), // pragma: allowlist secret + domain: "contoso.local".into(), + source: "secretsdump".into(), + discovered_at: None, + is_admin: false, + parent_id: None, + attack_step: 0, + } + } + + fn sample_host() -> Host { + Host { + ip: "192.168.58.10".into(), + hostname: "dc01.contoso.local".into(), + os: "Windows Server 2019".into(), + roles: vec!["Domain Controller".into()], + services: vec!["88/tcp kerberos".into()], + is_dc: true, + owned: false, + } + } + + #[test] + fn event_new_assigns_id_and_timestamp() { + let ev = OpStateEvent::new( + "op-1", + OpStateEventPayload::CredentialCaptured { + credential: sample_credential(), + }, + ); + assert_eq!(ev.op_id, "op-1"); + assert_eq!(ev.event_id.len(), 36); // UUIDv4 + assert!(ev.recorded_at <= Utc::now()); + } + + #[test] + fn distinct_events_get_distinct_ids() { + let p = OpStateEventPayload::UserDiscovered { + user: User { + username: "bob".into(), + domain: "contoso.local".into(), + description: String::new(), + is_admin: false, + source: "ldap".into(), + }, + }; + let a = OpStateEvent::new("op-1", p.clone()); + let b = OpStateEvent::new("op-1", p); + assert_ne!(a.event_id, b.event_id); + } + + #[test] + fn subject_suffix_matches_each_variant() { + let cases: &[(OpStateEventPayload, &str)] = &[ + ( + OpStateEventPayload::CredentialCaptured { + credential: sample_credential(), + }, + "cred.captured", + ), + ( + OpStateEventPayload::HashCaptured { + hash: Hash { + id: "h1".into(), + username: "krbtgt".into(), + hash_value: "aaaa".into(), + hash_type: "NTLM".into(), + domain: "contoso.local".into(), + cracked_password: None, + source: "secretsdump".into(), + discovered_at: None, + parent_id: None, + attack_step: 0, + aes_key: None, + }, + }, + "hash.captured", + ), + ( + OpStateEventPayload::HostDiscovered { + host: sample_host(), + }, + "host.discovered", + ), + ( + OpStateEventPayload::HostOwned { + ip: "192.168.58.10".into(), + hostname: "dc01.contoso.local".into(), + owned_by: "lateral".into(), + }, + "host.owned", + ), + ( + OpStateEventPayload::UserDiscovered { + user: User { + username: "carol".into(), + domain: "contoso.local".into(), + description: String::new(), + is_admin: false, + source: "ldap".into(), + }, + }, + "user.discovered", + ), + ( + OpStateEventPayload::VulnDiscovered { + vuln: VulnerabilityInfo { + vuln_id: "v1".into(), + vuln_type: "ADCS_ESC1".into(), + target: "192.168.58.10".into(), + discovered_by: "recon".into(), + discovered_at: Utc::now(), + details: Default::default(), + recommended_agent: "privesc".into(), + priority: 1, + }, + }, + "vuln.discovered", + ), + ( + OpStateEventPayload::VulnExploited { + vuln_id: "v1".into(), + exploited_by: "privesc".into(), + result: None, + }, + "vuln.exploited", + ), + ( + OpStateEventPayload::TimelineEvent { + event: serde_json::json!({"description": "captured DA"}), + }, + "timeline", + ), + ]; + + for (payload, expected) in cases { + let ev = OpStateEvent::new("op-x", payload.clone()); + assert_eq!(ev.subject_suffix(), *expected); + } + } + + #[test] + fn json_roundtrip_credential_captured() { + let ev = OpStateEvent::new( + "op-42", + OpStateEventPayload::CredentialCaptured { + credential: sample_credential(), + }, + ); + let j = serde_json::to_string(&ev).unwrap(); + let back: OpStateEvent = serde_json::from_str(&j).unwrap(); + assert_eq!(ev, back); + } + + #[test] + fn json_tag_uses_snake_case_kind() { + let ev = OpStateEvent::new( + "op-1", + OpStateEventPayload::HostOwned { + ip: "192.168.58.10".into(), + hostname: String::new(), + owned_by: String::new(), + }, + ); + let v: serde_json::Value = serde_json::to_value(&ev).unwrap(); + assert_eq!(v.get("kind").and_then(|s| s.as_str()), Some("host_owned")); + assert_eq!(v.get("op_id").and_then(|s| s.as_str()), Some("op-1")); + } + + #[test] + fn json_roundtrip_timeline_event_carries_arbitrary_payload() { + let ev = OpStateEvent::new( + "op-1", + OpStateEventPayload::TimelineEvent { + event: serde_json::json!({ + "description": "Captured Domain Admin", + "mitre": ["T1003"], + }), + }, + ); + let j = serde_json::to_string(&ev).unwrap(); + let back: OpStateEvent = serde_json::from_str(&j).unwrap(); + assert_eq!(ev, back); + } +} diff --git a/ares-core/src/models/task.rs b/ares-core/src/models/task.rs index a28da11a..5a854f24 100644 --- a/ares-core/src/models/task.rs +++ b/ares-core/src/models/task.rs @@ -115,7 +115,7 @@ pub struct TaskResult { /// /// Matches Python: `class VulnerabilityInfo` dataclass /// Redis serialization: `{"vuln_id","vuln_type","target","discovered_by","discovered_at","details","recommended_agent","priority"}` -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct VulnerabilityInfo { pub vuln_id: String, pub vuln_type: String, diff --git a/ares-core/src/nats.rs b/ares-core/src/nats.rs index 559b2aaa..ace68b7f 100644 --- a/ares-core/src/nats.rs +++ b/ares-core/src/nats.rs @@ -26,13 +26,19 @@ use std::time::Duration; use anyhow::{Context, Result}; +use async_nats::header::{HeaderMap, NATS_EXPECTED_LAST_SUBJECT_SEQUENCE, NATS_MESSAGE_ID}; use async_nats::jetstream::consumer::pull::Config as PullConfig; use async_nats::jetstream::consumer::AckPolicy; -use async_nats::jetstream::stream::{Config as StreamConfig, RetentionPolicy, StorageType}; +use async_nats::jetstream::context::{PublishError, PublishErrorKind}; +use async_nats::jetstream::stream::{ + Config as StreamConfig, DiscardPolicy, RetentionPolicy, StorageType, +}; use async_nats::jetstream::{self, Context as JetStreamContext}; use async_nats::Client; use tracing::{info, warn}; +use crate::models::OpStateEvent; + /// Default NATS URL used when neither `ARES_NATS_URL` nor an explicit URL is provided. pub const DEFAULT_NATS_URL: &str = "nats://127.0.0.1:4222"; @@ -52,6 +58,9 @@ pub const DEFERRED_SUBJECT_PREFIX: &str = "ares.deferred"; pub const STATE_UPDATE_SUBJECT_PREFIX: &str = "ares.state.updates"; /// Real-time discovery forwarding. `ares.discoveries.{op}`. pub const DISCOVERY_SUBJECT_PREFIX: &str = "ares.discoveries"; +/// Operation state event log. `ares.ops.{op_id}.{entity}.{action}`. +/// Backed by [`OP_STATE_STREAM`]; the durable source of truth for live op state. +pub const OP_STATE_SUBJECT_PREFIX: &str = "ares.ops"; /// Per-task result subject. `ares.tasks.results.{task_id}`. /// Lives on the `ARES_TASKS` stream so results survive orchestrator restart. pub const TASK_RESULT_SUBJECT_PREFIX: &str = "ares.tasks.results"; @@ -70,6 +79,9 @@ pub const BLUE_TASKS_STREAM: &str = "ARES_BLUE_TASKS"; pub const DEFERRED_STREAM: &str = "ARES_DEFERRED"; /// JetStream stream containing real-time discoveries. pub const DISCOVERIES_STREAM: &str = "ARES_DISCOVERIES"; +/// JetStream stream containing the durable operation state event log. +/// Pattern B: this is the source of truth, Redis is a derived cache. +pub const OP_STATE_STREAM: &str = "ARES_OPSTATE"; // === Subject builders ===================================================== @@ -118,6 +130,20 @@ pub fn discovery_subject(operation_id: &str) -> String { format!("{DISCOVERY_SUBJECT_PREFIX}.{operation_id}") } +/// Build the op-state subject for a given operation and entity-action suffix +/// (e.g. `cred.captured`). Suffix typically comes from +/// [`crate::models::OpStateEvent::subject_suffix`]. +#[inline] +pub fn op_state_subject(operation_id: &str, suffix: &str) -> String { + format!("{OP_STATE_SUBJECT_PREFIX}.{operation_id}.{suffix}") +} + +/// Subject filter that matches every op-state event for a single operation. +#[inline] +pub fn op_state_filter_for_op(operation_id: &str) -> String { + format!("{OP_STATE_SUBJECT_PREFIX}.{operation_id}.>") +} + // === Connection =========================================================== /// Shared NATS broker handle. @@ -172,6 +198,7 @@ impl NatsBroker { self.ensure_stream(StreamSpec::blue_tasks()).await?; self.ensure_stream(StreamSpec::deferred()).await?; self.ensure_stream(StreamSpec::discoveries()).await?; + self.ensure_stream(StreamSpec::op_state()).await?; Ok(()) } @@ -192,6 +219,49 @@ impl NatsBroker { } } + /// Publish an [`OpStateEvent`] to the `ARES_OPSTATE` stream. + /// + /// - Subject is `ares.ops.{op_id}.{event.subject_suffix()}` (granular). + /// - `Nats-Msg-Id` header is set to `event.event_id` so JetStream dedups + /// transient at-least-once retries. + /// - If `expected_last_subject_seq` is `Some(n)`, the publish carries + /// `Nats-Expected-Last-Subject-Sequence` for per-subject optimistic + /// concurrency; a mismatch surfaces as [`OpStatePublishError::Conflict`]. + /// + /// Awaits the JetStream ack. Callers in hot agent paths should treat a + /// transient failure as non-fatal and log — Phase 2 dual-write keeps Redis + /// authoritative until Phase 4 cutover. + pub async fn publish_op_state_event( + &self, + event: &OpStateEvent, + expected_last_subject_seq: Option, + ) -> Result { + let subject = op_state_subject(&event.op_id, event.subject_suffix()); + + let payload = serde_json::to_vec(event).map_err(OpStatePublishError::Serialize)?; + + let mut headers = HeaderMap::new(); + headers.insert(NATS_MESSAGE_ID, event.event_id.as_str()); + if let Some(seq) = expected_last_subject_seq { + headers.insert( + NATS_EXPECTED_LAST_SUBJECT_SEQUENCE, + seq.to_string().as_str(), + ); + } + + let ack_future = self + .jetstream + .publish_with_headers(subject.clone(), headers, payload.into()) + .await + .map_err(|e| classify_publish_error(&subject, e))?; + + let ack = ack_future + .await + .map_err(|e| classify_publish_error(&subject, e))?; + + Ok(ack.sequence) + } + /// Ensure a durable pull consumer exists on the given stream + filter. /// /// Returns the consumer name. Idempotent on repeated calls with the same @@ -225,12 +295,42 @@ impl NatsBroker { } } +/// Error returned by [`NatsBroker::publish_op_state_event`]. +#[derive(Debug, thiserror::Error)] +pub enum OpStatePublishError { + /// Event serialization failed before any publish was attempted. + #[error("serialize event: {0}")] + Serialize(#[source] serde_json::Error), + /// JetStream rejected the publish because the expected-last-subject-sequence + /// header did not match the server's view — another publisher won the race. + /// Callers can reload state and retry. + #[error("optimistic concurrency conflict on subject {subject}")] + Conflict { subject: String }, + /// Any other publish or ack failure. + #[error("publish to {subject} failed: {message}")] + Publish { subject: String, message: String }, +} + +fn classify_publish_error(subject: &str, err: PublishError) -> OpStatePublishError { + match err.kind() { + PublishErrorKind::WrongLastSequence => OpStatePublishError::Conflict { + subject: subject.to_string(), + }, + _ => OpStatePublishError::Publish { + subject: subject.to_string(), + message: err.to_string(), + }, + } +} + /// Stream definition. One per logical broker workload. pub struct StreamSpec { pub name: &'static str, pub subjects: Vec, pub max_age: Duration, pub storage: StorageType, + pub retention: RetentionPolicy, + pub discard: DiscardPolicy, } impl StreamSpec { @@ -241,6 +341,8 @@ impl StreamSpec { subjects: vec![format!("{TASK_SUBJECT_PREFIX}.>")], max_age: Duration::from_secs(60 * 60 * 24), // 24h storage: StorageType::File, + retention: RetentionPolicy::WorkQueue, + discard: DiscardPolicy::Old, } } @@ -254,6 +356,8 @@ impl StreamSpec { ], max_age: Duration::from_secs(60 * 60 * 24), storage: StorageType::File, + retention: RetentionPolicy::WorkQueue, + discard: DiscardPolicy::Old, } } @@ -266,6 +370,8 @@ impl StreamSpec { subjects: vec![format!("{DEFERRED_SUBJECT_PREFIX}.>")], max_age: Duration::from_secs(60 * 60 * 6), // shorter — deferred tasks are short-lived storage: StorageType::File, + retention: RetentionPolicy::WorkQueue, + discard: DiscardPolicy::Old, } } @@ -276,6 +382,26 @@ impl StreamSpec { subjects: vec![format!("{DISCOVERY_SUBJECT_PREFIX}.>")], max_age: Duration::from_secs(60 * 60 * 12), storage: StorageType::File, + retention: RetentionPolicy::WorkQueue, + discard: DiscardPolicy::Old, + } + } + + /// Operation state event log. Pattern B source-of-truth: + /// every credential / host / user / vuln / timeline mutation is appended + /// here and projected into Postgres + live caches by downstream consumers. + /// + /// `Limits` retention (not `WorkQueue`) so multiple consumers can replay + /// independently. 30 day window — old enough for forensics, bounded + /// enough for `File` storage to stay reasonable. + pub fn op_state() -> Self { + Self { + name: OP_STATE_STREAM, + subjects: vec![format!("{OP_STATE_SUBJECT_PREFIX}.>")], + max_age: Duration::from_secs(60 * 60 * 24 * 30), // 30 days + storage: StorageType::File, + retention: RetentionPolicy::Limits, + discard: DiscardPolicy::Old, } } @@ -283,7 +409,8 @@ impl StreamSpec { StreamConfig { name: self.name.to_string(), subjects: self.subjects.clone(), - retention: RetentionPolicy::WorkQueue, + retention: self.retention, + discard: self.discard, max_age: self.max_age, storage: self.storage, ..Default::default() @@ -400,6 +527,7 @@ mod tests { DEFERRED_SUBJECT_PREFIX, STATE_UPDATE_SUBJECT_PREFIX, DISCOVERY_SUBJECT_PREFIX, + OP_STATE_SUBJECT_PREFIX, ]; for (i, p1) in prefixes.iter().enumerate() { for p2 in &prefixes[i + 1..] { @@ -476,6 +604,69 @@ mod tests { assert!(s.starts_with(DEFERRED_SUBJECT_PREFIX)); } + #[test] + fn op_state_subject_format() { + assert_eq!( + op_state_subject("op-42", "cred.captured"), + "ares.ops.op-42.cred.captured" + ); + assert_eq!( + op_state_subject("op-42", "host.owned"), + "ares.ops.op-42.host.owned" + ); + } + + #[test] + fn op_state_filter_for_op_uses_wildcard() { + assert_eq!(op_state_filter_for_op("op-42"), "ares.ops.op-42.>"); + } + + #[test] + fn op_state_stream_spec_uses_limits_and_thirty_day_retention() { + let spec = StreamSpec::op_state(); + assert_eq!(spec.name, "ARES_OPSTATE"); + assert_eq!(spec.subjects, vec!["ares.ops.>"]); + assert!(matches!(spec.retention, RetentionPolicy::Limits)); + assert!(matches!(spec.storage, StorageType::File)); + assert!(matches!(spec.discard, DiscardPolicy::Old)); + assert_eq!(spec.max_age, Duration::from_secs(60 * 60 * 24 * 30)); + } + + #[test] + fn op_state_stream_to_config_carries_subjects_and_limits_retention() { + let cfg = StreamSpec::op_state().to_config(); + assert_eq!(cfg.name, "ARES_OPSTATE"); + assert!(cfg.subjects.iter().any(|s| s == "ares.ops.>")); + assert!(matches!(cfg.retention, RetentionPolicy::Limits)); + assert!(matches!(cfg.discard, DiscardPolicy::Old)); + } + + #[test] + fn op_state_subject_disjoint_from_other_streams() { + // Catches accidental overlap between the new op-state subject hierarchy + // and any of the existing streams' subjects. + let op_state = StreamSpec::op_state(); + let others = [ + StreamSpec::tasks(), + StreamSpec::blue_tasks(), + StreamSpec::deferred(), + StreamSpec::discoveries(), + ]; + for s in &op_state.subjects { + let s_prefix = s.trim_end_matches(".>"); + for other in &others { + for o in &other.subjects { + let o_prefix = o.trim_end_matches(".>"); + assert!( + !s_prefix.starts_with(o_prefix) && !o_prefix.starts_with(s_prefix), + "op-state subject {s} overlaps with {o} from {}", + other.name + ); + } + } + } + } + #[test] fn stream_names_are_uppercase_and_distinct() { let names = [ @@ -483,6 +674,7 @@ mod tests { BLUE_TASKS_STREAM, DEFERRED_STREAM, DISCOVERIES_STREAM, + OP_STATE_STREAM, ]; for n in &names { assert_eq!(*n, n.to_uppercase(), "stream name {n} must be uppercase"); From 5d47c10994dc40d443da06e150197eb7bc099803 Mon Sep 17 00:00:00 2001 From: Jayson Grace Date: Tue, 12 May 2026 14:20:59 -0600 Subject: [PATCH 2/5] feat: add dual-write op-state event log with NATS and test capturing **Added:** - Introduced `OpStateRecorder` abstraction to emit operation state events to a NATS-backed JetStream log or in-memory buffer for tests (`ares-core/src/op_state_log.rs`) - Implemented `emit_op_state` utility to handle event emission and error logging for all publish sites - Emitted op-state events for credential, hash, user, vulnerability, exploited-vuln, host, and timeline event publishers in orchestrator state modules - Provided capturing test recorders and comprehensive tests verifying event emission, deduplication, and disabled behavior **Changed:** - Updated `SharedState` to hold an `OpStateRecorder` and allow installing or replacing the recorder at runtime (`set_recorder`, `with_recorder`) - Modified orchestrator modules to dual-write op-state events when a recorder is active, preserving Redis as authoritative for now - Updated orchestrator state publishing and dedup logic to use new event emission mechanism after successful writes - Extended test coverage to assert correct event emission across all entity publishers **Removed:** - Legacy event logging stubs in orchestrator modules now handled by the new dual-write mechanism --- ares-cli/src/orchestrator/mod.rs | 15 +- ares-cli/src/orchestrator/state/dedup.rs | 33 ++++ .../state/publishing/credentials.rs | 110 ++++++++++- .../orchestrator/state/publishing/entities.rs | 106 ++++++++++- .../orchestrator/state/publishing/hosts.rs | 65 ++++++- .../src/orchestrator/state/publishing/mod.rs | 20 ++ ares-cli/src/orchestrator/state/shared.rs | 32 +++- ares-core/src/lib.rs | 1 + ares-core/src/op_state_log.rs | 178 ++++++++++++++++++ 9 files changed, 549 insertions(+), 11 deletions(-) create mode 100644 ares-core/src/op_state_log.rs diff --git a/ares-cli/src/orchestrator/mod.rs b/ares-cli/src/orchestrator/mod.rs index 2c2be791..b147bd56 100644 --- a/ares-cli/src/orchestrator/mod.rs +++ b/ares-cli/src/orchestrator/mod.rs @@ -131,7 +131,20 @@ async fn run_inner() -> Result<()> { ); } - let shared_state = SharedState::new(config.operation_id.clone()); + let mut shared_state = SharedState::new(config.operation_id.clone()); + + // Phase 2 dual-write: install a Nats-backed op-state recorder when NATS is + // available. Redis remains authoritative until Phase 4; emit failures are + // logged (see `emit_op_state`) but never abort the op. + if let Some(broker) = queue.nats_broker() { + shared_state.set_recorder(std::sync::Arc::new( + ares_core::op_state_log::OpStateRecorder::nats(std::sync::Arc::new(broker)), + )); + info!("Op-state event log enabled (JetStream ARES_OPSTATE)"); + } else { + info!("Op-state event log disabled — no NATS broker on TaskQueue"); + } + shared_state .load_from_redis(&queue) .await diff --git a/ares-cli/src/orchestrator/state/dedup.rs b/ares-cli/src/orchestrator/state/dedup.rs index bf3cd920..9b518835 100644 --- a/ares-cli/src/orchestrator/state/dedup.rs +++ b/ares-cli/src/orchestrator/state/dedup.rs @@ -3,10 +3,12 @@ use anyhow::Result; use redis::AsyncCommands; +use ares_core::models::OpStateEventPayload; use ares_core::state; use redis::aio::ConnectionLike; +use super::publishing::emit_op_state; use super::SharedState; use crate::orchestrator::task_queue::TaskQueueCore; @@ -31,6 +33,18 @@ impl SharedState { let _: () = conn.sadd(&key, vuln_id).await?; let _: () = conn.expire(&key, 86400).await?; + // Phase 2 dual-write: append vuln.exploited event. + emit_op_state( + self.recorder(), + &operation_id, + OpStateEventPayload::VulnExploited { + vuln_id: vuln_id.to_string(), + exploited_by: String::new(), + result: None, + }, + ) + .await; + let mut state = self.inner.write().await; state.exploited_vulnerabilities.insert(vuln_id.to_string()); Ok(()) @@ -150,4 +164,23 @@ mod tests { .unwrap(); assert!(members.contains("192.168.58.5")); } + + #[tokio::test] + async fn mark_exploited_emits_event_with_capturing_recorder() { + use ares_core::models::OpStateEventPayload; + let recorder = std::sync::Arc::new(ares_core::op_state_log::OpStateRecorder::capturing()); + let state = SharedState::with_recorder("op-ex".to_string(), recorder.clone()); + let q = mock_queue(); + + state.mark_exploited(&q, "VULN-007").await.unwrap(); + + let evs = recorder.captured().await; + assert_eq!(evs.len(), 1); + match &evs[0].payload { + OpStateEventPayload::VulnExploited { vuln_id, .. } => { + assert_eq!(vuln_id, "VULN-007"); + } + other => panic!("expected VulnExploited, got {other:?}"), + } + } } diff --git a/ares-cli/src/orchestrator/state/publishing/credentials.rs b/ares-cli/src/orchestrator/state/publishing/credentials.rs index a038b24e..b2ef5bb5 100644 --- a/ares-cli/src/orchestrator/state/publishing/credentials.rs +++ b/ares-cli/src/orchestrator/state/publishing/credentials.rs @@ -2,7 +2,7 @@ use anyhow::Result; -use ares_core::models::{Credential, Hash}; +use ares_core::models::{Credential, Hash, OpStateEventPayload}; use ares_core::state::{self, RedisStateReader}; use redis::aio::ConnectionLike; @@ -10,7 +10,7 @@ use redis::aio::ConnectionLike; use crate::orchestrator::state::SharedState; use crate::orchestrator::task_queue::TaskQueueCore; -use super::sanitize_credential; +use super::{emit_op_state, sanitize_credential}; impl SharedState { /// Add a credential to state and Redis (with dedup). @@ -51,6 +51,17 @@ impl SharedState { let mut conn = queue.connection(); let added = reader.add_credential(&mut conn, &cred).await?; if added { + // Phase 2 dual-write: append to the op-state log after Redis confirms + // the credential is new (Redis is the dedup oracle). + emit_op_state( + self.recorder(), + &operation_id, + OpStateEventPayload::CredentialCaptured { + credential: cred.clone(), + }, + ) + .await; + // Auto-extract domain from credential (matches Python add_credential) let cred_domain = cred.domain.to_lowercase(); if cred_domain.contains('.') { @@ -99,10 +110,18 @@ impl SharedState { let state = self.inner.read().await; state.operation_id.clone() }; - let reader = RedisStateReader::new(operation_id); + let reader = RedisStateReader::new(operation_id.clone()); let mut conn = queue.connection(); let added = reader.add_hash(&mut conn, &hash).await?; if added { + // Phase 2 dual-write: emit before consuming `hash` into state. + emit_op_state( + self.recorder(), + &operation_id, + OpStateEventPayload::HashCaptured { hash: hash.clone() }, + ) + .await; + let is_krbtgt = hash.username.to_lowercase() == "krbtgt" && hash.hash_type.to_lowercase().contains("ntlm"); let hash_domain = hash.domain.clone(); @@ -292,12 +311,20 @@ mod tests { use super::*; use crate::orchestrator::state::SharedState; use crate::orchestrator::task_queue::TaskQueueCore; + use ares_core::op_state_log::OpStateRecorder; use ares_core::state::mock_redis::MockRedisConnection; + use std::sync::Arc; fn mock_queue() -> TaskQueueCore { TaskQueueCore::from_connection(MockRedisConnection::new()) } + fn capturing_state(op_id: &str) -> (SharedState, Arc) { + let recorder = Arc::new(OpStateRecorder::capturing()); + let state = SharedState::with_recorder(op_id.to_string(), recorder.clone()); + (state, recorder) + } + fn make_cred(username: &str, password: &str, domain: &str) -> Credential { Credential { id: uuid::Uuid::new_v4().to_string(), @@ -490,4 +517,81 @@ mod tests { .unwrap(); assert!(!updated); } + + #[tokio::test] + async fn publish_credential_emits_event_with_capturing_recorder() { + let (state, recorder) = capturing_state("op-emit"); + let q = mock_queue(); + let cred = make_cred("alice", "P@ssw0rd!", "contoso.local"); + assert!(state.publish_credential(&q, cred).await.unwrap()); + + let evs = recorder.captured().await; + assert_eq!(evs.len(), 1, "exactly one event should be emitted"); + assert_eq!(evs[0].op_id, "op-emit"); + match &evs[0].payload { + OpStateEventPayload::CredentialCaptured { credential } => { + assert_eq!(credential.username, "alice"); + assert_eq!(credential.domain, "contoso.local"); + } + other => panic!("expected CredentialCaptured, got {other:?}"), + } + } + + #[tokio::test] + async fn publish_credential_dedup_does_not_emit_duplicate_event() { + let (state, recorder) = capturing_state("op-dedup"); + let q = mock_queue(); + let cred1 = make_cred("alice", "P@ssw0rd!", "contoso.local"); + let cred2 = make_cred("alice", "P@ssw0rd!", "contoso.local"); + assert!(state.publish_credential(&q, cred1).await.unwrap()); + assert!(!state.publish_credential(&q, cred2).await.unwrap()); + + let evs = recorder.captured().await; + assert_eq!(evs.len(), 1, "dedup'd insert must not emit a second event"); + } + + #[tokio::test] + async fn publish_credential_rejected_input_does_not_emit() { + // Invalid credential (empty password) is dropped by sanitize_credential + // before any Redis write — must not emit an event either. + let (state, recorder) = capturing_state("op-reject"); + let q = mock_queue(); + let cred = make_cred("alice", "", "contoso.local"); + assert!(!state.publish_credential(&q, cred).await.unwrap()); + assert!(recorder.captured().await.is_empty()); + } + + #[tokio::test] + async fn publish_hash_emits_event_with_capturing_recorder() { + let (state, recorder) = capturing_state("op-h"); + let q = mock_queue(); + let hash = make_hash("admin", "contoso.local", "NTLM", "aabbccdd"); + assert!(state.publish_hash(&q, hash).await.unwrap()); + + let evs = recorder.captured().await; + // krbtgt path emits multiple events (hash + vuln + exploited). Plain + // admin hash only emits hash.captured. + assert_eq!(evs.len(), 1); + match &evs[0].payload { + OpStateEventPayload::HashCaptured { hash } => { + assert_eq!(hash.username, "admin"); + assert_eq!(hash.hash_type, "NTLM"); + } + other => panic!("expected HashCaptured, got {other:?}"), + } + } + + #[tokio::test] + async fn disabled_recorder_emits_nothing() { + // SharedState::new() defaults to OpStateRecorder::Disabled. + let state = SharedState::new("op-noop".to_string()); + let q = mock_queue(); + state + .publish_credential(&q, make_cred("alice", "P@ssw0rd!", "contoso.local")) + .await + .unwrap(); + // No recorder handle to inspect — the assertion here is "no panic and + // no async hang on the no-op record path". Combined with the active + // tests above, this exercises both branches of `is_active`. + } } diff --git a/ares-cli/src/orchestrator/state/publishing/entities.rs b/ares-cli/src/orchestrator/state/publishing/entities.rs index 246468ff..b90641f9 100644 --- a/ares-cli/src/orchestrator/state/publishing/entities.rs +++ b/ares-cli/src/orchestrator/state/publishing/entities.rs @@ -3,11 +3,12 @@ use anyhow::Result; use redis::AsyncCommands; -use ares_core::models::{Share, User, VulnerabilityInfo}; +use ares_core::models::{OpStateEventPayload, Share, User, VulnerabilityInfo}; use ares_core::state::{self, RedisStateReader}; use redis::aio::ConnectionLike; +use super::emit_op_state; use crate::orchestrator::state::{SharedState, KEY_VULN_QUEUE}; use crate::orchestrator::task_queue::TaskQueueCore; @@ -71,10 +72,16 @@ impl SharedState { let state = self.inner.read().await; state.operation_id.clone() }; - let reader = RedisStateReader::new(operation_id); + let reader = RedisStateReader::new(operation_id.clone()); let mut conn = queue.connection(); let added = reader.add_user(&mut conn, &user).await?; if added { + emit_op_state( + self.recorder(), + &operation_id, + OpStateEventPayload::UserDiscovered { user: user.clone() }, + ) + .await; let mut state = self.inner.write().await; state.users.push(user); } @@ -123,6 +130,13 @@ impl SharedState { let mut conn = queue.connection(); let added = reader.add_vulnerability(&mut conn, &vuln).await?; if added { + emit_op_state( + self.recorder(), + &operation_id, + OpStateEventPayload::VulnDiscovered { vuln: vuln.clone() }, + ) + .await; + // Also add to vuln queue ZSET for exploitation workflow let vuln_queue_key = format!("{}:{}:{}", state::KEY_PREFIX, operation_id, KEY_VULN_QUEUE); @@ -184,7 +198,7 @@ impl SharedState { let state = self.inner.read().await; state.operation_id.clone() }; - let reader = RedisStateReader::new(operation_id); + let reader = RedisStateReader::new(operation_id.clone()); let mut conn = queue.connection(); reader.add_timeline_event(&mut conn, event).await?; @@ -193,6 +207,15 @@ impl SharedState { let _ = reader.add_technique(&mut conn, technique).await; } + emit_op_state( + self.recorder(), + &operation_id, + OpStateEventPayload::TimelineEvent { + event: event.clone(), + }, + ) + .await; + Ok(()) } @@ -657,4 +680,81 @@ mod tests { assert!(!are_in_same_forest("contoso.local", "fabrikam.local")); assert!(!are_in_same_forest("child.contoso.local", "fabrikam.local")); } + + #[tokio::test] + async fn publish_user_emits_event_with_capturing_recorder() { + let recorder = std::sync::Arc::new(ares_core::op_state_log::OpStateRecorder::capturing()); + let state = SharedState::with_recorder("op-u".to_string(), recorder.clone()); + let q = mock_queue(); + assert!(state + .publish_user(&q, make_user("alice", "contoso.local")) + .await + .unwrap()); + + let evs = recorder.captured().await; + assert_eq!(evs.len(), 1); + match &evs[0].payload { + OpStateEventPayload::UserDiscovered { user } => { + assert_eq!(user.username, "alice"); + assert_eq!(user.domain, "contoso.local"); + } + other => panic!("expected UserDiscovered, got {other:?}"), + } + } + + #[tokio::test] + async fn publish_user_dedup_does_not_emit_event() { + let recorder = std::sync::Arc::new(ares_core::op_state_log::OpStateRecorder::capturing()); + let state = SharedState::with_recorder("op-u-dup".to_string(), recorder.clone()); + let q = mock_queue(); + assert!(state + .publish_user(&q, make_user("alice", "contoso.local")) + .await + .unwrap()); + assert!(!state + .publish_user(&q, make_user("alice", "contoso.local")) + .await + .unwrap()); + assert_eq!(recorder.captured().await.len(), 1); + } + + #[tokio::test] + async fn publish_vulnerability_emits_event_with_capturing_recorder() { + let recorder = std::sync::Arc::new(ares_core::op_state_log::OpStateRecorder::capturing()); + let state = SharedState::with_recorder("op-v".to_string(), recorder.clone()); + let q = mock_queue(); + let vuln = make_vuln("VULN-001", "esc1", "192.168.58.10"); + assert!(state.publish_vulnerability(&q, vuln).await.unwrap()); + + let evs = recorder.captured().await; + assert_eq!(evs.len(), 1); + match &evs[0].payload { + OpStateEventPayload::VulnDiscovered { vuln } => { + assert_eq!(vuln.vuln_id, "VULN-001"); + assert_eq!(vuln.vuln_type, "esc1"); + } + other => panic!("expected VulnDiscovered, got {other:?}"), + } + } + + #[tokio::test] + async fn persist_timeline_event_emits_event_with_capturing_recorder() { + let recorder = std::sync::Arc::new(ares_core::op_state_log::OpStateRecorder::capturing()); + let state = SharedState::with_recorder("op-t".to_string(), recorder.clone()); + let q = mock_queue(); + let ev = serde_json::json!({"description": "smb 445 open"}); + state + .persist_timeline_event(&q, &ev, &["T1135".to_string()]) + .await + .unwrap(); + + let evs = recorder.captured().await; + assert_eq!(evs.len(), 1); + match &evs[0].payload { + OpStateEventPayload::TimelineEvent { event } => { + assert_eq!(event["description"], "smb 445 open"); + } + other => panic!("expected TimelineEvent, got {other:?}"), + } + } } diff --git a/ares-cli/src/orchestrator/state/publishing/hosts.rs b/ares-cli/src/orchestrator/state/publishing/hosts.rs index a3923601..6e75ac8e 100644 --- a/ares-cli/src/orchestrator/state/publishing/hosts.rs +++ b/ares-cli/src/orchestrator/state/publishing/hosts.rs @@ -3,7 +3,7 @@ use anyhow::Result; use redis::AsyncCommands; -use ares_core::models::Host; +use ares_core::models::{Host, OpStateEventPayload}; use ares_core::state::{self, RedisStateReader}; use redis::aio::ConnectionLike; @@ -11,7 +11,7 @@ use redis::aio::ConnectionLike; use crate::orchestrator::state::SharedState; use crate::orchestrator::task_queue::TaskQueueCore; -use super::is_aws_hostname; +use super::{emit_op_state, is_aws_hostname}; impl SharedState { /// Add a host to state and Redis. @@ -224,10 +224,19 @@ impl SharedState { let state = self.inner.read().await; state.operation_id.clone() }; - let reader = RedisStateReader::new(operation_id); + let reader = RedisStateReader::new(operation_id.clone()); let mut conn = queue.connection(); reader.add_host(&mut conn, &host).await?; + // Phase 2 dual-write: emit host.discovered for net-new hosts only. + // Merges return earlier; HostUpdated is intentionally not yet a variant. + emit_op_state( + self.recorder(), + &operation_id, + OpStateEventPayload::HostDiscovered { host: host.clone() }, + ) + .await; + // Update DC map and domain list if this is a domain controller if host.is_dc || host.detect_dc() { self.register_dc(queue, &host).await?; @@ -652,4 +661,54 @@ mod tests { let s = state.inner.read().await; assert_eq!(s.hosts[0].os, "Windows Server 2019"); } + + #[tokio::test] + async fn publish_host_emits_event_for_net_new_host() { + let recorder = std::sync::Arc::new(ares_core::op_state_log::OpStateRecorder::capturing()); + let state = SharedState::with_recorder("op-host".to_string(), recorder.clone()); + let q = mock_queue(); + + let host = make_host("192.168.58.7", "ws01.contoso.local", false); + state.publish_host(&q, host).await.unwrap(); + + let evs = recorder.captured().await; + assert!(evs.iter().any(|e| matches!( + &e.payload, + OpStateEventPayload::HostDiscovered { host } if host.ip == "192.168.58.7" + ))); + } + + #[tokio::test] + async fn publish_host_merge_does_not_emit_host_discovered() { + // A merge into an existing host returns early before the new-host path, + // so HostDiscovered must not fire a second time. + let recorder = std::sync::Arc::new(ares_core::op_state_log::OpStateRecorder::capturing()); + let state = SharedState::with_recorder("op-merge".to_string(), recorder.clone()); + let q = mock_queue(); + + state + .publish_host(&q, make_host("192.168.58.8", "", false)) + .await + .unwrap(); + let after_first = recorder.captured().await.len(); + + // Second publish with richer data should merge, not emit. + let mut host2 = make_host("192.168.58.8", "srv02.contoso.local", false); + host2.services.push("445/tcp microsoft-ds".to_string()); + state.publish_host(&q, host2).await.unwrap(); + let after_merge = recorder.captured().await.len(); + + let host_events_added = recorder + .captured() + .await + .iter() + .filter(|e| matches!(e.payload, OpStateEventPayload::HostDiscovered { .. })) + .count(); + assert_eq!( + host_events_added, 1, + "merge must not re-emit HostDiscovered" + ); + // The non-host events (e.g. netbios publish doesn't emit) shouldn't grow either. + assert_eq!(after_first, after_merge); + } } diff --git a/ares-cli/src/orchestrator/state/publishing/mod.rs b/ares-cli/src/orchestrator/state/publishing/mod.rs index ffe425ec..b1f1f640 100644 --- a/ares-cli/src/orchestrator/state/publishing/mod.rs +++ b/ares-cli/src/orchestrator/state/publishing/mod.rs @@ -6,9 +6,29 @@ mod entities; mod hosts; mod milestones; +use ares_core::models::{OpStateEvent, OpStateEventPayload}; +use ares_core::op_state_log::OpStateRecorder; use regex::Regex; use std::sync::LazyLock; +/// Emit a single op-state event through the recorder. No-op when the recorder +/// is disabled; otherwise builds an [`OpStateEvent`] and forwards to the +/// recorder. Publish failures are logged at WARN — Phase 2 keeps Redis +/// authoritative so a transient broker outage must not abort the call. +pub(super) async fn emit_op_state( + recorder: &OpStateRecorder, + op_id: &str, + payload: OpStateEventPayload, +) { + if !recorder.is_active() { + return; + } + let event = OpStateEvent::new(op_id, payload); + if let Err(e) = recorder.record(event).await { + tracing::warn!(err = %e, "op-state event publish failed"); + } +} + /// Regex matching `Password` (case-insensitive) followed by optional `:` and space. pub(super) static PASSWORD_PREFIX_RE: LazyLock = LazyLock::new(|| Regex::new(r"(?i)^password\s*:\s*").unwrap()); diff --git a/ares-cli/src/orchestrator/state/shared.rs b/ares-cli/src/orchestrator/state/shared.rs index 92b36337..88f8206c 100644 --- a/ares-cli/src/orchestrator/state/shared.rs +++ b/ares-cli/src/orchestrator/state/shared.rs @@ -3,22 +3,52 @@ use std::sync::Arc; use tokio::sync::RwLock; +use ares_core::op_state_log::OpStateRecorder; + use super::inner::StateInner; /// Thread-safe shared state with read/write access. #[derive(Clone)] pub struct SharedState { pub(super) inner: Arc>, + /// Sink for op-state events (Phase 2 dual-write). Defaults to + /// [`OpStateRecorder::Disabled`] so existing call sites stay no-op until + /// the orchestrator installs a Nats-backed recorder. + pub(crate) recorder: Arc, } impl SharedState { - /// Create a new empty state. + /// Create a new empty state. Recorder defaults to [`OpStateRecorder::Disabled`]. pub fn new(operation_id: String) -> Self { Self { inner: Arc::new(RwLock::new(StateInner::new(operation_id))), + recorder: Arc::new(OpStateRecorder::Disabled), + } + } + + /// Create a new empty state with a specific event recorder installed. + /// Production wires `OpStateRecorder::Nats(broker)`; tests use + /// `OpStateRecorder::capturing()` to assert what was emitted. + #[cfg_attr(not(test), allow(dead_code))] + pub fn with_recorder(operation_id: String, recorder: Arc) -> Self { + Self { + inner: Arc::new(RwLock::new(StateInner::new(operation_id))), + recorder, } } + /// Replace the recorder on an existing state — useful when SharedState is + /// built before the orchestrator has a NatsBroker handle ready. + pub fn set_recorder(&mut self, recorder: Arc) { + self.recorder = recorder; + } + + /// Access the installed recorder. Internal — publishing methods call this + /// to emit events after a successful Redis write. + pub(crate) fn recorder(&self) -> &OpStateRecorder { + &self.recorder + } + /// Create a cheap snapshot of state for prompt generation. /// /// Clones the relevant fields so the RwLock is released before LLM calls. diff --git a/ares-core/src/lib.rs b/ares-core/src/lib.rs index da8fdee4..4076ea20 100644 --- a/ares-core/src/lib.rs +++ b/ares-core/src/lib.rs @@ -17,6 +17,7 @@ pub mod detection; pub mod eval; pub mod models; pub mod nats; +pub mod op_state_log; pub mod parsing; pub mod persistent_store; pub mod reports; diff --git a/ares-core/src/op_state_log.rs b/ares-core/src/op_state_log.rs new file mode 100644 index 00000000..952ac2e4 --- /dev/null +++ b/ares-core/src/op_state_log.rs @@ -0,0 +1,178 @@ +//! Recorder abstraction for the operation state event log. +//! +//! `OpStateRecorder` is the sink that [`crate::models::OpStateEvent`] values +//! are pushed into. Production uses the [`OpStateRecorder::Nats`] variant which +//! publishes to the `ARES_OPSTATE` JetStream stream. Tests use +//! [`OpStateRecorder::Capturing`] to assert what was emitted without spinning +//! up a NATS server. Components that have not been wired into the event log +//! yet hold [`OpStateRecorder::Disabled`] and the recorder becomes a no-op. +//! +//! Phase 2 of the JetStream-as-source-of-truth rollout: publish failures are +//! logged at the call site but never abort the operation, because Redis is +//! still authoritative until the Phase 4 cutover. + +use std::sync::Arc; + +use tokio::sync::Mutex; + +use crate::models::OpStateEvent; +use crate::nats::{NatsBroker, OpStatePublishError}; + +/// Sink for [`OpStateEvent`] values emitted by `SharedState` publishers. +/// +/// Cheap to clone — variants either hold an `Arc` (clone-friendly +/// already) or an `Arc>` capture buffer. Default constructed as +/// [`OpStateRecorder::Disabled`] when callers have not opted into the event +/// log yet. +#[derive(Clone, Default)] +pub enum OpStateRecorder { + /// No-op sink. Used in tests and in code paths not yet wired into the log. + #[default] + Disabled, + /// Publishes each event to the `ARES_OPSTATE` JetStream stream. + Nats(Arc), + /// Test sink that appends events to an in-memory buffer. + Capturing(Arc>>), +} + +impl OpStateRecorder { + /// Construct a disabled recorder. Cheap; equivalent to `Self::Disabled`. + pub fn disabled() -> Self { + Self::Disabled + } + + /// Construct a Nats-backed recorder. Phase-2 dual-write entry point. + pub fn nats(broker: Arc) -> Self { + Self::Nats(broker) + } + + /// Construct a fresh capturing recorder for tests. + pub fn capturing() -> Self { + Self::Capturing(Arc::new(Mutex::new(Vec::new()))) + } + + /// `true` when this recorder will actually emit anything. Lets call sites + /// skip serialization work entirely when disabled. + pub fn is_active(&self) -> bool { + !matches!(self, Self::Disabled) + } + + /// Record an event. Returns the JetStream sequence number on success, or + /// the publish error so the caller can decide how to react. + /// + /// For [`Self::Disabled`] this is a successful no-op and returns sequence + /// `0`. For [`Self::Capturing`] the event is pushed onto the buffer and + /// the returned sequence is the buffer index (1-based, matching how + /// JetStream sequences begin at 1). + pub async fn record(&self, event: OpStateEvent) -> Result { + match self { + Self::Disabled => Ok(0), + Self::Nats(broker) => broker.publish_op_state_event(&event, None).await, + Self::Capturing(buf) => { + let mut guard = buf.lock().await; + guard.push(event); + Ok(guard.len() as u64) + } + } + } + + /// Snapshot of all events captured so far. Returns empty for non-capturing + /// variants. Test-only helper. + pub async fn captured(&self) -> Vec { + match self { + Self::Capturing(buf) => buf.lock().await.clone(), + _ => Vec::new(), + } + } +} + +impl std::fmt::Debug for OpStateRecorder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Disabled => f.write_str("OpStateRecorder::Disabled"), + Self::Nats(_) => f.write_str("OpStateRecorder::Nats(..)"), + Self::Capturing(_) => f.write_str("OpStateRecorder::Capturing(..)"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::OpStateEventPayload; + + fn timeline_payload(text: &str) -> OpStateEventPayload { + OpStateEventPayload::TimelineEvent { + event: serde_json::json!({ "description": text }), + } + } + + #[test] + fn disabled_is_default_and_inactive() { + let r = OpStateRecorder::default(); + assert!(!r.is_active()); + assert!(matches!(r, OpStateRecorder::Disabled)); + } + + #[test] + fn capturing_reports_active() { + let r = OpStateRecorder::capturing(); + assert!(r.is_active()); + } + + #[tokio::test] + async fn disabled_record_is_noop() { + let r = OpStateRecorder::Disabled; + let seq = r + .record(OpStateEvent::new("op-1", timeline_payload("hi"))) + .await + .unwrap(); + assert_eq!(seq, 0); + assert!(r.captured().await.is_empty()); + } + + #[tokio::test] + async fn capturing_appends_events_in_order() { + let r = OpStateRecorder::capturing(); + let s1 = r + .record(OpStateEvent::new("op-1", timeline_payload("first"))) + .await + .unwrap(); + let s2 = r + .record(OpStateEvent::new("op-1", timeline_payload("second"))) + .await + .unwrap(); + assert_eq!(s1, 1); + assert_eq!(s2, 2); + let evs = r.captured().await; + assert_eq!(evs.len(), 2); + match &evs[0].payload { + OpStateEventPayload::TimelineEvent { event } => { + assert_eq!(event["description"], "first"); + } + _ => panic!("wrong payload variant"), + } + } + + #[tokio::test] + async fn capturing_buffer_is_shared_across_clones() { + let r = OpStateRecorder::capturing(); + let r2 = r.clone(); + r.record(OpStateEvent::new("op-1", timeline_payload("a"))) + .await + .unwrap(); + r2.record(OpStateEvent::new("op-1", timeline_payload("b"))) + .await + .unwrap(); + // Both clones see both events. + assert_eq!(r.captured().await.len(), 2); + assert_eq!(r2.captured().await.len(), 2); + } + + #[test] + fn debug_does_not_leak_event_contents() { + let r = OpStateRecorder::Capturing(Arc::new(Mutex::new(Vec::new()))); + let s = format!("{r:?}"); + assert_eq!(s, "OpStateRecorder::Capturing(..)"); + } +} From 52267882ff3f26bdd6cb3ea134271557ef3919a2 Mon Sep 17 00:00:00 2001 From: Jayson Grace Date: Tue, 12 May 2026 14:29:16 -0600 Subject: [PATCH 3/5] feat: add Postgres projector to sync op-state events from JetStream **Added:** - Introduced `OpStateProjector` for syncing `ARES_OPSTATE` events to Postgres, ensuring the archive stays current by tailing JetStream and upserting events into relevant tables - Added durable JetStream consumer configuration and logic for idempotent event application using existing unique constraints - Implemented error handling, schema migration, and conditional projector startup based on NATS and database availability - Exported `OpStateProjector` and `PROJECTOR_CONSUMER_NAME` from `persistent_store` module - Included unit tests for utility functions and projector consumer stability **Changed:** - Updated orchestrator to spawn the Postgres projector consumer when both NATS and a database URL are available - Added `debug` import to orchestrator for enhanced logging during projector initialization --- ares-cli/src/orchestrator/mod.rs | 44 +- ares-core/src/persistent_store/mod.rs | 2 + ares-core/src/persistent_store/projector.rs | 471 ++++++++++++++++++++ 3 files changed, 515 insertions(+), 2 deletions(-) create mode 100644 ares-core/src/persistent_store/projector.rs diff --git a/ares-cli/src/orchestrator/mod.rs b/ares-cli/src/orchestrator/mod.rs index b147bd56..b9b90f15 100644 --- a/ares-cli/src/orchestrator/mod.rs +++ b/ares-cli/src/orchestrator/mod.rs @@ -40,7 +40,7 @@ use std::sync::Arc; use anyhow::{Context, Result}; use tokio::signal; use tokio::sync::watch; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use self::automation_spawner::spawn_automation_tasks; use self::bootstrap::{bootstrap_meta, dispatch_initial_recon}; @@ -136,7 +136,8 @@ async fn run_inner() -> Result<()> { // Phase 2 dual-write: install a Nats-backed op-state recorder when NATS is // available. Redis remains authoritative until Phase 4; emit failures are // logged (see `emit_op_state`) but never abort the op. - if let Some(broker) = queue.nats_broker() { + let nats_broker = queue.nats_broker(); + if let Some(broker) = nats_broker.clone() { shared_state.set_recorder(std::sync::Arc::new( ares_core::op_state_log::OpStateRecorder::nats(std::sync::Arc::new(broker)), )); @@ -145,6 +146,45 @@ async fn run_inner() -> Result<()> { info!("Op-state event log disabled — no NATS broker on TaskQueue"); } + // Phase 3: spawn the Postgres projector consumer when both NATS and a + // database URL are available. The projector tails ARES_OPSTATE and + // upserts each event into PG, replacing the manual `ares ops offload` + // path with an always-current archive. + let _projector_handle: Option> = match ( + nats_broker.clone(), + std::env::var("ARES_DATABASE_URL").ok(), + ) { + (Some(broker), Some(database_url)) => { + match ares_core::persistent_store::PersistentStore::connect(&database_url).await { + Ok(store) => { + if let Err(e) = store.migrate().await { + warn!(err = %e, "Postgres projector: schema migration failed; continuing without projection"); + None + } else { + let projector = + ares_core::persistent_store::OpStateProjector::new(store, broker); + match projector.spawn().await { + Ok(h) => Some(h), + Err(e) => { + warn!(err = %e, "Failed to spawn Postgres projector — events will accumulate in JetStream without PG sync"); + None + } + } + } + } + Err(e) => { + warn!(err = %e, "Postgres projector: PG connect failed; continuing without projection"); + None + } + } + } + (None, _) => None, + (Some(_), None) => { + debug!("ARES_DATABASE_URL not set — Postgres projector disabled"); + None + } + }; + shared_state .load_from_redis(&queue) .await diff --git a/ares-core/src/persistent_store/mod.rs b/ares-core/src/persistent_store/mod.rs index 9c76c677..c79ef11c 100644 --- a/ares-core/src/persistent_store/mod.rs +++ b/ares-core/src/persistent_store/mod.rs @@ -18,10 +18,12 @@ //! - Retention policy enforcement mod config; +mod projector; mod queries; mod store; pub use config::{PersistentStoreConfig, RetentionConfig}; +pub use projector::{OpStateProjector, PROJECTOR_CONSUMER_NAME}; pub use queries::{ CostRow, CredentialRow, HashRow, HistoricalQueryService, MitreCoverage, OperationRow, OperationSummary, diff --git a/ares-core/src/persistent_store/projector.rs b/ares-core/src/persistent_store/projector.rs new file mode 100644 index 00000000..7467150f --- /dev/null +++ b/ares-core/src/persistent_store/projector.rs @@ -0,0 +1,471 @@ +//! Postgres projector for the operation state event log. +//! +//! Tails the `ARES_OPSTATE` JetStream stream with a durable pull consumer and +//! projects each [`OpStateEvent`] into the existing Postgres tables. Replaces +//! the manual batch [`super::PersistentStore::offload_operation`] path at the +//! end of an op — Postgres now stays always-current. +//! +//! Idempotency comes from the existing entity-level UNIQUE constraints +//! (`uq_cred`, `uq_hash`, `uq_user`, `uq_vuln`, `uq_host`). Redelivered events +//! upsert to the same row, so at-least-once delivery is safe. The +//! `operations` row is auto-created on first event for an op_id. + +use std::time::Duration; + +use anyhow::{Context, Result}; +use async_nats::jetstream::consumer::{pull::Config as PullConfig, AckPolicy, Consumer}; +use chrono::Utc; +use futures::StreamExt; +use sha2::{Digest, Sha256}; +use sqlx::PgPool; +use tokio::task::JoinHandle; +use tracing::{debug, info, warn}; +use uuid::Uuid; + +use crate::models::{OpStateEvent, OpStateEventPayload}; +use crate::nats::{NatsBroker, OP_STATE_STREAM, OP_STATE_SUBJECT_PREFIX}; +use crate::persistent_store::PersistentStore; + +/// Durable consumer name used by the Postgres projector. Stable — renaming +/// requires a manual `consumer delete` against the JetStream stream. +pub const PROJECTOR_CONSUMER_NAME: &str = "ares-projector-pg"; + +/// Default ack wait for projector messages. PG writes are usually fast, but +/// during a stop-the-world index rebuild we want a generous window. +const ACK_WAIT: Duration = Duration::from_secs(60); + +/// Maximum redelivery attempts before JetStream gives up on a message. +const MAX_DELIVER: i64 = 5; + +/// Projector: connects the JetStream event log to the Postgres archive. +#[derive(Clone)] +pub struct OpStateProjector { + store: PersistentStore, + nats: NatsBroker, +} + +impl OpStateProjector { + /// Build a new projector. Does not start the background task — call + /// [`spawn`](Self::spawn) for that. + pub fn new(store: PersistentStore, nats: NatsBroker) -> Self { + Self { store, nats } + } + + /// Apply a single event to Postgres synchronously. Used by the consumer + /// loop and by replay tooling; tests can call this directly to avoid + /// spinning up a NATS server. + pub async fn apply_event(&self, event: &OpStateEvent) -> Result<()> { + let op_uuid = self.ensure_operation_row(&event.op_id).await?; + match &event.payload { + OpStateEventPayload::CredentialCaptured { credential } => { + upsert_credential(self.store.pool(), op_uuid, credential).await?; + } + OpStateEventPayload::HashCaptured { hash } => { + upsert_hash(self.store.pool(), op_uuid, hash).await?; + } + OpStateEventPayload::HostDiscovered { host } => { + upsert_host(self.store.pool(), op_uuid, host).await?; + } + OpStateEventPayload::HostOwned { ip, hostname, .. } => { + mark_host_owned(self.store.pool(), op_uuid, ip, hostname.as_str()).await?; + } + OpStateEventPayload::UserDiscovered { user } => { + upsert_user(self.store.pool(), op_uuid, user).await?; + } + OpStateEventPayload::VulnDiscovered { vuln } => { + upsert_vulnerability(self.store.pool(), op_uuid, vuln, false).await?; + } + OpStateEventPayload::VulnExploited { vuln_id, .. } => { + mark_vulnerability_exploited(self.store.pool(), op_uuid, vuln_id).await?; + } + OpStateEventPayload::TimelineEvent { .. } => { + // Timeline events are written to `timeline_events` in a later + // pass when the red-team timeline schema is wired in (no event_id + // on the current red-team timeline). Tracked in the Phase 4 + // cutover; the projector currently no-ops on these to keep the + // stream draining. + debug!(op_id = %event.op_id, "skipping timeline event projection (schema pending)"); + } + } + Ok(()) + } + + /// Spawn the long-running consumer task that tails `ARES_OPSTATE` and + /// applies each event via [`apply_event`](Self::apply_event). Returns the + /// task handle; aborting it stops the projector. + pub async fn spawn(self) -> Result> { + let consumer = self.ensure_consumer().await?; + let projector = self.clone(); + let handle = tokio::spawn(async move { + projector.run_loop(consumer).await; + }); + info!( + consumer = PROJECTOR_CONSUMER_NAME, + stream = OP_STATE_STREAM, + "Postgres projector spawned" + ); + Ok(handle) + } + + async fn ensure_consumer(&self) -> Result> { + let stream = self + .nats + .jetstream() + .get_stream(OP_STATE_STREAM) + .await + .with_context(|| format!("get_stream({OP_STATE_STREAM})"))?; + + let cfg = PullConfig { + durable_name: Some(PROJECTOR_CONSUMER_NAME.to_string()), + filter_subject: format!("{OP_STATE_SUBJECT_PREFIX}.>"), + ack_policy: AckPolicy::Explicit, + ack_wait: ACK_WAIT, + max_deliver: MAX_DELIVER, + ..Default::default() + }; + let consumer = stream + .get_or_create_consumer(PROJECTOR_CONSUMER_NAME, cfg) + .await + .with_context(|| format!("ensure consumer {PROJECTOR_CONSUMER_NAME}"))?; + Ok(consumer) + } + + async fn run_loop(self, consumer: Consumer) { + let mut messages = match consumer.messages().await { + Ok(m) => m, + Err(e) => { + warn!(error = %e, "projector: consumer.messages() failed at startup"); + return; + } + }; + while let Some(item) = messages.next().await { + let msg = match item { + Ok(m) => m, + Err(e) => { + warn!(error = %e, "projector: stream error"); + continue; + } + }; + let event: OpStateEvent = match serde_json::from_slice(&msg.payload) { + Ok(ev) => ev, + Err(e) => { + warn!( + error = %e, + subject = %msg.subject, + "projector: undecodable event payload; acking to skip", + ); + let _ = msg.ack().await; + continue; + } + }; + match self.apply_event(&event).await { + Ok(()) => { + if let Err(e) = msg.ack().await { + warn!(error = %e, "projector: ack failed"); + } + } + Err(e) => { + // No ack — JetStream will redeliver up to max_deliver. + warn!( + error = %e, + op_id = %event.op_id, + event_id = %event.event_id, + "projector: apply_event failed; allowing redelivery" + ); + } + } + } + warn!("projector: message stream ended"); + } + + /// Ensure an `operations` row exists for the given `operation_id`, returning + /// its UUID. Uses `ON CONFLICT … DO UPDATE` so the RETURNING clause always + /// produces a row, regardless of whether the insert or the conflict path + /// fired. + async fn ensure_operation_row(&self, operation_id: &str) -> Result { + let row: (Uuid,) = sqlx::query_as( + "INSERT INTO operations (operation_id, started_at) + VALUES ($1, NOW()) + ON CONFLICT (operation_id) DO UPDATE SET operation_id = EXCLUDED.operation_id + RETURNING id", + ) + .bind(operation_id) + .fetch_one(self.store.pool()) + .await + .with_context(|| format!("ensure operations row for {operation_id}"))?; + Ok(row.0) + } +} + +// ========================================================================= +// Single-row upserts (no transaction; PG enforces per-row UNIQUE constraints) +// ========================================================================= + +async fn upsert_credential( + pool: &PgPool, + operation_uuid: Uuid, + cred: &crate::models::Credential, +) -> Result<()> { + let password_hash = if cred.password.is_empty() { + None + } else { + Some(sha256_prefix(&cred.password, 16)) + }; + let domain = (!cred.domain.is_empty()).then_some(cred.domain.as_str()); + let source = (!cred.source.is_empty()).then_some(cred.source.as_str()); + + sqlx::query( + "INSERT INTO credentials (operation_id, credential_id, username, domain, + password_hash, is_admin, source, attack_step, discovered_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT ON CONSTRAINT uq_cred DO NOTHING", + ) + .bind(operation_uuid) + .bind(&cred.id) + .bind(&cred.username) + .bind(domain) + .bind(password_hash.as_deref()) + .bind(cred.is_admin) + .bind(source) + .bind(cred.attack_step) + .bind(cred.discovered_at) + .execute(pool) + .await + .context("upsert credential")?; + Ok(()) +} + +async fn upsert_hash(pool: &PgPool, operation_uuid: Uuid, h: &crate::models::Hash) -> Result<()> { + let hash_prefix = + (!h.hash_value.is_empty()).then_some(&h.hash_value[..h.hash_value.len().min(64)]); + let cracked_hash = h + .cracked_password + .as_deref() + .filter(|p| !p.is_empty()) + .map(|p| sha256_prefix(p, 16)); + let domain = (!h.domain.is_empty()).then_some(h.domain.as_str()); + let hash_type = (!h.hash_type.is_empty()).then_some(h.hash_type.as_str()); + let source = (!h.source.is_empty()).then_some(h.source.as_str()); + + sqlx::query( + "INSERT INTO hashes (operation_id, hash_id, username, domain, hash_type, + hash_value_prefix, cracked_password_hash, source, + attack_step, discovered_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT ON CONSTRAINT uq_hash DO NOTHING", + ) + .bind(operation_uuid) + .bind(&h.id) + .bind(&h.username) + .bind(domain) + .bind(hash_type) + .bind(hash_prefix) + .bind(cracked_hash.as_deref()) + .bind(source) + .bind(h.attack_step) + .bind(h.discovered_at) + .execute(pool) + .await + .context("upsert hash")?; + Ok(()) +} + +async fn upsert_host( + pool: &PgPool, + operation_uuid: Uuid, + host: &crate::models::Host, +) -> Result<()> { + if host.ip.is_empty() { + warn!("projector: skipping host with empty IP"); + return Ok(()); + } + let hostname = (!host.hostname.is_empty()).then_some(host.hostname.as_str()); + let os = (!host.os.is_empty()).then_some(host.os.as_str()); + let roles: Option<&[String]> = (!host.roles.is_empty()).then_some(host.roles.as_slice()); + let services: Option<&[String]> = + (!host.services.is_empty()).then_some(host.services.as_slice()); + + sqlx::query( + "INSERT INTO hosts (operation_id, ip, hostname, os, is_dc, is_owned, roles, services) + VALUES ($1, $2::inet, $3, $4, $5, $6, $7, $8) + ON CONFLICT ON CONSTRAINT uq_host DO UPDATE SET + hostname = COALESCE(EXCLUDED.hostname, hosts.hostname), + os = COALESCE(EXCLUDED.os, hosts.os), + is_dc = hosts.is_dc OR EXCLUDED.is_dc, + is_owned = hosts.is_owned OR EXCLUDED.is_owned, + roles = COALESCE(EXCLUDED.roles, hosts.roles), + services = COALESCE(EXCLUDED.services, hosts.services)", + ) + .bind(operation_uuid) + .bind(&host.ip) + .bind(hostname) + .bind(os) + .bind(host.is_dc) + .bind(host.owned) + .bind(roles) + .bind(services) + .execute(pool) + .await + .context("upsert host")?; + Ok(()) +} + +async fn mark_host_owned( + pool: &PgPool, + operation_uuid: Uuid, + ip: &str, + _hostname: &str, +) -> Result<()> { + sqlx::query( + "UPDATE hosts SET is_owned = TRUE + WHERE operation_id = $1 AND ip = $2::inet", + ) + .bind(operation_uuid) + .bind(ip) + .execute(pool) + .await + .context("mark host owned")?; + Ok(()) +} + +async fn upsert_user( + pool: &PgPool, + operation_uuid: Uuid, + user: &crate::models::User, +) -> Result<()> { + let domain = (!user.domain.is_empty()).then_some(user.domain.as_str()); + let description = (!user.description.is_empty()).then_some(user.description.as_str()); + let source = (!user.source.is_empty()).then_some(user.source.as_str()); + + sqlx::query( + "INSERT INTO users (operation_id, username, domain, description, is_admin, source) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT ON CONSTRAINT uq_user DO NOTHING", + ) + .bind(operation_uuid) + .bind(&user.username) + .bind(domain) + .bind(description) + .bind(user.is_admin) + .bind(source) + .execute(pool) + .await + .context("upsert user")?; + Ok(()) +} + +async fn upsert_vulnerability( + pool: &PgPool, + operation_uuid: Uuid, + vuln: &crate::models::VulnerabilityInfo, + exploited: bool, +) -> Result<()> { + let (target_ip, target_hostname) = if is_ip(&vuln.target) { + (Some(vuln.target.as_str()), None) + } else { + (None, Some(vuln.target.as_str())) + }; + let details = if vuln.details.is_empty() { + None + } else { + Some(serde_json::to_value(&vuln.details)?) + }; + let exploited_at = exploited.then(Utc::now); + + sqlx::query( + "INSERT INTO vulnerabilities (operation_id, vuln_id, vuln_type, target_ip, + target_hostname, priority, discovered_by, + discovered_at, exploited_at, details) + VALUES ($1, $2, $3, $4::inet, $5, $6, $7, $8, $9, $10) + ON CONFLICT ON CONSTRAINT uq_vuln DO UPDATE SET + details = COALESCE(EXCLUDED.details, vulnerabilities.details)", + ) + .bind(operation_uuid) + .bind(&vuln.vuln_id) + .bind(&vuln.vuln_type) + .bind(target_ip) + .bind(target_hostname) + .bind(vuln.priority) + .bind(&vuln.discovered_by) + .bind(vuln.discovered_at) + .bind(exploited_at) + .bind(details) + .execute(pool) + .await + .context("upsert vulnerability")?; + Ok(()) +} + +async fn mark_vulnerability_exploited( + pool: &PgPool, + operation_uuid: Uuid, + vuln_id: &str, +) -> Result<()> { + sqlx::query( + "UPDATE vulnerabilities SET exploited_at = COALESCE(exploited_at, NOW()) + WHERE operation_id = $1 AND vuln_id = $2", + ) + .bind(operation_uuid) + .bind(vuln_id) + .execute(pool) + .await + .context("mark vulnerability exploited")?; + Ok(()) +} + +fn sha256_prefix(input: &str, len: usize) -> String { + let hash = Sha256::digest(input.as_bytes()); + let hex: String = hash.iter().map(|b| format!("{b:02x}")).collect(); + hex[..hex.len().min(len)].to_string() +} + +fn is_ip(value: &str) -> bool { + if value.is_empty() { + return false; + } + let parts: Vec<&str> = value.split('.').collect(); + if parts.len() != 4 { + return false; + } + parts.iter().all(|p| p.parse::().is_ok()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sha256_prefix_truncates_to_requested_len() { + let s = sha256_prefix("P@ssw0rd!", 16); + assert_eq!(s.len(), 16); + assert!(s.chars().all(|c| c.is_ascii_hexdigit())); + } + + #[test] + fn sha256_prefix_deterministic() { + assert_eq!(sha256_prefix("alice", 8), sha256_prefix("alice", 8)); + assert_ne!(sha256_prefix("alice", 8), sha256_prefix("bob", 8)); + } + + #[test] + fn is_ip_accepts_dotted_quad() { + assert!(is_ip("192.168.58.10")); + assert!(is_ip("10.0.0.1")); + } + + #[test] + fn is_ip_rejects_hostname_and_short_quad() { + assert!(!is_ip("dc01.contoso.local")); + assert!(!is_ip("192.168.58")); + assert!(!is_ip("")); + assert!(!is_ip("999.999.999.999")); + } + + #[test] + fn projector_consumer_name_stable() { + // Renaming this is a deployment break — the existing durable consumer + // would be abandoned and a new one would start from the next message + // rather than where the old one left off. + assert_eq!(PROJECTOR_CONSUMER_NAME, "ares-projector-pg"); + } +} From a3e0f47eb23485a113e45934878766296cbda595 Mon Sep 17 00:00:00 2001 From: Jayson Grace Date: Tue, 12 May 2026 14:34:27 -0600 Subject: [PATCH 4/5] feat: add JetStream event log replay for operation state reconstruction **Added:** - Introduced `SharedState::load_from_event_log` to replay operation state from JetStream event log as an opt-in startup path, controlled by `ARES_USE_EVENT_LOG_REPLAY` - Added pure function `apply_event_to_state` for mutating state from `OpStateEvent` variants, supporting event log replay and tests - Created `replay.rs` module with implementation and comprehensive tests for event replay logic - Registered `replay` module in orchestrator state mod for inclusion in build **Changed:** - Updated orchestrator startup to conditionally replay from JetStream event log before falling back to Redis, preserving default behavior unless opt-in environment variable is set --- ares-cli/src/orchestrator/mod.rs | 31 +- ares-cli/src/orchestrator/state/mod.rs | 1 + ares-cli/src/orchestrator/state/replay.rs | 412 ++++++++++++++++++++++ 3 files changed, 440 insertions(+), 4 deletions(-) create mode 100644 ares-cli/src/orchestrator/state/replay.rs diff --git a/ares-cli/src/orchestrator/mod.rs b/ares-cli/src/orchestrator/mod.rs index b9b90f15..fb690591 100644 --- a/ares-cli/src/orchestrator/mod.rs +++ b/ares-cli/src/orchestrator/mod.rs @@ -185,10 +185,33 @@ async fn run_inner() -> Result<()> { } }; - shared_state - .load_from_redis(&queue) - .await - .context("Failed to load state from Redis")?; + // Phase 4 (opt-in): replay state from the JetStream event log instead of + // loading from Redis. Falls through to Redis on failure or when the env + // var is unset, so the default startup path is unchanged. + let replay_enabled = std::env::var("ARES_USE_EVENT_LOG_REPLAY").as_deref() == Ok("1"); + let replayed = match (replay_enabled, nats_broker.as_ref()) { + (true, Some(broker)) => match shared_state.load_from_event_log(broker).await { + Ok(n) => { + info!(events = n, "Loaded state from JetStream event log replay"); + true + } + Err(e) => { + warn!(err = %e, "Event log replay failed; falling back to Redis"); + false + } + }, + (true, None) => { + warn!("ARES_USE_EVENT_LOG_REPLAY=1 but no NATS broker; falling back to Redis"); + false + } + (false, _) => false, + }; + if !replayed { + shared_state + .load_from_redis(&queue) + .await + .context("Failed to load state from Redis")?; + } { let mut state = shared_state.write().await; diff --git a/ares-cli/src/orchestrator/state/mod.rs b/ares-cli/src/orchestrator/state/mod.rs index 93b8002d..a69fa790 100644 --- a/ares-cli/src/orchestrator/state/mod.rs +++ b/ares-cli/src/orchestrator/state/mod.rs @@ -11,6 +11,7 @@ mod dedup; mod inner; mod persistence; mod publishing; +mod replay; mod shared; // Re-export everything that was publicly visible from the old single file. diff --git a/ares-cli/src/orchestrator/state/replay.rs b/ares-cli/src/orchestrator/state/replay.rs new file mode 100644 index 00000000..46de5ba4 --- /dev/null +++ b/ares-cli/src/orchestrator/state/replay.rs @@ -0,0 +1,412 @@ +//! Replay operation state from the JetStream `ARES_OPSTATE` event log. +//! +//! Phase 4 cutover primitive. The pure +//! [`apply_event_to_state`] function knows how to mutate [`StateInner`] for +//! every [`OpStateEventPayload`] variant. The async +//! [`SharedState::load_from_event_log`] driver reads the stream up to the +//! current sequence and applies events in order. +//! +//! Scope limitations (see Phase 4 design doc): +//! - The current event types cover entities only (credentials, hashes, +//! hosts, users, vulns, timeline). They do NOT carry derived state like +//! `has_domain_admin`, `dominated_domains`, `domain_controllers`, or the +//! `domains` list. Replay reconstructs the entity collections; derived +//! state is re-computed by post-replay hooks or by re-running the publish +//! methods (deferred to a follow-up). +//! - Replay is opt-in via `ARES_USE_EVENT_LOG_REPLAY=1`. The default startup +//! path still loads from Redis. + +use std::time::Duration; + +use anyhow::{Context, Result}; +use async_nats::jetstream::consumer::{pull::Config as PullConfig, AckPolicy, DeliverPolicy}; +use futures::StreamExt; +use tracing::{info, warn}; + +use ares_core::models::{OpStateEvent, OpStateEventPayload}; +use ares_core::nats::{op_state_filter_for_op, NatsBroker, OP_STATE_STREAM}; + +use super::inner::StateInner; +use super::SharedState; + +/// How long to wait for the consumer to deliver the next message before +/// declaring the replay caught up. The stream is replayed from start to the +/// current sequence; an idle pause longer than this means we've drained it. +const REPLAY_IDLE_TIMEOUT: Duration = Duration::from_secs(2); + +/// Apply a single [`OpStateEvent`] to [`StateInner`] in-place. +/// +/// Pure function — no I/O. Used by both the live replay loop and by +/// replay-based tests. Idempotent in the sense that re-applying the same +/// event (same `event_id`) is safe: collections may grow with duplicates +/// since deduplication previously lived in Redis HSET-NX and is not yet +/// reproduced in-memory. Callers that need exact reconstruction should drop +/// duplicate event_ids before invoking — JetStream's `Nats-Msg-Id` dedup +/// usually makes this a non-issue. +pub fn apply_event_to_state(state: &mut StateInner, event: &OpStateEvent) { + match &event.payload { + OpStateEventPayload::CredentialCaptured { credential } => { + state.credentials.push(credential.clone()); + } + OpStateEventPayload::HashCaptured { hash } => { + state.hashes.push(hash.clone()); + } + OpStateEventPayload::HostDiscovered { host } => { + state.hosts.push(host.clone()); + } + OpStateEventPayload::HostOwned { ip, .. } => { + if let Some(existing) = state.hosts.iter_mut().find(|h| h.ip == *ip) { + existing.owned = true; + } + } + OpStateEventPayload::UserDiscovered { user } => { + state.users.push(user.clone()); + } + OpStateEventPayload::VulnDiscovered { vuln } => { + state + .discovered_vulnerabilities + .insert(vuln.vuln_id.clone(), vuln.clone()); + } + OpStateEventPayload::VulnExploited { vuln_id, .. } => { + state.exploited_vulnerabilities.insert(vuln_id.clone()); + } + OpStateEventPayload::TimelineEvent { .. } => { + // Red-team timeline replay is deferred until the timeline entries + // carry an event_id. The projector skips these for the same + // reason — keep replay symmetric. + } + } +} + +impl SharedState { + /// Replay all events for this operation from the `ARES_OPSTATE` stream + /// into in-memory state. Returns the number of events applied. + /// + /// Uses an ephemeral consumer with `DeliverPolicy::All` so each call + /// starts from the first retained message for the operation. Stops once + /// no new messages arrive within [`REPLAY_IDLE_TIMEOUT`] — the stream is + /// considered drained. + /// + /// Phase 4 entry point. Opt-in: orchestrator checks + /// `ARES_USE_EVENT_LOG_REPLAY=1` before calling. + pub async fn load_from_event_log(&self, nats: &NatsBroker) -> Result { + let op_id = self.operation_id().await; + let filter = op_state_filter_for_op(&op_id); + + let stream = nats + .jetstream() + .get_stream(OP_STATE_STREAM) + .await + .with_context(|| format!("get_stream({OP_STATE_STREAM})"))?; + + // Ephemeral consumer (no durable_name) — gets cleaned up automatically. + // DeliverPolicy::All replays from the first retained message; we stop + // when idle for REPLAY_IDLE_TIMEOUT. + let cfg = PullConfig { + filter_subject: filter.clone(), + ack_policy: AckPolicy::None, + deliver_policy: DeliverPolicy::All, + ..Default::default() + }; + let consumer = stream + .create_consumer(cfg) + .await + .with_context(|| format!("create ephemeral replay consumer for {filter}"))?; + + let mut messages = consumer + .messages() + .await + .context("ephemeral consumer.messages()")?; + + let mut count: usize = 0; + loop { + let next = tokio::time::timeout(REPLAY_IDLE_TIMEOUT, messages.next()).await; + let item = match next { + Err(_) => break, // idle timeout — drained + Ok(None) => break, // stream closed + Ok(Some(item)) => item, + }; + let msg = match item { + Ok(m) => m, + Err(e) => { + warn!(error = %e, "replay: stream error; aborting"); + break; + } + }; + let event: OpStateEvent = match serde_json::from_slice(&msg.payload) { + Ok(ev) => ev, + Err(e) => { + warn!(error = %e, subject = %msg.subject, "replay: undecodable event; skipping"); + continue; + } + }; + // Defensive: filter_subject should already do this, but skip + // cross-operation events if any sneak through. + if event.op_id != op_id { + continue; + } + { + let mut inner = self.write().await; + apply_event_to_state(&mut inner, &event); + } + count += 1; + } + + info!(op_id, events_applied = count, "Replayed op-state event log"); + Ok(count) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ares_core::models::{ + Credential, Hash, Host, OpStateEvent, OpStateEventPayload, User, VulnerabilityInfo, + }; + use chrono::Utc; + use std::collections::HashMap; + + fn cred(username: &str, domain: &str) -> Credential { + Credential { + id: format!("{username}@{domain}"), + username: username.to_string(), + password: "P@ssw0rd!".to_string(), // pragma: allowlist secret + domain: domain.to_string(), + source: "replay-test".to_string(), + discovered_at: None, + is_admin: false, + parent_id: None, + attack_step: 0, + } + } + + fn host(ip: &str, hostname: &str) -> Host { + Host { + ip: ip.to_string(), + hostname: hostname.to_string(), + os: String::new(), + roles: vec![], + services: vec![], + is_dc: false, + owned: false, + } + } + + fn vuln(id: &str, vtype: &str) -> VulnerabilityInfo { + VulnerabilityInfo { + vuln_id: id.to_string(), + vuln_type: vtype.to_string(), + target: "192.168.58.10".to_string(), + discovered_by: "test".to_string(), + discovered_at: Utc::now(), + details: HashMap::new(), + recommended_agent: String::new(), + priority: 5, + } + } + + fn apply(state: &mut StateInner, payload: OpStateEventPayload) { + let ev = OpStateEvent::new(&state.operation_id, payload); + apply_event_to_state(state, &ev); + } + + #[test] + fn credential_captured_pushes_to_credentials_vec() { + let mut s = StateInner::new("op-1".into()); + apply( + &mut s, + OpStateEventPayload::CredentialCaptured { + credential: cred("alice", "contoso.local"), + }, + ); + assert_eq!(s.credentials.len(), 1); + assert_eq!(s.credentials[0].username, "alice"); + } + + #[test] + fn hash_captured_pushes_to_hashes_vec() { + let mut s = StateInner::new("op-1".into()); + apply( + &mut s, + OpStateEventPayload::HashCaptured { + hash: Hash { + id: "h1".into(), + username: "admin".into(), + hash_value: "deadbeef".into(), + hash_type: "NTLM".into(), + domain: "contoso.local".into(), + cracked_password: None, + source: "test".into(), + discovered_at: None, + parent_id: None, + attack_step: 0, + aes_key: None, + }, + }, + ); + assert_eq!(s.hashes.len(), 1); + assert_eq!(s.hashes[0].username, "admin"); + } + + #[test] + fn host_discovered_then_owned_marks_existing_host() { + let mut s = StateInner::new("op-1".into()); + apply( + &mut s, + OpStateEventPayload::HostDiscovered { + host: host("192.168.58.10", "dc01.contoso.local"), + }, + ); + apply( + &mut s, + OpStateEventPayload::HostOwned { + ip: "192.168.58.10".into(), + hostname: "dc01.contoso.local".into(), + owned_by: "lateral".into(), + }, + ); + assert_eq!(s.hosts.len(), 1); + assert!(s.hosts[0].owned); + } + + #[test] + fn host_owned_for_unknown_ip_is_silent() { + let mut s = StateInner::new("op-1".into()); + apply( + &mut s, + OpStateEventPayload::HostOwned { + ip: "192.168.58.99".into(), + hostname: String::new(), + owned_by: String::new(), + }, + ); + // No host to flip — state stays empty, no panic. + assert!(s.hosts.is_empty()); + } + + #[test] + fn user_discovered_pushes_to_users_vec() { + let mut s = StateInner::new("op-1".into()); + apply( + &mut s, + OpStateEventPayload::UserDiscovered { + user: User { + username: "bob".into(), + domain: "contoso.local".into(), + description: String::new(), + is_admin: false, + source: "ldap".into(), + }, + }, + ); + assert_eq!(s.users.len(), 1); + assert_eq!(s.users[0].username, "bob"); + } + + #[test] + fn vuln_discovered_inserts_into_map_keyed_by_vuln_id() { + let mut s = StateInner::new("op-1".into()); + apply( + &mut s, + OpStateEventPayload::VulnDiscovered { + vuln: vuln("V-1", "esc1"), + }, + ); + assert_eq!(s.discovered_vulnerabilities.len(), 1); + assert!(s.discovered_vulnerabilities.contains_key("V-1")); + } + + #[test] + fn vuln_exploited_inserts_into_set() { + let mut s = StateInner::new("op-1".into()); + apply( + &mut s, + OpStateEventPayload::VulnExploited { + vuln_id: "V-1".into(), + exploited_by: String::new(), + result: None, + }, + ); + assert!(s.exploited_vulnerabilities.contains("V-1")); + } + + #[test] + fn replay_reconstructs_collections_in_order() { + // Apply a sequence of events and assert the final state matches what + // the publishers would have written. This is the load-bearing test + // for Phase 4 — if it diverges, replay is broken. + let mut s = StateInner::new("op-replay".into()); + let events: Vec = vec![ + OpStateEventPayload::HostDiscovered { + host: host("192.168.58.10", "dc01.contoso.local"), + }, + OpStateEventPayload::HostDiscovered { + host: host("192.168.58.20", "ws01.contoso.local"), + }, + OpStateEventPayload::CredentialCaptured { + credential: cred("alice", "contoso.local"), + }, + OpStateEventPayload::CredentialCaptured { + credential: cred("bob", "contoso.local"), + }, + OpStateEventPayload::VulnDiscovered { + vuln: vuln("V-1", "esc1"), + }, + OpStateEventPayload::VulnExploited { + vuln_id: "V-1".into(), + exploited_by: "privesc".into(), + result: None, + }, + OpStateEventPayload::HostOwned { + ip: "192.168.58.10".into(), + hostname: "dc01.contoso.local".into(), + owned_by: "lateral".into(), + }, + ]; + for payload in events { + apply(&mut s, payload); + } + assert_eq!(s.hosts.len(), 2); + assert_eq!(s.credentials.len(), 2); + assert_eq!(s.discovered_vulnerabilities.len(), 1); + assert!(s.exploited_vulnerabilities.contains("V-1")); + // HostOwned applied AFTER HostDiscovered → dc01 is owned + assert!( + s.hosts + .iter() + .find(|h| h.ip == "192.168.58.10") + .unwrap() + .owned + ); + assert!( + !s.hosts + .iter() + .find(|h| h.ip == "192.168.58.20") + .unwrap() + .owned + ); + } + + #[test] + fn replay_does_not_reconstruct_derived_state() { + // Documented limitation: domains / has_domain_admin / domain_controllers + // are derived inside the publish methods and are NOT in the event + // payload. Replay leaves them empty. A future change adds derived + // event types or a publish-replay mode. + let mut s = StateInner::new("op-1".into()); + apply( + &mut s, + OpStateEventPayload::CredentialCaptured { + credential: cred("alice", "contoso.local"), + }, + ); + assert_eq!(s.credentials.len(), 1); + // Derived: domains list stays empty after replay (would be populated + // if we re-ran publish_credential). + assert!( + s.domains.is_empty(), + "domains is derived state, not replayed" + ); + assert!(!s.has_domain_admin); + } +} From 58250878693e9ca80245b2745da5231b34099d7b Mon Sep 17 00:00:00 2001 From: Jayson Grace Date: Tue, 12 May 2026 14:39:11 -0600 Subject: [PATCH 5/5] feat: add ops replay command for point-in-time state reconstruction **Added:** - Introduced `Replay` command to `OpsCommands` for replaying operation state event logs to reconstruct point-in-time snapshots, with options for cutoff by timestamp, event count, and JSON output - Added `ops/replay.rs` implementing the `ops_replay` function to connect to NATS, fetch and apply event logs, and print human or JSON summaries of reconstructed operation state - Added `ReplaySnapshot` and `ReplayCutoff` types to `orchestrator/state/replay.rs` for lightweight, serializable operation state snapshots and flexible replay stopping conditions - Implemented event application and cutoff logic, as well as tests for `ReplaySnapshot` and cutoff behavior **Changed:** - Exposed `state` and `state::replay` modules as public to support replay tooling - Integrated `Replay` command handling into `run_ops` to invoke the new replay functionality when requested **Removed:** - Made internal replay module public, replacing previous private visibility to allow CLI access --- ares-cli/src/cli/ops.rs | 15 ++ ares-cli/src/ops/mod.rs | 7 + ares-cli/src/ops/replay.rs | 70 ++++++ ares-cli/src/orchestrator/mod.rs | 2 +- ares-cli/src/orchestrator/state/mod.rs | 2 +- ares-cli/src/orchestrator/state/replay.rs | 260 +++++++++++++++++++++- 6 files changed, 353 insertions(+), 3 deletions(-) create mode 100644 ares-cli/src/ops/replay.rs diff --git a/ares-cli/src/cli/ops.rs b/ares-cli/src/cli/ops.rs index d0a7f1a9..90a61c97 100644 --- a/ares-cli/src/cli/ops.rs +++ b/ares-cli/src/cli/ops.rs @@ -289,6 +289,21 @@ pub(crate) enum OpsCommands { cmd: SessionsCommands, }, + /// Replay the operation state event log into a point-in-time snapshot + Replay { + /// Operation ID to replay + operation_id: String, + /// Stop applying events whose `recorded_at` exceeds this ISO-8601 timestamp + #[arg(long)] + until: Option, + /// Stop after this many events have been applied + #[arg(long)] + until_count: Option, + /// Emit the snapshot as JSON instead of a human-readable summary + #[arg(long)] + json: bool, + }, + /// Persist token usage from Redis to PostgreSQL for an operation OffloadCost { /// Operation ID diff --git a/ares-cli/src/ops/mod.rs b/ares-cli/src/ops/mod.rs index 1766b78e..e57391bc 100644 --- a/ares-cli/src/ops/mod.rs +++ b/ares-cli/src/ops/mod.rs @@ -9,6 +9,7 @@ mod kill; mod list; mod loot; mod queue; +mod replay; mod report; pub(crate) mod resolve; mod runtime; @@ -160,6 +161,12 @@ pub(crate) async fn run_ops(cmd: OpsCommands, redis_url: Option) -> Resu operation_id, latest, } => backfill::ops_offload_cost(redis_url, operation_id, latest).await, + OpsCommands::Replay { + operation_id, + until, + until_count, + json, + } => replay::ops_replay(operation_id, until, until_count, json).await, OpsCommands::Report { operation_id, latest, diff --git a/ares-cli/src/ops/replay.rs b/ares-cli/src/ops/replay.rs new file mode 100644 index 00000000..11f9382e --- /dev/null +++ b/ares-cli/src/ops/replay.rs @@ -0,0 +1,70 @@ +//! `ares ops replay` — rebuild a point-in-time state snapshot from the +//! JetStream `ARES_OPSTATE` event log. Phase 5 forensics tooling. + +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; + +use ares_core::nats::NatsBroker; + +use crate::orchestrator::state::replay::{replay_op_to_snapshot, ReplayCutoff, ReplaySnapshot}; + +pub(crate) async fn ops_replay( + operation_id: String, + until: Option, + until_count: Option, + json: bool, +) -> Result<()> { + let until_dt: Option> = match until.as_deref() { + None => None, + Some(raw) => Some( + DateTime::parse_from_rfc3339(raw) + .with_context(|| format!("--until value '{raw}' is not RFC 3339"))? + .with_timezone(&Utc), + ), + }; + let cutoff = ReplayCutoff { + until: until_dt, + until_count, + }; + + let nats = NatsBroker::connect_from_env() + .await + .context("Failed to connect to NATS (set ARES_NATS_URL)")?; + + let snapshot = replay_op_to_snapshot(&nats, &operation_id, cutoff) + .await + .context("Replay failed")?; + + if json { + let s = serde_json::to_string_pretty(&snapshot).context("serialize snapshot")?; + println!("{s}"); + } else { + print_human_summary(&snapshot); + } + Ok(()) +} + +fn print_human_summary(s: &ReplaySnapshot) { + println!("Replay snapshot for operation: {}", s.operation_id); + println!("Events applied: {}", s.events_applied); + println!("Credentials: {}", s.credentials.len()); + println!("Hashes: {}", s.hashes.len()); + println!("Hosts: {}", s.hosts.len()); + println!( + " owned: {}", + s.hosts.iter().filter(|h| h.owned).count() + ); + println!( + " domain controllers: {}", + s.hosts.iter().filter(|h| h.is_dc).count() + ); + println!("Users: {}", s.users.len()); + println!( + "Discovered vulnerabilities: {}", + s.discovered_vulnerabilities.len() + ); + println!( + "Exploited vulnerabilities: {}", + s.exploited_vulnerabilities.len() + ); +} diff --git a/ares-cli/src/orchestrator/mod.rs b/ares-cli/src/orchestrator/mod.rs index fb690591..521cb405 100644 --- a/ares-cli/src/orchestrator/mod.rs +++ b/ares-cli/src/orchestrator/mod.rs @@ -29,7 +29,7 @@ mod recovery; mod result_processing; mod results; mod routing; -mod state; +pub(crate) mod state; pub(crate) mod strategy; mod task_queue; mod throttling; diff --git a/ares-cli/src/orchestrator/state/mod.rs b/ares-cli/src/orchestrator/state/mod.rs index a69fa790..1717fc1c 100644 --- a/ares-cli/src/orchestrator/state/mod.rs +++ b/ares-cli/src/orchestrator/state/mod.rs @@ -11,7 +11,7 @@ mod dedup; mod inner; mod persistence; mod publishing; -mod replay; +pub(crate) mod replay; mod shared; // Re-export everything that was publicly visible from the old single file. diff --git a/ares-cli/src/orchestrator/state/replay.rs b/ares-cli/src/orchestrator/state/replay.rs index 46de5ba4..56ff7b5d 100644 --- a/ares-cli/src/orchestrator/state/replay.rs +++ b/ares-cli/src/orchestrator/state/replay.rs @@ -16,19 +16,175 @@ //! - Replay is opt-in via `ARES_USE_EVENT_LOG_REPLAY=1`. The default startup //! path still loads from Redis. +use std::collections::{HashMap, HashSet}; use std::time::Duration; use anyhow::{Context, Result}; use async_nats::jetstream::consumer::{pull::Config as PullConfig, AckPolicy, DeliverPolicy}; +use chrono::{DateTime, Utc}; use futures::StreamExt; +use serde::Serialize; use tracing::{info, warn}; -use ares_core::models::{OpStateEvent, OpStateEventPayload}; +use ares_core::models::{ + Credential, Hash, Host, OpStateEvent, OpStateEventPayload, User, VulnerabilityInfo, +}; use ares_core::nats::{op_state_filter_for_op, NatsBroker, OP_STATE_STREAM}; use super::inner::StateInner; use super::SharedState; +/// Lightweight, serialisable snapshot of operation state reconstructed from +/// the event log. Used by `ares ops replay` and other Phase 5 tooling. +/// +/// Holds only the entity collections that the event log carries today +/// (no derived state — see Phase 4 limitations). +#[derive(Debug, Default, Serialize)] +pub struct ReplaySnapshot { + pub operation_id: String, + pub events_applied: usize, + pub credentials: Vec, + pub hashes: Vec, + pub hosts: Vec, + pub users: Vec, + pub discovered_vulnerabilities: HashMap, + pub exploited_vulnerabilities: HashSet, +} + +impl ReplaySnapshot { + pub fn new(operation_id: impl Into) -> Self { + Self { + operation_id: operation_id.into(), + ..Default::default() + } + } + + /// Apply a single event to this snapshot. Mirrors + /// [`apply_event_to_state`] but writes to the lightweight struct instead + /// of the full [`StateInner`]. + pub fn apply(&mut self, event: &OpStateEvent) { + match &event.payload { + OpStateEventPayload::CredentialCaptured { credential } => { + self.credentials.push(credential.clone()); + } + OpStateEventPayload::HashCaptured { hash } => { + self.hashes.push(hash.clone()); + } + OpStateEventPayload::HostDiscovered { host } => { + self.hosts.push(host.clone()); + } + OpStateEventPayload::HostOwned { ip, .. } => { + if let Some(existing) = self.hosts.iter_mut().find(|h| h.ip == *ip) { + existing.owned = true; + } + } + OpStateEventPayload::UserDiscovered { user } => { + self.users.push(user.clone()); + } + OpStateEventPayload::VulnDiscovered { vuln } => { + self.discovered_vulnerabilities + .insert(vuln.vuln_id.clone(), vuln.clone()); + } + OpStateEventPayload::VulnExploited { vuln_id, .. } => { + self.exploited_vulnerabilities.insert(vuln_id.clone()); + } + OpStateEventPayload::TimelineEvent { .. } => {} + } + self.events_applied += 1; + } +} + +/// Cutoff for replay-to-snapshot. `None` means "no cutoff". +#[derive(Debug, Clone, Copy, Default)] +pub struct ReplayCutoff { + /// Stop replay once an event's `recorded_at` exceeds this timestamp. + pub until: Option>, + /// Stop replay once this many events have been applied. + pub until_count: Option, +} + +impl ReplayCutoff { + fn should_stop_before(&self, event: &OpStateEvent, applied: usize) -> bool { + if let Some(until) = self.until { + if event.recorded_at > until { + return true; + } + } + if let Some(max) = self.until_count { + if applied >= max { + return true; + } + } + false + } +} + +/// Replay events for a single operation from JetStream into a fresh +/// [`ReplaySnapshot`], honoring the cutoff. Used by `ares ops replay`. +/// +/// Uses an ephemeral consumer with `DeliverPolicy::All`. Returns when the +/// stream idles past [`REPLAY_IDLE_TIMEOUT`] or the cutoff fires. +pub async fn replay_op_to_snapshot( + nats: &NatsBroker, + op_id: &str, + cutoff: ReplayCutoff, +) -> Result { + let filter = op_state_filter_for_op(op_id); + let stream = nats + .jetstream() + .get_stream(OP_STATE_STREAM) + .await + .with_context(|| format!("get_stream({OP_STATE_STREAM})"))?; + + let cfg = PullConfig { + filter_subject: filter.clone(), + ack_policy: AckPolicy::None, + deliver_policy: DeliverPolicy::All, + ..Default::default() + }; + let consumer = stream + .create_consumer(cfg) + .await + .with_context(|| format!("create ephemeral replay consumer for {filter}"))?; + + let mut messages = consumer + .messages() + .await + .context("ephemeral consumer.messages()")?; + + let mut snapshot = ReplaySnapshot::new(op_id.to_string()); + loop { + let next = tokio::time::timeout(REPLAY_IDLE_TIMEOUT, messages.next()).await; + let item = match next { + Err(_) => break, + Ok(None) => break, + Ok(Some(item)) => item, + }; + let msg = match item { + Ok(m) => m, + Err(e) => { + warn!(error = %e, "replay: stream error; aborting"); + break; + } + }; + let event: OpStateEvent = match serde_json::from_slice(&msg.payload) { + Ok(ev) => ev, + Err(e) => { + warn!(error = %e, subject = %msg.subject, "replay: undecodable event; skipping"); + continue; + } + }; + if event.op_id != op_id { + continue; + } + if cutoff.should_stop_before(&event, snapshot.events_applied) { + break; + } + snapshot.apply(&event); + } + Ok(snapshot) +} + /// How long to wait for the consumer to deliver the next message before /// declaring the replay caught up. The stream is replayed from start to the /// current sequence; an idle pause longer than this means we've drained it. @@ -387,6 +543,108 @@ mod tests { ); } + #[test] + fn snapshot_apply_dispatches_per_variant() { + let mut s = ReplaySnapshot::new("op-snap"); + s.apply(&OpStateEvent::new( + "op-snap", + OpStateEventPayload::CredentialCaptured { + credential: cred("alice", "contoso.local"), + }, + )); + s.apply(&OpStateEvent::new( + "op-snap", + OpStateEventPayload::HostDiscovered { + host: host("192.168.58.10", "dc01.contoso.local"), + }, + )); + s.apply(&OpStateEvent::new( + "op-snap", + OpStateEventPayload::HostOwned { + ip: "192.168.58.10".into(), + hostname: String::new(), + owned_by: String::new(), + }, + )); + s.apply(&OpStateEvent::new( + "op-snap", + OpStateEventPayload::VulnDiscovered { + vuln: vuln("V-1", "esc1"), + }, + )); + s.apply(&OpStateEvent::new( + "op-snap", + OpStateEventPayload::VulnExploited { + vuln_id: "V-1".into(), + exploited_by: String::new(), + result: None, + }, + )); + assert_eq!(s.events_applied, 5); + assert_eq!(s.credentials.len(), 1); + assert_eq!(s.hosts.len(), 1); + assert!(s.hosts[0].owned); + assert_eq!(s.discovered_vulnerabilities.len(), 1); + assert!(s.exploited_vulnerabilities.contains("V-1")); + } + + #[test] + fn cutoff_until_stops_on_time() { + use chrono::TimeZone; + let cutoff = ReplayCutoff { + until: Some(Utc.with_ymd_and_hms(2026, 5, 12, 12, 0, 0).unwrap()), + until_count: None, + }; + let early = OpStateEvent { + event_id: "a".into(), + op_id: "op".into(), + recorded_at: Utc.with_ymd_and_hms(2026, 5, 12, 11, 0, 0).unwrap(), + payload: OpStateEventPayload::TimelineEvent { + event: serde_json::json!({}), + }, + }; + let late = OpStateEvent { + event_id: "b".into(), + op_id: "op".into(), + recorded_at: Utc.with_ymd_and_hms(2026, 5, 12, 13, 0, 0).unwrap(), + payload: OpStateEventPayload::TimelineEvent { + event: serde_json::json!({}), + }, + }; + assert!(!cutoff.should_stop_before(&early, 0)); + assert!(cutoff.should_stop_before(&late, 0)); + } + + #[test] + fn cutoff_until_count_stops_on_count() { + let cutoff = ReplayCutoff { + until: None, + until_count: Some(2), + }; + let ev = OpStateEvent::new( + "op", + OpStateEventPayload::TimelineEvent { + event: serde_json::json!({}), + }, + ); + assert!(!cutoff.should_stop_before(&ev, 0)); + assert!(!cutoff.should_stop_before(&ev, 1)); + assert!(cutoff.should_stop_before(&ev, 2)); + } + + #[test] + fn cutoff_default_never_stops() { + let cutoff = ReplayCutoff::default(); + let ev = OpStateEvent::new( + "op", + OpStateEventPayload::TimelineEvent { + event: serde_json::json!({}), + }, + ); + assert!(!cutoff.should_stop_before(&ev, 0)); + assert!(!cutoff.should_stop_before(&ev, 1_000_000)); + } + #[test] fn replay_does_not_reconstruct_derived_state() { // Documented limitation: domains / has_domain_admin / domain_controllers