diff --git a/bd-runtime/src/runtime.rs b/bd-runtime/src/runtime.rs index 3892682c..3bd56174 100644 --- a/bd-runtime/src/runtime.rs +++ b/bd-runtime/src/runtime.rs @@ -789,20 +789,6 @@ pub mod workflows { "workflows.persistence_write_interval_ms", 1.seconds() ); - - // The maximum number of workflow traversals that may be active. - int_feature_flag!( - TraversalsCountLimitFlag, - "workflows.traversals_global_count_limit", - 200 - ); - - // The interval at which workflows state persistence attempts to disk are made. - duration_feature_flag!( - StatePeriodicWriteIntervalFlag, - "workflows.state_periodic_write_interval_ms", - 5.seconds() - ); } pub mod platform_events { diff --git a/bd-workflows/src/engine.rs b/bd-workflows/src/engine.rs index 16cfc9ab..18ee5394 100644 --- a/bd-workflows/src/engine.rs +++ b/bd-workflows/src/engine.rs @@ -37,14 +37,9 @@ use bd_client_stats::Stats; use bd_client_stats_store::{Counter, Histogram, Scope}; use bd_log_primitives::{Log, LogRef}; pub use bd_matcher::FieldProvider; -use bd_runtime::runtime::workflows::{ - PersistenceWriteIntervalFlag, - StatePeriodicWriteIntervalFlag, - TraversalsCountLimitFlag, -}; +use bd_runtime::runtime::workflows::PersistenceWriteIntervalFlag; use bd_runtime::runtime::{ConfigLoader, DurationWatch, IntWatch, session_capture}; use bd_stats_common::labels; -use bd_time::TimeDurationExt as _; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::collections::{BTreeMap, BTreeSet, HashMap}; @@ -75,15 +70,6 @@ pub struct WorkflowsEngine { stats: WorkflowsEngineStats, - current_traversals_count: u32, - /// Used to keep track of the current number of traversals. - /// The other way to do this would be to iterate over all workflows - /// and their runs and their traversals every time we want to - /// know how many traversals are there but this could turn out to - /// be expensive for cases when we have a lot of workflows/traversals/runs. - traversals_count_limit: u32, - state_periodic_write_interval: time::Duration, - flush_buffers_actions_resolver: Resolver, flush_buffers_negotiator_input_tx: Sender, flush_buffers_negotiator_output_rx: Receiver, @@ -111,10 +97,6 @@ impl WorkflowsEngine { ) -> (Self, Receiver) { let scope = scope.scope("workflows"); - let traversals_count_limit_flag = TraversalsCountLimitFlag::register(runtime).unwrap(); - let state_periodic_write_interval_flag = - StatePeriodicWriteIntervalFlag::register(runtime).unwrap(); - // An arbitrary size for the channel that should be sufficient to handle incoming flush buffer // triggers. It should be acceptable to drop events if the size limit is exceeded. let (buffers_to_flush_tx, buffers_to_flush_rx) = tokio::sync::mpsc::channel(10); @@ -144,9 +126,6 @@ impl WorkflowsEngine { stats: WorkflowsEngineStats::new(&scope), state_store: StateStore::new(sdk_directory, &scope, runtime), needs_state_persistence: false, - current_traversals_count: 0, - traversals_count_limit: *traversals_count_limit_flag.read(), - state_periodic_write_interval: *state_periodic_write_interval_flag.read(), flush_buffers_actions_resolver, flush_buffers_negotiator_join_handle, flush_buffers_negotiator_input_tx: input_tx, @@ -213,13 +192,12 @@ impl WorkflowsEngine { log::debug!( "started workflows engine with {} workflow(s); {} pending processing action(s); {} pending \ - sankey path uploads; {} streaming action(s); session ID: \"{}\"; traversals count limit: {}", + sankey path uploads; {} streaming action(s); session ID: \"{}\"", self.state.workflows.len(), self.state.pending_flush_actions.len(), self.state.pending_sankey_actions.len(), self.state.streaming_actions.len(), self.state.session_id, - self.traversals_count_limit, ); } @@ -347,62 +325,14 @@ impl WorkflowsEngine { } fn add_workflow(&mut self, workflow: Workflow, config: Config) { - let mut workflow = workflow; - - let workflow_traversals_count = workflow.traversals_count(); - if workflow_traversals_count + self.current_traversals_count > self.traversals_count_limit { - log::debug!( - "failed to add workflow with {} traversal(s) due to traversals count limit being hit \ - ({}); current traversals count {}", - workflow_traversals_count, - self.traversals_count_limit, - self.current_traversals_count - ); - - // Workflow being added has too many traversal for the engine to not exceed - // the configured traversals count limit. Keep removing workflow's run until - // its addition to the engine is possible. - // Process traversals in reversed order as we may end up modifying the array - // starting at `index` in any given iteration of the loop + we want to start - // removing runs starting from the "latest" ones and they are at the end of - // the enumerated array. - let mut remaining_workflow_traversals_count = workflow_traversals_count; - for index in (0 .. workflow.runs().len()).rev() { - let run_traversals_count = workflow.runs()[index].traversals_count(); - - log::debug!("removing workflow run with {run_traversals_count} traversal(s)"); - - remaining_workflow_traversals_count -= run_traversals_count; - self.stats.traversals_count_limit_hit_total.inc(); - workflow.remove_run(index); - - if remaining_workflow_traversals_count + self.current_traversals_count - <= self.traversals_count_limit - { - break; - } - } - } - self.stats.workflow_starts_total.inc(); - self - .stats - .run_starts_total - .inc_by(workflow.runs().len() as u64); - self - .stats - .traversal_starts_total - .inc_by(workflow_traversals_count.into()); log::trace!( - "workflow={}: workflow added, runs count {}, traversal count {}", + "workflow={}: workflow added, runs count {}", workflow.id(), workflow.runs().len(), - workflow_traversals_count, ); - self.current_traversals_count += workflow.traversals_count(); - self.configs.push(config); self.state.workflows.push(workflow); } @@ -411,20 +341,7 @@ impl WorkflowsEngine { self.configs.remove(workflow_index); let workflow = self.state.workflows.remove(workflow_index); - let workflow_traversals_count = workflow.traversals_count(); self.stats.workflow_stops_total.inc(); - self - .stats - .run_stops_total - .inc_by(workflow.runs().len() as u64); - self - .stats - .traversal_stops_total - .inc_by(workflow_traversals_count.into()); - - self.current_traversals_count = self - .current_traversals_count - .saturating_sub(workflow_traversals_count); log::debug!("workflow={}: workflow removed", workflow.id()); @@ -459,19 +376,12 @@ impl WorkflowsEngine { pub async fn run(&mut self) { loop { - self.run_once(true).await; + self.run_once().await; } } - pub(crate) async fn run_once(&mut self, persist_periodically: bool) { - // TODO(Augustyniak): Fix periodic state persistence as it's effectively a no-op now due to the - // way `run` method is being called. - let state_persistence_in = self.state_periodic_write_interval.sleep(); - + pub(crate) async fn run_once(&mut self) { tokio::select! { - () = state_persistence_in, if persist_periodically => { - self.maybe_persist(false).await; - }, Some(negotiator_output) = self.flush_buffers_negotiator_output_rx.recv() => { log::debug!("received flush buffers negotiator output: \"{negotiator_output:?}\""); @@ -621,11 +531,6 @@ impl WorkflowsEngine { && log.capture_session.is_none() && self.state.streaming_actions.is_empty() { - self - .stats - .active_traversals_total - .observe(self.current_traversals_count.into()); - return WorkflowsEngineResult { log_destination_buffer_ids: Cow::Borrowed(log_destination_buffer_ids), triggered_flushes_buffer_ids: BTreeSet::new(), @@ -639,12 +544,7 @@ impl WorkflowsEngine { let mut logs_to_inject: BTreeMap<&'a str, Log> = BTreeMap::new(); for (index, workflow) in &mut self.state.workflows.iter_mut().enumerate() { let was_in_initial_state = workflow.is_in_initial_state(); - let result = workflow.process_log( - &self.configs[index], - log, - &mut self.current_traversals_count, - self.traversals_count_limit, - ); + let result = workflow.process_log(&self.configs[index], log); macro_rules! inc_by { ($field:ident, $value:ident) => { @@ -652,25 +552,6 @@ impl WorkflowsEngine { }; } - inc_by!( - exclusive_workflow_resets_total, - reset_exclusive_workflows_count - ); - inc_by!(run_starts_total, created_runs_count); - inc_by!(run_advances_total, advanced_runs_count); - inc_by!(run_stops_total, stopped_runs_count); - inc_by!(run_completions_total, completed_runs_count); - - inc_by!(traversal_starts_total, created_traversals_count); - inc_by!(traversal_advances_total, advanced_traversals_count); - inc_by!(traversal_stops_total, stopped_traversals_count); - inc_by!(traversal_completions_total, completed_traversals_count); - - inc_by!( - traversals_count_limit_hit_total, - traversals_count_limit_hit_count - ); - inc_by!(matched_logs_total, matched_logs_count); // Not every case of a workflow making a progress needs a state persistence. @@ -695,22 +576,6 @@ impl WorkflowsEngine { logs_to_inject.extend(workflow_logs_to_inject); } - self - .stats - .active_traversals_total - .observe(self.current_traversals_count.into()); - - debug_assert!( - self - .state - .workflows - .iter() - .map(Workflow::traversals_count) - .sum::() - == self.current_traversals_count, - "current_traversals_count is not equal to computed traversals count" - ); - let PreparedActions { mut flush_buffers_actions, emit_metric_actions, @@ -824,7 +689,6 @@ impl WorkflowsEngine { } fn clean_state(&mut self) { - self.current_traversals_count = 0; // We clear the ongoing workflows state as opposed to the whole state because: // * pending actions (uploads) are not affected by the session change, and ongoing logs uploads // should continue even as the session ID changes. @@ -1180,41 +1044,6 @@ struct WorkflowsEngineStats { /// server that removed them from the list of workflows. workflow_stops_total: Counter, - /// The number of times the state of an exclusive was reset and the workflow moved back to its - /// initial state. - exclusive_workflow_resets_total: Counter, - - /// The number of started runs. Each workflow has at least one and can - /// have significantly more runs. - run_starts_total: Counter, - /// The number of runs who made progress. A progress is defined as a move of one or more - /// of run's traversals from one state to another. - run_advances_total: Counter, - /// A number of runs that completed. Completion means that all of their traversals reached - /// the final state. - run_completions_total: Counter, - /// A number of runs stopped in response to config update from a server or exceeding - /// one of the workflows-related limits controlled with either a workflows config or - /// SDK runtime settings. - run_stops_total: Counter, - - /// The number of started traversals. See `Traversal` for more details. - traversal_starts_total: Counter, - /// The number of workflow traversals that made progress. A progress is defined as move from one - /// state to another. - traversal_advances_total: Counter, - /// The number of workflow traversals that reached one of workflow's final state. - traversal_completions_total: Counter, - /// A number of traversals stopped in response to config update from a server or exceeding - /// one of the workflows-related limits controlled with either a workflows config or - /// SDK runtime settings. - traversal_stops_total: Counter, - - /// The number of active traversals. - active_traversals_total: Histogram, - /// The number of times the engine prevented a run and/or traversal from being - /// created due to a configured traversals count limit. - traversals_count_limit_hit_total: Counter, /// The number of matched logs. A single log can be matched multiple times matched_logs_total: Counter, @@ -1241,42 +1070,10 @@ impl WorkflowsEngineStats { let workflow_stops_total = scope.counter_with_labels("workflows_total", labels!("operation" => "stop")); - let exclusive_workflow_resets_total = scope.counter("workflow_resets_total"); - - let run_starts_total = scope.counter_with_labels("runs_total", labels!("operation" => "start")); - let run_advances_total = - scope.counter_with_labels("runs_total", labels!("operation" => "advance")); - let run_completions_total = - scope.counter_with_labels("runs_total", labels!("operation" => "completion")); - let run_stops_total = scope.counter_with_labels("runs_total", labels!("operation" => "stop")); - - let traversal_starts_total = - scope.counter_with_labels("traversals_total", labels!("operation" => "start")); - let traversal_advances_total = - scope.counter_with_labels("traversals_total", labels!("operation" => "advance")); - let traversal_completions_total = - scope.counter_with_labels("traversals_total", labels!("operation" => "completion")); - let traversal_stops_total = - scope.counter_with_labels("traversals_total", labels!("operation" => "stop")); - Self { workflow_starts_total, workflow_stops_total, - exclusive_workflow_resets_total, - - run_starts_total, - run_advances_total, - run_completions_total, - run_stops_total, - - traversal_starts_total, - traversal_advances_total, - traversal_completions_total, - traversal_stops_total, - - active_traversals_total: scope.histogram("traversal_active_total"), - traversals_count_limit_hit_total: scope.counter("traversals_count_limit_hit_total"), matched_logs_total: scope.counter("matched_logs_total"), process_log_duration: scope.histogram("engine_process_log_duration_s"), diff --git a/bd-workflows/src/engine_test.rs b/bd-workflows/src/engine_test.rs index d5fdaa9c..26f79b26 100644 --- a/bd-workflows/src/engine_test.rs +++ b/bd-workflows/src/engine_test.rs @@ -25,9 +25,8 @@ use bd_proto::protos::client::api::{ SankeyPathUploadRequest, log_upload_intent_request, }; -use bd_runtime::runtime::{ConfigLoader, FeatureFlag}; +use bd_runtime::runtime::ConfigLoader; use bd_stats_common::labels; -use bd_test_helpers::runtime::{ValueKind, make_simple_update}; use bd_test_helpers::workflow::macros::{action, any, limit, log_matches, rule, state}; use bd_test_helpers::workflow::{ TestFieldRef, @@ -51,7 +50,6 @@ use time::ext::NumericalDuration; use time::macros::datetime; use tokio::sync::mpsc::Receiver; use tokio::task::JoinHandle; -use tokio::time::timeout; /// A macro that creates a workflow config using provided states. /// See `workflow_proto` macro for more details. @@ -267,8 +265,8 @@ impl AnnotatedWorkflowsEngine { } } - async fn run_once_for_test(&mut self, persist_periodically: bool) { - self.engine.run_once(persist_periodically).await; + async fn run_once_for_test(&mut self) { + self.engine.run_once().await; // Give the task started inside of `run_for_test()` method chance to run before proceeding. 1.milliseconds().sleep().await; } @@ -528,42 +526,6 @@ async fn engine_initialization_and_update() { "workflows:workflows_total", labels! {"operation" => "stop"}, ); - setup - .collector - .assert_counter_eq(0, "workflows:runs_total", labels! {"operation" => "start"}); - setup.collector.assert_counter_eq( - 0, - "workflows:runs_total", - labels! {"operation" => "advance"}, - ); - setup.collector.assert_counter_eq( - 0, - "workflows:runs_total", - labels! {"operation" => "completion"}, - ); - setup - .collector - .assert_counter_eq(0, "workflows:runs_total", labels! {"operation" => "stop"}); - setup.collector.assert_counter_eq( - 0, - "workflows:traversals_total", - labels! {"operation" => "start"}, - ); - setup.collector.assert_counter_eq( - 0, - "workflows:traversals_total", - labels! {"operation" => "advance"}, - ); - setup.collector.assert_counter_eq( - 0, - "workflows:traversals_total", - labels! {"operation" => "completion"}, - ); - setup.collector.assert_counter_eq( - 0, - "workflows:traversals_total", - labels! {"operation" => "stop"}, - ); } #[tokio::test] @@ -780,16 +742,6 @@ async fn persistence_skipped_if_workflow_stays_in_an_initial_state() { setup .collector .assert_counter_eq(1, "workflows:matched_logs_total", labels! {}); - setup.collector.assert_counter_eq( - 1, - "workflows:runs_total", - labels! { "operation" => "advance" }, - ); - setup.collector.assert_counter_eq( - 1, - "workflows:runs_total", - labels! { "operation" => "completion" }, - ); assert!(!workflows_engine.needs_state_persistence); } @@ -815,22 +767,15 @@ async fn persist_workflows_with_at_least_one_non_initial_state_run_only() { )) .await; - // Workflow "1" matches a log and its run is not initial state anymore. + // Workflow "1" matches a log and its run is not initial state anymore, but is still at state A. engine_process_log!(workflows_engine; "foo"); + assert!(!workflows_engine.engine.state.workflows[0].is_in_initial_state()); + engine_assert_active_runs!(workflows_engine; 0; "A"); + engine_assert_active_runs!(workflows_engine; 1; "C"); setup .collector .assert_counter_eq(1, "workflows:matched_logs_total", labels! {}); - setup.collector.assert_counter_eq( - 0, - "workflows:runs_total", - labels! { "operation" => "advance" }, - ); - setup.collector.assert_counter_eq( - 0, - "workflows:runs_total", - labels! { "operation" => "completion" }, - ); assert!(workflows_engine.needs_state_persistence); workflows_engine.maybe_persist(false).await; @@ -861,30 +806,22 @@ async fn needs_persistence_if_workflow_moves_to_an_initial_state() { // Workflow's run moves to state 'B'. engine_process_log!(workflows_engine; "foo"); + engine_assert_active_runs!(workflows_engine; 0; "B"); setup .collector .assert_counter_eq(1, "workflows:matched_logs_total", labels! {}); - setup.collector.assert_counter_eq( - 1, - "workflows:runs_total", - labels! { "operation" => "advance" }, - ); assert!(workflows_engine.needs_state_persistence); // Persist state workflows_engine.maybe_persist(false).await; assert!(!workflows_engine.needs_state_persistence); - // Workflow's run moves to its final state 'C' and completes. + // Workflow's run moves to its final state 'C' and completes, leaving only the initial state run. engine_process_log!(workflows_engine; "bar"); + engine_assert_active_runs!(workflows_engine; 0; "A"); // Workflow needs persistence as its state changed. assert!(workflows_engine.needs_state_persistence); - setup.collector.assert_counter_eq( - 1, - "workflows:runs_total", - labels! { "operation" => "completion" }, - ); } #[tokio::test] @@ -971,187 +908,6 @@ async fn persistence_performed_if_match_is_found_without_advancing() { ); } -#[tokio::test] -async fn traversals_count_limit_prevents_creation_of_new_workflows() { - let mut a = state("A"); - let b = state("B"); - - a = a.declare_transition(&b, rule!(log_matches!(message == "foo"); times 100)); - - let workflows = vec![ - workflow!("1"; exclusive with a, b), - workflow!("2"; exclusive with a, b), - workflow!("3"; exclusive with a, b), - workflow!("4"; exclusive with a, b), - ]; - - let setup = Setup::new(); - setup - .runtime - .update_snapshot(&make_simple_update(vec![( - bd_runtime::runtime::workflows::TraversalsCountLimitFlag::path(), - ValueKind::Int(2), - )])) - .await; - - // We try to create 4 workflows (each with 1 run that has 1 traversal) but - // the configured traversals count limit is 2. For this reason, we succeed - // hit the traversals limit twice. - let mut workflows_engine = setup - .make_workflows_engine(WorkflowsEngineConfig::new_with_workflow_configurations( - workflows, - )) - .await; - - // All workflows are added to the engine but some of them have no runs - // to keep the engine below the configured traversals count limit. - assert_eq!(4, workflows_engine.state.workflows.len()); - assert!(workflows_engine.state.workflows[2].runs().is_empty()); - assert!(workflows_engine.state.workflows[3].runs().is_empty()); - - // Process a log to force the engine to create initial runs for all workflows. - engine_process_log!(workflows_engine; "foo"); - - setup - .collector - .assert_counter_eq(2, "workflows:traversals_count_limit_hit_total", labels! {}); - setup.collector.assert_counter_eq( - 4, - "workflows:workflows_total", - labels! {"operation" => "start"}, - ); - - let workflows = vec![ - workflow!("11"; exclusive with a, b), - workflow!("22"; exclusive with a, b), - workflow!("33"; exclusive with a, b), - ]; - - // We replace 2 of the existing workflows (each with 1 run that has 1 traversal) - // with 3 new ones (each with 1 run that has 1 traversal -> we process a log to force - // the engine to create initial state runs). - // We start with 2 traversals, substract 2 and try to add 3. Addition of the third one - // fails as we hit traversals count limit. - workflows_engine.update(WorkflowsEngineConfig::new_with_workflow_configurations( - workflows, - )); - engine_process_log!(workflows_engine; "foo"); - - // All workflows are added to the engine but some of them have no runs - // to keep the engine below the configured traversals count limit. - assert_eq!(3, workflows_engine.state.workflows.len()); - assert!(workflows_engine.state.workflows[2].runs().is_empty()); - - setup - .collector - .assert_counter_eq(3, "workflows:traversals_count_limit_hit_total", labels! {}); - setup.collector.assert_counter_eq( - 7, - "workflows:workflows_total", - labels! {"operation" => "start"}, - ); -} - -#[tokio::test] -async fn traversals_count_limit_prevents_creation_of_new_workflow_runs() { - let mut a = state("A"); - let b = state("B"); - - a = a.declare_transition(&b, rule!(log_matches!(message == "foo"); times 100)); - - let workflows = vec![workflow!(exclusive with a, b)]; - - let setup = Setup::new(); - setup - .runtime - .update_snapshot(&make_simple_update(vec![( - bd_runtime::runtime::workflows::TraversalsCountLimitFlag::path(), - ValueKind::Int(2), - )])) - .await; - - let mut workflows_engine = setup - .make_workflows_engine(WorkflowsEngineConfig::new_with_workflow_configurations( - workflows, - )) - .await; - - // In exclusive mode we will only ever have 1 run with a single traversal attempting to hit the - // total count. - engine_process_log!(workflows_engine; "foo"); - engine_process_log!(workflows_engine; "foo"); - engine_process_log!(workflows_engine; "foo"); - engine_process_log!(workflows_engine; "foo"); - engine_process_log!(workflows_engine; "foo"); - - setup - .collector - .assert_counter_eq(0, "workflows:traversals_count_limit_hit_total", labels! {}); -} - -#[tokio::test] -#[allow(clippy::many_single_char_names)] -async fn traversals_count_limit_causes_run_removal_after_forking() { - let mut a = state("A"); - let mut b = state("B"); - let mut c = state("C"); - let mut d = state("D"); - let e = state("E"); - let f = state("F"); - - a = a.declare_transition(&b, rule!(log_matches!(message == "foo"))); - b = b.declare_transition(&c, rule!(log_matches!(message == "bar"))); - b = b.declare_transition(&d, rule!(log_matches!(message == "bar"))); - c = c.declare_transition(&e, rule!(log_matches!(message == "zar"))); - d = d.declare_transition(&f, rule!(log_matches!(message == "zar"))); - - let workflows = vec![workflow!(exclusive with a, b, c, d, e, f)]; - - let setup = Setup::new(); - - setup - .runtime - .update_snapshot(&make_simple_update(vec![( - bd_runtime::runtime::workflows::TraversalsCountLimitFlag::path(), - ValueKind::Int(2), - )])) - .await; - - let mut workflows_engine = setup - .make_workflows_engine(WorkflowsEngineConfig::new_with_workflow_configurations( - workflows, - )) - .await; - assert!(workflows_engine.state.workflows[0].runs().is_empty()); - - // * A new run is created as workflows has no runs in an initial state. - // * The existing run "A" matches "foo" and moves to B. - // * Workflow has 2 traversals total (one run "B" trasversal and one run "A" traversal). - engine_process_log!(workflows_engine; "foo"); - engine_assert_active_runs!(workflows_engine; 0; "B"); - - // * A second run is created so that a workflow has a run in an initial state. - // * Two outgoing transitions of the run "B" traversal match log "bar". - // * We have 2 traversals and attempt to create 2 more which gives us 4 traversals total. - // * We hit the configured limit of traversals (2). - // * In order to stay below the limit we remove the run that caused us to hit the limit. - // * We are left with run "A". - engine_process_log!(workflows_engine; "bar"); - engine_assert_active_run_traversals!(workflows_engine; 0 => 0; "A"); - - setup - .collector - .assert_counter_eq(1, "workflows:runs_total", labels! {"operation" => "stop"}); - setup.collector.assert_counter_eq( - 2, - "workflows:traversals_total", - labels! {"operation" => "stop"}, - ); - setup - .collector - .assert_counter_eq(1, "workflows:traversals_count_limit_hit_total", labels! {}); -} - #[tokio::test(start_paused = true)] #[allow(clippy::many_single_char_names)] async fn persistence_to_disk_is_rate_limited() { @@ -1272,11 +1028,6 @@ async fn runs_in_initial_state_are_not_persisted() { // * Workflow #2: No runs exists as no runs were stored on disk. engine_assert_active_runs!(workflows_engine; 0; "A"); assert!(workflows_engine.state.workflows[1].runs().is_empty()); - setup.collector.assert_counter_eq( - 1, - "workflows:runs_total", - labels! { "operation" => "start" }, - ); engine_process_log!(workflows_engine; "bar"); // * Workflow #1: A new run in an initial state is created as workflow has a parallel execution @@ -1284,11 +1035,6 @@ async fn runs_in_initial_state_are_not_persisted() { // * Workflow #2: A new run in an initial state is created as workflow had not runs. engine_assert_active_runs!(workflows_engine; 0; "A", "A"); engine_assert_active_runs!(workflows_engine; 1; "B"); - setup.collector.assert_counter_eq( - 3, - "workflows:runs_total", - labels! { "operation" => "start" }, - ); } #[tokio::test] @@ -1469,52 +1215,6 @@ async fn ignore_persisted_state_if_invalid_dir() { ); } -#[tokio::test] -async fn persists_state_on_periodic_basis() { - let mut a = state("A"); - let mut b = state("B"); - let c = state("C"); - - a = a.declare_transition(&b, rule!(log_matches!(message == "foo"); times 100)); - b = b.declare_transition(&c, rule!(log_matches!(message == "bar"))); - - let workflows = vec![workflow!(exclusive with a, b, c)]; - - let setup = Setup::new(); - - // Speed up periodic persistance so that the test can complete in a shorter - // amount of time. - setup - .runtime - .update_snapshot(&make_simple_update(vec![( - bd_runtime::runtime::workflows::StatePeriodicWriteIntervalFlag::path(), - ValueKind::Int(10), - )])) - .await; - - // Engine creation should still succeed but with a default state - let mut workflows_engine = setup - .make_workflows_engine(WorkflowsEngineConfig::new_with_workflow_configurations( - workflows, - )) - .await; - - engine_process_log!(workflows_engine; "foo"); - // Log made the state dirty. - assert!(workflows_engine.needs_state_persistence); - // One of run loop's responsibilities is periodic persistance of state. - // Given enough time it should persist the state to disk and mark - // state as being "clean". - _ = timeout(Duration::from_millis(100), workflows_engine.run()).await; - assert!(!workflows_engine.needs_state_persistence); - - setup.collector.assert_counter_eq( - 1, - "workflows:state_persistences_total", - labels! {"result" => "success"}, - ); -} - #[tokio::test] async fn engine_processing_log() { let mut a = state("A"); @@ -1564,6 +1264,8 @@ async fn engine_processing_log() { }, result ); + assert!(workflows_engine.state.workflows[0].runs().is_empty()); + assert!(workflows_engine.state.workflows[1].runs().is_empty()); workflows_engine .collector @@ -1579,56 +1281,14 @@ async fn engine_processing_log() { "workflows:workflows_total", labels! {"operation" => "stop"}, ); - setup - .collector - .assert_counter_eq(2, "workflows:runs_total", labels! {"operation" => "start"}); - setup.collector.assert_counter_eq( - 2, - "workflows:runs_total", - labels! {"operation" => "advance"}, - ); - setup.collector.assert_counter_eq( - 2, - "workflows:runs_total", - labels! {"operation" => "completion"}, - ); - setup - .collector - .assert_counter_eq(0, "workflows:runs_total", labels! {"operation" => "stop"}); - setup.collector.assert_counter_eq( - 2, - "workflows:traversals_total", - labels! {"operation" => "start"}, - ); - setup.collector.assert_counter_eq( - 2, - "workflows:traversals_total", - labels! {"operation" => "advance"}, - ); - setup.collector.assert_counter_eq( - 2, - "workflows:traversals_total", - labels! {"operation" => "completion"}, - ); - setup.collector.assert_counter_eq( - 0, - "workflows:traversals_total", - labels! {"operation" => "stop"}, - ); setup .collector .assert_counter_eq(2, "workflows:matched_logs_total", labels! {}); // Two new runs are created to ensure that each workflow has one run in an initial state. engine_process_log!(workflows_engine; "not matching"); - setup - .collector - .assert_counter_eq(4, "workflows:runs_total", labels! {"operation" => "start"}); - setup.collector.assert_counter_eq( - 4, - "workflows:traversals_total", - labels! {"operation" => "start"}, - ); + engine_assert_active_runs!(workflows_engine; 0; "A"); + engine_assert_active_runs!(workflows_engine; 1; "C"); } #[tokio::test] @@ -1659,20 +1319,10 @@ async fn exclusive_workflow_duration_limit() { // * The newly created run doesn't match a log. engine_process_log!(workflows_engine; "bar"; with labels!{}; time now); engine_assert_active_runs!(workflows_engine; 0; "A"); - setup.collector.assert_counter_eq( - 1, - "workflows:runs_total", - labels! { "operation" => "start" }, - ); // * The run matches a log and advances. It leaves its initial state. engine_process_log!(workflows_engine; "foo"; with labels!{}; time now + Duration::from_secs(2)); engine_assert_active_runs!(workflows_engine; 0; "B"); - setup.collector.assert_counter_eq( - 1, - "workflows:runs_total", - labels! { "operation" => "advance" }, - ); // * A run in an initial state is created and added to the beginning of runs list. // * The run is not an initial state and has exceeded the maximum duration. @@ -1680,24 +1330,11 @@ async fn exclusive_workflow_duration_limit() { engine_process_log!(workflows_engine; "not matching"; with labels!{}; time now + Duration::from_secs(4)); assert_eq!(workflows_engine.state.workflows[0].runs().len(), 1); engine_assert_active_runs!(workflows_engine; 0; "A"); - setup - .collector - .assert_counter_eq(1, "workflows:runs_total", labels! { "operation" => "stop" }); // * A new run in an initial state is created. // * The new run matches a log and advances. engine_process_log!(workflows_engine; "foo"; with labels!{}; time now + Duration::from_secs(4)); engine_assert_active_runs!(workflows_engine; 0; "B"); - setup.collector.assert_counter_eq( - 2, - "workflows:runs_total", - labels! { "operation" => "start" }, - ); - setup.collector.assert_counter_eq( - 2, - "workflows:runs_total", - labels! { "operation" => "advance" }, - ); } #[tokio::test] @@ -1847,7 +1484,7 @@ async fn logs_streaming() { // Allow the engine to perform logs upload intent and process the response to it (upload // immediately). - workflows_engine.run_once_for_test(false).await; + workflows_engine.run_once_for_test().await; assert_eq!( vec![log_upload_intent_request::WorkflowActionUpload { @@ -1878,7 +1515,7 @@ async fn logs_streaming() { // Allow the engine to perform logs upload intent and process the response to it (upload // immediately). - workflows_engine.run_once_for_test(false).await; + workflows_engine.run_once_for_test().await; assert_eq!( vec![ @@ -1919,7 +1556,7 @@ async fn logs_streaming() { // Allow the engine to perform logs upload intent and process the response to it (upload // immediately). - workflows_engine.run_once_for_test(false).await; + workflows_engine.run_once_for_test().await; assert_eq!( vec![ @@ -2022,7 +1659,7 @@ async fn logs_streaming() { // Allow the engine to perform logs upload intent and process the response to it (upload // immediately). - workflows_engine.run_once_for_test(false).await; + workflows_engine.run_once_for_test().await; assert_eq!( vec![ @@ -2056,7 +1693,7 @@ async fn logs_streaming() { // Allow the engine to perform logs upload intent and process the response to it (upload // immediately). - workflows_engine.run_once_for_test(false).await; + workflows_engine.run_once_for_test().await; assert_eq!( vec![ @@ -2234,7 +1871,7 @@ async fn engine_does_not_purge_pending_actions_on_session_id_change() { workflows_engine .set_awaiting_logs_upload_intent_decisions(vec![IntentDecision::UploadImmediately]); - workflows_engine.run_once_for_test(false).await; + workflows_engine.run_once_for_test().await; assert_eq!( vec![log_upload_intent_request::WorkflowActionUpload { @@ -2314,7 +1951,7 @@ async fn engine_continues_to_stream_upload_not_complete() { ); log::info!("Running the engine for the first time."); - workflows_engine.run_once_for_test(false).await; + workflows_engine.run_once_for_test().await; log::info!("after Running the engine for the first time."); assert_eq!( @@ -2395,13 +2032,6 @@ async fn creating_new_runs_after_first_log_processing() { ); let setup = Setup::new(); - setup - .runtime - .update_snapshot(&make_simple_update(vec![( - bd_runtime::runtime::workflows::TraversalsCountLimitFlag::path(), - ValueKind::Int(3), - )])) - .await; // This test assumes that internally `workflows_engine` iterates // over the list of its workflows in order. @@ -2419,22 +2049,11 @@ async fn creating_new_runs_after_first_log_processing() { engine_process_log!(workflows_engine; "bar"); engine_assert_active_runs!(workflows_engine; 0; "D"); engine_assert_active_runs!(workflows_engine; 1; "A"); - setup - .collector - .assert_counter_eq(2, "workflows:runs_total", labels! {"operation" => "start"}); - setup.collector.assert_counter_eq( - 1, - "workflows:runs_total", - labels! {"operation" => "advance"}, - ); // * State "A" matches log but does not advance as its transition requires 100 matches. engine_process_log!(workflows_engine; "foo"); engine_assert_active_runs!(workflows_engine; 0; "C", "D"); engine_assert_active_runs!(workflows_engine; 1; "A"); - setup - .collector - .assert_counter_eq(0, "workflows:traversals_count_limit_hit_total", labels! {}); // * States "D" (workflow #1) and "A" (workflow #2) match a log with (key => value) tag. // * State "D" advances to a final state "E" and the run is completed. The number of traversals @@ -2447,18 +2066,12 @@ async fn creating_new_runs_after_first_log_processing() { engine_process_log!(workflows_engine; "not matching message"; with labels! { "key" => "value" }); engine_assert_active_runs!(workflows_engine; 0; "C"); engine_assert_active_runs!(workflows_engine; 1; "A", "A"); - setup - .collector - .assert_counter_eq(0, "workflows:traversals_count_limit_hit_total", labels! {}); // In exclusive mode we will not make any new runs because both workflows have runs in the // initial state. With no matches and no traversals we stay under the limit. engine_process_log!(workflows_engine; "not matching message"); engine_assert_active_runs!(workflows_engine; 0; "C"); engine_assert_active_runs!(workflows_engine; 1; "A", "A"); - setup - .collector - .assert_counter_eq(0, "workflows:traversals_count_limit_hit_total", labels! {}); } #[tokio::test] @@ -2480,7 +2093,7 @@ async fn workflows_state_is_purged_when_session_id_changes() { // Session ID is empty on first engine initialization. assert!(workflows_engine.state.session_id.is_empty()); // No traversals as no log has been processed yet. - assert_eq!(0, workflows_engine.current_traversals_count); + assert!(workflows_engine.state.workflows[0].runs().is_empty()); workflows_engine.process_log( &LogRef { @@ -2512,8 +2125,6 @@ async fn workflows_state_is_purged_when_session_id_changes() { assert_eq!("foo_session", workflows_engine.state.session_id); // Read saved workflow state from disk. engine_assert_active_runs!(workflows_engine; 0; "A"); - // One traversal trad from disk. - assert_eq!(1, workflows_engine.current_traversals_count); // Process a log with a new session ID. workflows_engine.process_log( @@ -2534,7 +2145,7 @@ async fn workflows_state_is_purged_when_session_id_changes() { // Session ID changed. assert_eq!("bar_session", workflows_engine.state.session_id); - assert_eq!(1, workflows_engine.current_traversals_count); + engine_assert_active_runs!(workflows_engine; 0; "A"); assert!(workflows_engine.needs_state_persistence); workflows_engine.maybe_persist(false).await; @@ -2589,93 +2200,80 @@ async fn test_traversals_count_tracking() { engine_process_log!(engine; "foo"); assert_eq!(1, engine.state.workflows[0].runs().len()); engine_assert_active_runs!(engine; 0; "B"); - assert_eq!(1, engine.current_traversals_count); // Log is matched and workflow moves to state "B". engine_process_log!(engine; "foo"); assert_eq!(1, engine.state.workflows[0].runs().len()); engine_assert_active_runs!(engine; 0; "B"); - assert_eq!(1, engine.current_traversals_count); // * A new initial state run is created and added to the beginning of runs list. // * Log is matched and workflow moves to state "C". engine_process_log!(engine; "bar"); assert_eq!(2, engine.state.workflows[0].runs().len()); engine_assert_active_runs!(engine; 0; "A", "C"); - assert_eq!(2, engine.current_traversals_count); // Log is not matched. engine_process_log!(engine; "dar"); assert_eq!(2, engine.state.workflows[0].runs().len()); engine_assert_active_runs!(engine; 0; "A", "C"); - assert_eq!(2, engine.current_traversals_count); // Log is matched and workflow is reset to its initial state. engine_process_log!(engine; "foo"); assert_eq!(1, engine.state.workflows[0].runs().len()); engine_assert_active_runs!(engine; 0; "B"); - assert_eq!(1, engine.current_traversals_count); // * A new initial state run is created and added to the beginning of runs list. // * Log is matched by the run that's not in an initial state and the run advances to state `C`. engine_process_log!(engine; "bar"); assert_eq!(2, engine.state.workflows[0].runs().len()); engine_assert_active_runs!(engine; 0; "A", "C"); - assert_eq!(2, engine.current_traversals_count); // Log is matched and the more advanced run moves to final state "D" and completes. engine_process_log!(engine; "car"); engine_assert_active_runs!(engine; 0; "A"); - assert_eq!(1, engine.current_traversals_count); // Log is not matched. engine_process_log!(engine; "foo"); assert_eq!(1, engine.state.workflows[0].runs().len()); engine_assert_active_runs!(engine; 0; "B"); - assert_eq!(1, engine.current_traversals_count); // Log is matched and workflow moves to state "B". engine_process_log!(engine; "foo"); assert_eq!(1, engine.state.workflows[0].runs().len()); engine_assert_active_runs!(engine; 0; "B"); - assert_eq!(1, engine.current_traversals_count); // * A new initial state run is created and added to the beginning of runs list. // * Log is matched and workflow moves to "E" state. engine_process_log!(engine; "dar"); assert_eq!(2, engine.state.workflows[0].runs().len()); engine_assert_active_runs!(engine; 0; "A", "E"); - assert_eq!(2, engine.current_traversals_count); // Log is matched and workflow moves to final state "F" and completes. engine_process_log!(engine; "far"); engine_assert_active_runs!(engine; 0; "A"); - assert_eq!(1, engine.current_traversals_count); // Log is not matched. engine_process_log!(engine; "no match"); engine_assert_active_runs!(engine; 0; "A"); - assert_eq!(1, engine.current_traversals_count); // Checks that the traversal count does not change if we get update // with the same workflow. engine.update(engine_config.clone()); - assert_eq!(1, engine.current_traversals_count); + engine_assert_active_runs!(engine; 0; "A"); // Check that traversals count goes to 0 if empty update happens. engine.update(WorkflowsEngineConfig::new_with_workflow_configurations( vec![], )); - assert_eq!(0, engine.current_traversals_count); + assert!(engine.state.workflows.is_empty()); - // Check that traversals stay at since no log has been processed yet. + // Check that traversals stay zero at since no log has been processed yet. engine.update(engine_config); - assert_eq!(0, engine.current_traversals_count); + assert!(engine.state.workflows[0].runs().is_empty()); // A traversal is created to process an incoming log that's not matched. engine_process_log!(engine; "no match"); engine_assert_active_runs!(engine; 0; "A"); - assert_eq!(1, engine.current_traversals_count); } #[tokio::test] @@ -2703,19 +2301,6 @@ async fn test_exclusive_workflow_state_reset() { // state `B`. engine_process_log!(engine; "foo"); engine_assert_active_runs!(engine; 0; "B"); - setup - .collector - .assert_counter_eq(0, "workflows:workflow_resets_total", labels! {}); - setup.collector.assert_counter_eq( - 1, - "workflows:runs_total", - labels! { "operation" => "start" }, - ); - setup.collector.assert_counter_eq( - 1, - "workflows:traversals_total", - labels! { "operation" => "start" }, - ); // * A new initial state run is created and added to the beginning of runs list so that the // workflow has a run that's in an initial state. @@ -2723,63 +2308,16 @@ async fn test_exclusive_workflow_state_reset() { // state `C`. engine_process_log!(engine; "bar"); engine_assert_active_runs!(engine; 0; "A", "C"); - setup - .collector - .assert_counter_eq(0, "workflows:workflow_resets_total", labels! {}); - setup.collector.assert_counter_eq( - 2, - "workflows:runs_total", - labels! { "operation" => "start" }, - ); - setup.collector.assert_counter_eq( - 2, - "workflows:traversals_total", - labels! { "operation" => "start" }, - ); // The log is not matched by any of the runs. engine_process_log!(engine; "not matching"); engine_assert_active_runs!(engine; 0; "A", "C"); - setup - .collector - .assert_counter_eq(0, "workflows:workflow_resets_total", labels! {}); - setup.collector.assert_counter_eq( - 2, - "workflows:runs_total", - labels! { "operation" => "start" }, - ); - setup.collector.assert_counter_eq( - 2, - "workflows:traversals_total", - labels! { "operation" => "start" }, - ); // * The log is not matched by the run that's not in an initial state. // * The log is matched by the run that's in an initial state. That causes the state advancement // of the run and the removal of the other run. engine_process_log!(engine; "foo"); engine_assert_active_runs!(engine; 0; "B"); - setup - .collector - .assert_counter_eq(1, "workflows:workflow_resets_total", labels! {}); - setup.collector.assert_counter_eq( - 2, - "workflows:runs_total", - labels! { "operation" => "start" }, - ); - setup - .collector - .assert_counter_eq(1, "workflows:runs_total", labels! { "operation" => "stop" }); - setup.collector.assert_counter_eq( - 2, - "workflows:traversals_total", - labels! { "operation" => "start" }, - ); - setup.collector.assert_counter_eq( - 1, - "workflows:traversals_total", - labels! { "operation" => "stop" }, - ); } #[tokio::test] diff --git a/bd-workflows/src/workflow.rs b/bd-workflows/src/workflow.rs index 332bdf69..20572985 100644 --- a/bd-workflows/src/workflow.rs +++ b/bd-workflows/src/workflow.rs @@ -79,21 +79,13 @@ impl Workflow { &mut self, config: &'a Config, log: &LogRef<'_>, - current_traversals_count: &mut u32, - traversals_count_limit: u32, ) -> WorkflowResult<'a> { let mut result = WorkflowResult::default(); if self.needs_new_run() { let run = Run::new(config); - if self.maybe_add_run( - run, - current_traversals_count, - traversals_count_limit, - &mut result, - ) { - log::trace!("added a new run for workflow {}", self.id); - } + self.add_run(run); + log::trace!("added a new run for workflow {}", self.id); } let mut did_make_progress = false; @@ -105,43 +97,13 @@ impl Workflow { result.incorporate_run_result(&mut run_result); - *current_traversals_count += run_result.created_traversals_count; - // TODO(Augustyniak): a 'standard' subtracting operation should be sufficient here for - // production code but it's no enough for tests due to the way they track - // `current_traversals_count` (in tests, the processing of any given always starts with - // `current_traversals_count == 0` even if it was != 0 as the result of processing - // previous logs). Rework tests to make it possible to replace `saturating_sub` with `-` - // operator in here. - *current_traversals_count = - current_traversals_count.saturating_sub(run_result.completed_traversals_count); - - let is_over_traversals_count_limit = *current_traversals_count > traversals_count_limit; let run_result_did_make_progress = run_result.did_make_progress(); - match (run_result.state, is_over_traversals_count_limit) { - // If after processing a given log by a workflow run we notice that we end up - // over the limit of allowed traversals we remove the `run` which caused the - // overflow. - (RunState::Stopped, _) | (_, true) => { - result.stats.stopped_runs_count += 1; - - if is_over_traversals_count_limit { - result.stats.traversals_count_limit_hit_count += 1; - log::debug!( - "traversals count ({}) is over the limit ({}); stopping run", - *current_traversals_count, - traversals_count_limit - ); - } - - let stopped_traversals_count = run.traversals_count(); - result.stats.stopped_traversals_count += stopped_traversals_count; - *current_traversals_count = - current_traversals_count.saturating_sub(stopped_traversals_count); - + match run_result.state { + RunState::Stopped => { self.runs.remove(index); }, - (RunState::Completed, _) => { + RunState::Completed => { debug_assert!( self.runs[index].traversals.is_empty(), "completing a run with active traversals" @@ -151,22 +113,12 @@ impl Workflow { log::trace!("completed run, workflow id={:?}", self.id); - result.stats.advanced_runs_count += 1; - result.stats.completed_runs_count += 1; - self.runs.remove(index); }, - (RunState::Running, _) => { + RunState::Running => { if run_result_did_make_progress { did_make_progress = true; } - - // The run is still running. - if run_result.advanced_traversals_count > 0 { - // Since at least one traversal was advanced we - // consider the enclosing run to be advanced too. - result.stats.advanced_runs_count += 1; - } }, } @@ -209,15 +161,7 @@ impl Workflow { // This is safe as `has_active_run == true means that there are at least two runs and // `is_initial_run == true`` means that we are processing run with index == 0. log::trace!("resetting workflow due to initial state transition"); - let run = self.runs.remove(index + 1); - - let removed_traversals_count = run.traversals_count(); - result.stats.stopped_runs_count += 1; - result.stats.stopped_traversals_count += removed_traversals_count; - *current_traversals_count = - current_traversals_count.saturating_sub(removed_traversals_count); - - result.stats.reset_exclusive_workflows_count += 1; + self.runs.remove(index + 1); } else { // The active state run made progress and the next run to be processed (if there is any) // is an initial state run that we do not want to expose to the log. @@ -229,39 +173,9 @@ impl Workflow { result } - fn maybe_add_run( - &mut self, - run: Run, - current_traversals_count: &mut u32, - traversals_count_limit: u32, - result: &mut WorkflowResult<'_>, - ) -> bool { - if run.traversals_count() + *current_traversals_count <= traversals_count_limit { - log::trace!("workflow={}: creating a new run", self.id); - - result.stats.created_runs_count += 1; - result.stats.created_traversals_count += run.traversals_count(); - - *current_traversals_count += run.traversals_count(); - self.runs.insert(0, run); - - true - } else { - result.stats.traversals_count_limit_hit_count += 1; - log::debug!( - "workflow={}: traversals count ({}) is over the limit ({}); preventing a new run from \ - being added", - self.id, - *current_traversals_count, - traversals_count_limit - ); - - false - } - } - - pub(crate) fn traversals_count(&self) -> u32 { - self.runs.iter().map(Run::traversals_count).sum() + fn add_run(&mut self, run: Run) { + log::trace!("workflow={}: creating a new run", self.id); + self.runs.insert(0, run); } pub(crate) fn needs_new_run(&self) -> bool { @@ -317,10 +231,6 @@ impl Workflow { is_in_initial_state } - pub(crate) fn remove_run(&mut self, index: usize) { - self.runs.remove(index); - } - /// Remove all workflow runs. pub(crate) fn remove_all_runs(&mut self) { self.runs.clear(); @@ -380,9 +290,6 @@ impl<'a> WorkflowResult<'a> { .triggered_actions .append(&mut run_result.triggered_actions); self.logs_to_inject.append(&mut run_result.logs_to_inject); - self.stats.created_traversals_count += run_result.created_traversals_count; - self.stats.advanced_traversals_count += run_result.advanced_traversals_count; - self.stats.completed_traversals_count += run_result.completed_traversals_count; self.stats.matched_logs_count += run_result.matched_logs_count; } } @@ -396,37 +303,12 @@ impl<'a> WorkflowResult<'a> { #[derive(Debug, Default, PartialEq, Eq)] #[allow(clippy::struct_field_names)] pub(crate) struct WorkflowResultStats { - pub(crate) created_runs_count: u32, - pub(crate) advanced_runs_count: u32, - pub(crate) stopped_runs_count: u32, - pub(crate) completed_runs_count: u32, - - pub(crate) created_traversals_count: u32, - pub(crate) advanced_traversals_count: u32, - pub(crate) stopped_traversals_count: u32, - pub(crate) completed_traversals_count: u32, - - /// The number of times the engine prevented a run and/or traversal from being - /// created due to a configured traversals count limit. - pub(crate) traversals_count_limit_hit_count: u32, - - pub(crate) reset_exclusive_workflows_count: u32, - pub(crate) matched_logs_count: u32, } impl WorkflowResultStats { pub(crate) const fn did_make_progress(&self) -> bool { - self.created_runs_count > 0 - || self.advanced_runs_count > 0 - || self.stopped_runs_count > 0 - || self.completed_runs_count > 0 - || self.created_traversals_count > 0 - || self.advanced_traversals_count > 0 - || self.stopped_traversals_count > 0 - || self.completed_traversals_count > 0 - || self.matched_logs_count > 0 - || self.reset_exclusive_workflows_count > 0 + self.matched_logs_count > 0 } } @@ -550,10 +432,6 @@ impl Run { let mut run_triggered_actions = Vec::>::new(); let mut run_logs_to_inject = BTreeMap::<&'a str, Log>::new(); - - let mut created_traversals_count = 0; - let mut advanced_traversals_count = 0; - let mut completed_traversals_count = 0; let mut run_matched_logs_count = 0; // Process traversals in reversed order as we may end up modifying the array @@ -583,9 +461,6 @@ impl Run { return RunResult { state: RunState::Stopped, triggered_actions: vec![], - created_traversals_count: 0, - advanced_traversals_count: 0, - completed_traversals_count: 0, matched_logs_count: run_matched_logs_count, logs_to_inject: BTreeMap::new(), }; @@ -606,9 +481,6 @@ impl Run { return RunResult { state: RunState::Stopped, triggered_actions: vec![], - created_traversals_count: 0, - advanced_traversals_count: 0, - completed_traversals_count: 0, matched_logs_count: run_matched_logs_count, logs_to_inject: BTreeMap::new(), }; @@ -638,20 +510,6 @@ impl Run { continue; } - // The currently processed traversal is about to advance. - advanced_traversals_count += 1; - - if traversal_result.output_traversals.is_empty() { - // Traversal has no successors so it's been completed. - completed_traversals_count += 1; - } else { - #[allow(clippy::cast_possible_truncation)] - let output_traversals_count = traversal_result.output_traversals.len() as u32; - // The number of created traversals is the number of output traversals - // minus the input traversal. - created_traversals_count += output_traversals_count - 1; - } - // Replace advanced traversals with their successors. // Each advanced traversal may have 0 or more successors. // Notes: @@ -675,9 +533,6 @@ impl Run { RunResult { state, triggered_actions: run_triggered_actions, - created_traversals_count, - advanced_traversals_count, - completed_traversals_count, matched_logs_count: run_matched_logs_count, logs_to_inject: run_logs_to_inject, } @@ -708,12 +563,6 @@ impl Run { ); self.traversals.iter().all(Traversal::is_in_initial_state) } - - pub(crate) fn traversals_count(&self) -> u32 { - #[allow(clippy::cast_possible_truncation)] - let traversals_count = self.traversals.len() as u32; - traversals_count - } } // @@ -744,13 +593,6 @@ pub(crate) struct RunResult<'a> { state: RunState, /// The list of triggered actions. triggered_actions: Vec>, - /// The number of newly created traversals. - created_traversals_count: u32, - /// The number of advanced traversals. The traversal is considered - /// to be advanced when it transitions to the next state. - advanced_traversals_count: u32, - /// The number of completed traversals. - completed_traversals_count: u32, /// The number of matched logs. A single log can be matched multiple times matched_logs_count: u32, // Logs to be injected back into the workflow engine after field attachment and other processing. diff --git a/bd-workflows/src/workflow_test.rs b/bd-workflows/src/workflow_test.rs index fbb68fc5..f7721b9a 100644 --- a/bd-workflows/src/workflow_test.rs +++ b/bd-workflows/src/workflow_test.rs @@ -61,8 +61,6 @@ macro_rules! workflow_process_log { occurred_at: time::OffsetDateTime::now_utc(), capture_session: None, }, - &mut 0, - 1000, ) }; ($annotated_workflow:expr; $message:expr,with $tags:expr) => { @@ -80,8 +78,6 @@ macro_rules! workflow_process_log { occurred_at: time::OffsetDateTime::now_utc(), capture_session: None, }, - &mut 0, - 1000, ) }; } @@ -273,16 +269,6 @@ fn multiple_start_nodes_initial_fork() { triggered_actions: vec![], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 1, - advanced_runs_count: 1, - completed_runs_count: 0, - stopped_runs_count: 0, - created_traversals_count: 2, - advanced_traversals_count: 1, - completed_traversals_count: 0, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 2, }, } @@ -296,16 +282,6 @@ fn multiple_start_nodes_initial_fork() { triggered_actions: vec![], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 1, - created_runs_count: 1, - advanced_runs_count: 1, - completed_runs_count: 0, - stopped_runs_count: 1, - created_traversals_count: 2, - advanced_traversals_count: 1, - completed_traversals_count: 0, - stopped_traversals_count: 2, - traversals_count_limit_hit_count: 0, matched_logs_count: 2, }, } @@ -323,16 +299,6 @@ fn multiple_start_nodes_initial_fork() { })], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 1, - advanced_runs_count: 1, - completed_runs_count: 0, - stopped_runs_count: 0, - created_traversals_count: 1, - advanced_traversals_count: 1, - completed_traversals_count: 1, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 1, }, } @@ -372,16 +338,6 @@ fn multiple_start_nodes_initial_branching() { triggered_actions: vec![], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 1, - advanced_runs_count: 1, - completed_runs_count: 0, - stopped_runs_count: 0, - created_traversals_count: 1, - advanced_traversals_count: 1, - completed_traversals_count: 0, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 1, }, } @@ -396,16 +352,6 @@ fn multiple_start_nodes_initial_branching() { triggered_actions: vec![], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 1, - created_runs_count: 1, - advanced_runs_count: 1, - completed_runs_count: 0, - stopped_runs_count: 1, - created_traversals_count: 1, - advanced_traversals_count: 1, - completed_traversals_count: 0, - stopped_traversals_count: 1, - traversals_count_limit_hit_count: 0, matched_logs_count: 1, }, } @@ -423,16 +369,6 @@ fn multiple_start_nodes_initial_branching() { })], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 1, - advanced_runs_count: 1, - completed_runs_count: 1, - stopped_runs_count: 0, - created_traversals_count: 1, - advanced_traversals_count: 1, - completed_traversals_count: 1, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 1, }, } @@ -488,16 +424,6 @@ fn basic_exclusive_workflow() { ], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 1, - advanced_runs_count: 1, - completed_runs_count: 0, - stopped_runs_count: 0, - created_traversals_count: 1, - advanced_traversals_count: 1, - completed_traversals_count: 0, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 1, }, } @@ -518,16 +444,6 @@ fn basic_exclusive_workflow() { })], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 1, - advanced_runs_count: 1, - completed_runs_count: 1, - stopped_runs_count: 0, - created_traversals_count: 1, - advanced_traversals_count: 1, - completed_traversals_count: 1, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 1, }, } @@ -574,16 +490,6 @@ fn exclusive_workflow_matched_logs_count_limit() { triggered_actions: vec![], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 1, - advanced_runs_count: 1, - completed_runs_count: 0, - stopped_runs_count: 0, - created_traversals_count: 2, - advanced_traversals_count: 1, - completed_traversals_count: 0, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 2, }, } @@ -602,16 +508,6 @@ fn exclusive_workflow_matched_logs_count_limit() { triggered_actions: vec![], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 1, - advanced_runs_count: 0, - completed_runs_count: 0, - stopped_runs_count: 0, - created_traversals_count: 1, - advanced_traversals_count: 0, - completed_traversals_count: 0, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 1, }, } @@ -631,16 +527,6 @@ fn exclusive_workflow_matched_logs_count_limit() { triggered_actions: vec![], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 0, - advanced_runs_count: 0, - completed_runs_count: 0, - stopped_runs_count: 1, - created_traversals_count: 0, - advanced_traversals_count: 0, - completed_traversals_count: 0, - stopped_traversals_count: 2, - traversals_count_limit_hit_count: 0, matched_logs_count: 1, }, } @@ -682,16 +568,6 @@ fn exclusive_workflow_log_rule_count() { triggered_actions: vec![], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 1, - advanced_runs_count: 0, - completed_runs_count: 0, - stopped_runs_count: 0, - created_traversals_count: 1, - advanced_traversals_count: 0, - completed_traversals_count: 0, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 1, }, } @@ -712,16 +588,6 @@ fn exclusive_workflow_log_rule_count() { })], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 1, - advanced_runs_count: 1, - completed_runs_count: 0, - stopped_runs_count: 0, - created_traversals_count: 1, - advanced_traversals_count: 1, - completed_traversals_count: 0, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 1, }, } @@ -748,16 +614,6 @@ fn exclusive_workflow_log_rule_count() { })], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 0, - advanced_runs_count: 1, - completed_runs_count: 1, - stopped_runs_count: 0, - created_traversals_count: 0, - advanced_traversals_count: 1, - completed_traversals_count: 1, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 1, }, } @@ -816,16 +672,6 @@ fn branching_exclusive_workflow() { })], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 1, - advanced_runs_count: 1, - completed_runs_count: 0, - stopped_runs_count: 0, - created_traversals_count: 1, - advanced_traversals_count: 1, - completed_traversals_count: 0, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 1, }, } @@ -841,16 +687,6 @@ fn branching_exclusive_workflow() { assert!(result.triggered_actions.is_empty()); assert_eq!( WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 1, - advanced_runs_count: 0, - completed_runs_count: 0, - stopped_runs_count: 0, - created_traversals_count: 1, - advanced_traversals_count: 0, - completed_traversals_count: 0, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 0, }, result.stats @@ -870,16 +706,6 @@ fn branching_exclusive_workflow() { })], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 0, - advanced_runs_count: 1, - completed_runs_count: 1, - stopped_runs_count: 0, - created_traversals_count: 0, - advanced_traversals_count: 1, - completed_traversals_count: 1, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 1, }, } @@ -907,16 +733,6 @@ fn branching_exclusive_workflow() { ], logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { - reset_exclusive_workflows_count: 0, - created_runs_count: 0, - advanced_runs_count: 1, - completed_runs_count: 0, - stopped_runs_count: 0, - created_traversals_count: 1, - advanced_traversals_count: 1, - completed_traversals_count: 0, - stopped_traversals_count: 0, - traversals_count_limit_hit_count: 0, matched_logs_count: 2, }, }