diff --git a/bd-log-filter/src/lib.rs b/bd-log-filter/src/lib.rs index 8ff80ed9..5c6a76d9 100644 --- a/bd-log-filter/src/lib.rs +++ b/bd-log-filter/src/lib.rs @@ -19,6 +19,7 @@ mod filter_chain_test; use anyhow::{Context, Result, anyhow}; +use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{ FieldsRef, LOG_FIELD_NAME_LEVEL, @@ -87,10 +88,13 @@ impl FilterChain { pub fn process(&self, log: &mut Log) { for filter in &self.filters { let fields_ref = FieldsRef::new(&log.fields, &log.matching_fields); - if !filter - .matcher - .do_match(log.log_level, log.log_type, &log.message, fields_ref, None) - { + if !filter.matcher.do_match( + log.log_level, + log.log_type, + &log.message, + fields_ref, + &TinyMap::default(), + ) { continue; } diff --git a/bd-log-matcher/src/legacy_matcher_test.rs b/bd-log-matcher/src/legacy_matcher_test.rs index 44b88cfc..0ae81834 100644 --- a/bd-log-matcher/src/legacy_matcher_test.rs +++ b/bd-log-matcher/src/legacy_matcher_test.rs @@ -9,6 +9,7 @@ use crate::matcher::Tree; use assert_matches::assert_matches; +use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{ EMPTY_FIELDS, FieldsRef, @@ -79,7 +80,13 @@ fn match_test_runner(config: LegacyLogMatcher, cases: Vec<(Input<'_>, bool)>) { assert_eq!( should_match, - match_tree.do_match(log_level, log_type, &message, fields_ref, None), + match_tree.do_match( + log_level, + log_type, + &message, + fields_ref, + &TinyMap::default() + ), "{input:?} should result in {should_match} but did not", ); } diff --git a/bd-log-matcher/src/matcher.rs b/bd-log-matcher/src/matcher.rs index b51fe782..187ef478 100644 --- a/bd-log-matcher/src/matcher.rs +++ b/bd-log-matcher/src/matcher.rs @@ -24,6 +24,7 @@ use base_log_matcher::tag_match::Value_match::{ SemVerValueMatch, StringValueMatch, }; +use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{FieldsRef, LogLevel, LogMessage, LogType}; use bd_proto::protos::config::v1::config::log_matcher::base_log_matcher::StringMatchType; use bd_proto::protos::config::v1::config::log_matcher::{ @@ -46,7 +47,6 @@ use log_matcher::log_matcher::base_log_matcher::{ use log_matcher::log_matcher::{BaseLogMatcher, Matcher, base_log_matcher}; use regex::Regex; use std::borrow::Cow; -use std::collections::BTreeMap; use std::ops::Deref; const LOG_LEVEL_KEY: &str = "log_level"; @@ -79,14 +79,11 @@ impl From for ValueOrSavedFieldId { } impl<'a, T: MakeValueOrRef<'a, T>> ValueOrSavedFieldId { - fn load( - &'a self, - extracted_fields: Option<&'a BTreeMap>, - ) -> Option> { + fn load(&'a self, extracted_fields: &'a TinyMap) -> Option> { match self { Self::Value(v) => Some(ValueOrRef::Ref(v)), Self::SaveFieldId(field_id) => extracted_fields - .and_then(|extracted_fields| extracted_fields.get(field_id)) + .get(field_id) .and_then(|v| T::make_value_or_ref(v)), } } @@ -207,7 +204,7 @@ impl Tree { log_type: LogType, message: &LogMessage, fields: FieldsRef<'_>, - extracted_fields: Option<&BTreeMap>, + extracted_fields: &TinyMap, ) -> bool { match self { Self::Base(base_matcher) => match base_matcher { @@ -268,7 +265,7 @@ impl IntMatch { } } - fn evaluate(&self, candidate: i32, extracted_fields: Option<&BTreeMap>) -> bool { + fn evaluate(&self, candidate: i32, extracted_fields: &TinyMap) -> bool { let Some(value) = self.value.load(extracted_fields) else { return false; }; @@ -317,7 +314,7 @@ impl DoubleMatch { } } - fn evaluate(&self, candidate: f64, extracted_fields: Option<&BTreeMap>) -> bool { + fn evaluate(&self, candidate: f64, extracted_fields: &TinyMap) -> bool { let candidate = NanEqualFloat(candidate); let Some(value) = self.value.load(extracted_fields) else { return false; @@ -386,7 +383,7 @@ impl StringMatch { }) } - fn evaluate(&self, candidate: &str, extracted_fields: Option<&BTreeMap>) -> bool { + fn evaluate(&self, candidate: &str, extracted_fields: &TinyMap) -> bool { let Some(value) = self.value.load(extracted_fields) else { return false; }; diff --git a/bd-log-matcher/src/matcher_test.rs b/bd-log-matcher/src/matcher_test.rs index 5f4fba58..166a25de 100644 --- a/bd-log-matcher/src/matcher_test.rs +++ b/bd-log-matcher/src/matcher_test.rs @@ -7,6 +7,7 @@ use crate::matcher::Tree; use crate::matcher::base_log_matcher::tag_match::Value_match::DoubleValueMatch; +use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{ EMPTY_FIELDS, FieldsRef, @@ -33,7 +34,6 @@ use log_matcher::base_log_matcher::tag_match::Value_match::{ use log_matcher::{BaseLogMatcher, Matcher, MatcherList, base_log_matcher}; use pretty_assertions::assert_eq; use protobuf::MessageField; -use std::collections::BTreeMap; type Input<'a> = (LogType, LogLevel, LogMessage, LogFields); @@ -230,7 +230,7 @@ fn test_extracted_string_matcher() { (log_tag("keyx", "exact"), false), (log_msg("no fields"), false), ], - None, + &TinyMap::default(), ); match_test_runner_with_extractions( @@ -240,7 +240,7 @@ fn test_extracted_string_matcher() { (log_tag("keyx", "exact"), false), (log_msg("no fields"), false), ], - Some(&BTreeMap::from([("id1".to_string(), "exact".to_string())])), + &[("id1".to_string(), "exact".to_string())].into(), ); } @@ -309,7 +309,7 @@ fn test_extracted_double_matcher() { (log_tag("key", "13.0"), false), (log_tag("key", "13"), false), ], - None, + &TinyMap::default(), ); match_test_runner_with_extractions( config.clone(), @@ -317,12 +317,12 @@ fn test_extracted_double_matcher() { (log_tag("key", "13.0"), false), (log_tag("key", "13"), false), ], - Some(&BTreeMap::from([("id1".to_string(), "bad".to_string())])), + &[("id1".to_string(), "bad".to_string())].into(), ); match_test_runner_with_extractions( config, vec![(log_tag("key", "13.0"), true), (log_tag("key", "13"), true)], - Some(&BTreeMap::from([("id1".to_string(), "13".to_string())])), + &[("id1".to_string(), "13".to_string())].into(), ); } @@ -419,13 +419,13 @@ fn test_extracted_int_matcher() { (log_tag("key", "13"), false), (log_tag("key", "13.0"), false), ], - None, + &TinyMap::default(), ); match_test_runner_with_extractions( config, vec![(log_tag("key", "13"), true), (log_tag("key", "13.0"), true)], - Some(&BTreeMap::from([("id1".to_string(), "13".to_string())])), + &[("id1".to_string(), "13".to_string())].into(), ); } @@ -856,14 +856,14 @@ fn make_message_match(operator: Operator, match_value: &str) -> base_log_matcher #[allow(clippy::needless_pass_by_value)] fn match_test_runner(config: LogMatcher, cases: Vec<(Input<'_>, bool)>) { - match_test_runner_with_extractions(config, cases, None); + match_test_runner_with_extractions(config, cases, &TinyMap::default()); } #[allow(clippy::needless_pass_by_value)] fn match_test_runner_with_extractions( config: LogMatcher, cases: Vec<(Input<'_>, bool)>, - extracted_fields: Option<&BTreeMap>, + extracted_fields: &TinyMap, ) { let match_tree = Tree::new(&config).unwrap(); diff --git a/bd-log-matcher/src/test.rs b/bd-log-matcher/src/test.rs index 30bb314a..c83b8f7a 100644 --- a/bd-log-matcher/src/test.rs +++ b/bd-log-matcher/src/test.rs @@ -6,6 +6,7 @@ // https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt use crate::matcher::Tree; +use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{FieldsRef, LogMessage, LogType, StringOrBytes, TypedLogLevel}; use bd_proto::protos::log_matcher::log_matcher::LogMatcher; use std::collections::HashMap; @@ -41,7 +42,7 @@ impl TestMatcher { log_type, &message.into(), FieldsRef::new(&fields, &matching_fields), - None, + &TinyMap::default(), ) } } diff --git a/bd-log-primitives/src/lib.rs b/bd-log-primitives/src/lib.rs index 502accc8..6240c9a5 100644 --- a/bd-log-primitives/src/lib.rs +++ b/bd-log-primitives/src/lib.rs @@ -15,6 +15,7 @@ )] pub mod size; +pub mod tiny_set; use crate::size::MemorySized; use ahash::AHashMap; diff --git a/bd-log-primitives/src/tiny_set.rs b/bd-log-primitives/src/tiny_set.rs new file mode 100644 index 00000000..744f476c --- /dev/null +++ b/bd-log-primitives/src/tiny_set.rs @@ -0,0 +1,232 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +use serde::{Deserialize, Serialize}; +use std::borrow::Borrow; + +// The purpose of these data structures are to have a small map/set like structures which are +// efficient for small sizes (up to ~5 items) and use Vec as a backing store which is more +// efficient than HashMap/BTreeMap for small sizes. + +// +// TinySet +// + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +pub struct TinySet { + inner: TinyMap, +} + +impl TinySet { + pub fn iter(&self) -> impl Iterator { + self.inner.items.iter().map(|(k, ())| k) + } + + #[must_use] + pub fn first(&self) -> Option<&T> { + self.inner.items.first().map(|(k, ())| k) + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.inner.items.is_empty() + } + + pub fn remove(&mut self, value: &T) { + self.inner.remove(value); + } + + pub fn insert(&mut self, value: T) { + self.inner.insert(value, ()); + } + + pub fn extend(&mut self, iter: I) + where + I: IntoIterator, + { + self.inner.extend(iter.into_iter().map(|item| (item, ()))); + } + + #[must_use] + pub fn len(&self) -> usize { + self.inner.items.len() + } + + pub fn intersection(&self, other: &Self) -> impl Iterator { + self + .iter() + .filter(move |item| other.inner.get(item).is_some()) + } + + #[must_use] + pub fn is_disjoint(&self, other: &Self) -> bool { + self.iter().all(|item| other.inner.get(item).is_none()) + } + + pub fn contains(&self, value: &Q) -> bool + where + T: Borrow, + Q: PartialEq + ?Sized, + { + self.inner.get(value).is_some() + } + + pub fn retain(&mut self, f: F) + where + F: Fn(&T) -> bool, + { + self.inner.items.retain(|(k, ())| f(k)); + } +} + +impl Default for TinySet { + fn default() -> Self { + Self { + inner: TinyMap::default(), + } + } +} + +impl FromIterator for TinySet { + fn from_iter>(iter: I) -> Self { + Self { + inner: iter.into_iter().map(|item| (item, ())).collect(), + } + } +} + +impl From<[T; N]> for TinySet { + fn from(arr: [T; N]) -> Self { + Self { + inner: TinyMap::from(arr.map(|item| (item, ()))), + } + } +} + +impl IntoIterator for TinySet { + type Item = T; + type IntoIter = std::iter::Map, fn((T, ())) -> T>; + + fn into_iter(self) -> Self::IntoIter { + fn take_key((k, ()): (T, ())) -> T { + k + } + self.inner.into_iter().map(take_key::) + } +} + +// +// TinyMap +// + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +pub struct TinyMap { + items: Vec<(K, V)>, +} + +impl TinyMap { + pub fn get(&self, key: &Q) -> Option<&V> + where + K: Borrow, + Q: PartialEq + ?Sized, + { + self + .items + .iter() + .find_map(|(k, v)| if k.borrow() == key { Some(v) } else { None }) + } + + pub fn get_mut_or_insert_default(&mut self, key: K) -> &mut V + where + V: Default, + { + if let Some(pos) = self.items.iter().position(|(k, _)| k == &key) { + return &mut self.items[pos].1; + } + + debug_assert!(self.items.len() <= 5, "TinyMap should be small"); + self.items.push((key, V::default())); + #[allow(clippy::unwrap_used)] + &mut self.items.last_mut().unwrap().1 + } + + pub fn insert(&mut self, key: K, value: V) { + if let Some((_, v)) = self.items.iter_mut().find(|(k, _)| k == &key) { + *v = value; + } else { + debug_assert!(self.items.len() <= 5, "TinyMap should be small"); + self.items.push((key, value)); + } + } + + pub fn extend(&mut self, iter: I) + where + I: IntoIterator, + { + for (k, v) in iter { + self.insert(k, v); + } + } + + pub fn append(&mut self, other: &mut Self) { + for (k, v) in other.items.drain(..) { + self.insert(k, v); + } + } + + pub fn remove(&mut self, key: &Q) -> Option + where + K: Borrow, + Q: PartialEq + ?Sized, + { + if let Some(pos) = self.items.iter().position(|(k, _)| k.borrow() == key) { + Some(self.items.swap_remove(pos).1) + } else { + None + } + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.items.is_empty() + } + + pub fn into_values(self) -> impl Iterator { + self.items.into_iter().map(|(_, v)| v) + } +} + +impl IntoIterator for TinyMap { + type Item = (K, V); + type IntoIter = std::vec::IntoIter<(K, V)>; + + fn into_iter(self) -> Self::IntoIter { + self.items.into_iter() + } +} + +impl FromIterator<(K, V)> for TinyMap { + fn from_iter>(iter: I) -> Self { + let mut new = Self::default(); + new.extend(iter); + new + } +} + +impl From<[(K, V); N]> for TinyMap { + fn from(arr: [(K, V); N]) -> Self { + let mut new = Self::default(); + new.extend(arr); + new + } +} + +impl Default for TinyMap { + fn default() -> Self { + Self { items: Vec::new() } + } +} diff --git a/bd-logger/src/buffer_selector.rs b/bd-logger/src/buffer_selector.rs index 1344d992..68bebec0 100644 --- a/bd-logger/src/buffer_selector.rs +++ b/bd-logger/src/buffer_selector.rs @@ -6,10 +6,10 @@ // https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt use bd_log_matcher::matcher::Tree; +use bd_log_primitives::tiny_set::{TinyMap, TinySet}; use bd_log_primitives::{FieldsRef, LogLevel, LogMessage, LogType}; use bd_proto::protos::config::v1::config::BufferConfigList; use std::borrow::Cow; -use std::collections::BTreeSet; // A single buffer filter, containing the matchers used to determine if logs should be written to // the specific buffer. @@ -60,11 +60,11 @@ impl BufferSelector { log_level: LogLevel, message: &LogMessage, fields: FieldsRef<'_>, - ) -> BTreeSet> { - let mut buffers = BTreeSet::new(); + ) -> TinySet> { + let mut buffers = TinySet::default(); for buffer in &self.buffer_filters { for (_id, matcher) in &buffer.matchers { - if matcher.do_match(log_level, log_type, message, fields, None) { + if matcher.do_match(log_level, log_type, message, fields, &TinyMap::default()) { buffers.insert(Cow::Borrowed(buffer.buffer_id.as_str())); // No reason to match further. diff --git a/bd-logger/src/client_config.rs b/bd-logger/src/client_config.rs index 368be522..d2a14c3d 100644 --- a/bd-logger/src/client_config.rs +++ b/bd-logger/src/client_config.rs @@ -21,6 +21,7 @@ use bd_client_common::safe_file_cache::SafeFileCache; use bd_client_stats_store::{Counter, Scope}; use bd_log_filter::FilterChain; use bd_log_primitives::LogRef; +use bd_log_primitives::tiny_set::TinyMap; use bd_proto::protos::bdtail::bdtail_config::BdTailConfigurations; use bd_proto::protos::client::api::configuration_update::{StateOfTheWorld, Update_type}; use bd_proto::protos::client::api::configuration_update_ack::Nack; @@ -406,7 +407,13 @@ impl TailConfigurations { matcher .as_ref() .is_none_or(|matcher| { - matcher.do_match(log.log_level, log.log_type, log.message, log.fields, None) + matcher.do_match( + log.log_level, + log.log_type, + log.message, + log.fields, + &TinyMap::default(), + ) }) .then_some(id.as_str()) }) diff --git a/bd-logger/src/log_replay.rs b/bd-logger/src/log_replay.rs index d443da2b..b9f534b9 100644 --- a/bd-logger/src/log_replay.rs +++ b/bd-logger/src/log_replay.rs @@ -13,6 +13,7 @@ use bd_buffer::{AbslCode, BuffersWithAck, Error}; use bd_client_common::fb::make_log; use bd_client_stats::FlushTrigger; use bd_log_filter::FilterChain; +use bd_log_primitives::tiny_set::TinySet; use bd_log_primitives::{FieldsRef, Log, LogRef, LogType, log_level}; use bd_runtime::runtime::ConfigLoader; use bd_session_replay::CaptureScreenshotHandler; @@ -21,7 +22,6 @@ use bd_workflows::config::FlushBufferId; use bd_workflows::engine::{WorkflowsEngine, WorkflowsEngineConfig}; use bd_workflows::workflow::WorkflowDebugStateMap; use std::borrow::Cow; -use std::collections::BTreeSet; use std::path::Path; use time::OffsetDateTime; use tokio::sync::mpsc::{Receiver, Sender}; @@ -284,7 +284,7 @@ impl ProcessingPipeline { async fn finish_blocking_log_processing( flush_buffers_tx: tokio::sync::mpsc::Sender, flush_stats_trigger: FlushTrigger, - matching_buffers: BTreeSet>, + matching_buffers: TinySet>, ) -> anyhow::Result<()> { // The processing of a blocking log is about to complete. // We make an arbitrary decision to start with the flushing of log buffers to disk first and @@ -332,7 +332,7 @@ impl ProcessingPipeline { fn write_to_buffers<'a>( buffers: &mut BufferProducers, - matching_buffers: &BTreeSet>, + matching_buffers: &TinySet>, log: &LogRef<'_>, workflow_flush_buffer_action_ids: impl Iterator, ) -> anyhow::Result<()> { @@ -351,7 +351,7 @@ impl ProcessingPipeline { workflow_flush_buffer_action_ids, std::iter::empty(), |data| { - for buffer in matching_buffers { + for buffer in matching_buffers.iter() { // TODO(snowp): For both logger and buffer lookup we end up doing a map lookup, which // seems less than ideal in the logging path. Look into ways to optimize this, // possibly via vector indices instead of string keys. @@ -379,10 +379,10 @@ impl ProcessingPipeline { } fn process_flush_buffers_actions( - triggered_flush_buffers_action_ids: &BTreeSet>, + triggered_flush_buffers_action_ids: &TinySet>, buffers: &mut BufferProducers, - triggered_flushes_buffer_ids: &BTreeSet>, - written_to_buffers: &BTreeSet>, + triggered_flushes_buffer_ids: &TinySet>, + written_to_buffers: &TinySet>, log: &LogRef<'_>, ) -> Option { if triggered_flush_buffers_action_ids.is_empty() { diff --git a/bd-logger/src/logging_state.rs b/bd-logger/src/logging_state.rs index 5cbd97ab..4966a18b 100644 --- a/bd-logger/src/logging_state.rs +++ b/bd-logger/src/logging_state.rs @@ -16,6 +16,7 @@ use bd_client_stats::{FlushTrigger, Stats}; use bd_client_stats_store::{Counter, Scope}; use bd_log_filter::FilterChain; use bd_log_primitives::size::MemorySized; +use bd_log_primitives::tiny_set::TinySet; use bd_runtime::runtime::ConfigLoader; use bd_session_replay::CaptureScreenshotHandler; use bd_stats_common::labels; @@ -23,7 +24,7 @@ use bd_workflows::config::WorkflowsConfiguration; use bd_workflows::engine::WorkflowsEngine; use flatbuffers::FlatBufferBuilder; use std::borrow::Cow; -use std::collections::{BTreeSet, HashMap}; +use std::collections::HashMap; use std::fmt::Debug; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -283,8 +284,8 @@ pub struct ConfigUpdate { pub struct BufferProducers { pub(crate) buffers: HashMap, pub(crate) builder: FlatBufferBuilder<'static>, - pub(crate) continuous_buffer_ids: BTreeSet>, - pub(crate) trigger_buffer_ids: BTreeSet>, + pub(crate) continuous_buffer_ids: TinySet>, + pub(crate) trigger_buffer_ids: TinySet>, } impl BufferProducers { @@ -297,8 +298,8 @@ impl BufferProducers { .map(|(id, buffer)| Ok((id.clone(), buffer.1.new_thread_local_producer()?))) .collect::>()?; - let mut continuous_buffer_ids = BTreeSet::new(); - let mut trigger_buffer_ids = BTreeSet::new(); + let mut continuous_buffer_ids = TinySet::default(); + let mut trigger_buffer_ids = TinySet::default(); for (buffer_id, (buffer_type, _)) in buffer_manager.buffers() { match buffer_type { diff --git a/bd-workflows/benches/linux_only/matcher.rs b/bd-workflows/benches/linux_only/matcher.rs index 10490e2d..c2beedfd 100644 --- a/bd-workflows/benches/linux_only/matcher.rs +++ b/bd-workflows/benches/linux_only/matcher.rs @@ -8,6 +8,7 @@ use bd_api::DataUpload; use bd_client_stats::Stats; use bd_client_stats_store::Collector; +use bd_log_primitives::tiny_set::TinySet; use bd_log_primitives::{FieldsRef, LogFields, LogRef, LogType, log_level}; use bd_proto::protos::workflow::workflow::Workflow; use bd_runtime::runtime::ConfigLoader; @@ -22,7 +23,6 @@ use gungraun::{ library_benchmark, library_benchmark_group, }; -use std::collections::BTreeSet; use std::future::Future; use std::hint::black_box; use time::OffsetDateTime; @@ -61,8 +61,8 @@ impl Setup { engine .start(WorkflowsEngineConfig::new( WorkflowsConfiguration::new(workflows, vec![]), - BTreeSet::default(), - BTreeSet::default(), + TinySet::default(), + TinySet::default(), )) .await; @@ -133,10 +133,10 @@ fn run_runtime_bench>(engine: impl FnOnce() occurred_at: OffsetDateTime::now_utc(), capture_session: None, }; - engine.process_log(&log, &BTreeSet::default(), now); + engine.process_log(&log, &TinySet::default(), now); gungraun::client_requests::callgrind::start_instrumentation(); - engine.process_log(&log, &BTreeSet::default(), now); + engine.process_log(&log, &TinySet::default(), now); gungraun::client_requests::callgrind::stop_instrumentation(); }); } diff --git a/bd-workflows/src/actions_flush_buffers.rs b/bd-workflows/src/actions_flush_buffers.rs index 10ee4652..224dceaa 100644 --- a/bd-workflows/src/actions_flush_buffers.rs +++ b/bd-workflows/src/actions_flush_buffers.rs @@ -14,6 +14,7 @@ use anyhow::anyhow; use bd_api::DataUpload; use bd_api::upload::{Intent_type, IntentDecision, TrackedLogUploadIntent, WorkflowActionUpload}; use bd_client_stats_store::{Counter, Scope}; +use bd_log_primitives::tiny_set::TinySet; use bd_proto::protos::client::api::LogUploadIntentRequest; use bd_proto::protos::client::api::log_upload_intent_request::ExplicitSessionCapture; use bd_stats_common::labels; @@ -349,8 +350,8 @@ impl Negotiator { #[derive(Debug)] /// Responsible for orchestrating and managing flush buffer actions. pub(crate) struct Resolver { - trigger_buffer_ids: BTreeSet>, - continuous_buffer_ids: BTreeSet>, + trigger_buffer_ids: TinySet>, + continuous_buffer_ids: TinySet>, stats: ResolverStats, } @@ -359,8 +360,8 @@ impl Resolver { pub(crate) fn new(stats_scope: &Scope) -> Self { Self { stats: ResolverStats::new(stats_scope), - trigger_buffer_ids: BTreeSet::new(), - continuous_buffer_ids: BTreeSet::new(), + trigger_buffer_ids: TinySet::default(), + continuous_buffer_ids: TinySet::default(), } } @@ -375,14 +376,14 @@ impl Resolver { /// buffer actions that require further processing. pub(crate) fn process_flush_buffer_actions<'a>( &self, - actions: BTreeSet>, + actions: TinySet>, session_id: &str, - pending_actions: &BTreeSet, + pending_actions: &TinySet, streaming_actions: &[StreamingBuffersAction], ) -> FlushBuffersActionsProcessingResult<'a> { - let mut created_actions = BTreeSet::new(); - let mut triggered_flush_buffers_action_ids = BTreeSet::new(); - let mut triggered_flushes_buffer_ids = BTreeSet::new(); + let mut created_actions = TinySet::default(); + let mut triggered_flush_buffers_action_ids = TinySet::default(); + let mut triggered_flushes_buffer_ids = TinySet::default(); for action in actions { triggered_flush_buffers_action_ids.insert(match action { @@ -440,7 +441,7 @@ impl Resolver { pub(crate) fn process_streaming_actions<'a>( &self, mut streaming_actions: Vec<(StreamingBuffersAction, bool)>, - log_destination_buffer_ids: &BTreeSet>, + log_destination_buffer_ids: &TinySet>, session_id: &str, ) -> StreamingBuffersActionsProcessingResult<'a> { let mut has_changed_streaming_actions = false; @@ -488,9 +489,9 @@ impl Resolver { // of this process, the state of all active streaming actions is updated, with a counter of logs // streamed for each active streaming rule being incremented accordingly. - let mut final_log_destination_buffer_ids: BTreeSet<_> = BTreeSet::new(); + let mut final_log_destination_buffer_ids: TinySet<_> = TinySet::default(); - let mut not_rerouted_buffer_ids: BTreeSet<_> = log_destination_buffer_ids + let mut not_rerouted_buffer_ids: TinySet<_> = log_destination_buffer_ids .clone() .into_iter() .filter(|id| @@ -502,7 +503,7 @@ impl Resolver { let mut has_been_rerouted = false; for (action, _) in &mut streaming_actions { - let intersection: BTreeSet<_> = action + let intersection: TinySet<_> = action .source_trigger_buffer_ids .intersection(log_destination_buffer_ids) .collect(); @@ -532,7 +533,7 @@ impl Resolver { self.stats.streaming_action_applications.inc(); } - final_log_destination_buffer_ids.append(&mut not_rerouted_buffer_ids); + final_log_destination_buffer_ids.extend(not_rerouted_buffer_ids); has_changed_streaming_actions |= has_been_rerouted; @@ -561,8 +562,8 @@ impl Resolver { /// after this process, an action is left with no trigger buffer IDs, it is dropped. pub(crate) fn standardize_pending_actions( &self, - pending_actions: BTreeSet, - ) -> BTreeSet { + pending_actions: TinySet, + ) -> TinySet { pending_actions .into_iter() .filter_map(|action| { @@ -594,12 +595,12 @@ impl Resolver { streaming_buffers .into_iter() .filter_map(|action| { - let source_trigger_buffer_ids: BTreeSet<_> = action + let source_trigger_buffer_ids: TinySet<_> = action .source_trigger_buffer_ids .intersection(&self.trigger_buffer_ids) .cloned() .collect(); - let destination_continuous_buffer_ids: BTreeSet<_> = action + let destination_continuous_buffer_ids: TinySet<_> = action .destination_continuous_buffer_ids .intersection(&self.continuous_buffer_ids) .cloned() @@ -695,14 +696,14 @@ impl ResolverStats { #[derive(Debug)] pub(crate) struct ResolverConfig { - trigger_buffer_ids: BTreeSet>, - continuous_buffer_ids: BTreeSet>, + trigger_buffer_ids: TinySet>, + continuous_buffer_ids: TinySet>, } impl ResolverConfig { pub(crate) const fn new( - trigger_buffer_ids: BTreeSet>, - continuous_buffer_ids: BTreeSet>, + trigger_buffer_ids: TinySet>, + continuous_buffer_ids: TinySet>, ) -> Self { Self { trigger_buffer_ids, @@ -714,12 +715,11 @@ impl ResolverConfig { // // FlushBuffersActionsProcessingResult // -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq)] pub(crate) struct FlushBuffersActionsProcessingResult<'a> { - pub(crate) new_pending_actions_to_add: BTreeSet, - - pub(crate) triggered_flush_buffers_action_ids: BTreeSet>, - pub(crate) triggered_flushes_buffer_ids: BTreeSet>, + pub(crate) new_pending_actions_to_add: TinySet, + pub(crate) triggered_flush_buffers_action_ids: TinySet>, + pub(crate) triggered_flushes_buffer_ids: TinySet>, } // @@ -728,7 +728,7 @@ pub(crate) struct FlushBuffersActionsProcessingResult<'a> { #[derive(Debug, PartialEq)] pub(crate) struct StreamingBuffersActionsProcessingResult<'a> { - pub(crate) log_destination_buffer_ids: BTreeSet>, + pub(crate) log_destination_buffer_ids: TinySet>, pub(crate) has_changed_streaming_actions: bool, pub(crate) updated_streaming_actions: Vec, } @@ -740,12 +740,12 @@ pub(crate) struct StreamingBuffersActionsProcessingResult<'a> { // The action created by a flush buffer workflow action. This tracks the action while upload intent // negotiation is performed. At that point, it either transitions into a `StreamingBuffersAction` if // streaming was configured for the action. -#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Serialize, Deserialize, PartialEq)] pub(crate) struct PendingFlushBuffersAction { pub(crate) id: FlushBufferId, session_id: String, - trigger_buffer_ids: BTreeSet>, + trigger_buffer_ids: TinySet>, streaming: Option, } @@ -769,8 +769,8 @@ impl PendingFlushBuffersAction { fn new( action: ActionFlushBuffers, session_id: String, - trigger_buffer_ids: &BTreeSet>, - continuous_buffer_ids: &BTreeSet>, + trigger_buffer_ids: &TinySet>, + continuous_buffer_ids: &TinySet>, ) -> Option { log::debug!("evaluating flush buffers action: {action:?}"); @@ -786,7 +786,7 @@ impl PendingFlushBuffersAction { if streaming_proto.destination_continuous_buffer_ids.is_empty() { continuous_buffer_ids .first() - .map_or(BTreeSet::new(), |id| BTreeSet::from([id.clone()])) + .map_or_else(TinySet::default, |id| TinySet::from([id.clone()])) } else { // Make sure that we dismiss invalid (not existing) continuous buffer IDs. continuous_buffer_ids @@ -811,7 +811,7 @@ impl PendingFlushBuffersAction { }) }); - let trigger_buffer_ids: BTreeSet<_> = if action.buffer_ids.is_empty() { + let trigger_buffer_ids: TinySet<_> = if action.buffer_ids.is_empty() { // Empty buffer IDs means that the action should be applied to all buffers. trigger_buffer_ids.clone() } else { @@ -841,9 +841,9 @@ impl PendingFlushBuffersAction { // Streaming // -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub(crate) struct Streaming { - destination_continuous_buffer_ids: BTreeSet>, + destination_continuous_buffer_ids: TinySet>, max_logs_count: Option, } @@ -852,15 +852,15 @@ pub(crate) struct Streaming { // StreamingBuffersAction // -#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Serialize, Deserialize, PartialEq)] // The action created in response to flush buffer actions that were accepted for upload and had a // streaming configuration attached to them. pub(crate) struct StreamingBuffersAction { pub(crate) id: FlushBufferId, session_id: String, - source_trigger_buffer_ids: BTreeSet>, - destination_continuous_buffer_ids: BTreeSet>, + source_trigger_buffer_ids: TinySet>, + destination_continuous_buffer_ids: TinySet>, max_logs_count: Option, @@ -885,7 +885,7 @@ impl std::fmt::Debug for StreamingBuffersAction { impl StreamingBuffersAction { pub(crate) fn new( action: PendingFlushBuffersAction, - continuous_buffer_ids: &BTreeSet>, + continuous_buffer_ids: &TinySet>, ) -> Option { let Some(streaming) = action.streaming else { log::trace!("buffers streaming not activated: no streaming configuration"); @@ -931,7 +931,7 @@ impl StreamingBuffersAction { #[derive(Debug)] pub struct BuffersToFlush { // Unique IDs of buffers to flush. - pub buffer_ids: BTreeSet>, + pub buffer_ids: TinySet>, // Channel to notify the caller that the flush has been completed. pub response_tx: tokio::sync::oneshot::Sender<()>, } diff --git a/bd-workflows/src/actions_flush_buffers_test.rs b/bd-workflows/src/actions_flush_buffers_test.rs index a5f716a3..c5e66cad 100644 --- a/bd-workflows/src/actions_flush_buffers_test.rs +++ b/bd-workflows/src/actions_flush_buffers_test.rs @@ -21,6 +21,7 @@ use bd_api::DataUpload; use bd_api::upload::{IntentDecision, IntentResponse}; use bd_client_stats_store::Collector; use bd_client_stats_store::test::StatsHelper; +use bd_log_primitives::tiny_set::TinySet; use bd_stats_common::labels; use pretty_assertions::assert_eq; use std::borrow::Cow; @@ -82,18 +83,18 @@ impl std::ops::Drop for NegotiatorWrapper { async fn pending_buffers_standardization_removes_references_to_non_existing_trigger_buffers() { let mut resolver = Resolver::new(&Collector::default().scope("test")); resolver.update(ResolverConfig::new( - BTreeSet::from([ + TinySet::from([ "existing_trigger_buffer_id_1".into(), "existing_trigger_buffer_id_2".into(), ]), - BTreeSet::new(), + TinySet::default(), )); - let result = resolver.standardize_pending_actions(BTreeSet::from([ + let result = resolver.standardize_pending_actions(TinySet::from([ PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - trigger_buffer_ids: BTreeSet::from([ + trigger_buffer_ids: TinySet::from([ "existing_trigger_buffer_id_1".into(), "unknown_trigger_buffer_id".into(), ]), @@ -102,26 +103,26 @@ async fn pending_buffers_standardization_removes_references_to_non_existing_trig PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_2".to_string()), session_id: "foo_session_id".to_string(), - trigger_buffer_ids: BTreeSet::from(["unknown_trigger_buffer_id".into()]), + trigger_buffer_ids: TinySet::from(["unknown_trigger_buffer_id".into()]), streaming: None, }, PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_3".to_string()), session_id: "bar_session_id".to_string(), - trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id_2".into()]), + trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id_2".into()]), streaming: Some(Streaming { - destination_continuous_buffer_ids: BTreeSet::from(["unknown_continuous_buffer_id".into()]), + destination_continuous_buffer_ids: TinySet::from(["unknown_continuous_buffer_id".into()]), max_logs_count: Some(10), }), }, ])); assert_eq!( - BTreeSet::from([ + TinySet::from([ PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - trigger_buffer_ids: BTreeSet::from([ + trigger_buffer_ids: TinySet::from([ // The unknown trigger buffer ID present in the original flush buffers action is no // longer present. "existing_trigger_buffer_id_1".into(), @@ -133,15 +134,13 @@ async fn pending_buffers_standardization_removes_references_to_non_existing_trig PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_3".to_string()), session_id: "bar_session_id".to_string(), - trigger_buffer_ids: BTreeSet::from([ + trigger_buffer_ids: TinySet::from([ // The unknown continuous buffer ID present in the original flush buffers action is // no longer present. "existing_trigger_buffer_id_2".into(), ]), streaming: Some(Streaming { - destination_continuous_buffer_ids: BTreeSet::from( - ["unknown_continuous_buffer_id".into()] - ), + destination_continuous_buffer_ids: TinySet::from(["unknown_continuous_buffer_id".into()]), max_logs_count: Some(10), }), }, @@ -154,11 +153,11 @@ async fn pending_buffers_standardization_removes_references_to_non_existing_trig async fn streaming_buffers_standardization_removes_references_to_non_existing_buffers() { let mut resolver = Resolver::new(&Collector::default().scope("test")); resolver.update(ResolverConfig::new( - BTreeSet::from([ + TinySet::from([ "existing_trigger_buffer_id_1".into(), "existing_trigger_buffer_id_2".into(), ]), - BTreeSet::from([ + TinySet::from([ "existing_continuous_buffer_id_1".into(), "existing_continuous_buffer_id_2".into(), ]), @@ -168,8 +167,8 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id_1".into()]), - destination_continuous_buffer_ids: BTreeSet::from([ + source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id_1".into()]), + destination_continuous_buffer_ids: TinySet::from([ "existing_continuous_buffer_id_1".into(), "unknown_continuous_buffer_id".into(), ]), @@ -179,8 +178,8 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_2".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: BTreeSet::from(["unknown_trigger_buffer_id".into()]), - destination_continuous_buffer_ids: BTreeSet::from([ + source_trigger_buffer_ids: TinySet::from(["unknown_trigger_buffer_id".into()]), + destination_continuous_buffer_ids: TinySet::from([ "existing_continuous_buffer_id_1".into(), "unknown_continuous_buffer_id".into(), ]), @@ -190,11 +189,11 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_3".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: BTreeSet::from([ + source_trigger_buffer_ids: TinySet::from([ "existing_trigger_buffer_id_1".into(), "unknown_trigger_buffer_id".into(), ]), - destination_continuous_buffer_ids: BTreeSet::from([ + destination_continuous_buffer_ids: TinySet::from([ "existing_continuous_buffer_id_1".into(), "unknown_continuous_buffer_id".into(), ]), @@ -204,22 +203,22 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_4".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: BTreeSet::from([ + source_trigger_buffer_ids: TinySet::from([ "existing_trigger_buffer_id_1".into(), "unknown_trigger_buffer_id".into(), ]), - destination_continuous_buffer_ids: BTreeSet::from(["unknown_continuous_buffer_id".into()]), + destination_continuous_buffer_ids: TinySet::from(["unknown_continuous_buffer_id".into()]), max_logs_count: Some(10), logs_count: 0, }, StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_5".to_string()), session_id: "bar_session_id".to_string(), - source_trigger_buffer_ids: BTreeSet::from([ + source_trigger_buffer_ids: TinySet::from([ "existing_trigger_buffer_id_1".into(), "unknown_trigger_buffer_id".into(), ]), - destination_continuous_buffer_ids: BTreeSet::from([ + destination_continuous_buffer_ids: TinySet::from([ "existing_continuous_buffer_id_1".into(), "unknown_continuous_buffer_id".into(), ]), @@ -233,8 +232,8 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id_1".into()]), - destination_continuous_buffer_ids: BTreeSet::from([ + source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id_1".into()]), + destination_continuous_buffer_ids: TinySet::from([ "existing_continuous_buffer_id_1".into(), ]), max_logs_count: Some(10), @@ -243,8 +242,8 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_3".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id_1".into(),]), - destination_continuous_buffer_ids: BTreeSet::from([ + source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id_1".into(),]), + destination_continuous_buffer_ids: TinySet::from([ "existing_continuous_buffer_id_1".into(), ]), max_logs_count: Some(10), @@ -253,8 +252,8 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_5".to_string()), session_id: "bar_session_id".to_string(), - source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id_1".into(),]), - destination_continuous_buffer_ids: BTreeSet::from([ + source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id_1".into(),]), + destination_continuous_buffer_ids: TinySet::from([ "existing_continuous_buffer_id_1".into(), ]), max_logs_count: Some(10), @@ -271,11 +270,11 @@ fn process_flush_buffers_actions() { let mut resolver = Resolver::new(&collector.scope("test")); resolver.update(ResolverConfig::new( - BTreeSet::from(["existing_trigger_buffer_id".into()]), - BTreeSet::new(), + TinySet::from(["existing_trigger_buffer_id".into()]), + TinySet::default(), )); - let actions = BTreeSet::from([ + let actions = TinySet::from([ ActionFlushBuffers { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".to_string()]), @@ -301,17 +300,17 @@ fn process_flush_buffers_actions() { let result = resolver.process_flush_buffer_actions( actions.iter().map(Cow::Borrowed).collect(), "foo_session_id", - &BTreeSet::from([PendingFlushBuffersAction { + &TinySet::from([PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_2".to_string()), session_id: "foo_session_id".to_string(), - trigger_buffer_ids: BTreeSet::new(), + trigger_buffer_ids: TinySet::default(), streaming: None, }]), &[StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_3".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into()]), - destination_continuous_buffer_ids: BTreeSet::new(), + source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into()]), + destination_continuous_buffer_ids: TinySet::default(), max_logs_count: Some(10), logs_count: 0, }], @@ -319,19 +318,19 @@ fn process_flush_buffers_actions() { assert_eq!( FlushBuffersActionsProcessingResult { - new_pending_actions_to_add: BTreeSet::from([PendingFlushBuffersAction { + new_pending_actions_to_add: TinySet::from([PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into(),]), + trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into(),]), streaming: None, }]), - triggered_flush_buffers_action_ids: BTreeSet::from([ + triggered_flush_buffers_action_ids: TinySet::from([ Cow::Owned(FlushBufferId::WorkflowActionId("action_id_1".into())), Cow::Owned(FlushBufferId::WorkflowActionId("action_id_2".into())), Cow::Owned(FlushBufferId::WorkflowActionId("action_id_3".into())), Cow::Owned(FlushBufferId::WorkflowActionId("action_id_4".into())), ]), - triggered_flushes_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into(),]) + triggered_flushes_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into()]) }, result ); @@ -364,11 +363,11 @@ fn process_flush_buffer_action_with_no_buffers() { let mut resolver = Resolver::new(&collector.scope("test")); resolver.update(ResolverConfig::new( - BTreeSet::from(["existing_trigger_buffer_id".into()]), - BTreeSet::from(["existing_continuous_buffer_id".into()]), + TinySet::from(["existing_trigger_buffer_id".into()]), + TinySet::from(["existing_continuous_buffer_id".into()]), )); - let actions = BTreeSet::from([ActionFlushBuffers { + let actions = TinySet::from([ActionFlushBuffers { id: FlushBufferId::WorkflowActionId("action_id".to_string()), buffer_ids: BTreeSet::new(), streaming: Some(crate::config::Streaming { @@ -380,27 +379,27 @@ fn process_flush_buffer_action_with_no_buffers() { let result = resolver.process_flush_buffer_actions( actions.iter().map(Cow::Borrowed).collect(), "foo_session_id", - &BTreeSet::new(), + &TinySet::default(), &[], ); assert_eq!( FlushBuffersActionsProcessingResult { - new_pending_actions_to_add: BTreeSet::from([PendingFlushBuffersAction { + new_pending_actions_to_add: TinySet::from([PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id".to_string()), session_id: "foo_session_id".to_string(), - trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into(),]), + trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into(),]), streaming: Some(Streaming { - destination_continuous_buffer_ids: BTreeSet::from([ - "existing_continuous_buffer_id".into() - ]), + destination_continuous_buffer_ids: TinySet::from( + ["existing_continuous_buffer_id".into()] + ), max_logs_count: Some(10), }), }]), - triggered_flush_buffers_action_ids: BTreeSet::from([Cow::Owned( + triggered_flush_buffers_action_ids: TinySet::from([Cow::Owned( FlushBufferId::WorkflowActionId("action_id".into()) )]), - triggered_flushes_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into(),]) + triggered_flushes_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into(),]) }, result ); @@ -412,8 +411,8 @@ fn process_streaming_buffers_actions() { let mut resolver = Resolver::new(&collector.scope("test")); resolver.update(ResolverConfig::new( - BTreeSet::from(["existing_trigger_buffer_id".into()]), - BTreeSet::from(["existing_continuous_buffer_id".into()]), + TinySet::from(["existing_trigger_buffer_id".into()]), + TinySet::from(["existing_continuous_buffer_id".into()]), )); let result = resolver.process_streaming_actions( @@ -422,8 +421,8 @@ fn process_streaming_buffers_actions() { StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into()]), - destination_continuous_buffer_ids: BTreeSet::from(["continuous_buffer_id".into()]), + source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into()]), + destination_continuous_buffer_ids: TinySet::from(["continuous_buffer_id".into()]), max_logs_count: Some(10), logs_count: 0, }, @@ -433,27 +432,27 @@ fn process_streaming_buffers_actions() { StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_2".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into()]), - destination_continuous_buffer_ids: BTreeSet::from(["continuous_buffer_id".into()]), + source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into()]), + destination_continuous_buffer_ids: TinySet::from(["continuous_buffer_id".into()]), max_logs_count: Some(10), logs_count: 10, }, true, ), ], - &BTreeSet::from(["existing_trigger_buffer_id".into()]), + &TinySet::from(["existing_trigger_buffer_id".into()]), "foo_session_id", ); assert_eq!( StreamingBuffersActionsProcessingResult { - log_destination_buffer_ids: BTreeSet::from(["continuous_buffer_id".into()]), + log_destination_buffer_ids: TinySet::from(["continuous_buffer_id".into()]), has_changed_streaming_actions: true, updated_streaming_actions: vec![StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into()]), - destination_continuous_buffer_ids: BTreeSet::from(["continuous_buffer_id".into()]), + source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into()]), + destination_continuous_buffer_ids: TinySet::from(["continuous_buffer_id".into()]), max_logs_count: Some(10), logs_count: 1, },], @@ -484,7 +483,7 @@ async fn negotiator_upload_flow() { let pending_action = PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id".to_string()), session_id: "session_id".to_string(), - trigger_buffer_ids: BTreeSet::new(), + trigger_buffer_ids: TinySet::default(), streaming: None, }; @@ -544,7 +543,7 @@ async fn negotiator_drop_flow() { let pending_action = PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id".to_string()), session_id: "session_id".to_string(), - trigger_buffer_ids: BTreeSet::new(), + trigger_buffer_ids: TinySet::default(), streaming: None, }; diff --git a/bd-workflows/src/engine.rs b/bd-workflows/src/engine.rs index 09b9546f..3d5eaa02 100644 --- a/bd-workflows/src/engine.rs +++ b/bd-workflows/src/engine.rs @@ -43,6 +43,7 @@ use bd_client_common::file::{read_compressed, write_compressed}; use bd_client_stats::Stats; use bd_client_stats_store::{Counter, Histogram, Scope}; use bd_error_reporter::reporter::handle_unexpected; +use bd_log_primitives::tiny_set::{TinyMap, TinySet}; use bd_log_primitives::{Log, LogRef}; use bd_runtime::runtime::workflows::PersistenceWriteIntervalFlag; use bd_runtime::runtime::{ConfigLoader, DurationWatch, IntWatch, session_capture}; @@ -50,7 +51,7 @@ use bd_stats_common::labels; use bd_stats_common::workflow::WorkflowDebugKey; use serde::{Deserialize, Serialize}; use std::borrow::Cow; -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::collections::{BTreeSet, HashMap}; use std::path::{Path, PathBuf}; use std::sync::Arc; use time::OffsetDateTime; @@ -184,7 +185,7 @@ impl WorkflowsEngine { self.add_workflows(config.workflows_configuration.workflows, None); } - for action in &self.state.pending_flush_actions { + for action in self.state.pending_flush_actions.iter() { if let Err(e) = self .flush_buffers_negotiator_input_tx .try_send(action.clone()) @@ -534,7 +535,7 @@ impl WorkflowsEngine { pub fn process_log<'a>( &'a mut self, log: &LogRef<'_>, - log_destination_buffer_ids: &'a BTreeSet>, + log_destination_buffer_ids: &'a TinySet>, now: OffsetDateTime, ) -> WorkflowsEngineResult<'a> { // Measure duration in here even if the list of workflows is empty. @@ -583,20 +584,20 @@ impl WorkflowsEngine { { return WorkflowsEngineResult { log_destination_buffer_ids: Cow::Borrowed(log_destination_buffer_ids), - triggered_flushes_buffer_ids: BTreeSet::new(), - triggered_flush_buffers_action_ids: BTreeSet::new(), + triggered_flushes_buffer_ids: TinySet::default(), + triggered_flush_buffers_action_ids: TinySet::default(), capture_screenshot: false, - logs_to_inject: BTreeMap::new(), workflow_debug_state: vec![], has_debug_workflows: false, + logs_to_inject: TinyMap::default(), }; } let mut actions: Vec> = vec![]; - let mut logs_to_inject: BTreeMap<&'a str, Log> = BTreeMap::new(); let mut all_cumulative_workflow_debug_state = vec![]; let mut all_incremental_workflow_debug_state = vec![]; let mut has_debug_workflows = false; + let mut logs_to_inject: TinyMap<&'a str, Log> = TinyMap::default(); for (index, workflow) in &mut self.state.workflows.iter_mut().enumerate() { let Some(config) = self.configs.get(index) else { continue; @@ -792,7 +793,7 @@ impl WorkflowsEngine { return PreparedActions::default(); } - let flush_buffers_actions: BTreeSet> = actions + let flush_buffers_actions: TinySet> = actions .iter() .filter_map(|action| { if let TriggeredAction::FlushBuffers(flush_buffers_action) = action { @@ -803,7 +804,7 @@ impl WorkflowsEngine { }) .collect(); - let emit_metric_actions: BTreeSet<&ActionEmitMetric> = actions + let emit_metric_actions: TinySet<&ActionEmitMetric> = actions .iter() .filter_map(|action| { if let TriggeredAction::EmitMetric(emit_metric_action) = action { @@ -826,7 +827,7 @@ impl WorkflowsEngine { }) .collect(); - let emit_sankey_diagrams_actions: BTreeSet> = actions + let emit_sankey_diagrams_actions: TinySet> = actions .into_iter() .filter_map(|action| { if let TriggeredAction::SankeyDiagram(action) = action { @@ -856,10 +857,10 @@ impl Drop for WorkflowsEngine { #[derive(Default)] struct PreparedActions<'a> { - flush_buffers_actions: BTreeSet>, - emit_metric_actions: BTreeSet<&'a ActionEmitMetric>, - emit_sankey_diagrams_actions: BTreeSet>, - capture_screenshot_actions: BTreeSet<&'a ActionTakeScreenshot>, + flush_buffers_actions: TinySet>, + emit_metric_actions: TinySet<&'a ActionEmitMetric>, + emit_sankey_diagrams_actions: TinySet>, + capture_screenshot_actions: TinySet<&'a ActionTakeScreenshot>, } // @@ -868,18 +869,18 @@ struct PreparedActions<'a> { #[derive(Debug, PartialEq, Eq)] pub struct WorkflowsEngineResult<'a> { - pub log_destination_buffer_ids: Cow<'a, BTreeSet>>, + pub log_destination_buffer_ids: Cow<'a, TinySet>>, // The identifier of workflow actions that triggered buffers flush(es). - pub triggered_flush_buffers_action_ids: BTreeSet>, + pub triggered_flush_buffers_action_ids: TinySet>, // The identifier of trigger buffers that should be flushed. - pub triggered_flushes_buffer_ids: BTreeSet>, + pub triggered_flushes_buffer_ids: TinySet>, // Whether a screenshot should be taken in response to processing the log. pub capture_screenshot: bool, // Logs to be injected back into the workflow engine after field attachment and other processing. - pub logs_to_inject: BTreeMap<&'a str, Log>, + pub logs_to_inject: TinyMap<&'a str, Log>, // If any workflows have debugging enabled, this will contain the *cumulative* debug state since // debugging started for each workflow. The state is persisted in the workflow state file to @@ -900,16 +901,16 @@ pub type AllWorkflowsDebugState = Vec<(String, WorkflowDebugStateMap)>; pub struct WorkflowsEngineConfig { pub(crate) workflows_configuration: WorkflowsConfiguration, - pub(crate) trigger_buffer_ids: BTreeSet>, - pub(crate) continuous_buffer_ids: BTreeSet>, + pub(crate) trigger_buffer_ids: TinySet>, + pub(crate) continuous_buffer_ids: TinySet>, } impl WorkflowsEngineConfig { #[must_use] pub const fn new( workflows_configuration: WorkflowsConfiguration, - trigger_buffer_ids: BTreeSet>, - continuous_buffer_ids: BTreeSet>, + trigger_buffer_ids: TinySet>, + continuous_buffer_ids: TinySet>, ) -> Self { Self { workflows_configuration, @@ -920,11 +921,11 @@ impl WorkflowsEngineConfig { #[cfg(test)] #[must_use] - pub const fn new_with_workflow_configurations(workflow_configs: Vec) -> Self { + pub fn new_with_workflow_configurations(workflow_configs: Vec) -> Self { Self::new( WorkflowsConfiguration::new_with_workflow_configurations_for_test(workflow_configs), - BTreeSet::new(), - BTreeSet::new(), + TinySet::default(), + TinySet::default(), ) } } @@ -1082,7 +1083,7 @@ pub(crate) struct WorkflowsState { session_id: String, workflows: Vec, - pending_flush_actions: BTreeSet, + pending_flush_actions: TinySet, pending_sankey_actions: BTreeSet, streaming_actions: Vec, } diff --git a/bd-workflows/src/engine_test.rs b/bd-workflows/src/engine_test.rs index 5a589298..02ced9cf 100644 --- a/bd-workflows/src/engine_test.rs +++ b/bd-workflows/src/engine_test.rs @@ -19,6 +19,7 @@ use bd_client_stats::Stats; use bd_client_stats_store::Collector; use bd_client_stats_store::test::StatsHelper; use bd_error_reporter::reporter::{Reporter, UnexpectedErrorHandler}; +use bd_log_primitives::tiny_set::{TinyMap, TinySet}; use bd_log_primitives::{FieldsRef, Log, LogFields, LogMessage, LogRef, log_level}; use bd_proto::flatbuffers::buffer_log::bitdrift_public::fbs::logging::v_1::LogType; use bd_proto::protos::client::api::log_upload_intent_request::Intent_type::WorkflowActionUpload; @@ -54,7 +55,7 @@ use bd_time::TimeDurationExt; use itertools::Itertools; use pretty_assertions::assert_eq; use std::borrow::Cow; -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -219,7 +220,7 @@ struct AnnotatedWorkflowsEngine { engine: WorkflowsEngine, session_id: String, - log_destination_buffer_ids: BTreeSet>, + log_destination_buffer_ids: TinySet>, hooks: Arc>, @@ -239,7 +240,7 @@ impl AnnotatedWorkflowsEngine { engine, session_id: "foo_session".to_string(), - log_destination_buffer_ids: BTreeSet::new(), + log_destination_buffer_ids: TinySet::default(), hooks, @@ -364,7 +365,7 @@ impl AnnotatedWorkflowsEngine { }) } - fn flushed_buffers(&self) -> Vec>> { + fn flushed_buffers(&self) -> Vec>> { self .hooks .lock() @@ -942,8 +943,8 @@ async fn engine_update_after_sdk_update() { WorkflowBuilder::new("2", &[&c, &d]).make_config(), WorkflowBuilder::new("1", &[&a, &b]).make_config(), ]), - BTreeSet::from(["trigger_buffer_id".into()]), - BTreeSet::from(["continuous_buffer_id".into()]), + TinySet::from(["trigger_buffer_id".into()]), + TinySet::from(["continuous_buffer_id".into()]), ); let setup = Setup::new(); @@ -976,8 +977,8 @@ async fn engine_update_after_sdk_update() { WorkflowsConfiguration::new_with_workflow_configurations_for_test(vec![ WorkflowBuilder::new("1", &[&a, &b]).make_config(), ]), - BTreeSet::from(["trigger_buffer_id".into()]), - BTreeSet::from(["continuous_buffer_id".into()]), + TinySet::from(["trigger_buffer_id".into()]), + TinySet::from(["continuous_buffer_id".into()]), )); assert_eq!(workflows_engine.state.workflows.len(), 1); @@ -1624,7 +1625,7 @@ async fn ignore_persisted_state_if_invalid_dir() { occurred_at: OffsetDateTime::now_utc(), capture_session: None, }, - &BTreeSet::new(), + &TinySet::default(), OffsetDateTime::now_utc(), ); @@ -1698,27 +1699,27 @@ async fn engine_processing_log() { let mut workflows_engine = setup .make_workflows_engine(WorkflowsEngineConfig::new( WorkflowsConfiguration::new_with_workflow_configurations_for_test(workflows), - BTreeSet::from(["foo_buffer_id".into()]), - BTreeSet::new(), + TinySet::from(["foo_buffer_id".into()]), + TinySet::default(), )) .await; // * Two workflows are created in response to a passed workflows config. // * One run is created for each of the created workflows. // * Each workflow run advances from their initial to final state in response to "foo" log. - workflows_engine.log_destination_buffer_ids = BTreeSet::from(["foo_buffer_id".into()]); + workflows_engine.log_destination_buffer_ids = TinySet::from(["foo_buffer_id".into()]); let result = workflows_engine.process_log(TestLog::new("foo")); assert_eq!( WorkflowsEngineResult { - log_destination_buffer_ids: Cow::Owned(BTreeSet::from(["foo_buffer_id".into()])), - triggered_flush_buffers_action_ids: BTreeSet::from([Cow::Owned( + log_destination_buffer_ids: Cow::Owned(TinySet::from(["foo_buffer_id".into()])), + triggered_flush_buffers_action_ids: TinySet::from([Cow::Owned( FlushBufferId::WorkflowActionId("foo_action_id".into()) ),]), - triggered_flushes_buffer_ids: BTreeSet::from(["foo_buffer_id".into()]), + triggered_flushes_buffer_ids: TinySet::from(["foo_buffer_id".into()]), capture_screenshot: false, - logs_to_inject: BTreeMap::new(), workflow_debug_state: vec![], has_debug_workflows: false, + logs_to_inject: TinyMap::default(), }, result ); @@ -1814,28 +1815,28 @@ async fn log_without_destination() { WorkflowsConfiguration::new_with_workflow_configurations_for_test(vec![ WorkflowBuilder::new("1", &[&a, &b]).make_config(), ]), - BTreeSet::from(["trigger_buffer_id".into()]), - BTreeSet::from(["continuous_buffer_id".into()]), + TinySet::from(["trigger_buffer_id".into()]), + TinySet::from(["continuous_buffer_id".into()]), ); let setup = Setup::new(); let mut workflows_engine = setup.make_workflows_engine(workflows_engine_config).await; - workflows_engine.log_destination_buffer_ids = BTreeSet::new(); + workflows_engine.log_destination_buffer_ids = TinySet::default(); let result = workflows_engine.process_log(TestLog::new("foo")); assert_eq!( WorkflowsEngineResult { - log_destination_buffer_ids: Cow::Owned(BTreeSet::new()), - triggered_flush_buffers_action_ids: BTreeSet::from([Cow::Owned( + log_destination_buffer_ids: Cow::Owned(TinySet::default()), + triggered_flush_buffers_action_ids: TinySet::from([Cow::Owned( FlushBufferId::WorkflowActionId("action".into()) ),]), - triggered_flushes_buffer_ids: BTreeSet::from(["trigger_buffer_id".into()]), + triggered_flushes_buffer_ids: TinySet::from(["trigger_buffer_id".into()]), capture_screenshot: false, - logs_to_inject: BTreeMap::new(), workflow_debug_state: vec![], has_debug_workflows: false, + logs_to_inject: TinyMap::default(), }, result ); @@ -1921,8 +1922,8 @@ async fn logs_streaming() { WorkflowsConfiguration::new_with_workflow_configurations_for_test(vec![ WorkflowBuilder::new("1", &[&a, &b, &c, &d, &e, &f, &g, &h]).make_config(), ]), - BTreeSet::from(["trigger_buffer_id".into()]), - BTreeSet::from([ + TinySet::from(["trigger_buffer_id".into()]), + TinySet::from([ "continuous_buffer_id_1".into(), "continuous_buffer_id_2".into(), ]), @@ -1933,7 +1934,7 @@ async fn logs_streaming() { let mut workflows_engine = setup .make_workflows_engine(workflows_engine_config.clone()) .await; - workflows_engine.log_destination_buffer_ids = BTreeSet::from(["trigger_buffer_id".into()]); + workflows_engine.log_destination_buffer_ids = TinySet::from(["trigger_buffer_id".into()]); // Emit four logs that results in four flushes of the buffer(s). // The logs upload intents for the first two buffer flushes are processed soon immediately after @@ -1951,7 +1952,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("immediate_drop")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) + Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) ); // Allow the engine to perform logs upload intent and process the response to it (upload @@ -1981,7 +1982,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("immediate_upload_no_streaming")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) + Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) ); // Allow the engine to perform logs upload intent and process the response to it (upload @@ -2003,7 +2004,7 @@ async fn logs_streaming() { ); assert_eq!( workflows_engine.flushed_buffers(), - vec![BTreeSet::from(["trigger_buffer_id".into()])], + vec![TinySet::from(["trigger_buffer_id".into()])], ); setup.collector.assert_counter_eq( @@ -2022,7 +2023,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("immediate_upload_streaming")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) + Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) ); // Allow the engine to perform logs upload intent and process the response to it (upload @@ -2049,8 +2050,8 @@ async fn logs_streaming() { assert_eq!( workflows_engine.flushed_buffers(), vec![ - BTreeSet::from(["trigger_buffer_id".into()]), - BTreeSet::from(["trigger_buffer_id".into()]) + TinySet::from(["trigger_buffer_id".into()]), + TinySet::from(["trigger_buffer_id".into()]) ], ); @@ -2074,7 +2075,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("relaunch_upload_no_streaming")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["continuous_buffer_id_2".into()])) + Cow::Owned(TinySet::from(["continuous_buffer_id_2".into()])) ); // The resulting flush buffer action should be ignored as the same flush buffer action was @@ -2082,7 +2083,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("relaunch_upload_no_streaming")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["continuous_buffer_id_2".into()])) + Cow::Owned(TinySet::from(["continuous_buffer_id_2".into()])) ); // This should trigger a flush of a buffer that's followed by logs streaming to continuous log @@ -2092,7 +2093,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("relaunch_upload_streaming")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["continuous_buffer_id_2".into()])) + Cow::Owned(TinySet::from(["continuous_buffer_id_2".into()])) ); // Confirm that the state of the workflows engine is as expected prior to engine's shutdown. @@ -2113,7 +2114,7 @@ async fn logs_streaming() { let setup = Setup::new_with_sdk_directory(&setup.sdk_directory); let mut workflows_engine = setup.make_workflows_engine(workflows_engine_config).await; - workflows_engine.log_destination_buffer_ids = BTreeSet::from(["trigger_buffer_id".into()]); + workflows_engine.log_destination_buffer_ids = TinySet::from(["trigger_buffer_id".into()]); workflows_engine.set_awaiting_logs_upload_intent_decisions(vec![ IntentDecision::UploadImmediately, @@ -2123,7 +2124,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("test log")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["continuous_buffer_id_2".into()])) + Cow::Owned(TinySet::from(["continuous_buffer_id_2".into()])) ); // Allow the engine to perform logs upload intent and process the response to it (upload @@ -2145,7 +2146,7 @@ async fn logs_streaming() { ); assert_eq!( workflows_engine.flushed_buffers(), - vec![BTreeSet::from(["trigger_buffer_id".into()])], + vec![TinySet::from(["trigger_buffer_id".into()])], ); setup.collector.assert_counter_eq( @@ -2157,7 +2158,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("test log")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["continuous_buffer_id_2".into()])) + Cow::Owned(TinySet::from(["continuous_buffer_id_2".into()])) ); // Allow the engine to perform logs upload intent and process the response to it (upload @@ -2180,8 +2181,8 @@ async fn logs_streaming() { assert_eq!( workflows_engine.flushed_buffers(), vec![ - BTreeSet::from(["trigger_buffer_id".into()]), - BTreeSet::from(["trigger_buffer_id".into()]) + TinySet::from(["trigger_buffer_id".into()]), + TinySet::from(["trigger_buffer_id".into()]) ], ); @@ -2190,9 +2191,9 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("relaunch_upload_streaming")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from([ + Cow::Owned(TinySet::from([ + "continuous_buffer_id_2".into(), "continuous_buffer_id_1".into(), - "continuous_buffer_id_2".into() ])) ); @@ -2214,8 +2215,8 @@ async fn logs_streaming() { assert_eq!( workflows_engine.flushed_buffers(), vec![ - BTreeSet::from(["trigger_buffer_id".into()]), - BTreeSet::from(["trigger_buffer_id".into()]) + TinySet::from(["trigger_buffer_id".into()]), + TinySet::from(["trigger_buffer_id".into()]) ], ); workflows_engine.complete_flushes(); @@ -2232,7 +2233,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("test log")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) + Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) ); assert!(workflows_engine.state.pending_flush_actions.is_empty()); @@ -2250,8 +2251,8 @@ async fn engine_tracks_new_sessions() { let workflows_engine_config = WorkflowsEngineConfig::new( WorkflowsConfiguration::new_with_workflow_configurations_for_test(vec![]), - BTreeSet::from(["trigger_buffer_id".into()]), - BTreeSet::from(["continuous_buffer_id".into()]), + TinySet::from(["trigger_buffer_id".into()]), + TinySet::from(["continuous_buffer_id".into()]), ); let mut workflows_engine = setup @@ -2293,14 +2294,14 @@ async fn engine_does_not_purge_pending_actions_on_session_id_change() { WorkflowsConfiguration::new_with_workflow_configurations_for_test(vec![ WorkflowBuilder::new("1", &[&a, &b, &c]).make_config(), ]), - BTreeSet::from(["trigger_buffer_id".into()]), - BTreeSet::from(["continuous_buffer_id".into()]), + TinySet::from(["trigger_buffer_id".into()]), + TinySet::from(["continuous_buffer_id".into()]), ); let mut workflows_engine = setup .make_workflows_engine(workflows_engine_config.clone()) .await; - workflows_engine.log_destination_buffer_ids = BTreeSet::from(["trigger_buffer_id".into()]); + workflows_engine.log_destination_buffer_ids = TinySet::from(["trigger_buffer_id".into()]); // Set up no responses so that the actions continue to wait for the server's response. workflows_engine.set_awaiting_logs_upload_intent_decisions(vec![]); @@ -2309,7 +2310,7 @@ async fn engine_does_not_purge_pending_actions_on_session_id_change() { let result = workflows_engine.process_log(TestLog::new("foo")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) + Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) ); // The log below doesn't trigger a buffer flush, but it's emitted with a new session ID, which @@ -2319,7 +2320,7 @@ async fn engine_does_not_purge_pending_actions_on_session_id_change() { let result = workflows_engine.process_log(TestLog::new("not triggering")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) + Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) ); // Confirm that the pending action was not cleaned up. @@ -2334,7 +2335,7 @@ async fn engine_does_not_purge_pending_actions_on_session_id_change() { let mut workflows_engine = setup.make_workflows_engine(workflows_engine_config).await; workflows_engine.session_id = "new session ID".to_string(); - workflows_engine.log_destination_buffer_ids = BTreeSet::from(["trigger_buffer_id".into()]); + workflows_engine.log_destination_buffer_ids = TinySet::from(["trigger_buffer_id".into()]); workflows_engine .set_awaiting_logs_upload_intent_decisions(vec![IntentDecision::UploadImmediately]); @@ -2350,14 +2351,14 @@ async fn engine_does_not_purge_pending_actions_on_session_id_change() { ); assert_eq!( workflows_engine.flushed_buffers(), - vec![BTreeSet::from(["trigger_buffer_id".into()])], + vec![TinySet::from(["trigger_buffer_id".into()])], ); workflows_engine.complete_flushes(); let result = workflows_engine.process_log(TestLog::new("not triggering")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) + Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) ); setup.collector.assert_counter_eq( @@ -2397,14 +2398,14 @@ async fn engine_continues_to_stream_upload_not_complete() { WorkflowsConfiguration::new_with_workflow_configurations_for_test(vec![ WorkflowBuilder::new("1", &[&a, &b, &c]).make_config(), ]), - BTreeSet::from(["trigger_buffer_id".into()]), - BTreeSet::from(["continuous_buffer_id".into()]), + TinySet::from(["trigger_buffer_id".into()]), + TinySet::from(["continuous_buffer_id".into()]), ); let mut workflows_engine = setup .make_workflows_engine(workflows_engine_config.clone()) .await; - workflows_engine.log_destination_buffer_ids = BTreeSet::from(["trigger_buffer_id".into()]); + workflows_engine.log_destination_buffer_ids = TinySet::from(["trigger_buffer_id".into()]); // Allow the intent to go through which should trigger an upload. workflows_engine @@ -2414,7 +2415,7 @@ async fn engine_continues_to_stream_upload_not_complete() { let result = workflows_engine.process_log(TestLog::new("foo")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) + Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) ); log::info!("Running the engine for the first time."); @@ -2430,14 +2431,14 @@ async fn engine_continues_to_stream_upload_not_complete() { ); assert_eq!( workflows_engine.flushed_buffers(), - vec![BTreeSet::from(["trigger_buffer_id".into()])], + vec![TinySet::from(["trigger_buffer_id".into()])], ); // Verify that we have transitioned to streaming. let result = workflows_engine.process_log(TestLog::new("streamed")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["continuous_buffer_id".into()])) + Cow::Owned(TinySet::from(["continuous_buffer_id".into()])) ); // Change the session. This would typically cause the engine to stop streaming, but we haven't @@ -2446,7 +2447,7 @@ async fn engine_continues_to_stream_upload_not_complete() { let result = workflows_engine.process_log(TestLog::new("streamed")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["continuous_buffer_id".into()])) + Cow::Owned(TinySet::from(["continuous_buffer_id".into()])) ); workflows_engine.complete_flushes(); @@ -2456,7 +2457,7 @@ async fn engine_continues_to_stream_upload_not_complete() { let result = workflows_engine.process_log(TestLog::new("not streamed")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) + Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) ); setup.collector.assert_counter_eq( diff --git a/bd-workflows/src/generate_log.rs b/bd-workflows/src/generate_log.rs index 5f9d7677..24910c80 100644 --- a/bd-workflows/src/generate_log.rs +++ b/bd-workflows/src/generate_log.rs @@ -55,13 +55,11 @@ fn resolve_reference<'a>( .map(StringOrFloat::String), Value_reference_type::SavedFieldId(saved_field_id) => extractions .fields - .as_ref() - .and_then(|fields| fields.get(saved_field_id)) + .get(saved_field_id) .map(|field| StringOrFloat::String(field.into())), Value_reference_type::SavedTimestampId(saved_timestamp_id) => extractions .timestamps - .as_ref() - .and_then(|timestamps| timestamps.get(saved_timestamp_id)) + .get(saved_timestamp_id) .map(|timestamp| StringOrFloat::Float(fractional_milliseconds_since_epoch(*timestamp))), Value_reference_type::Uuid(_) => Some(StringOrFloat::String(Uuid::new_v4().to_string().into())), } diff --git a/bd-workflows/src/generate_log_test.rs b/bd-workflows/src/generate_log_test.rs index f3d4c19e..53cb541a 100644 --- a/bd-workflows/src/generate_log_test.rs +++ b/bd-workflows/src/generate_log_test.rs @@ -70,7 +70,6 @@ impl Helper { self .extractions .timestamps - .get_or_insert_default() .insert(id.to_string(), timestamp); } @@ -78,7 +77,6 @@ impl Helper { self .extractions .fields - .get_or_insert_default() .insert(id.to_string(), value.to_string()); } diff --git a/bd-workflows/src/metrics.rs b/bd-workflows/src/metrics.rs index 02099070..e695e35f 100644 --- a/bd-workflows/src/metrics.rs +++ b/bd-workflows/src/metrics.rs @@ -13,9 +13,10 @@ use crate::config::{ActionEmitMetric, TagValue}; use crate::workflow::TriggeredActionEmitSankey; use bd_client_stats::Stats; use bd_log_primitives::LogRef; +use bd_log_primitives::tiny_set::TinySet; use bd_stats_common::MetricType; use std::borrow::Cow; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::sync::Arc; // @@ -32,11 +33,11 @@ impl MetricsCollector { Self { stats } } - pub(crate) fn emit_metrics(&self, actions: &BTreeSet<&ActionEmitMetric>, log: &LogRef<'_>) { + pub(crate) fn emit_metrics(&self, actions: &TinySet<&ActionEmitMetric>, log: &LogRef<'_>) { // TODO(Augustyniak): We dedupe stats in here too only when both their tags and the value of // If `counter_increment` values are identical, consider deduping metrics even if their // `counter_increment` fields have different values. - for action in actions { + for action in actions.iter() { let tags = Self::extract_tags(log, &action.tags); #[allow(clippy::cast_precision_loss)] @@ -76,10 +77,10 @@ impl MetricsCollector { pub(crate) fn emit_sankeys( &self, - actions: &BTreeSet>, + actions: &TinySet>, log: &LogRef<'_>, ) { - for action in actions { + for action in actions.iter() { let mut tags = Self::extract_tags(log, action.action.tags()); tags.insert("_path_id".to_string(), action.path.path_id.to_string()); diff --git a/bd-workflows/src/workflow.rs b/bd-workflows/src/workflow.rs index c51b4eab..fa0a85e9 100644 --- a/bd-workflows/src/workflow.rs +++ b/bd-workflows/src/workflow.rs @@ -20,13 +20,14 @@ use crate::config::{ WorkflowDebugMode, }; use crate::generate_log::generate_log_action; +use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{FieldsRef, Log, LogRef}; use bd_stats_common::workflow::{WorkflowDebugStateKey, WorkflowDebugTransitionType}; use bd_time::OffsetDateTimeExt; use itertools::Itertools; use serde::{Deserialize, Serialize}; use sha2::Digest; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::time::SystemTime; use time::OffsetDateTime; @@ -338,7 +339,7 @@ impl Workflow { #[derive(Debug, Default, PartialEq)] pub(crate) struct WorkflowResult<'a> { triggered_actions: Vec>, - logs_to_inject: BTreeMap<&'a str, Log>, + logs_to_inject: TinyMap<&'a str, Log>, stats: WorkflowResultStats, // Persisted workflow debug state. This is only used for live debugging. Global debugging is // persisted as part of stats snapshots. @@ -352,7 +353,7 @@ impl<'a> WorkflowResult<'a> { self, ) -> ( Vec>, - BTreeMap<&'a str, Log>, + TinyMap<&'a str, Log>, OptWorkflowDebugStateMap, Vec, ) { @@ -559,7 +560,7 @@ impl Run { // the most common situation. let mut run_triggered_actions = Vec::>::new(); - let mut run_logs_to_inject = BTreeMap::<&'a str, Log>::new(); + let mut run_logs_to_inject = TinyMap::<&'a str, Log>::default(); let mut run_matched_logs_count = 0; let mut run_processed_timeout = false; let mut workflow_debug_state = Vec::new(); @@ -597,8 +598,8 @@ impl Run { triggered_actions: vec![], matched_logs_count: run_matched_logs_count, processed_timeout: run_processed_timeout, - logs_to_inject: BTreeMap::new(), workflow_debug_state, + logs_to_inject: TinyMap::default(), }; } } @@ -620,8 +621,8 @@ impl Run { triggered_actions: vec![], matched_logs_count: run_matched_logs_count, processed_timeout: run_processed_timeout, - logs_to_inject: BTreeMap::new(), workflow_debug_state, + logs_to_inject: TinyMap::default(), }; } }, @@ -739,7 +740,7 @@ pub(crate) struct RunResult<'a> { processed_timeout: bool, /// Logs to be injected back into the workflow engine after field attachment and other /// processing. - logs_to_inject: BTreeMap<&'a str, Log>, + logs_to_inject: TinyMap<&'a str, Log>, /// Any debug state changes that occurred as a result of processing the traversal. workflow_debug_state: Vec, } @@ -759,11 +760,11 @@ impl RunResult<'_> { pub(crate) struct TraversalExtractions { /// States of Sankey diagrams. It's a `None` when traversal is initialized and is set /// to `Some` after the first value for a Sankey and a given traversal is extracted. - pub(crate) sankey_states: Option>, + pub(crate) sankey_states: TinyMap, /// Snapped timestamps, by extraction ID. - pub(crate) timestamps: Option>, + pub(crate) timestamps: TinyMap, /// Snapped field values, by extraction ID. - pub(crate) fields: Option>, + pub(crate) fields: TinyMap, } // @@ -909,7 +910,7 @@ impl Traversal { log.log_type, log.message, log.fields, - self.extractions.fields.as_ref(), + &self.extractions.fields, ) { let Some(matched_logs_counts) = self.matched_logs_counts.get_mut(index) else { continue; @@ -1012,9 +1013,7 @@ impl Traversal { new_extractions .sankey_states - .get_or_insert_with(BTreeMap::new) - .entry(extraction.sankey_id.clone()) - .or_default() + .get_mut_or_insert_default(extraction.sankey_id.clone()) .push( extracted_value.into_owned(), extraction.limit, @@ -1027,7 +1026,6 @@ impl Traversal { log::debug!("extracted timestamp {timestamp} for extraction ID {timestamp_extraction_id}"); new_extractions .timestamps - .get_or_insert_with(BTreeMap::new) .insert(timestamp_extraction_id.clone(), timestamp); } @@ -1040,7 +1038,6 @@ impl Traversal { ); new_extractions .fields - .get_or_insert_with(BTreeMap::new) .insert(extraction.id.clone(), value.to_string()); } } @@ -1052,9 +1049,9 @@ impl Traversal { actions: &'a [Action], extractions: &mut TraversalExtractions, current_log_fields: FieldsRef<'_>, - ) -> (Vec>, BTreeMap<&'a str, Log>) { + ) -> (Vec>, TinyMap<&'a str, Log>) { let mut triggered_actions = vec![]; - let mut logs_to_inject = BTreeMap::new(); + let mut logs_to_inject = TinyMap::default(); for action in actions { match action { Action::FlushBuffers(action) => { @@ -1064,11 +1061,7 @@ impl Traversal { triggered_actions.push(TriggeredAction::EmitMetric(action)); }, Action::EmitSankey(action) => { - let Some(sankey_states) = &mut extractions.sankey_states else { - continue; - }; - - let Some(sankey_state) = sankey_states.remove(action.id()) else { + let Some(sankey_state) = extractions.sankey_states.remove(action.id()) else { debug_assert!( false, "sankey_states for Sankey with {:?} ID should be present", @@ -1148,7 +1141,7 @@ struct TraversalResult<'a> { followed_transitions_count: u32, /// Logs to be injected back into the workflow engine after field attachment and other /// processing. - log_to_inject: BTreeMap<&'a str, Log>, + log_to_inject: TinyMap<&'a str, Log>, /// Any debug state changes that occurred as a result of processing the traversal. workflow_debug_state: Vec, } diff --git a/bd-workflows/src/workflow_test.rs b/bd-workflows/src/workflow_test.rs index 9162064c..7c249c01 100644 --- a/bd-workflows/src/workflow_test.rs +++ b/bd-workflows/src/workflow_test.rs @@ -15,6 +15,7 @@ use crate::config::{ }; use crate::test::{MakeConfig, TestLog}; use crate::workflow::{Run, TriggeredAction, Workflow, WorkflowResult, WorkflowResultStats}; +use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{FieldsRef, LogFields, LogMessage, log_level}; use bd_proto::flatbuffers::buffer_log::bitdrift_public::fbs::logging::v_1::LogType; use bd_stats_common::workflow::{WorkflowDebugStateKey, WorkflowDebugTransitionType}; @@ -257,7 +258,7 @@ fn timeout_no_parallel_match() { increment: ValueIncrement::Fixed(1), metric_type: MetricType::Counter, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: true, @@ -286,7 +287,7 @@ fn timeout_no_parallel_match() { increment: ValueIncrement::Fixed(1), metric_type: MetricType::Counter, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 0, processed_timeout: true, @@ -315,7 +316,7 @@ fn timeout_no_parallel_match() { increment: ValueIncrement::Fixed(1), metric_type: MetricType::Counter, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: true, @@ -341,7 +342,7 @@ fn timeout_no_parallel_match() { increment: ValueIncrement::Fixed(1), metric_type: MetricType::Counter, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: true, @@ -411,7 +412,7 @@ fn timeout_not_start() { increment: ValueIncrement::Fixed(1), metric_type: MetricType::Counter, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 0, processed_timeout: true, @@ -441,7 +442,7 @@ fn timeout_not_start() { buffer_ids: BTreeSet::from(["bar_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -557,7 +558,7 @@ fn timeout_from_start() { increment: ValueIncrement::Fixed(1), metric_type: MetricType::Counter, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 0, processed_timeout: true, @@ -607,7 +608,7 @@ fn multiple_start_nodes_initial_fork() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 2, processed_timeout: false, @@ -633,7 +634,7 @@ fn multiple_start_nodes_initial_fork() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 2, processed_timeout: false, @@ -663,7 +664,7 @@ fn multiple_start_nodes_initial_fork() { buffer_ids: BTreeSet::from(["foo_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -708,7 +709,7 @@ fn multiple_start_nodes_initial_branching() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -731,7 +732,7 @@ fn multiple_start_nodes_initial_branching() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -757,7 +758,7 @@ fn multiple_start_nodes_initial_branching() { buffer_ids: BTreeSet::from(["foo_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -821,7 +822,7 @@ fn basic_exclusive_workflow() { metric_type: MetricType::Counter, }) ], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -850,7 +851,7 @@ fn basic_exclusive_workflow() { buffer_ids: BTreeSet::from(["bar_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -907,7 +908,7 @@ fn exclusive_workflow_matched_logs_count_limit() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 2, processed_timeout: false, @@ -938,7 +939,7 @@ fn exclusive_workflow_matched_logs_count_limit() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -960,7 +961,7 @@ fn exclusive_workflow_matched_logs_count_limit() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -1004,7 +1005,7 @@ fn exclusive_workflow_log_rule_count() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -1027,7 +1028,7 @@ fn exclusive_workflow_log_rule_count() { buffer_ids: BTreeSet::from(["foo_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -1059,7 +1060,7 @@ fn exclusive_workflow_log_rule_count() { buffer_ids: BTreeSet::from(["bar_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -1130,7 +1131,7 @@ fn branching_exclusive_workflow() { buffer_ids: BTreeSet::from(["foo_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -1174,7 +1175,7 @@ fn branching_exclusive_workflow() { buffer_ids: BTreeSet::from(["zoo_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -1210,7 +1211,7 @@ fn branching_exclusive_workflow() { streaming: None, }), ], - logs_to_inject: BTreeMap::new(), + logs_to_inject: TinyMap::default(), stats: WorkflowResultStats { matched_logs_count: 2, processed_timeout: false, diff --git a/profiling/src/main.rs b/profiling/src/main.rs index 026119b1..0bf8f588 100644 --- a/profiling/src/main.rs +++ b/profiling/src/main.rs @@ -11,6 +11,7 @@ use crate::paths::PATHS; use bd_client_common::file::read_compressed_protobuf; use bd_client_stats::Stats; use bd_client_stats_store::{Collector, Scope}; +use bd_log_primitives::tiny_set::TinySet; use bd_log_primitives::{FieldsRef, LogLevel, LogMessage, LogRef, log_level}; use bd_logger::LogFields; use bd_logger::builder::default_stats_flush_triggers; @@ -42,7 +43,7 @@ use bd_workflows::engine::{WorkflowsEngine, WorkflowsEngineConfig}; use protobuf::Message; use rand::Rng; use sha2::Digest; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::fs::{self}; use std::os::unix::fs::MetadataExt; use std::path::Path; @@ -111,8 +112,8 @@ impl AnnotatedWorkflowsEngine { engine .start(WorkflowsEngineConfig::new( WorkflowsConfiguration::new(workflow_configurations.configs(), vec![]), - BTreeSet::default(), - BTreeSet::default(), + TinySet::default(), + TinySet::default(), )) .await; @@ -141,7 +142,7 @@ impl AnnotatedWorkflowsEngine { occurred_at: OffsetDateTime::now_utc(), capture_session: None, }, - &BTreeSet::new(), + &TinySet::default(), OffsetDateTime::now_utc(), ); }