Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions bd-runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,20 +789,6 @@ pub mod workflows {
"workflows.persistence_write_interval_ms",
1.seconds()
);

// The maximum number of workflow traversals that may be active.
int_feature_flag!(
TraversalsCountLimitFlag,
"workflows.traversals_global_count_limit",
200
);

// The interval at which workflows state persistence attempts to disk are made.
duration_feature_flag!(
StatePeriodicWriteIntervalFlag,
"workflows.state_periodic_write_interval_ms",
5.seconds()
);
}

pub mod platform_events {
Expand Down
215 changes: 6 additions & 209 deletions bd-workflows/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,9 @@ use bd_client_stats::Stats;
use bd_client_stats_store::{Counter, Histogram, Scope};
use bd_log_primitives::{Log, LogRef};
pub use bd_matcher::FieldProvider;
use bd_runtime::runtime::workflows::{
PersistenceWriteIntervalFlag,
StatePeriodicWriteIntervalFlag,
TraversalsCountLimitFlag,
};
use bd_runtime::runtime::workflows::PersistenceWriteIntervalFlag;
use bd_runtime::runtime::{ConfigLoader, DurationWatch, IntWatch, session_capture};
use bd_stats_common::labels;
use bd_time::TimeDurationExt as _;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet, HashMap};
Expand Down Expand Up @@ -75,15 +70,6 @@ pub struct WorkflowsEngine {

stats: WorkflowsEngineStats,

current_traversals_count: u32,
/// Used to keep track of the current number of traversals.
/// The other way to do this would be to iterate over all workflows
/// and their runs and their traversals every time we want to
/// know how many traversals are there but this could turn out to
/// be expensive for cases when we have a lot of workflows/traversals/runs.
traversals_count_limit: u32,
state_periodic_write_interval: time::Duration,

flush_buffers_actions_resolver: Resolver,
flush_buffers_negotiator_input_tx: Sender<PendingFlushBuffersAction>,
flush_buffers_negotiator_output_rx: Receiver<NegotiatorOutput>,
Expand Down Expand Up @@ -111,10 +97,6 @@ impl WorkflowsEngine {
) -> (Self, Receiver<BuffersToFlush>) {
let scope = scope.scope("workflows");

let traversals_count_limit_flag = TraversalsCountLimitFlag::register(runtime).unwrap();
let state_periodic_write_interval_flag =
StatePeriodicWriteIntervalFlag::register(runtime).unwrap();

// An arbitrary size for the channel that should be sufficient to handle incoming flush buffer
// triggers. It should be acceptable to drop events if the size limit is exceeded.
let (buffers_to_flush_tx, buffers_to_flush_rx) = tokio::sync::mpsc::channel(10);
Expand Down Expand Up @@ -144,9 +126,6 @@ impl WorkflowsEngine {
stats: WorkflowsEngineStats::new(&scope),
state_store: StateStore::new(sdk_directory, &scope, runtime),
needs_state_persistence: false,
current_traversals_count: 0,
traversals_count_limit: *traversals_count_limit_flag.read(),
state_periodic_write_interval: *state_periodic_write_interval_flag.read(),
flush_buffers_actions_resolver,
flush_buffers_negotiator_join_handle,
flush_buffers_negotiator_input_tx: input_tx,
Expand Down Expand Up @@ -213,13 +192,12 @@ impl WorkflowsEngine {

log::debug!(
"started workflows engine with {} workflow(s); {} pending processing action(s); {} pending \
sankey path uploads; {} streaming action(s); session ID: \"{}\"; traversals count limit: {}",
sankey path uploads; {} streaming action(s); session ID: \"{}\"",
self.state.workflows.len(),
self.state.pending_flush_actions.len(),
self.state.pending_sankey_actions.len(),
self.state.streaming_actions.len(),
self.state.session_id,
self.traversals_count_limit,
);
}

Expand Down Expand Up @@ -347,62 +325,14 @@ impl WorkflowsEngine {
}

fn add_workflow(&mut self, workflow: Workflow, config: Config) {
let mut workflow = workflow;

let workflow_traversals_count = workflow.traversals_count();
if workflow_traversals_count + self.current_traversals_count > self.traversals_count_limit {
log::debug!(
"failed to add workflow with {} traversal(s) due to traversals count limit being hit \
({}); current traversals count {}",
workflow_traversals_count,
self.traversals_count_limit,
self.current_traversals_count
);

// Workflow being added has too many traversal for the engine to not exceed
// the configured traversals count limit. Keep removing workflow's run until
// its addition to the engine is possible.
// Process traversals in reversed order as we may end up modifying the array
// starting at `index` in any given iteration of the loop + we want to start
// removing runs starting from the "latest" ones and they are at the end of
// the enumerated array.
let mut remaining_workflow_traversals_count = workflow_traversals_count;
for index in (0 .. workflow.runs().len()).rev() {
let run_traversals_count = workflow.runs()[index].traversals_count();

log::debug!("removing workflow run with {run_traversals_count} traversal(s)");

remaining_workflow_traversals_count -= run_traversals_count;
self.stats.traversals_count_limit_hit_total.inc();
workflow.remove_run(index);

if remaining_workflow_traversals_count + self.current_traversals_count
<= self.traversals_count_limit
{
break;
}
}
}

self.stats.workflow_starts_total.inc();
self
.stats
.run_starts_total
.inc_by(workflow.runs().len() as u64);
self
.stats
.traversal_starts_total
.inc_by(workflow_traversals_count.into());

log::trace!(
"workflow={}: workflow added, runs count {}, traversal count {}",
"workflow={}: workflow added, runs count {}",
workflow.id(),
workflow.runs().len(),
workflow_traversals_count,
);

self.current_traversals_count += workflow.traversals_count();

self.configs.push(config);
self.state.workflows.push(workflow);
}
Expand All @@ -411,20 +341,7 @@ impl WorkflowsEngine {
self.configs.remove(workflow_index);
let workflow = self.state.workflows.remove(workflow_index);

let workflow_traversals_count = workflow.traversals_count();
self.stats.workflow_stops_total.inc();
self
.stats
.run_stops_total
.inc_by(workflow.runs().len() as u64);
self
.stats
.traversal_stops_total
.inc_by(workflow_traversals_count.into());

self.current_traversals_count = self
.current_traversals_count
.saturating_sub(workflow_traversals_count);

log::debug!("workflow={}: workflow removed", workflow.id());

Expand Down Expand Up @@ -459,19 +376,12 @@ impl WorkflowsEngine {

pub async fn run(&mut self) {
loop {
self.run_once(true).await;
self.run_once().await;
}
}

pub(crate) async fn run_once(&mut self, persist_periodically: bool) {
// TODO(Augustyniak): Fix periodic state persistence as it's effectively a no-op now due to the
// way `run` method is being called.
let state_persistence_in = self.state_periodic_write_interval.sleep();

pub(crate) async fn run_once(&mut self) {
tokio::select! {
() = state_persistence_in, if persist_periodically => {
self.maybe_persist(false).await;
},
Some(negotiator_output) = self.flush_buffers_negotiator_output_rx.recv() => {
log::debug!("received flush buffers negotiator output: \"{negotiator_output:?}\"");

Expand Down Expand Up @@ -621,11 +531,6 @@ impl WorkflowsEngine {
&& log.capture_session.is_none()
&& self.state.streaming_actions.is_empty()
{
self
.stats
.active_traversals_total
.observe(self.current_traversals_count.into());

return WorkflowsEngineResult {
log_destination_buffer_ids: Cow::Borrowed(log_destination_buffer_ids),
triggered_flushes_buffer_ids: BTreeSet::new(),
Expand All @@ -639,38 +544,14 @@ impl WorkflowsEngine {
let mut logs_to_inject: BTreeMap<&'a str, Log> = BTreeMap::new();
for (index, workflow) in &mut self.state.workflows.iter_mut().enumerate() {
let was_in_initial_state = workflow.is_in_initial_state();
let result = workflow.process_log(
&self.configs[index],
log,
&mut self.current_traversals_count,
self.traversals_count_limit,
);
let result = workflow.process_log(&self.configs[index], log);

macro_rules! inc_by {
($field:ident, $value:ident) => {
self.stats.$field.inc_by(u64::from(result.stats().$value));
};
}

inc_by!(
exclusive_workflow_resets_total,
reset_exclusive_workflows_count
);
inc_by!(run_starts_total, created_runs_count);
inc_by!(run_advances_total, advanced_runs_count);
inc_by!(run_stops_total, stopped_runs_count);
inc_by!(run_completions_total, completed_runs_count);

inc_by!(traversal_starts_total, created_traversals_count);
inc_by!(traversal_advances_total, advanced_traversals_count);
inc_by!(traversal_stops_total, stopped_traversals_count);
inc_by!(traversal_completions_total, completed_traversals_count);

inc_by!(
traversals_count_limit_hit_total,
traversals_count_limit_hit_count
);

inc_by!(matched_logs_total, matched_logs_count);

// Not every case of a workflow making a progress needs a state persistence.
Expand All @@ -695,22 +576,6 @@ impl WorkflowsEngine {
logs_to_inject.extend(workflow_logs_to_inject);
}

self
.stats
.active_traversals_total
.observe(self.current_traversals_count.into());

debug_assert!(
self
.state
.workflows
.iter()
.map(Workflow::traversals_count)
.sum::<u32>()
== self.current_traversals_count,
"current_traversals_count is not equal to computed traversals count"
);

let PreparedActions {
mut flush_buffers_actions,
emit_metric_actions,
Expand Down Expand Up @@ -824,7 +689,6 @@ impl WorkflowsEngine {
}

fn clean_state(&mut self) {
self.current_traversals_count = 0;
// We clear the ongoing workflows state as opposed to the whole state because:
// * pending actions (uploads) are not affected by the session change, and ongoing logs uploads
// should continue even as the session ID changes.
Expand Down Expand Up @@ -1180,41 +1044,6 @@ struct WorkflowsEngineStats {
/// server that removed them from the list of workflows.
workflow_stops_total: Counter,

/// The number of times the state of an exclusive was reset and the workflow moved back to its
/// initial state.
exclusive_workflow_resets_total: Counter,

/// The number of started runs. Each workflow has at least one and can
/// have significantly more runs.
run_starts_total: Counter,
/// The number of runs who made progress. A progress is defined as a move of one or more
/// of run's traversals from one state to another.
run_advances_total: Counter,
/// A number of runs that completed. Completion means that all of their traversals reached
/// the final state.
run_completions_total: Counter,
/// A number of runs stopped in response to config update from a server or exceeding
/// one of the workflows-related limits controlled with either a workflows config or
/// SDK runtime settings.
run_stops_total: Counter,

/// The number of started traversals. See `Traversal` for more details.
traversal_starts_total: Counter,
/// The number of workflow traversals that made progress. A progress is defined as move from one
/// state to another.
traversal_advances_total: Counter,
/// The number of workflow traversals that reached one of workflow's final state.
traversal_completions_total: Counter,
/// A number of traversals stopped in response to config update from a server or exceeding
/// one of the workflows-related limits controlled with either a workflows config or
/// SDK runtime settings.
traversal_stops_total: Counter,

/// The number of active traversals.
active_traversals_total: Histogram,
/// The number of times the engine prevented a run and/or traversal from being
/// created due to a configured traversals count limit.
traversals_count_limit_hit_total: Counter,
/// The number of matched logs. A single log can be matched multiple times
matched_logs_total: Counter,

Expand All @@ -1241,42 +1070,10 @@ impl WorkflowsEngineStats {
let workflow_stops_total =
scope.counter_with_labels("workflows_total", labels!("operation" => "stop"));

let exclusive_workflow_resets_total = scope.counter("workflow_resets_total");

let run_starts_total = scope.counter_with_labels("runs_total", labels!("operation" => "start"));
let run_advances_total =
scope.counter_with_labels("runs_total", labels!("operation" => "advance"));
let run_completions_total =
scope.counter_with_labels("runs_total", labels!("operation" => "completion"));
let run_stops_total = scope.counter_with_labels("runs_total", labels!("operation" => "stop"));

let traversal_starts_total =
scope.counter_with_labels("traversals_total", labels!("operation" => "start"));
let traversal_advances_total =
scope.counter_with_labels("traversals_total", labels!("operation" => "advance"));
let traversal_completions_total =
scope.counter_with_labels("traversals_total", labels!("operation" => "completion"));
let traversal_stops_total =
scope.counter_with_labels("traversals_total", labels!("operation" => "stop"));

Self {
workflow_starts_total,
workflow_stops_total,

exclusive_workflow_resets_total,

run_starts_total,
run_advances_total,
run_completions_total,
run_stops_total,

traversal_starts_total,
traversal_advances_total,
traversal_completions_total,
traversal_stops_total,

active_traversals_total: scope.histogram("traversal_active_total"),
traversals_count_limit_hit_total: scope.counter("traversals_count_limit_hit_total"),
matched_logs_total: scope.counter("matched_logs_total"),

process_log_duration: scope.histogram("engine_process_log_duration_s"),
Expand Down
Loading
Loading