diff --git a/bd-log-filter/src/lib.rs b/bd-log-filter/src/lib.rs index 5c6a76d9..8ff80ed9 100644 --- a/bd-log-filter/src/lib.rs +++ b/bd-log-filter/src/lib.rs @@ -19,7 +19,6 @@ mod filter_chain_test; use anyhow::{Context, Result, anyhow}; -use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{ FieldsRef, LOG_FIELD_NAME_LEVEL, @@ -88,13 +87,10 @@ impl FilterChain { pub fn process(&self, log: &mut Log) { for filter in &self.filters { let fields_ref = FieldsRef::new(&log.fields, &log.matching_fields); - if !filter.matcher.do_match( - log.log_level, - log.log_type, - &log.message, - fields_ref, - &TinyMap::default(), - ) { + if !filter + .matcher + .do_match(log.log_level, log.log_type, &log.message, fields_ref, None) + { continue; } diff --git a/bd-log-matcher/src/legacy_matcher_test.rs b/bd-log-matcher/src/legacy_matcher_test.rs index 0ae81834..44b88cfc 100644 --- a/bd-log-matcher/src/legacy_matcher_test.rs +++ b/bd-log-matcher/src/legacy_matcher_test.rs @@ -9,7 +9,6 @@ use crate::matcher::Tree; use assert_matches::assert_matches; -use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{ EMPTY_FIELDS, FieldsRef, @@ -80,13 +79,7 @@ fn match_test_runner(config: LegacyLogMatcher, cases: Vec<(Input<'_>, bool)>) { assert_eq!( should_match, - match_tree.do_match( - log_level, - log_type, - &message, - fields_ref, - &TinyMap::default() - ), + match_tree.do_match(log_level, log_type, &message, fields_ref, None), "{input:?} should result in {should_match} but did not", ); } diff --git a/bd-log-matcher/src/matcher.rs b/bd-log-matcher/src/matcher.rs index 187ef478..b51fe782 100644 --- a/bd-log-matcher/src/matcher.rs +++ b/bd-log-matcher/src/matcher.rs @@ -24,7 +24,6 @@ use base_log_matcher::tag_match::Value_match::{ SemVerValueMatch, StringValueMatch, }; -use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{FieldsRef, LogLevel, LogMessage, LogType}; use bd_proto::protos::config::v1::config::log_matcher::base_log_matcher::StringMatchType; use bd_proto::protos::config::v1::config::log_matcher::{ @@ -47,6 +46,7 @@ use log_matcher::log_matcher::base_log_matcher::{ use log_matcher::log_matcher::{BaseLogMatcher, Matcher, base_log_matcher}; use regex::Regex; use std::borrow::Cow; +use std::collections::BTreeMap; use std::ops::Deref; const LOG_LEVEL_KEY: &str = "log_level"; @@ -79,11 +79,14 @@ impl From for ValueOrSavedFieldId { } impl<'a, T: MakeValueOrRef<'a, T>> ValueOrSavedFieldId { - fn load(&'a self, extracted_fields: &'a TinyMap) -> Option> { + fn load( + &'a self, + extracted_fields: Option<&'a BTreeMap>, + ) -> Option> { match self { Self::Value(v) => Some(ValueOrRef::Ref(v)), Self::SaveFieldId(field_id) => extracted_fields - .get(field_id) + .and_then(|extracted_fields| extracted_fields.get(field_id)) .and_then(|v| T::make_value_or_ref(v)), } } @@ -204,7 +207,7 @@ impl Tree { log_type: LogType, message: &LogMessage, fields: FieldsRef<'_>, - extracted_fields: &TinyMap, + extracted_fields: Option<&BTreeMap>, ) -> bool { match self { Self::Base(base_matcher) => match base_matcher { @@ -265,7 +268,7 @@ impl IntMatch { } } - fn evaluate(&self, candidate: i32, extracted_fields: &TinyMap) -> bool { + fn evaluate(&self, candidate: i32, extracted_fields: Option<&BTreeMap>) -> bool { let Some(value) = self.value.load(extracted_fields) else { return false; }; @@ -314,7 +317,7 @@ impl DoubleMatch { } } - fn evaluate(&self, candidate: f64, extracted_fields: &TinyMap) -> bool { + fn evaluate(&self, candidate: f64, extracted_fields: Option<&BTreeMap>) -> bool { let candidate = NanEqualFloat(candidate); let Some(value) = self.value.load(extracted_fields) else { return false; @@ -383,7 +386,7 @@ impl StringMatch { }) } - fn evaluate(&self, candidate: &str, extracted_fields: &TinyMap) -> bool { + fn evaluate(&self, candidate: &str, extracted_fields: Option<&BTreeMap>) -> bool { let Some(value) = self.value.load(extracted_fields) else { return false; }; diff --git a/bd-log-matcher/src/matcher_test.rs b/bd-log-matcher/src/matcher_test.rs index 166a25de..5f4fba58 100644 --- a/bd-log-matcher/src/matcher_test.rs +++ b/bd-log-matcher/src/matcher_test.rs @@ -7,7 +7,6 @@ use crate::matcher::Tree; use crate::matcher::base_log_matcher::tag_match::Value_match::DoubleValueMatch; -use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{ EMPTY_FIELDS, FieldsRef, @@ -34,6 +33,7 @@ use log_matcher::base_log_matcher::tag_match::Value_match::{ use log_matcher::{BaseLogMatcher, Matcher, MatcherList, base_log_matcher}; use pretty_assertions::assert_eq; use protobuf::MessageField; +use std::collections::BTreeMap; type Input<'a> = (LogType, LogLevel, LogMessage, LogFields); @@ -230,7 +230,7 @@ fn test_extracted_string_matcher() { (log_tag("keyx", "exact"), false), (log_msg("no fields"), false), ], - &TinyMap::default(), + None, ); match_test_runner_with_extractions( @@ -240,7 +240,7 @@ fn test_extracted_string_matcher() { (log_tag("keyx", "exact"), false), (log_msg("no fields"), false), ], - &[("id1".to_string(), "exact".to_string())].into(), + Some(&BTreeMap::from([("id1".to_string(), "exact".to_string())])), ); } @@ -309,7 +309,7 @@ fn test_extracted_double_matcher() { (log_tag("key", "13.0"), false), (log_tag("key", "13"), false), ], - &TinyMap::default(), + None, ); match_test_runner_with_extractions( config.clone(), @@ -317,12 +317,12 @@ fn test_extracted_double_matcher() { (log_tag("key", "13.0"), false), (log_tag("key", "13"), false), ], - &[("id1".to_string(), "bad".to_string())].into(), + Some(&BTreeMap::from([("id1".to_string(), "bad".to_string())])), ); match_test_runner_with_extractions( config, vec![(log_tag("key", "13.0"), true), (log_tag("key", "13"), true)], - &[("id1".to_string(), "13".to_string())].into(), + Some(&BTreeMap::from([("id1".to_string(), "13".to_string())])), ); } @@ -419,13 +419,13 @@ fn test_extracted_int_matcher() { (log_tag("key", "13"), false), (log_tag("key", "13.0"), false), ], - &TinyMap::default(), + None, ); match_test_runner_with_extractions( config, vec![(log_tag("key", "13"), true), (log_tag("key", "13.0"), true)], - &[("id1".to_string(), "13".to_string())].into(), + Some(&BTreeMap::from([("id1".to_string(), "13".to_string())])), ); } @@ -856,14 +856,14 @@ fn make_message_match(operator: Operator, match_value: &str) -> base_log_matcher #[allow(clippy::needless_pass_by_value)] fn match_test_runner(config: LogMatcher, cases: Vec<(Input<'_>, bool)>) { - match_test_runner_with_extractions(config, cases, &TinyMap::default()); + match_test_runner_with_extractions(config, cases, None); } #[allow(clippy::needless_pass_by_value)] fn match_test_runner_with_extractions( config: LogMatcher, cases: Vec<(Input<'_>, bool)>, - extracted_fields: &TinyMap, + extracted_fields: Option<&BTreeMap>, ) { let match_tree = Tree::new(&config).unwrap(); diff --git a/bd-log-matcher/src/test.rs b/bd-log-matcher/src/test.rs index c83b8f7a..30bb314a 100644 --- a/bd-log-matcher/src/test.rs +++ b/bd-log-matcher/src/test.rs @@ -6,7 +6,6 @@ // https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt use crate::matcher::Tree; -use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{FieldsRef, LogMessage, LogType, StringOrBytes, TypedLogLevel}; use bd_proto::protos::log_matcher::log_matcher::LogMatcher; use std::collections::HashMap; @@ -42,7 +41,7 @@ impl TestMatcher { log_type, &message.into(), FieldsRef::new(&fields, &matching_fields), - &TinyMap::default(), + None, ) } } diff --git a/bd-log-primitives/src/lib.rs b/bd-log-primitives/src/lib.rs index 6240c9a5..502accc8 100644 --- a/bd-log-primitives/src/lib.rs +++ b/bd-log-primitives/src/lib.rs @@ -15,7 +15,6 @@ )] pub mod size; -pub mod tiny_set; use crate::size::MemorySized; use ahash::AHashMap; diff --git a/bd-log-primitives/src/tiny_set.rs b/bd-log-primitives/src/tiny_set.rs deleted file mode 100644 index 744f476c..00000000 --- a/bd-log-primitives/src/tiny_set.rs +++ /dev/null @@ -1,232 +0,0 @@ -// shared-core - bitdrift's common client/server libraries -// Copyright Bitdrift, Inc. All rights reserved. -// -// Use of this source code is governed by a source available license that can be found in the -// LICENSE file or at: -// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt - -use serde::{Deserialize, Serialize}; -use std::borrow::Borrow; - -// The purpose of these data structures are to have a small map/set like structures which are -// efficient for small sizes (up to ~5 items) and use Vec as a backing store which is more -// efficient than HashMap/BTreeMap for small sizes. - -// -// TinySet -// - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] -pub struct TinySet { - inner: TinyMap, -} - -impl TinySet { - pub fn iter(&self) -> impl Iterator { - self.inner.items.iter().map(|(k, ())| k) - } - - #[must_use] - pub fn first(&self) -> Option<&T> { - self.inner.items.first().map(|(k, ())| k) - } - - #[must_use] - pub fn is_empty(&self) -> bool { - self.inner.items.is_empty() - } - - pub fn remove(&mut self, value: &T) { - self.inner.remove(value); - } - - pub fn insert(&mut self, value: T) { - self.inner.insert(value, ()); - } - - pub fn extend(&mut self, iter: I) - where - I: IntoIterator, - { - self.inner.extend(iter.into_iter().map(|item| (item, ()))); - } - - #[must_use] - pub fn len(&self) -> usize { - self.inner.items.len() - } - - pub fn intersection(&self, other: &Self) -> impl Iterator { - self - .iter() - .filter(move |item| other.inner.get(item).is_some()) - } - - #[must_use] - pub fn is_disjoint(&self, other: &Self) -> bool { - self.iter().all(|item| other.inner.get(item).is_none()) - } - - pub fn contains(&self, value: &Q) -> bool - where - T: Borrow, - Q: PartialEq + ?Sized, - { - self.inner.get(value).is_some() - } - - pub fn retain(&mut self, f: F) - where - F: Fn(&T) -> bool, - { - self.inner.items.retain(|(k, ())| f(k)); - } -} - -impl Default for TinySet { - fn default() -> Self { - Self { - inner: TinyMap::default(), - } - } -} - -impl FromIterator for TinySet { - fn from_iter>(iter: I) -> Self { - Self { - inner: iter.into_iter().map(|item| (item, ())).collect(), - } - } -} - -impl From<[T; N]> for TinySet { - fn from(arr: [T; N]) -> Self { - Self { - inner: TinyMap::from(arr.map(|item| (item, ()))), - } - } -} - -impl IntoIterator for TinySet { - type Item = T; - type IntoIter = std::iter::Map, fn((T, ())) -> T>; - - fn into_iter(self) -> Self::IntoIter { - fn take_key((k, ()): (T, ())) -> T { - k - } - self.inner.into_iter().map(take_key::) - } -} - -// -// TinyMap -// - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] -pub struct TinyMap { - items: Vec<(K, V)>, -} - -impl TinyMap { - pub fn get(&self, key: &Q) -> Option<&V> - where - K: Borrow, - Q: PartialEq + ?Sized, - { - self - .items - .iter() - .find_map(|(k, v)| if k.borrow() == key { Some(v) } else { None }) - } - - pub fn get_mut_or_insert_default(&mut self, key: K) -> &mut V - where - V: Default, - { - if let Some(pos) = self.items.iter().position(|(k, _)| k == &key) { - return &mut self.items[pos].1; - } - - debug_assert!(self.items.len() <= 5, "TinyMap should be small"); - self.items.push((key, V::default())); - #[allow(clippy::unwrap_used)] - &mut self.items.last_mut().unwrap().1 - } - - pub fn insert(&mut self, key: K, value: V) { - if let Some((_, v)) = self.items.iter_mut().find(|(k, _)| k == &key) { - *v = value; - } else { - debug_assert!(self.items.len() <= 5, "TinyMap should be small"); - self.items.push((key, value)); - } - } - - pub fn extend(&mut self, iter: I) - where - I: IntoIterator, - { - for (k, v) in iter { - self.insert(k, v); - } - } - - pub fn append(&mut self, other: &mut Self) { - for (k, v) in other.items.drain(..) { - self.insert(k, v); - } - } - - pub fn remove(&mut self, key: &Q) -> Option - where - K: Borrow, - Q: PartialEq + ?Sized, - { - if let Some(pos) = self.items.iter().position(|(k, _)| k.borrow() == key) { - Some(self.items.swap_remove(pos).1) - } else { - None - } - } - - #[must_use] - pub fn is_empty(&self) -> bool { - self.items.is_empty() - } - - pub fn into_values(self) -> impl Iterator { - self.items.into_iter().map(|(_, v)| v) - } -} - -impl IntoIterator for TinyMap { - type Item = (K, V); - type IntoIter = std::vec::IntoIter<(K, V)>; - - fn into_iter(self) -> Self::IntoIter { - self.items.into_iter() - } -} - -impl FromIterator<(K, V)> for TinyMap { - fn from_iter>(iter: I) -> Self { - let mut new = Self::default(); - new.extend(iter); - new - } -} - -impl From<[(K, V); N]> for TinyMap { - fn from(arr: [(K, V); N]) -> Self { - let mut new = Self::default(); - new.extend(arr); - new - } -} - -impl Default for TinyMap { - fn default() -> Self { - Self { items: Vec::new() } - } -} diff --git a/bd-logger/src/buffer_selector.rs b/bd-logger/src/buffer_selector.rs index 68bebec0..1344d992 100644 --- a/bd-logger/src/buffer_selector.rs +++ b/bd-logger/src/buffer_selector.rs @@ -6,10 +6,10 @@ // https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt use bd_log_matcher::matcher::Tree; -use bd_log_primitives::tiny_set::{TinyMap, TinySet}; use bd_log_primitives::{FieldsRef, LogLevel, LogMessage, LogType}; use bd_proto::protos::config::v1::config::BufferConfigList; use std::borrow::Cow; +use std::collections::BTreeSet; // A single buffer filter, containing the matchers used to determine if logs should be written to // the specific buffer. @@ -60,11 +60,11 @@ impl BufferSelector { log_level: LogLevel, message: &LogMessage, fields: FieldsRef<'_>, - ) -> TinySet> { - let mut buffers = TinySet::default(); + ) -> BTreeSet> { + let mut buffers = BTreeSet::new(); for buffer in &self.buffer_filters { for (_id, matcher) in &buffer.matchers { - if matcher.do_match(log_level, log_type, message, fields, &TinyMap::default()) { + if matcher.do_match(log_level, log_type, message, fields, None) { buffers.insert(Cow::Borrowed(buffer.buffer_id.as_str())); // No reason to match further. diff --git a/bd-logger/src/client_config.rs b/bd-logger/src/client_config.rs index d2a14c3d..368be522 100644 --- a/bd-logger/src/client_config.rs +++ b/bd-logger/src/client_config.rs @@ -21,7 +21,6 @@ use bd_client_common::safe_file_cache::SafeFileCache; use bd_client_stats_store::{Counter, Scope}; use bd_log_filter::FilterChain; use bd_log_primitives::LogRef; -use bd_log_primitives::tiny_set::TinyMap; use bd_proto::protos::bdtail::bdtail_config::BdTailConfigurations; use bd_proto::protos::client::api::configuration_update::{StateOfTheWorld, Update_type}; use bd_proto::protos::client::api::configuration_update_ack::Nack; @@ -407,13 +406,7 @@ impl TailConfigurations { matcher .as_ref() .is_none_or(|matcher| { - matcher.do_match( - log.log_level, - log.log_type, - log.message, - log.fields, - &TinyMap::default(), - ) + matcher.do_match(log.log_level, log.log_type, log.message, log.fields, None) }) .then_some(id.as_str()) }) diff --git a/bd-logger/src/log_replay.rs b/bd-logger/src/log_replay.rs index b9f534b9..d443da2b 100644 --- a/bd-logger/src/log_replay.rs +++ b/bd-logger/src/log_replay.rs @@ -13,7 +13,6 @@ use bd_buffer::{AbslCode, BuffersWithAck, Error}; use bd_client_common::fb::make_log; use bd_client_stats::FlushTrigger; use bd_log_filter::FilterChain; -use bd_log_primitives::tiny_set::TinySet; use bd_log_primitives::{FieldsRef, Log, LogRef, LogType, log_level}; use bd_runtime::runtime::ConfigLoader; use bd_session_replay::CaptureScreenshotHandler; @@ -22,6 +21,7 @@ use bd_workflows::config::FlushBufferId; use bd_workflows::engine::{WorkflowsEngine, WorkflowsEngineConfig}; use bd_workflows::workflow::WorkflowDebugStateMap; use std::borrow::Cow; +use std::collections::BTreeSet; use std::path::Path; use time::OffsetDateTime; use tokio::sync::mpsc::{Receiver, Sender}; @@ -284,7 +284,7 @@ impl ProcessingPipeline { async fn finish_blocking_log_processing( flush_buffers_tx: tokio::sync::mpsc::Sender, flush_stats_trigger: FlushTrigger, - matching_buffers: TinySet>, + matching_buffers: BTreeSet>, ) -> anyhow::Result<()> { // The processing of a blocking log is about to complete. // We make an arbitrary decision to start with the flushing of log buffers to disk first and @@ -332,7 +332,7 @@ impl ProcessingPipeline { fn write_to_buffers<'a>( buffers: &mut BufferProducers, - matching_buffers: &TinySet>, + matching_buffers: &BTreeSet>, log: &LogRef<'_>, workflow_flush_buffer_action_ids: impl Iterator, ) -> anyhow::Result<()> { @@ -351,7 +351,7 @@ impl ProcessingPipeline { workflow_flush_buffer_action_ids, std::iter::empty(), |data| { - for buffer in matching_buffers.iter() { + for buffer in matching_buffers { // TODO(snowp): For both logger and buffer lookup we end up doing a map lookup, which // seems less than ideal in the logging path. Look into ways to optimize this, // possibly via vector indices instead of string keys. @@ -379,10 +379,10 @@ impl ProcessingPipeline { } fn process_flush_buffers_actions( - triggered_flush_buffers_action_ids: &TinySet>, + triggered_flush_buffers_action_ids: &BTreeSet>, buffers: &mut BufferProducers, - triggered_flushes_buffer_ids: &TinySet>, - written_to_buffers: &TinySet>, + triggered_flushes_buffer_ids: &BTreeSet>, + written_to_buffers: &BTreeSet>, log: &LogRef<'_>, ) -> Option { if triggered_flush_buffers_action_ids.is_empty() { diff --git a/bd-logger/src/logging_state.rs b/bd-logger/src/logging_state.rs index 4966a18b..5cbd97ab 100644 --- a/bd-logger/src/logging_state.rs +++ b/bd-logger/src/logging_state.rs @@ -16,7 +16,6 @@ use bd_client_stats::{FlushTrigger, Stats}; use bd_client_stats_store::{Counter, Scope}; use bd_log_filter::FilterChain; use bd_log_primitives::size::MemorySized; -use bd_log_primitives::tiny_set::TinySet; use bd_runtime::runtime::ConfigLoader; use bd_session_replay::CaptureScreenshotHandler; use bd_stats_common::labels; @@ -24,7 +23,7 @@ use bd_workflows::config::WorkflowsConfiguration; use bd_workflows::engine::WorkflowsEngine; use flatbuffers::FlatBufferBuilder; use std::borrow::Cow; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use std::fmt::Debug; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -284,8 +283,8 @@ pub struct ConfigUpdate { pub struct BufferProducers { pub(crate) buffers: HashMap, pub(crate) builder: FlatBufferBuilder<'static>, - pub(crate) continuous_buffer_ids: TinySet>, - pub(crate) trigger_buffer_ids: TinySet>, + pub(crate) continuous_buffer_ids: BTreeSet>, + pub(crate) trigger_buffer_ids: BTreeSet>, } impl BufferProducers { @@ -298,8 +297,8 @@ impl BufferProducers { .map(|(id, buffer)| Ok((id.clone(), buffer.1.new_thread_local_producer()?))) .collect::>()?; - let mut continuous_buffer_ids = TinySet::default(); - let mut trigger_buffer_ids = TinySet::default(); + let mut continuous_buffer_ids = BTreeSet::new(); + let mut trigger_buffer_ids = BTreeSet::new(); for (buffer_id, (buffer_type, _)) in buffer_manager.buffers() { match buffer_type { diff --git a/bd-workflows/benches/linux_only/matcher.rs b/bd-workflows/benches/linux_only/matcher.rs index c2beedfd..10490e2d 100644 --- a/bd-workflows/benches/linux_only/matcher.rs +++ b/bd-workflows/benches/linux_only/matcher.rs @@ -8,7 +8,6 @@ use bd_api::DataUpload; use bd_client_stats::Stats; use bd_client_stats_store::Collector; -use bd_log_primitives::tiny_set::TinySet; use bd_log_primitives::{FieldsRef, LogFields, LogRef, LogType, log_level}; use bd_proto::protos::workflow::workflow::Workflow; use bd_runtime::runtime::ConfigLoader; @@ -23,6 +22,7 @@ use gungraun::{ library_benchmark, library_benchmark_group, }; +use std::collections::BTreeSet; use std::future::Future; use std::hint::black_box; use time::OffsetDateTime; @@ -61,8 +61,8 @@ impl Setup { engine .start(WorkflowsEngineConfig::new( WorkflowsConfiguration::new(workflows, vec![]), - TinySet::default(), - TinySet::default(), + BTreeSet::default(), + BTreeSet::default(), )) .await; @@ -133,10 +133,10 @@ fn run_runtime_bench>(engine: impl FnOnce() occurred_at: OffsetDateTime::now_utc(), capture_session: None, }; - engine.process_log(&log, &TinySet::default(), now); + engine.process_log(&log, &BTreeSet::default(), now); gungraun::client_requests::callgrind::start_instrumentation(); - engine.process_log(&log, &TinySet::default(), now); + engine.process_log(&log, &BTreeSet::default(), now); gungraun::client_requests::callgrind::stop_instrumentation(); }); } diff --git a/bd-workflows/src/actions_flush_buffers.rs b/bd-workflows/src/actions_flush_buffers.rs index 224dceaa..10ee4652 100644 --- a/bd-workflows/src/actions_flush_buffers.rs +++ b/bd-workflows/src/actions_flush_buffers.rs @@ -14,7 +14,6 @@ use anyhow::anyhow; use bd_api::DataUpload; use bd_api::upload::{Intent_type, IntentDecision, TrackedLogUploadIntent, WorkflowActionUpload}; use bd_client_stats_store::{Counter, Scope}; -use bd_log_primitives::tiny_set::TinySet; use bd_proto::protos::client::api::LogUploadIntentRequest; use bd_proto::protos::client::api::log_upload_intent_request::ExplicitSessionCapture; use bd_stats_common::labels; @@ -350,8 +349,8 @@ impl Negotiator { #[derive(Debug)] /// Responsible for orchestrating and managing flush buffer actions. pub(crate) struct Resolver { - trigger_buffer_ids: TinySet>, - continuous_buffer_ids: TinySet>, + trigger_buffer_ids: BTreeSet>, + continuous_buffer_ids: BTreeSet>, stats: ResolverStats, } @@ -360,8 +359,8 @@ impl Resolver { pub(crate) fn new(stats_scope: &Scope) -> Self { Self { stats: ResolverStats::new(stats_scope), - trigger_buffer_ids: TinySet::default(), - continuous_buffer_ids: TinySet::default(), + trigger_buffer_ids: BTreeSet::new(), + continuous_buffer_ids: BTreeSet::new(), } } @@ -376,14 +375,14 @@ impl Resolver { /// buffer actions that require further processing. pub(crate) fn process_flush_buffer_actions<'a>( &self, - actions: TinySet>, + actions: BTreeSet>, session_id: &str, - pending_actions: &TinySet, + pending_actions: &BTreeSet, streaming_actions: &[StreamingBuffersAction], ) -> FlushBuffersActionsProcessingResult<'a> { - let mut created_actions = TinySet::default(); - let mut triggered_flush_buffers_action_ids = TinySet::default(); - let mut triggered_flushes_buffer_ids = TinySet::default(); + let mut created_actions = BTreeSet::new(); + let mut triggered_flush_buffers_action_ids = BTreeSet::new(); + let mut triggered_flushes_buffer_ids = BTreeSet::new(); for action in actions { triggered_flush_buffers_action_ids.insert(match action { @@ -441,7 +440,7 @@ impl Resolver { pub(crate) fn process_streaming_actions<'a>( &self, mut streaming_actions: Vec<(StreamingBuffersAction, bool)>, - log_destination_buffer_ids: &TinySet>, + log_destination_buffer_ids: &BTreeSet>, session_id: &str, ) -> StreamingBuffersActionsProcessingResult<'a> { let mut has_changed_streaming_actions = false; @@ -489,9 +488,9 @@ impl Resolver { // of this process, the state of all active streaming actions is updated, with a counter of logs // streamed for each active streaming rule being incremented accordingly. - let mut final_log_destination_buffer_ids: TinySet<_> = TinySet::default(); + let mut final_log_destination_buffer_ids: BTreeSet<_> = BTreeSet::new(); - let mut not_rerouted_buffer_ids: TinySet<_> = log_destination_buffer_ids + let mut not_rerouted_buffer_ids: BTreeSet<_> = log_destination_buffer_ids .clone() .into_iter() .filter(|id| @@ -503,7 +502,7 @@ impl Resolver { let mut has_been_rerouted = false; for (action, _) in &mut streaming_actions { - let intersection: TinySet<_> = action + let intersection: BTreeSet<_> = action .source_trigger_buffer_ids .intersection(log_destination_buffer_ids) .collect(); @@ -533,7 +532,7 @@ impl Resolver { self.stats.streaming_action_applications.inc(); } - final_log_destination_buffer_ids.extend(not_rerouted_buffer_ids); + final_log_destination_buffer_ids.append(&mut not_rerouted_buffer_ids); has_changed_streaming_actions |= has_been_rerouted; @@ -562,8 +561,8 @@ impl Resolver { /// after this process, an action is left with no trigger buffer IDs, it is dropped. pub(crate) fn standardize_pending_actions( &self, - pending_actions: TinySet, - ) -> TinySet { + pending_actions: BTreeSet, + ) -> BTreeSet { pending_actions .into_iter() .filter_map(|action| { @@ -595,12 +594,12 @@ impl Resolver { streaming_buffers .into_iter() .filter_map(|action| { - let source_trigger_buffer_ids: TinySet<_> = action + let source_trigger_buffer_ids: BTreeSet<_> = action .source_trigger_buffer_ids .intersection(&self.trigger_buffer_ids) .cloned() .collect(); - let destination_continuous_buffer_ids: TinySet<_> = action + let destination_continuous_buffer_ids: BTreeSet<_> = action .destination_continuous_buffer_ids .intersection(&self.continuous_buffer_ids) .cloned() @@ -696,14 +695,14 @@ impl ResolverStats { #[derive(Debug)] pub(crate) struct ResolverConfig { - trigger_buffer_ids: TinySet>, - continuous_buffer_ids: TinySet>, + trigger_buffer_ids: BTreeSet>, + continuous_buffer_ids: BTreeSet>, } impl ResolverConfig { pub(crate) const fn new( - trigger_buffer_ids: TinySet>, - continuous_buffer_ids: TinySet>, + trigger_buffer_ids: BTreeSet>, + continuous_buffer_ids: BTreeSet>, ) -> Self { Self { trigger_buffer_ids, @@ -715,11 +714,12 @@ impl ResolverConfig { // // FlushBuffersActionsProcessingResult // -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub(crate) struct FlushBuffersActionsProcessingResult<'a> { - pub(crate) new_pending_actions_to_add: TinySet, - pub(crate) triggered_flush_buffers_action_ids: TinySet>, - pub(crate) triggered_flushes_buffer_ids: TinySet>, + pub(crate) new_pending_actions_to_add: BTreeSet, + + pub(crate) triggered_flush_buffers_action_ids: BTreeSet>, + pub(crate) triggered_flushes_buffer_ids: BTreeSet>, } // @@ -728,7 +728,7 @@ pub(crate) struct FlushBuffersActionsProcessingResult<'a> { #[derive(Debug, PartialEq)] pub(crate) struct StreamingBuffersActionsProcessingResult<'a> { - pub(crate) log_destination_buffer_ids: TinySet>, + pub(crate) log_destination_buffer_ids: BTreeSet>, pub(crate) has_changed_streaming_actions: bool, pub(crate) updated_streaming_actions: Vec, } @@ -740,12 +740,12 @@ pub(crate) struct StreamingBuffersActionsProcessingResult<'a> { // The action created by a flush buffer workflow action. This tracks the action while upload intent // negotiation is performed. At that point, it either transitions into a `StreamingBuffersAction` if // streaming was configured for the action. -#[derive(Clone, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub(crate) struct PendingFlushBuffersAction { pub(crate) id: FlushBufferId, session_id: String, - trigger_buffer_ids: TinySet>, + trigger_buffer_ids: BTreeSet>, streaming: Option, } @@ -769,8 +769,8 @@ impl PendingFlushBuffersAction { fn new( action: ActionFlushBuffers, session_id: String, - trigger_buffer_ids: &TinySet>, - continuous_buffer_ids: &TinySet>, + trigger_buffer_ids: &BTreeSet>, + continuous_buffer_ids: &BTreeSet>, ) -> Option { log::debug!("evaluating flush buffers action: {action:?}"); @@ -786,7 +786,7 @@ impl PendingFlushBuffersAction { if streaming_proto.destination_continuous_buffer_ids.is_empty() { continuous_buffer_ids .first() - .map_or_else(TinySet::default, |id| TinySet::from([id.clone()])) + .map_or(BTreeSet::new(), |id| BTreeSet::from([id.clone()])) } else { // Make sure that we dismiss invalid (not existing) continuous buffer IDs. continuous_buffer_ids @@ -811,7 +811,7 @@ impl PendingFlushBuffersAction { }) }); - let trigger_buffer_ids: TinySet<_> = if action.buffer_ids.is_empty() { + let trigger_buffer_ids: BTreeSet<_> = if action.buffer_ids.is_empty() { // Empty buffer IDs means that the action should be applied to all buffers. trigger_buffer_ids.clone() } else { @@ -841,9 +841,9 @@ impl PendingFlushBuffersAction { // Streaming // -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub(crate) struct Streaming { - destination_continuous_buffer_ids: TinySet>, + destination_continuous_buffer_ids: BTreeSet>, max_logs_count: Option, } @@ -852,15 +852,15 @@ pub(crate) struct Streaming { // StreamingBuffersAction // -#[derive(Clone, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] // The action created in response to flush buffer actions that were accepted for upload and had a // streaming configuration attached to them. pub(crate) struct StreamingBuffersAction { pub(crate) id: FlushBufferId, session_id: String, - source_trigger_buffer_ids: TinySet>, - destination_continuous_buffer_ids: TinySet>, + source_trigger_buffer_ids: BTreeSet>, + destination_continuous_buffer_ids: BTreeSet>, max_logs_count: Option, @@ -885,7 +885,7 @@ impl std::fmt::Debug for StreamingBuffersAction { impl StreamingBuffersAction { pub(crate) fn new( action: PendingFlushBuffersAction, - continuous_buffer_ids: &TinySet>, + continuous_buffer_ids: &BTreeSet>, ) -> Option { let Some(streaming) = action.streaming else { log::trace!("buffers streaming not activated: no streaming configuration"); @@ -931,7 +931,7 @@ impl StreamingBuffersAction { #[derive(Debug)] pub struct BuffersToFlush { // Unique IDs of buffers to flush. - pub buffer_ids: TinySet>, + pub buffer_ids: BTreeSet>, // Channel to notify the caller that the flush has been completed. pub response_tx: tokio::sync::oneshot::Sender<()>, } diff --git a/bd-workflows/src/actions_flush_buffers_test.rs b/bd-workflows/src/actions_flush_buffers_test.rs index c5e66cad..a5f716a3 100644 --- a/bd-workflows/src/actions_flush_buffers_test.rs +++ b/bd-workflows/src/actions_flush_buffers_test.rs @@ -21,7 +21,6 @@ use bd_api::DataUpload; use bd_api::upload::{IntentDecision, IntentResponse}; use bd_client_stats_store::Collector; use bd_client_stats_store::test::StatsHelper; -use bd_log_primitives::tiny_set::TinySet; use bd_stats_common::labels; use pretty_assertions::assert_eq; use std::borrow::Cow; @@ -83,18 +82,18 @@ impl std::ops::Drop for NegotiatorWrapper { async fn pending_buffers_standardization_removes_references_to_non_existing_trigger_buffers() { let mut resolver = Resolver::new(&Collector::default().scope("test")); resolver.update(ResolverConfig::new( - TinySet::from([ + BTreeSet::from([ "existing_trigger_buffer_id_1".into(), "existing_trigger_buffer_id_2".into(), ]), - TinySet::default(), + BTreeSet::new(), )); - let result = resolver.standardize_pending_actions(TinySet::from([ + let result = resolver.standardize_pending_actions(BTreeSet::from([ PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - trigger_buffer_ids: TinySet::from([ + trigger_buffer_ids: BTreeSet::from([ "existing_trigger_buffer_id_1".into(), "unknown_trigger_buffer_id".into(), ]), @@ -103,26 +102,26 @@ async fn pending_buffers_standardization_removes_references_to_non_existing_trig PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_2".to_string()), session_id: "foo_session_id".to_string(), - trigger_buffer_ids: TinySet::from(["unknown_trigger_buffer_id".into()]), + trigger_buffer_ids: BTreeSet::from(["unknown_trigger_buffer_id".into()]), streaming: None, }, PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_3".to_string()), session_id: "bar_session_id".to_string(), - trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id_2".into()]), + trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id_2".into()]), streaming: Some(Streaming { - destination_continuous_buffer_ids: TinySet::from(["unknown_continuous_buffer_id".into()]), + destination_continuous_buffer_ids: BTreeSet::from(["unknown_continuous_buffer_id".into()]), max_logs_count: Some(10), }), }, ])); assert_eq!( - TinySet::from([ + BTreeSet::from([ PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - trigger_buffer_ids: TinySet::from([ + trigger_buffer_ids: BTreeSet::from([ // The unknown trigger buffer ID present in the original flush buffers action is no // longer present. "existing_trigger_buffer_id_1".into(), @@ -134,13 +133,15 @@ async fn pending_buffers_standardization_removes_references_to_non_existing_trig PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_3".to_string()), session_id: "bar_session_id".to_string(), - trigger_buffer_ids: TinySet::from([ + trigger_buffer_ids: BTreeSet::from([ // The unknown continuous buffer ID present in the original flush buffers action is // no longer present. "existing_trigger_buffer_id_2".into(), ]), streaming: Some(Streaming { - destination_continuous_buffer_ids: TinySet::from(["unknown_continuous_buffer_id".into()]), + destination_continuous_buffer_ids: BTreeSet::from( + ["unknown_continuous_buffer_id".into()] + ), max_logs_count: Some(10), }), }, @@ -153,11 +154,11 @@ async fn pending_buffers_standardization_removes_references_to_non_existing_trig async fn streaming_buffers_standardization_removes_references_to_non_existing_buffers() { let mut resolver = Resolver::new(&Collector::default().scope("test")); resolver.update(ResolverConfig::new( - TinySet::from([ + BTreeSet::from([ "existing_trigger_buffer_id_1".into(), "existing_trigger_buffer_id_2".into(), ]), - TinySet::from([ + BTreeSet::from([ "existing_continuous_buffer_id_1".into(), "existing_continuous_buffer_id_2".into(), ]), @@ -167,8 +168,8 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id_1".into()]), - destination_continuous_buffer_ids: TinySet::from([ + source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id_1".into()]), + destination_continuous_buffer_ids: BTreeSet::from([ "existing_continuous_buffer_id_1".into(), "unknown_continuous_buffer_id".into(), ]), @@ -178,8 +179,8 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_2".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: TinySet::from(["unknown_trigger_buffer_id".into()]), - destination_continuous_buffer_ids: TinySet::from([ + source_trigger_buffer_ids: BTreeSet::from(["unknown_trigger_buffer_id".into()]), + destination_continuous_buffer_ids: BTreeSet::from([ "existing_continuous_buffer_id_1".into(), "unknown_continuous_buffer_id".into(), ]), @@ -189,11 +190,11 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_3".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: TinySet::from([ + source_trigger_buffer_ids: BTreeSet::from([ "existing_trigger_buffer_id_1".into(), "unknown_trigger_buffer_id".into(), ]), - destination_continuous_buffer_ids: TinySet::from([ + destination_continuous_buffer_ids: BTreeSet::from([ "existing_continuous_buffer_id_1".into(), "unknown_continuous_buffer_id".into(), ]), @@ -203,22 +204,22 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_4".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: TinySet::from([ + source_trigger_buffer_ids: BTreeSet::from([ "existing_trigger_buffer_id_1".into(), "unknown_trigger_buffer_id".into(), ]), - destination_continuous_buffer_ids: TinySet::from(["unknown_continuous_buffer_id".into()]), + destination_continuous_buffer_ids: BTreeSet::from(["unknown_continuous_buffer_id".into()]), max_logs_count: Some(10), logs_count: 0, }, StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_5".to_string()), session_id: "bar_session_id".to_string(), - source_trigger_buffer_ids: TinySet::from([ + source_trigger_buffer_ids: BTreeSet::from([ "existing_trigger_buffer_id_1".into(), "unknown_trigger_buffer_id".into(), ]), - destination_continuous_buffer_ids: TinySet::from([ + destination_continuous_buffer_ids: BTreeSet::from([ "existing_continuous_buffer_id_1".into(), "unknown_continuous_buffer_id".into(), ]), @@ -232,8 +233,8 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id_1".into()]), - destination_continuous_buffer_ids: TinySet::from([ + source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id_1".into()]), + destination_continuous_buffer_ids: BTreeSet::from([ "existing_continuous_buffer_id_1".into(), ]), max_logs_count: Some(10), @@ -242,8 +243,8 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_3".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id_1".into(),]), - destination_continuous_buffer_ids: TinySet::from([ + source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id_1".into(),]), + destination_continuous_buffer_ids: BTreeSet::from([ "existing_continuous_buffer_id_1".into(), ]), max_logs_count: Some(10), @@ -252,8 +253,8 @@ async fn streaming_buffers_standardization_removes_references_to_non_existing_bu StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_5".to_string()), session_id: "bar_session_id".to_string(), - source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id_1".into(),]), - destination_continuous_buffer_ids: TinySet::from([ + source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id_1".into(),]), + destination_continuous_buffer_ids: BTreeSet::from([ "existing_continuous_buffer_id_1".into(), ]), max_logs_count: Some(10), @@ -270,11 +271,11 @@ fn process_flush_buffers_actions() { let mut resolver = Resolver::new(&collector.scope("test")); resolver.update(ResolverConfig::new( - TinySet::from(["existing_trigger_buffer_id".into()]), - TinySet::default(), + BTreeSet::from(["existing_trigger_buffer_id".into()]), + BTreeSet::new(), )); - let actions = TinySet::from([ + let actions = BTreeSet::from([ ActionFlushBuffers { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".to_string()]), @@ -300,17 +301,17 @@ fn process_flush_buffers_actions() { let result = resolver.process_flush_buffer_actions( actions.iter().map(Cow::Borrowed).collect(), "foo_session_id", - &TinySet::from([PendingFlushBuffersAction { + &BTreeSet::from([PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_2".to_string()), session_id: "foo_session_id".to_string(), - trigger_buffer_ids: TinySet::default(), + trigger_buffer_ids: BTreeSet::new(), streaming: None, }]), &[StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_3".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into()]), - destination_continuous_buffer_ids: TinySet::default(), + source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into()]), + destination_continuous_buffer_ids: BTreeSet::new(), max_logs_count: Some(10), logs_count: 0, }], @@ -318,19 +319,19 @@ fn process_flush_buffers_actions() { assert_eq!( FlushBuffersActionsProcessingResult { - new_pending_actions_to_add: TinySet::from([PendingFlushBuffersAction { + new_pending_actions_to_add: BTreeSet::from([PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into(),]), + trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into(),]), streaming: None, }]), - triggered_flush_buffers_action_ids: TinySet::from([ + triggered_flush_buffers_action_ids: BTreeSet::from([ Cow::Owned(FlushBufferId::WorkflowActionId("action_id_1".into())), Cow::Owned(FlushBufferId::WorkflowActionId("action_id_2".into())), Cow::Owned(FlushBufferId::WorkflowActionId("action_id_3".into())), Cow::Owned(FlushBufferId::WorkflowActionId("action_id_4".into())), ]), - triggered_flushes_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into()]) + triggered_flushes_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into(),]) }, result ); @@ -363,11 +364,11 @@ fn process_flush_buffer_action_with_no_buffers() { let mut resolver = Resolver::new(&collector.scope("test")); resolver.update(ResolverConfig::new( - TinySet::from(["existing_trigger_buffer_id".into()]), - TinySet::from(["existing_continuous_buffer_id".into()]), + BTreeSet::from(["existing_trigger_buffer_id".into()]), + BTreeSet::from(["existing_continuous_buffer_id".into()]), )); - let actions = TinySet::from([ActionFlushBuffers { + let actions = BTreeSet::from([ActionFlushBuffers { id: FlushBufferId::WorkflowActionId("action_id".to_string()), buffer_ids: BTreeSet::new(), streaming: Some(crate::config::Streaming { @@ -379,27 +380,27 @@ fn process_flush_buffer_action_with_no_buffers() { let result = resolver.process_flush_buffer_actions( actions.iter().map(Cow::Borrowed).collect(), "foo_session_id", - &TinySet::default(), + &BTreeSet::new(), &[], ); assert_eq!( FlushBuffersActionsProcessingResult { - new_pending_actions_to_add: TinySet::from([PendingFlushBuffersAction { + new_pending_actions_to_add: BTreeSet::from([PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id".to_string()), session_id: "foo_session_id".to_string(), - trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into(),]), + trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into(),]), streaming: Some(Streaming { - destination_continuous_buffer_ids: TinySet::from( - ["existing_continuous_buffer_id".into()] - ), + destination_continuous_buffer_ids: BTreeSet::from([ + "existing_continuous_buffer_id".into() + ]), max_logs_count: Some(10), }), }]), - triggered_flush_buffers_action_ids: TinySet::from([Cow::Owned( + triggered_flush_buffers_action_ids: BTreeSet::from([Cow::Owned( FlushBufferId::WorkflowActionId("action_id".into()) )]), - triggered_flushes_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into(),]) + triggered_flushes_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into(),]) }, result ); @@ -411,8 +412,8 @@ fn process_streaming_buffers_actions() { let mut resolver = Resolver::new(&collector.scope("test")); resolver.update(ResolverConfig::new( - TinySet::from(["existing_trigger_buffer_id".into()]), - TinySet::from(["existing_continuous_buffer_id".into()]), + BTreeSet::from(["existing_trigger_buffer_id".into()]), + BTreeSet::from(["existing_continuous_buffer_id".into()]), )); let result = resolver.process_streaming_actions( @@ -421,8 +422,8 @@ fn process_streaming_buffers_actions() { StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into()]), - destination_continuous_buffer_ids: TinySet::from(["continuous_buffer_id".into()]), + source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into()]), + destination_continuous_buffer_ids: BTreeSet::from(["continuous_buffer_id".into()]), max_logs_count: Some(10), logs_count: 0, }, @@ -432,27 +433,27 @@ fn process_streaming_buffers_actions() { StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_2".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into()]), - destination_continuous_buffer_ids: TinySet::from(["continuous_buffer_id".into()]), + source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into()]), + destination_continuous_buffer_ids: BTreeSet::from(["continuous_buffer_id".into()]), max_logs_count: Some(10), logs_count: 10, }, true, ), ], - &TinySet::from(["existing_trigger_buffer_id".into()]), + &BTreeSet::from(["existing_trigger_buffer_id".into()]), "foo_session_id", ); assert_eq!( StreamingBuffersActionsProcessingResult { - log_destination_buffer_ids: TinySet::from(["continuous_buffer_id".into()]), + log_destination_buffer_ids: BTreeSet::from(["continuous_buffer_id".into()]), has_changed_streaming_actions: true, updated_streaming_actions: vec![StreamingBuffersAction { id: FlushBufferId::WorkflowActionId("action_id_1".to_string()), session_id: "foo_session_id".to_string(), - source_trigger_buffer_ids: TinySet::from(["existing_trigger_buffer_id".into()]), - destination_continuous_buffer_ids: TinySet::from(["continuous_buffer_id".into()]), + source_trigger_buffer_ids: BTreeSet::from(["existing_trigger_buffer_id".into()]), + destination_continuous_buffer_ids: BTreeSet::from(["continuous_buffer_id".into()]), max_logs_count: Some(10), logs_count: 1, },], @@ -483,7 +484,7 @@ async fn negotiator_upload_flow() { let pending_action = PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id".to_string()), session_id: "session_id".to_string(), - trigger_buffer_ids: TinySet::default(), + trigger_buffer_ids: BTreeSet::new(), streaming: None, }; @@ -543,7 +544,7 @@ async fn negotiator_drop_flow() { let pending_action = PendingFlushBuffersAction { id: FlushBufferId::WorkflowActionId("action_id".to_string()), session_id: "session_id".to_string(), - trigger_buffer_ids: TinySet::default(), + trigger_buffer_ids: BTreeSet::new(), streaming: None, }; diff --git a/bd-workflows/src/engine.rs b/bd-workflows/src/engine.rs index 3d5eaa02..09b9546f 100644 --- a/bd-workflows/src/engine.rs +++ b/bd-workflows/src/engine.rs @@ -43,7 +43,6 @@ use bd_client_common::file::{read_compressed, write_compressed}; use bd_client_stats::Stats; use bd_client_stats_store::{Counter, Histogram, Scope}; use bd_error_reporter::reporter::handle_unexpected; -use bd_log_primitives::tiny_set::{TinyMap, TinySet}; use bd_log_primitives::{Log, LogRef}; use bd_runtime::runtime::workflows::PersistenceWriteIntervalFlag; use bd_runtime::runtime::{ConfigLoader, DurationWatch, IntWatch, session_capture}; @@ -51,7 +50,7 @@ use bd_stats_common::labels; use bd_stats_common::workflow::WorkflowDebugKey; use serde::{Deserialize, Serialize}; use std::borrow::Cow; -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::path::{Path, PathBuf}; use std::sync::Arc; use time::OffsetDateTime; @@ -185,7 +184,7 @@ impl WorkflowsEngine { self.add_workflows(config.workflows_configuration.workflows, None); } - for action in self.state.pending_flush_actions.iter() { + for action in &self.state.pending_flush_actions { if let Err(e) = self .flush_buffers_negotiator_input_tx .try_send(action.clone()) @@ -535,7 +534,7 @@ impl WorkflowsEngine { pub fn process_log<'a>( &'a mut self, log: &LogRef<'_>, - log_destination_buffer_ids: &'a TinySet>, + log_destination_buffer_ids: &'a BTreeSet>, now: OffsetDateTime, ) -> WorkflowsEngineResult<'a> { // Measure duration in here even if the list of workflows is empty. @@ -584,20 +583,20 @@ impl WorkflowsEngine { { return WorkflowsEngineResult { log_destination_buffer_ids: Cow::Borrowed(log_destination_buffer_ids), - triggered_flushes_buffer_ids: TinySet::default(), - triggered_flush_buffers_action_ids: TinySet::default(), + triggered_flushes_buffer_ids: BTreeSet::new(), + triggered_flush_buffers_action_ids: BTreeSet::new(), capture_screenshot: false, + logs_to_inject: BTreeMap::new(), workflow_debug_state: vec![], has_debug_workflows: false, - logs_to_inject: TinyMap::default(), }; } let mut actions: Vec> = vec![]; + let mut logs_to_inject: BTreeMap<&'a str, Log> = BTreeMap::new(); let mut all_cumulative_workflow_debug_state = vec![]; let mut all_incremental_workflow_debug_state = vec![]; let mut has_debug_workflows = false; - let mut logs_to_inject: TinyMap<&'a str, Log> = TinyMap::default(); for (index, workflow) in &mut self.state.workflows.iter_mut().enumerate() { let Some(config) = self.configs.get(index) else { continue; @@ -793,7 +792,7 @@ impl WorkflowsEngine { return PreparedActions::default(); } - let flush_buffers_actions: TinySet> = actions + let flush_buffers_actions: BTreeSet> = actions .iter() .filter_map(|action| { if let TriggeredAction::FlushBuffers(flush_buffers_action) = action { @@ -804,7 +803,7 @@ impl WorkflowsEngine { }) .collect(); - let emit_metric_actions: TinySet<&ActionEmitMetric> = actions + let emit_metric_actions: BTreeSet<&ActionEmitMetric> = actions .iter() .filter_map(|action| { if let TriggeredAction::EmitMetric(emit_metric_action) = action { @@ -827,7 +826,7 @@ impl WorkflowsEngine { }) .collect(); - let emit_sankey_diagrams_actions: TinySet> = actions + let emit_sankey_diagrams_actions: BTreeSet> = actions .into_iter() .filter_map(|action| { if let TriggeredAction::SankeyDiagram(action) = action { @@ -857,10 +856,10 @@ impl Drop for WorkflowsEngine { #[derive(Default)] struct PreparedActions<'a> { - flush_buffers_actions: TinySet>, - emit_metric_actions: TinySet<&'a ActionEmitMetric>, - emit_sankey_diagrams_actions: TinySet>, - capture_screenshot_actions: TinySet<&'a ActionTakeScreenshot>, + flush_buffers_actions: BTreeSet>, + emit_metric_actions: BTreeSet<&'a ActionEmitMetric>, + emit_sankey_diagrams_actions: BTreeSet>, + capture_screenshot_actions: BTreeSet<&'a ActionTakeScreenshot>, } // @@ -869,18 +868,18 @@ struct PreparedActions<'a> { #[derive(Debug, PartialEq, Eq)] pub struct WorkflowsEngineResult<'a> { - pub log_destination_buffer_ids: Cow<'a, TinySet>>, + pub log_destination_buffer_ids: Cow<'a, BTreeSet>>, // The identifier of workflow actions that triggered buffers flush(es). - pub triggered_flush_buffers_action_ids: TinySet>, + pub triggered_flush_buffers_action_ids: BTreeSet>, // The identifier of trigger buffers that should be flushed. - pub triggered_flushes_buffer_ids: TinySet>, + pub triggered_flushes_buffer_ids: BTreeSet>, // Whether a screenshot should be taken in response to processing the log. pub capture_screenshot: bool, // Logs to be injected back into the workflow engine after field attachment and other processing. - pub logs_to_inject: TinyMap<&'a str, Log>, + pub logs_to_inject: BTreeMap<&'a str, Log>, // If any workflows have debugging enabled, this will contain the *cumulative* debug state since // debugging started for each workflow. The state is persisted in the workflow state file to @@ -901,16 +900,16 @@ pub type AllWorkflowsDebugState = Vec<(String, WorkflowDebugStateMap)>; pub struct WorkflowsEngineConfig { pub(crate) workflows_configuration: WorkflowsConfiguration, - pub(crate) trigger_buffer_ids: TinySet>, - pub(crate) continuous_buffer_ids: TinySet>, + pub(crate) trigger_buffer_ids: BTreeSet>, + pub(crate) continuous_buffer_ids: BTreeSet>, } impl WorkflowsEngineConfig { #[must_use] pub const fn new( workflows_configuration: WorkflowsConfiguration, - trigger_buffer_ids: TinySet>, - continuous_buffer_ids: TinySet>, + trigger_buffer_ids: BTreeSet>, + continuous_buffer_ids: BTreeSet>, ) -> Self { Self { workflows_configuration, @@ -921,11 +920,11 @@ impl WorkflowsEngineConfig { #[cfg(test)] #[must_use] - pub fn new_with_workflow_configurations(workflow_configs: Vec) -> Self { + pub const fn new_with_workflow_configurations(workflow_configs: Vec) -> Self { Self::new( WorkflowsConfiguration::new_with_workflow_configurations_for_test(workflow_configs), - TinySet::default(), - TinySet::default(), + BTreeSet::new(), + BTreeSet::new(), ) } } @@ -1083,7 +1082,7 @@ pub(crate) struct WorkflowsState { session_id: String, workflows: Vec, - pending_flush_actions: TinySet, + pending_flush_actions: BTreeSet, pending_sankey_actions: BTreeSet, streaming_actions: Vec, } diff --git a/bd-workflows/src/engine_test.rs b/bd-workflows/src/engine_test.rs index 02ced9cf..5a589298 100644 --- a/bd-workflows/src/engine_test.rs +++ b/bd-workflows/src/engine_test.rs @@ -19,7 +19,6 @@ use bd_client_stats::Stats; use bd_client_stats_store::Collector; use bd_client_stats_store::test::StatsHelper; use bd_error_reporter::reporter::{Reporter, UnexpectedErrorHandler}; -use bd_log_primitives::tiny_set::{TinyMap, TinySet}; use bd_log_primitives::{FieldsRef, Log, LogFields, LogMessage, LogRef, log_level}; use bd_proto::flatbuffers::buffer_log::bitdrift_public::fbs::logging::v_1::LogType; use bd_proto::protos::client::api::log_upload_intent_request::Intent_type::WorkflowActionUpload; @@ -55,7 +54,7 @@ use bd_time::TimeDurationExt; use itertools::Itertools; use pretty_assertions::assert_eq; use std::borrow::Cow; -use std::collections::HashMap; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -220,7 +219,7 @@ struct AnnotatedWorkflowsEngine { engine: WorkflowsEngine, session_id: String, - log_destination_buffer_ids: TinySet>, + log_destination_buffer_ids: BTreeSet>, hooks: Arc>, @@ -240,7 +239,7 @@ impl AnnotatedWorkflowsEngine { engine, session_id: "foo_session".to_string(), - log_destination_buffer_ids: TinySet::default(), + log_destination_buffer_ids: BTreeSet::new(), hooks, @@ -365,7 +364,7 @@ impl AnnotatedWorkflowsEngine { }) } - fn flushed_buffers(&self) -> Vec>> { + fn flushed_buffers(&self) -> Vec>> { self .hooks .lock() @@ -943,8 +942,8 @@ async fn engine_update_after_sdk_update() { WorkflowBuilder::new("2", &[&c, &d]).make_config(), WorkflowBuilder::new("1", &[&a, &b]).make_config(), ]), - TinySet::from(["trigger_buffer_id".into()]), - TinySet::from(["continuous_buffer_id".into()]), + BTreeSet::from(["trigger_buffer_id".into()]), + BTreeSet::from(["continuous_buffer_id".into()]), ); let setup = Setup::new(); @@ -977,8 +976,8 @@ async fn engine_update_after_sdk_update() { WorkflowsConfiguration::new_with_workflow_configurations_for_test(vec![ WorkflowBuilder::new("1", &[&a, &b]).make_config(), ]), - TinySet::from(["trigger_buffer_id".into()]), - TinySet::from(["continuous_buffer_id".into()]), + BTreeSet::from(["trigger_buffer_id".into()]), + BTreeSet::from(["continuous_buffer_id".into()]), )); assert_eq!(workflows_engine.state.workflows.len(), 1); @@ -1625,7 +1624,7 @@ async fn ignore_persisted_state_if_invalid_dir() { occurred_at: OffsetDateTime::now_utc(), capture_session: None, }, - &TinySet::default(), + &BTreeSet::new(), OffsetDateTime::now_utc(), ); @@ -1699,27 +1698,27 @@ async fn engine_processing_log() { let mut workflows_engine = setup .make_workflows_engine(WorkflowsEngineConfig::new( WorkflowsConfiguration::new_with_workflow_configurations_for_test(workflows), - TinySet::from(["foo_buffer_id".into()]), - TinySet::default(), + BTreeSet::from(["foo_buffer_id".into()]), + BTreeSet::new(), )) .await; // * Two workflows are created in response to a passed workflows config. // * One run is created for each of the created workflows. // * Each workflow run advances from their initial to final state in response to "foo" log. - workflows_engine.log_destination_buffer_ids = TinySet::from(["foo_buffer_id".into()]); + workflows_engine.log_destination_buffer_ids = BTreeSet::from(["foo_buffer_id".into()]); let result = workflows_engine.process_log(TestLog::new("foo")); assert_eq!( WorkflowsEngineResult { - log_destination_buffer_ids: Cow::Owned(TinySet::from(["foo_buffer_id".into()])), - triggered_flush_buffers_action_ids: TinySet::from([Cow::Owned( + log_destination_buffer_ids: Cow::Owned(BTreeSet::from(["foo_buffer_id".into()])), + triggered_flush_buffers_action_ids: BTreeSet::from([Cow::Owned( FlushBufferId::WorkflowActionId("foo_action_id".into()) ),]), - triggered_flushes_buffer_ids: TinySet::from(["foo_buffer_id".into()]), + triggered_flushes_buffer_ids: BTreeSet::from(["foo_buffer_id".into()]), capture_screenshot: false, + logs_to_inject: BTreeMap::new(), workflow_debug_state: vec![], has_debug_workflows: false, - logs_to_inject: TinyMap::default(), }, result ); @@ -1815,28 +1814,28 @@ async fn log_without_destination() { WorkflowsConfiguration::new_with_workflow_configurations_for_test(vec![ WorkflowBuilder::new("1", &[&a, &b]).make_config(), ]), - TinySet::from(["trigger_buffer_id".into()]), - TinySet::from(["continuous_buffer_id".into()]), + BTreeSet::from(["trigger_buffer_id".into()]), + BTreeSet::from(["continuous_buffer_id".into()]), ); let setup = Setup::new(); let mut workflows_engine = setup.make_workflows_engine(workflows_engine_config).await; - workflows_engine.log_destination_buffer_ids = TinySet::default(); + workflows_engine.log_destination_buffer_ids = BTreeSet::new(); let result = workflows_engine.process_log(TestLog::new("foo")); assert_eq!( WorkflowsEngineResult { - log_destination_buffer_ids: Cow::Owned(TinySet::default()), - triggered_flush_buffers_action_ids: TinySet::from([Cow::Owned( + log_destination_buffer_ids: Cow::Owned(BTreeSet::new()), + triggered_flush_buffers_action_ids: BTreeSet::from([Cow::Owned( FlushBufferId::WorkflowActionId("action".into()) ),]), - triggered_flushes_buffer_ids: TinySet::from(["trigger_buffer_id".into()]), + triggered_flushes_buffer_ids: BTreeSet::from(["trigger_buffer_id".into()]), capture_screenshot: false, + logs_to_inject: BTreeMap::new(), workflow_debug_state: vec![], has_debug_workflows: false, - logs_to_inject: TinyMap::default(), }, result ); @@ -1922,8 +1921,8 @@ async fn logs_streaming() { WorkflowsConfiguration::new_with_workflow_configurations_for_test(vec![ WorkflowBuilder::new("1", &[&a, &b, &c, &d, &e, &f, &g, &h]).make_config(), ]), - TinySet::from(["trigger_buffer_id".into()]), - TinySet::from([ + BTreeSet::from(["trigger_buffer_id".into()]), + BTreeSet::from([ "continuous_buffer_id_1".into(), "continuous_buffer_id_2".into(), ]), @@ -1934,7 +1933,7 @@ async fn logs_streaming() { let mut workflows_engine = setup .make_workflows_engine(workflows_engine_config.clone()) .await; - workflows_engine.log_destination_buffer_ids = TinySet::from(["trigger_buffer_id".into()]); + workflows_engine.log_destination_buffer_ids = BTreeSet::from(["trigger_buffer_id".into()]); // Emit four logs that results in four flushes of the buffer(s). // The logs upload intents for the first two buffer flushes are processed soon immediately after @@ -1952,7 +1951,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("immediate_drop")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) + Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) ); // Allow the engine to perform logs upload intent and process the response to it (upload @@ -1982,7 +1981,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("immediate_upload_no_streaming")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) + Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) ); // Allow the engine to perform logs upload intent and process the response to it (upload @@ -2004,7 +2003,7 @@ async fn logs_streaming() { ); assert_eq!( workflows_engine.flushed_buffers(), - vec![TinySet::from(["trigger_buffer_id".into()])], + vec![BTreeSet::from(["trigger_buffer_id".into()])], ); setup.collector.assert_counter_eq( @@ -2023,7 +2022,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("immediate_upload_streaming")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) + Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) ); // Allow the engine to perform logs upload intent and process the response to it (upload @@ -2050,8 +2049,8 @@ async fn logs_streaming() { assert_eq!( workflows_engine.flushed_buffers(), vec![ - TinySet::from(["trigger_buffer_id".into()]), - TinySet::from(["trigger_buffer_id".into()]) + BTreeSet::from(["trigger_buffer_id".into()]), + BTreeSet::from(["trigger_buffer_id".into()]) ], ); @@ -2075,7 +2074,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("relaunch_upload_no_streaming")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["continuous_buffer_id_2".into()])) + Cow::Owned(BTreeSet::from(["continuous_buffer_id_2".into()])) ); // The resulting flush buffer action should be ignored as the same flush buffer action was @@ -2083,7 +2082,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("relaunch_upload_no_streaming")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["continuous_buffer_id_2".into()])) + Cow::Owned(BTreeSet::from(["continuous_buffer_id_2".into()])) ); // This should trigger a flush of a buffer that's followed by logs streaming to continuous log @@ -2093,7 +2092,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("relaunch_upload_streaming")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["continuous_buffer_id_2".into()])) + Cow::Owned(BTreeSet::from(["continuous_buffer_id_2".into()])) ); // Confirm that the state of the workflows engine is as expected prior to engine's shutdown. @@ -2114,7 +2113,7 @@ async fn logs_streaming() { let setup = Setup::new_with_sdk_directory(&setup.sdk_directory); let mut workflows_engine = setup.make_workflows_engine(workflows_engine_config).await; - workflows_engine.log_destination_buffer_ids = TinySet::from(["trigger_buffer_id".into()]); + workflows_engine.log_destination_buffer_ids = BTreeSet::from(["trigger_buffer_id".into()]); workflows_engine.set_awaiting_logs_upload_intent_decisions(vec![ IntentDecision::UploadImmediately, @@ -2124,7 +2123,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("test log")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["continuous_buffer_id_2".into()])) + Cow::Owned(BTreeSet::from(["continuous_buffer_id_2".into()])) ); // Allow the engine to perform logs upload intent and process the response to it (upload @@ -2146,7 +2145,7 @@ async fn logs_streaming() { ); assert_eq!( workflows_engine.flushed_buffers(), - vec![TinySet::from(["trigger_buffer_id".into()])], + vec![BTreeSet::from(["trigger_buffer_id".into()])], ); setup.collector.assert_counter_eq( @@ -2158,7 +2157,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("test log")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["continuous_buffer_id_2".into()])) + Cow::Owned(BTreeSet::from(["continuous_buffer_id_2".into()])) ); // Allow the engine to perform logs upload intent and process the response to it (upload @@ -2181,8 +2180,8 @@ async fn logs_streaming() { assert_eq!( workflows_engine.flushed_buffers(), vec![ - TinySet::from(["trigger_buffer_id".into()]), - TinySet::from(["trigger_buffer_id".into()]) + BTreeSet::from(["trigger_buffer_id".into()]), + BTreeSet::from(["trigger_buffer_id".into()]) ], ); @@ -2191,9 +2190,9 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("relaunch_upload_streaming")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from([ - "continuous_buffer_id_2".into(), + Cow::Owned(BTreeSet::from([ "continuous_buffer_id_1".into(), + "continuous_buffer_id_2".into() ])) ); @@ -2215,8 +2214,8 @@ async fn logs_streaming() { assert_eq!( workflows_engine.flushed_buffers(), vec![ - TinySet::from(["trigger_buffer_id".into()]), - TinySet::from(["trigger_buffer_id".into()]) + BTreeSet::from(["trigger_buffer_id".into()]), + BTreeSet::from(["trigger_buffer_id".into()]) ], ); workflows_engine.complete_flushes(); @@ -2233,7 +2232,7 @@ async fn logs_streaming() { let result = workflows_engine.process_log(TestLog::new("test log")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) + Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) ); assert!(workflows_engine.state.pending_flush_actions.is_empty()); @@ -2251,8 +2250,8 @@ async fn engine_tracks_new_sessions() { let workflows_engine_config = WorkflowsEngineConfig::new( WorkflowsConfiguration::new_with_workflow_configurations_for_test(vec![]), - TinySet::from(["trigger_buffer_id".into()]), - TinySet::from(["continuous_buffer_id".into()]), + BTreeSet::from(["trigger_buffer_id".into()]), + BTreeSet::from(["continuous_buffer_id".into()]), ); let mut workflows_engine = setup @@ -2294,14 +2293,14 @@ async fn engine_does_not_purge_pending_actions_on_session_id_change() { WorkflowsConfiguration::new_with_workflow_configurations_for_test(vec![ WorkflowBuilder::new("1", &[&a, &b, &c]).make_config(), ]), - TinySet::from(["trigger_buffer_id".into()]), - TinySet::from(["continuous_buffer_id".into()]), + BTreeSet::from(["trigger_buffer_id".into()]), + BTreeSet::from(["continuous_buffer_id".into()]), ); let mut workflows_engine = setup .make_workflows_engine(workflows_engine_config.clone()) .await; - workflows_engine.log_destination_buffer_ids = TinySet::from(["trigger_buffer_id".into()]); + workflows_engine.log_destination_buffer_ids = BTreeSet::from(["trigger_buffer_id".into()]); // Set up no responses so that the actions continue to wait for the server's response. workflows_engine.set_awaiting_logs_upload_intent_decisions(vec![]); @@ -2310,7 +2309,7 @@ async fn engine_does_not_purge_pending_actions_on_session_id_change() { let result = workflows_engine.process_log(TestLog::new("foo")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) + Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) ); // The log below doesn't trigger a buffer flush, but it's emitted with a new session ID, which @@ -2320,7 +2319,7 @@ async fn engine_does_not_purge_pending_actions_on_session_id_change() { let result = workflows_engine.process_log(TestLog::new("not triggering")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) + Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) ); // Confirm that the pending action was not cleaned up. @@ -2335,7 +2334,7 @@ async fn engine_does_not_purge_pending_actions_on_session_id_change() { let mut workflows_engine = setup.make_workflows_engine(workflows_engine_config).await; workflows_engine.session_id = "new session ID".to_string(); - workflows_engine.log_destination_buffer_ids = TinySet::from(["trigger_buffer_id".into()]); + workflows_engine.log_destination_buffer_ids = BTreeSet::from(["trigger_buffer_id".into()]); workflows_engine .set_awaiting_logs_upload_intent_decisions(vec![IntentDecision::UploadImmediately]); @@ -2351,14 +2350,14 @@ async fn engine_does_not_purge_pending_actions_on_session_id_change() { ); assert_eq!( workflows_engine.flushed_buffers(), - vec![TinySet::from(["trigger_buffer_id".into()])], + vec![BTreeSet::from(["trigger_buffer_id".into()])], ); workflows_engine.complete_flushes(); let result = workflows_engine.process_log(TestLog::new("not triggering")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) + Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) ); setup.collector.assert_counter_eq( @@ -2398,14 +2397,14 @@ async fn engine_continues_to_stream_upload_not_complete() { WorkflowsConfiguration::new_with_workflow_configurations_for_test(vec![ WorkflowBuilder::new("1", &[&a, &b, &c]).make_config(), ]), - TinySet::from(["trigger_buffer_id".into()]), - TinySet::from(["continuous_buffer_id".into()]), + BTreeSet::from(["trigger_buffer_id".into()]), + BTreeSet::from(["continuous_buffer_id".into()]), ); let mut workflows_engine = setup .make_workflows_engine(workflows_engine_config.clone()) .await; - workflows_engine.log_destination_buffer_ids = TinySet::from(["trigger_buffer_id".into()]); + workflows_engine.log_destination_buffer_ids = BTreeSet::from(["trigger_buffer_id".into()]); // Allow the intent to go through which should trigger an upload. workflows_engine @@ -2415,7 +2414,7 @@ async fn engine_continues_to_stream_upload_not_complete() { let result = workflows_engine.process_log(TestLog::new("foo")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) + Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) ); log::info!("Running the engine for the first time."); @@ -2431,14 +2430,14 @@ async fn engine_continues_to_stream_upload_not_complete() { ); assert_eq!( workflows_engine.flushed_buffers(), - vec![TinySet::from(["trigger_buffer_id".into()])], + vec![BTreeSet::from(["trigger_buffer_id".into()])], ); // Verify that we have transitioned to streaming. let result = workflows_engine.process_log(TestLog::new("streamed")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["continuous_buffer_id".into()])) + Cow::Owned(BTreeSet::from(["continuous_buffer_id".into()])) ); // Change the session. This would typically cause the engine to stop streaming, but we haven't @@ -2447,7 +2446,7 @@ async fn engine_continues_to_stream_upload_not_complete() { let result = workflows_engine.process_log(TestLog::new("streamed")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["continuous_buffer_id".into()])) + Cow::Owned(BTreeSet::from(["continuous_buffer_id".into()])) ); workflows_engine.complete_flushes(); @@ -2457,7 +2456,7 @@ async fn engine_continues_to_stream_upload_not_complete() { let result = workflows_engine.process_log(TestLog::new("not streamed")); assert_eq!( result.log_destination_buffer_ids, - Cow::Owned(TinySet::from(["trigger_buffer_id".into()])) + Cow::Owned(BTreeSet::from(["trigger_buffer_id".into()])) ); setup.collector.assert_counter_eq( diff --git a/bd-workflows/src/generate_log.rs b/bd-workflows/src/generate_log.rs index 24910c80..5f9d7677 100644 --- a/bd-workflows/src/generate_log.rs +++ b/bd-workflows/src/generate_log.rs @@ -55,11 +55,13 @@ fn resolve_reference<'a>( .map(StringOrFloat::String), Value_reference_type::SavedFieldId(saved_field_id) => extractions .fields - .get(saved_field_id) + .as_ref() + .and_then(|fields| fields.get(saved_field_id)) .map(|field| StringOrFloat::String(field.into())), Value_reference_type::SavedTimestampId(saved_timestamp_id) => extractions .timestamps - .get(saved_timestamp_id) + .as_ref() + .and_then(|timestamps| timestamps.get(saved_timestamp_id)) .map(|timestamp| StringOrFloat::Float(fractional_milliseconds_since_epoch(*timestamp))), Value_reference_type::Uuid(_) => Some(StringOrFloat::String(Uuid::new_v4().to_string().into())), } diff --git a/bd-workflows/src/generate_log_test.rs b/bd-workflows/src/generate_log_test.rs index 53cb541a..f3d4c19e 100644 --- a/bd-workflows/src/generate_log_test.rs +++ b/bd-workflows/src/generate_log_test.rs @@ -70,6 +70,7 @@ impl Helper { self .extractions .timestamps + .get_or_insert_default() .insert(id.to_string(), timestamp); } @@ -77,6 +78,7 @@ impl Helper { self .extractions .fields + .get_or_insert_default() .insert(id.to_string(), value.to_string()); } diff --git a/bd-workflows/src/metrics.rs b/bd-workflows/src/metrics.rs index e695e35f..02099070 100644 --- a/bd-workflows/src/metrics.rs +++ b/bd-workflows/src/metrics.rs @@ -13,10 +13,9 @@ use crate::config::{ActionEmitMetric, TagValue}; use crate::workflow::TriggeredActionEmitSankey; use bd_client_stats::Stats; use bd_log_primitives::LogRef; -use bd_log_primitives::tiny_set::TinySet; use bd_stats_common::MetricType; use std::borrow::Cow; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; // @@ -33,11 +32,11 @@ impl MetricsCollector { Self { stats } } - pub(crate) fn emit_metrics(&self, actions: &TinySet<&ActionEmitMetric>, log: &LogRef<'_>) { + pub(crate) fn emit_metrics(&self, actions: &BTreeSet<&ActionEmitMetric>, log: &LogRef<'_>) { // TODO(Augustyniak): We dedupe stats in here too only when both their tags and the value of // If `counter_increment` values are identical, consider deduping metrics even if their // `counter_increment` fields have different values. - for action in actions.iter() { + for action in actions { let tags = Self::extract_tags(log, &action.tags); #[allow(clippy::cast_precision_loss)] @@ -77,10 +76,10 @@ impl MetricsCollector { pub(crate) fn emit_sankeys( &self, - actions: &TinySet>, + actions: &BTreeSet>, log: &LogRef<'_>, ) { - for action in actions.iter() { + for action in actions { let mut tags = Self::extract_tags(log, action.action.tags()); tags.insert("_path_id".to_string(), action.path.path_id.to_string()); diff --git a/bd-workflows/src/workflow.rs b/bd-workflows/src/workflow.rs index fa0a85e9..c51b4eab 100644 --- a/bd-workflows/src/workflow.rs +++ b/bd-workflows/src/workflow.rs @@ -20,14 +20,13 @@ use crate::config::{ WorkflowDebugMode, }; use crate::generate_log::generate_log_action; -use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{FieldsRef, Log, LogRef}; use bd_stats_common::workflow::{WorkflowDebugStateKey, WorkflowDebugTransitionType}; use bd_time::OffsetDateTimeExt; use itertools::Itertools; use serde::{Deserialize, Serialize}; use sha2::Digest; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::time::SystemTime; use time::OffsetDateTime; @@ -339,7 +338,7 @@ impl Workflow { #[derive(Debug, Default, PartialEq)] pub(crate) struct WorkflowResult<'a> { triggered_actions: Vec>, - logs_to_inject: TinyMap<&'a str, Log>, + logs_to_inject: BTreeMap<&'a str, Log>, stats: WorkflowResultStats, // Persisted workflow debug state. This is only used for live debugging. Global debugging is // persisted as part of stats snapshots. @@ -353,7 +352,7 @@ impl<'a> WorkflowResult<'a> { self, ) -> ( Vec>, - TinyMap<&'a str, Log>, + BTreeMap<&'a str, Log>, OptWorkflowDebugStateMap, Vec, ) { @@ -560,7 +559,7 @@ impl Run { // the most common situation. let mut run_triggered_actions = Vec::>::new(); - let mut run_logs_to_inject = TinyMap::<&'a str, Log>::default(); + let mut run_logs_to_inject = BTreeMap::<&'a str, Log>::new(); let mut run_matched_logs_count = 0; let mut run_processed_timeout = false; let mut workflow_debug_state = Vec::new(); @@ -598,8 +597,8 @@ impl Run { triggered_actions: vec![], matched_logs_count: run_matched_logs_count, processed_timeout: run_processed_timeout, + logs_to_inject: BTreeMap::new(), workflow_debug_state, - logs_to_inject: TinyMap::default(), }; } } @@ -621,8 +620,8 @@ impl Run { triggered_actions: vec![], matched_logs_count: run_matched_logs_count, processed_timeout: run_processed_timeout, + logs_to_inject: BTreeMap::new(), workflow_debug_state, - logs_to_inject: TinyMap::default(), }; } }, @@ -740,7 +739,7 @@ pub(crate) struct RunResult<'a> { processed_timeout: bool, /// Logs to be injected back into the workflow engine after field attachment and other /// processing. - logs_to_inject: TinyMap<&'a str, Log>, + logs_to_inject: BTreeMap<&'a str, Log>, /// Any debug state changes that occurred as a result of processing the traversal. workflow_debug_state: Vec, } @@ -760,11 +759,11 @@ impl RunResult<'_> { pub(crate) struct TraversalExtractions { /// States of Sankey diagrams. It's a `None` when traversal is initialized and is set /// to `Some` after the first value for a Sankey and a given traversal is extracted. - pub(crate) sankey_states: TinyMap, + pub(crate) sankey_states: Option>, /// Snapped timestamps, by extraction ID. - pub(crate) timestamps: TinyMap, + pub(crate) timestamps: Option>, /// Snapped field values, by extraction ID. - pub(crate) fields: TinyMap, + pub(crate) fields: Option>, } // @@ -910,7 +909,7 @@ impl Traversal { log.log_type, log.message, log.fields, - &self.extractions.fields, + self.extractions.fields.as_ref(), ) { let Some(matched_logs_counts) = self.matched_logs_counts.get_mut(index) else { continue; @@ -1013,7 +1012,9 @@ impl Traversal { new_extractions .sankey_states - .get_mut_or_insert_default(extraction.sankey_id.clone()) + .get_or_insert_with(BTreeMap::new) + .entry(extraction.sankey_id.clone()) + .or_default() .push( extracted_value.into_owned(), extraction.limit, @@ -1026,6 +1027,7 @@ impl Traversal { log::debug!("extracted timestamp {timestamp} for extraction ID {timestamp_extraction_id}"); new_extractions .timestamps + .get_or_insert_with(BTreeMap::new) .insert(timestamp_extraction_id.clone(), timestamp); } @@ -1038,6 +1040,7 @@ impl Traversal { ); new_extractions .fields + .get_or_insert_with(BTreeMap::new) .insert(extraction.id.clone(), value.to_string()); } } @@ -1049,9 +1052,9 @@ impl Traversal { actions: &'a [Action], extractions: &mut TraversalExtractions, current_log_fields: FieldsRef<'_>, - ) -> (Vec>, TinyMap<&'a str, Log>) { + ) -> (Vec>, BTreeMap<&'a str, Log>) { let mut triggered_actions = vec![]; - let mut logs_to_inject = TinyMap::default(); + let mut logs_to_inject = BTreeMap::new(); for action in actions { match action { Action::FlushBuffers(action) => { @@ -1061,7 +1064,11 @@ impl Traversal { triggered_actions.push(TriggeredAction::EmitMetric(action)); }, Action::EmitSankey(action) => { - let Some(sankey_state) = extractions.sankey_states.remove(action.id()) else { + let Some(sankey_states) = &mut extractions.sankey_states else { + continue; + }; + + let Some(sankey_state) = sankey_states.remove(action.id()) else { debug_assert!( false, "sankey_states for Sankey with {:?} ID should be present", @@ -1141,7 +1148,7 @@ struct TraversalResult<'a> { followed_transitions_count: u32, /// Logs to be injected back into the workflow engine after field attachment and other /// processing. - log_to_inject: TinyMap<&'a str, Log>, + log_to_inject: BTreeMap<&'a str, Log>, /// Any debug state changes that occurred as a result of processing the traversal. workflow_debug_state: Vec, } diff --git a/bd-workflows/src/workflow_test.rs b/bd-workflows/src/workflow_test.rs index 7c249c01..9162064c 100644 --- a/bd-workflows/src/workflow_test.rs +++ b/bd-workflows/src/workflow_test.rs @@ -15,7 +15,6 @@ use crate::config::{ }; use crate::test::{MakeConfig, TestLog}; use crate::workflow::{Run, TriggeredAction, Workflow, WorkflowResult, WorkflowResultStats}; -use bd_log_primitives::tiny_set::TinyMap; use bd_log_primitives::{FieldsRef, LogFields, LogMessage, log_level}; use bd_proto::flatbuffers::buffer_log::bitdrift_public::fbs::logging::v_1::LogType; use bd_stats_common::workflow::{WorkflowDebugStateKey, WorkflowDebugTransitionType}; @@ -258,7 +257,7 @@ fn timeout_no_parallel_match() { increment: ValueIncrement::Fixed(1), metric_type: MetricType::Counter, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: true, @@ -287,7 +286,7 @@ fn timeout_no_parallel_match() { increment: ValueIncrement::Fixed(1), metric_type: MetricType::Counter, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 0, processed_timeout: true, @@ -316,7 +315,7 @@ fn timeout_no_parallel_match() { increment: ValueIncrement::Fixed(1), metric_type: MetricType::Counter, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: true, @@ -342,7 +341,7 @@ fn timeout_no_parallel_match() { increment: ValueIncrement::Fixed(1), metric_type: MetricType::Counter, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: true, @@ -412,7 +411,7 @@ fn timeout_not_start() { increment: ValueIncrement::Fixed(1), metric_type: MetricType::Counter, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 0, processed_timeout: true, @@ -442,7 +441,7 @@ fn timeout_not_start() { buffer_ids: BTreeSet::from(["bar_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -558,7 +557,7 @@ fn timeout_from_start() { increment: ValueIncrement::Fixed(1), metric_type: MetricType::Counter, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 0, processed_timeout: true, @@ -608,7 +607,7 @@ fn multiple_start_nodes_initial_fork() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 2, processed_timeout: false, @@ -634,7 +633,7 @@ fn multiple_start_nodes_initial_fork() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 2, processed_timeout: false, @@ -664,7 +663,7 @@ fn multiple_start_nodes_initial_fork() { buffer_ids: BTreeSet::from(["foo_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -709,7 +708,7 @@ fn multiple_start_nodes_initial_branching() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -732,7 +731,7 @@ fn multiple_start_nodes_initial_branching() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -758,7 +757,7 @@ fn multiple_start_nodes_initial_branching() { buffer_ids: BTreeSet::from(["foo_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -822,7 +821,7 @@ fn basic_exclusive_workflow() { metric_type: MetricType::Counter, }) ], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -851,7 +850,7 @@ fn basic_exclusive_workflow() { buffer_ids: BTreeSet::from(["bar_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -908,7 +907,7 @@ fn exclusive_workflow_matched_logs_count_limit() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 2, processed_timeout: false, @@ -939,7 +938,7 @@ fn exclusive_workflow_matched_logs_count_limit() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -961,7 +960,7 @@ fn exclusive_workflow_matched_logs_count_limit() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -1005,7 +1004,7 @@ fn exclusive_workflow_log_rule_count() { result, WorkflowResult { triggered_actions: vec![], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -1028,7 +1027,7 @@ fn exclusive_workflow_log_rule_count() { buffer_ids: BTreeSet::from(["foo_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -1060,7 +1059,7 @@ fn exclusive_workflow_log_rule_count() { buffer_ids: BTreeSet::from(["bar_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -1131,7 +1130,7 @@ fn branching_exclusive_workflow() { buffer_ids: BTreeSet::from(["foo_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -1175,7 +1174,7 @@ fn branching_exclusive_workflow() { buffer_ids: BTreeSet::from(["zoo_buffer_id".to_string()]), streaming: None, })], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 1, processed_timeout: false, @@ -1211,7 +1210,7 @@ fn branching_exclusive_workflow() { streaming: None, }), ], - logs_to_inject: TinyMap::default(), + logs_to_inject: BTreeMap::new(), stats: WorkflowResultStats { matched_logs_count: 2, processed_timeout: false, diff --git a/profiling/src/main.rs b/profiling/src/main.rs index 0bf8f588..026119b1 100644 --- a/profiling/src/main.rs +++ b/profiling/src/main.rs @@ -11,7 +11,6 @@ use crate::paths::PATHS; use bd_client_common::file::read_compressed_protobuf; use bd_client_stats::Stats; use bd_client_stats_store::{Collector, Scope}; -use bd_log_primitives::tiny_set::TinySet; use bd_log_primitives::{FieldsRef, LogLevel, LogMessage, LogRef, log_level}; use bd_logger::LogFields; use bd_logger::builder::default_stats_flush_triggers; @@ -43,7 +42,7 @@ use bd_workflows::engine::{WorkflowsEngine, WorkflowsEngineConfig}; use protobuf::Message; use rand::Rng; use sha2::Digest; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::fs::{self}; use std::os::unix::fs::MetadataExt; use std::path::Path; @@ -112,8 +111,8 @@ impl AnnotatedWorkflowsEngine { engine .start(WorkflowsEngineConfig::new( WorkflowsConfiguration::new(workflow_configurations.configs(), vec![]), - TinySet::default(), - TinySet::default(), + BTreeSet::default(), + BTreeSet::default(), )) .await; @@ -142,7 +141,7 @@ impl AnnotatedWorkflowsEngine { occurred_at: OffsetDateTime::now_utc(), capture_session: None, }, - &TinySet::default(), + &BTreeSet::new(), OffsetDateTime::now_utc(), ); }