feat: implement operation state event log with JetStream and Postgres projection#279
Merged
Conversation
…sh API **Added:** - Introduced `op_state_event.rs` defining `OpStateEvent` and `OpStateEventPayload` for operation state mutations with subject hierarchy, JSON serialization, and deduplication via event IDs - Added export and module wiring for `OpStateEvent` and `OpStateEventPayload` in `models/mod.rs` - Added new NATS subject prefix and stream constants for operation state events (`OP_STATE_SUBJECT_PREFIX`, `OP_STATE_STREAM`) - Implemented subject builders `op_state_subject` and `op_state_filter_for_op` for granular or wildcard subscription to operation state events - Created `StreamSpec::op_state` for configuring the durable `ARES_OPSTATE` stream with 30-day retention, `Limits` policy, and file storage - Added `NatsBroker::publish_op_state_event` for publishing op-state events with deduplication and optional optimistic concurrency control - Defined `OpStatePublishError` and error classification for publish failures and concurrency conflicts - Added comprehensive tests for subject formatting, stream configuration, and subject hierarchy disjointness in `nats.rs` - Added unit tests for event construction, JSON serialization, and subject suffix logic in `op_state_event.rs` **Changed:** - Updated `VulnerabilityInfo` to derive `PartialEq` for use in event payloads and tests - Refactored `NatsBroker::ensure_streams` to include the new op-state stream
**Added:** - Introduced `OpStateRecorder` abstraction to emit operation state events to a NATS-backed JetStream log or in-memory buffer for tests (`ares-core/src/op_state_log.rs`) - Implemented `emit_op_state` utility to handle event emission and error logging for all publish sites - Emitted op-state events for credential, hash, user, vulnerability, exploited-vuln, host, and timeline event publishers in orchestrator state modules - Provided capturing test recorders and comprehensive tests verifying event emission, deduplication, and disabled behavior **Changed:** - Updated `SharedState` to hold an `OpStateRecorder` and allow installing or replacing the recorder at runtime (`set_recorder`, `with_recorder`) - Modified orchestrator modules to dual-write op-state events when a recorder is active, preserving Redis as authoritative for now - Updated orchestrator state publishing and dedup logic to use new event emission mechanism after successful writes - Extended test coverage to assert correct event emission across all entity publishers **Removed:** - Legacy event logging stubs in orchestrator modules now handled by the new dual-write mechanism
**Added:** - Introduced `OpStateProjector` for syncing `ARES_OPSTATE` events to Postgres, ensuring the archive stays current by tailing JetStream and upserting events into relevant tables - Added durable JetStream consumer configuration and logic for idempotent event application using existing unique constraints - Implemented error handling, schema migration, and conditional projector startup based on NATS and database availability - Exported `OpStateProjector` and `PROJECTOR_CONSUMER_NAME` from `persistent_store` module - Included unit tests for utility functions and projector consumer stability **Changed:** - Updated orchestrator to spawn the Postgres projector consumer when both NATS and a database URL are available - Added `debug` import to orchestrator for enhanced logging during projector initialization
**Added:** - Introduced `SharedState::load_from_event_log` to replay operation state from JetStream event log as an opt-in startup path, controlled by `ARES_USE_EVENT_LOG_REPLAY` - Added pure function `apply_event_to_state` for mutating state from `OpStateEvent` variants, supporting event log replay and tests - Created `replay.rs` module with implementation and comprehensive tests for event replay logic - Registered `replay` module in orchestrator state mod for inclusion in build **Changed:** - Updated orchestrator startup to conditionally replay from JetStream event log before falling back to Redis, preserving default behavior unless opt-in environment variable is set
**Added:** - Introduced `Replay` command to `OpsCommands` for replaying operation state event logs to reconstruct point-in-time snapshots, with options for cutoff by timestamp, event count, and JSON output - Added `ops/replay.rs` implementing the `ops_replay` function to connect to NATS, fetch and apply event logs, and print human or JSON summaries of reconstructed operation state - Added `ReplaySnapshot` and `ReplayCutoff` types to `orchestrator/state/replay.rs` for lightweight, serializable operation state snapshots and flexible replay stopping conditions - Implemented event application and cutoff logic, as well as tests for `ReplaySnapshot` and cutoff behavior **Changed:** - Exposed `state` and `state::replay` modules as public to support replay tooling - Integrated `Replay` command handling into `run_ops` to invoke the new replay functionality when requested **Removed:** - Made internal replay module public, replacing previous private visibility to allow CLI access
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #279 +/- ##
==========================================
- Coverage 76.06% 75.85% -0.21%
==========================================
Files 387 392 +5
Lines 84347 85859 +1512
==========================================
+ Hits 64157 65131 +974
- Misses 20190 20728 +538
🚀 New features to boost your workflow:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Key Changes:
ARES_OPSTATE)Added:
OpStateEventandOpStateEventPayloadtypes to model granular state mutations inares-core/src/models/op_state_event.rsOpStateRecorderabstraction for event log sinks (NATS, capturing, disabled) inares-core/src/op_state_log.rsARES_OPSTATEevent log inares-core/src/nats.rsares-core/src/persistent_store/projector.rsares ops replaycommand and replay logic to build point-in-time state snapshots from the event log (ares-cli/src/cli/ops.rs,ares-cli/src/ops/replay.rs,ares-cli/src/orchestrator/state/replay.rs)Changed:
publish_credential,publish_hash,publish_user,publish_vulnerability,publish_host, etc.) now emit events to the op-state log after successful Redis writes (Phase 2 dual-write)ARES_USE_EVENT_LOG_REPLAY=1)ops offloadbatch jobs