From d697b40e7921fbdfb762fe953f0a6149d6fc9493 Mon Sep 17 00:00:00 2001 From: Snow Pettersen Date: Tue, 25 Mar 2025 09:24:45 -0700 Subject: [PATCH 1/9] add a timeout to the blocking log write To avoid potentially blocking for a long time, add a fixed timeout for how long we'll block a log for --- bd-logger/src/async_log_buffer.rs | 35 ++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/bd-logger/src/async_log_buffer.rs b/bd-logger/src/async_log_buffer.rs index 4bd6b39d..670b6c05 100644 --- a/bd-logger/src/async_log_buffer.rs +++ b/bd-logger/src/async_log_buffer.rs @@ -45,6 +45,7 @@ use std::collections::VecDeque; use std::mem::size_of_val; use std::sync::Arc; use time::OffsetDateTime; +use tokio::sync::oneshot::error::TryRecvError; use tokio::sync::{mpsc, oneshot}; #[derive(Debug)] @@ -291,16 +292,30 @@ impl AsyncLogBuffer { // Wait for log processing to be completed only if passed `blocking` // argument is equal to `true` and we created a relevant one shot Tokio channel. - if let Some(rx) = log_processing_completed_rx_option { - match rx.blocking_recv() { - Ok(()) => { - log::debug!("enqueue_log: log processing completion received"); - }, - Err(e) => { - log::debug!( - "enqueue_log: received an error when waiting for log processing completion: {e}" - ); - }, + if let Some(mut rx) = log_processing_completed_rx_option { + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(1); + loop { + if std::time::Instant::now() > deadline { + log::debug!("enqueue_log: timeout waiting for log processing completion"); + break; + } + + match rx.try_recv() { + Ok(()) => { + log::debug!("enqueue_log: log processing completion received"); + break; + }, + Err(TryRecvError::Closed) => { + log::debug!( + "enqueue_log: received an error when waiting for log processing completion: channel \ + closed" + ); + break; + }, + Err(TryRecvError::Empty) => { + std::thread::sleep(std::time::Duration::from_millis(5)); + }, + } } } From 943dba62d6e2c3dc7500c7e6f539fc22d2b7eeee Mon Sep 17 00:00:00 2001 From: Fran Aguilera Date: Wed, 4 Jun 2025 23:30:44 +0200 Subject: [PATCH 2/9] Trying to fix `logger.flush(true)` Revert "Trying to fix `logger.flush(true)`" This reverts commit e521f5e62e007fa8451a1da0df7fa8cfdc73d976. Revert api changes --- api | 2 +- bd-logger/src/async_log_buffer.rs | 51 ++++++++++++++++++------------- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/api b/api index 45cf0419..154326bb 160000 --- a/api +++ b/api @@ -1 +1 @@ -Subproject commit 45cf041973d8713f8c122494f01b11b6dcb8cb0a +Subproject commit 154326bb2c4c76a71ac0073bef2836a90446d14a diff --git a/bd-logger/src/async_log_buffer.rs b/bd-logger/src/async_log_buffer.rs index 670b6c05..afca257b 100644 --- a/bd-logger/src/async_log_buffer.rs +++ b/bd-logger/src/async_log_buffer.rs @@ -45,8 +45,8 @@ use std::collections::VecDeque; use std::mem::size_of_val; use std::sync::Arc; use time::OffsetDateTime; -use tokio::sync::oneshot::error::TryRecvError; use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot::error::TryRecvError; #[derive(Debug)] pub enum AsyncLogBufferMessage { @@ -345,31 +345,40 @@ impl AsyncLogBuffer { tx: &Sender, blocking: bool, ) -> Result<(), TrySendError> { - let (completion_tx, completion_rx) = if blocking { - let (tx, rx) = bd_completion::Sender::new(); - (Some(tx), Some(rx)) - } else { - (None, None) - }; - - tx.try_send(AsyncLogBufferMessage::FlushState(completion_tx))?; - - // Wait for the processing to be completed only if passed `blocking` argument is equal to - // `true`. - if let Some(completion_rx) = completion_rx { - match &completion_rx.blocking_recv() { - Ok(()) => { - log::debug!("flush state: completion received"); - }, - Err(e) => { - log::debug!("flush state: received an error when waiting for completion: {e}"); - }, - } + let (completion_tx, completion_rx) = bd_completion::Sender::new(); + + tx.try_send(AsyncLogBufferMessage::FlushState(Some(completion_tx)))?; + + // Wait for the processing to be completed only if `blocking` is `true`. + if blocking { + if let Some(completion_rx) = Some(completion_rx) { + let handle = tokio::runtime::Handle::current(); + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(1); + + loop { + if std::time::Instant::now() > deadline { + log::debug!("flush state: timeout waiting for completion"); + break; + } + + match handle.block_on(completion_rx.recv()) { + Ok(()) => { + log::debug!("flush state: completion received"); + break; + }, + Err(e) => { + log::debug!("flush state: received an error when waiting for completion: {e}"); + break; + }, + } + } + } } Ok(()) } + async fn process_all_logs(&mut self, log: LogLine) -> anyhow::Result<()> { let mut logs = VecDeque::new(); logs.push_back(log); From af7043a1642e999def2b746ab617910a54979b11 Mon Sep 17 00:00:00 2001 From: Fran Aguilera Date: Wed, 4 Jun 2025 23:35:22 +0200 Subject: [PATCH 3/9] Minor polishes --- api | 2 +- bd-logger/src/async_log_buffer.rs | 8 +- bd-proto/src/protos/client/api.rs | 551 ++++++++++++++++++++---------- 3 files changed, 381 insertions(+), 180 deletions(-) diff --git a/api b/api index 154326bb..663402d7 160000 --- a/api +++ b/api @@ -1 +1 @@ -Subproject commit 154326bb2c4c76a71ac0073bef2836a90446d14a +Subproject commit 663402d7d8c11a9148c236e41878c4ceecbcb054 diff --git a/bd-logger/src/async_log_buffer.rs b/bd-logger/src/async_log_buffer.rs index afca257b..8575f89d 100644 --- a/bd-logger/src/async_log_buffer.rs +++ b/bd-logger/src/async_log_buffer.rs @@ -293,7 +293,7 @@ impl AsyncLogBuffer { // Wait for log processing to be completed only if passed `blocking` // argument is equal to `true` and we created a relevant one shot Tokio channel. if let Some(mut rx) = log_processing_completed_rx_option { - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(1); + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(4); loop { if std::time::Instant::now() > deadline { log::debug!("enqueue_log: timeout waiting for log processing completion"); @@ -349,13 +349,14 @@ impl AsyncLogBuffer { tx.try_send(AsyncLogBufferMessage::FlushState(Some(completion_tx)))?; - // Wait for the processing to be completed only if `blocking` is `true`. + // Only wait with a timeout if blocking is enabled if blocking { if let Some(completion_rx) = Some(completion_rx) { let handle = tokio::runtime::Handle::current(); - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(1); + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(4); loop { + if std::time::Instant::now() > deadline { log::debug!("flush state: timeout waiting for completion"); break; @@ -378,7 +379,6 @@ impl AsyncLogBuffer { Ok(()) } - async fn process_all_logs(&mut self, log: LogLine) -> anyhow::Result<()> { let mut logs = VecDeque::new(); logs.push_back(log); diff --git a/bd-proto/src/protos/client/api.rs b/bd-proto/src/protos/client/api.rs index 8aa30a2b..c9d5472d 100644 --- a/bd-proto/src/protos/client/api.rs +++ b/bd-proto/src/protos/client/api.rs @@ -184,6 +184,8 @@ pub struct HandshakeRequest { pub runtime_version_nonce: ::std::string::String, // @@protoc_insertion_point(field:bitdrift_public.protobuf.client.v1.HandshakeRequest.previous_disconnect_reason) pub previous_disconnect_reason: ::std::string::String, + // @@protoc_insertion_point(field:bitdrift_public.protobuf.client.v1.HandshakeRequest.sleep_mode) + pub sleep_mode: bool, // special fields // @@protoc_insertion_point(special_field:bitdrift_public.protobuf.client.v1.HandshakeRequest.special_fields) pub special_fields: ::protobuf::SpecialFields, @@ -201,7 +203,7 @@ impl HandshakeRequest { } fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData { - let mut fields = ::std::vec::Vec::with_capacity(4); + let mut fields = ::std::vec::Vec::with_capacity(5); let mut oneofs = ::std::vec::Vec::with_capacity(0); fields.push(::protobuf::reflect::rt::v2::make_map_simpler_accessor_new::<_, _>( "static_device_metadata", @@ -223,6 +225,11 @@ impl HandshakeRequest { |m: &HandshakeRequest| { &m.previous_disconnect_reason }, |m: &mut HandshakeRequest| { &mut m.previous_disconnect_reason }, )); + fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( + "sleep_mode", + |m: &HandshakeRequest| { &m.sleep_mode }, + |m: &mut HandshakeRequest| { &mut m.sleep_mode }, + )); ::protobuf::reflect::GeneratedMessageDescriptorData::new_2::( "HandshakeRequest", fields, @@ -265,6 +272,9 @@ impl ::protobuf::Message for HandshakeRequest { 50 => { self.previous_disconnect_reason = is.read_string()?; }, + 56 => { + self.sleep_mode = is.read_bool()?; + }, tag => { ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; }, @@ -293,6 +303,9 @@ impl ::protobuf::Message for HandshakeRequest { if !self.previous_disconnect_reason.is_empty() { my_size += ::protobuf::rt::string_size(6, &self.previous_disconnect_reason); } + if self.sleep_mode != false { + my_size += 1 + 1; + } my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields()); self.special_fields.cached_size().set(my_size as u32); my_size @@ -318,6 +331,9 @@ impl ::protobuf::Message for HandshakeRequest { if !self.previous_disconnect_reason.is_empty() { os.write_string(6, &self.previous_disconnect_reason)?; } + if self.sleep_mode != false { + os.write_bool(7, self.sleep_mode)?; + } os.write_unknown_fields(self.special_fields.unknown_fields())?; ::std::result::Result::Ok(()) } @@ -339,6 +355,7 @@ impl ::protobuf::Message for HandshakeRequest { self.configuration_version_nonce.clear(); self.runtime_version_nonce.clear(); self.previous_disconnect_reason.clear(); + self.sleep_mode = false; self.special_fields.clear(); } @@ -1449,6 +1466,9 @@ impl ::protobuf::reflect::ProtobufValue for LogUploadRequest { // @@protoc_insertion_point(message:bitdrift_public.protobuf.client.v1.PingRequest) #[derive(PartialEq,Clone,Default,Debug)] pub struct PingRequest { + // message fields + // @@protoc_insertion_point(field:bitdrift_public.protobuf.client.v1.PingRequest.sleep_mode) + pub sleep_mode: bool, // special fields // @@protoc_insertion_point(special_field:bitdrift_public.protobuf.client.v1.PingRequest.special_fields) pub special_fields: ::protobuf::SpecialFields, @@ -1466,8 +1486,13 @@ impl PingRequest { } fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData { - let mut fields = ::std::vec::Vec::with_capacity(0); + let mut fields = ::std::vec::Vec::with_capacity(1); let mut oneofs = ::std::vec::Vec::with_capacity(0); + fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( + "sleep_mode", + |m: &PingRequest| { &m.sleep_mode }, + |m: &mut PingRequest| { &mut m.sleep_mode }, + )); ::protobuf::reflect::GeneratedMessageDescriptorData::new_2::( "PingRequest", fields, @@ -1486,6 +1511,9 @@ impl ::protobuf::Message for PingRequest { fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::Result<()> { while let Some(tag) = is.read_raw_tag_or_eof()? { match tag { + 8 => { + self.sleep_mode = is.read_bool()?; + }, tag => { ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; }, @@ -1498,12 +1526,18 @@ impl ::protobuf::Message for PingRequest { #[allow(unused_variables)] fn compute_size(&self) -> u64 { let mut my_size = 0; + if self.sleep_mode != false { + my_size += 1 + 1; + } my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields()); self.special_fields.cached_size().set(my_size as u32); my_size } fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::Result<()> { + if self.sleep_mode != false { + os.write_bool(1, self.sleep_mode)?; + } os.write_unknown_fields(self.special_fields.unknown_fields())?; ::std::result::Result::Ok(()) } @@ -1521,11 +1555,13 @@ impl ::protobuf::Message for PingRequest { } fn clear(&mut self) { + self.sleep_mode = false; self.special_fields.clear(); } fn default_instance() -> &'static PingRequest { static instance: PingRequest = PingRequest { + sleep_mode: false, special_fields: ::protobuf::SpecialFields::new(), }; &instance @@ -4989,6 +5025,8 @@ pub mod stats_upload_request { // message fields // @@protoc_insertion_point(field:bitdrift_public.protobuf.client.v1.StatsUploadRequest.Snapshot.metric_id_overflows) pub metric_id_overflows: ::std::collections::HashMap<::std::string::String, u64>, + // @@protoc_insertion_point(field:bitdrift_public.protobuf.client.v1.StatsUploadRequest.Snapshot.internal_metrics) + pub internal_metrics: ::protobuf::MessageField, // message oneof groups pub snapshot_type: ::std::option::Option, pub occurred_at: ::std::option::Option, @@ -5107,7 +5145,7 @@ pub mod stats_upload_request { } pub(in super) fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData { - let mut fields = ::std::vec::Vec::with_capacity(3); + let mut fields = ::std::vec::Vec::with_capacity(4); let mut oneofs = ::std::vec::Vec::with_capacity(2); fields.push(::protobuf::reflect::rt::v2::make_oneof_message_has_get_mut_set_accessor::<_, super::super::metric::MetricsList>( "metrics", @@ -5128,6 +5166,11 @@ pub mod stats_upload_request { |m: &Snapshot| { &m.metric_id_overflows }, |m: &mut Snapshot| { &mut m.metric_id_overflows }, )); + fields.push(::protobuf::reflect::rt::v2::make_message_field_accessor::<_, snapshot::InternalMetrics>( + "internal_metrics", + |m: &Snapshot| { &m.internal_metrics }, + |m: &mut Snapshot| { &mut m.internal_metrics }, + )); oneofs.push(snapshot::Snapshot_type::generated_oneof_descriptor_data()); oneofs.push(snapshot::Occurred_at::generated_oneof_descriptor_data()); ::protobuf::reflect::GeneratedMessageDescriptorData::new_2::( @@ -5169,6 +5212,9 @@ pub mod stats_upload_request { is.pop_limit(old_limit); self.metric_id_overflows.insert(key, value); }, + 34 => { + ::protobuf::rt::read_singular_message_into_field(is, &mut self.internal_metrics)?; + }, tag => { ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; }, @@ -5187,6 +5233,10 @@ pub mod stats_upload_request { entry_size += ::protobuf::rt::uint64_size(2, *v); my_size += 1 + ::protobuf::rt::compute_raw_varint64_size(entry_size) + entry_size }; + if let Some(v) = self.internal_metrics.as_ref() { + let len = v.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint64_size(len) + len; + } if let ::std::option::Option::Some(ref v) = self.snapshot_type { match v { &snapshot::Snapshot_type::Metrics(ref v) => { @@ -5218,6 +5268,9 @@ pub mod stats_upload_request { os.write_string(1, &k)?; os.write_uint64(2, *v)?; }; + if let Some(v) = self.internal_metrics.as_ref() { + ::protobuf::rt::write_message_field_with_cached_size(4, v, os)?; + } if let ::std::option::Option::Some(ref v) = self.snapshot_type { match v { &snapshot::Snapshot_type::Metrics(ref v) => { @@ -5252,6 +5305,7 @@ pub mod stats_upload_request { self.snapshot_type = ::std::option::Option::None; self.occurred_at = ::std::option::Option::None; self.metric_id_overflows.clear(); + self.internal_metrics.clear(); self.special_fields.clear(); } @@ -5326,6 +5380,146 @@ pub mod stats_upload_request { ::protobuf::reflect::GeneratedOneofDescriptorData::new::("occurred_at") } } + // @@protoc_insertion_point(message:bitdrift_public.protobuf.client.v1.StatsUploadRequest.Snapshot.InternalMetrics) + #[derive(PartialEq,Clone,Default,Debug)] + pub struct InternalMetrics { + // message fields + // @@protoc_insertion_point(field:bitdrift_public.protobuf.client.v1.StatsUploadRequest.Snapshot.InternalMetrics.sleep_mode_enabled_transitions) + pub sleep_mode_enabled_transitions: u64, + // @@protoc_insertion_point(field:bitdrift_public.protobuf.client.v1.StatsUploadRequest.Snapshot.InternalMetrics.sleep_mode_disabled_transitions) + pub sleep_mode_disabled_transitions: u64, + // special fields + // @@protoc_insertion_point(special_field:bitdrift_public.protobuf.client.v1.StatsUploadRequest.Snapshot.InternalMetrics.special_fields) + pub special_fields: ::protobuf::SpecialFields, + } + + impl<'a> ::std::default::Default for &'a InternalMetrics { + fn default() -> &'a InternalMetrics { + ::default_instance() + } + } + + impl InternalMetrics { + pub fn new() -> InternalMetrics { + ::std::default::Default::default() + } + + pub(in super::super) fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData { + let mut fields = ::std::vec::Vec::with_capacity(2); + let mut oneofs = ::std::vec::Vec::with_capacity(0); + fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( + "sleep_mode_enabled_transitions", + |m: &InternalMetrics| { &m.sleep_mode_enabled_transitions }, + |m: &mut InternalMetrics| { &mut m.sleep_mode_enabled_transitions }, + )); + fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( + "sleep_mode_disabled_transitions", + |m: &InternalMetrics| { &m.sleep_mode_disabled_transitions }, + |m: &mut InternalMetrics| { &mut m.sleep_mode_disabled_transitions }, + )); + ::protobuf::reflect::GeneratedMessageDescriptorData::new_2::( + "StatsUploadRequest.Snapshot.InternalMetrics", + fields, + oneofs, + ) + } + } + + impl ::protobuf::Message for InternalMetrics { + const NAME: &'static str = "InternalMetrics"; + + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::Result<()> { + while let Some(tag) = is.read_raw_tag_or_eof()? { + match tag { + 8 => { + self.sleep_mode_enabled_transitions = is.read_uint64()?; + }, + 16 => { + self.sleep_mode_disabled_transitions = is.read_uint64()?; + }, + tag => { + ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u64 { + let mut my_size = 0; + if self.sleep_mode_enabled_transitions != 0 { + my_size += ::protobuf::rt::uint64_size(1, self.sleep_mode_enabled_transitions); + } + if self.sleep_mode_disabled_transitions != 0 { + my_size += ::protobuf::rt::uint64_size(2, self.sleep_mode_disabled_transitions); + } + my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields()); + self.special_fields.cached_size().set(my_size as u32); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::Result<()> { + if self.sleep_mode_enabled_transitions != 0 { + os.write_uint64(1, self.sleep_mode_enabled_transitions)?; + } + if self.sleep_mode_disabled_transitions != 0 { + os.write_uint64(2, self.sleep_mode_disabled_transitions)?; + } + os.write_unknown_fields(self.special_fields.unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn special_fields(&self) -> &::protobuf::SpecialFields { + &self.special_fields + } + + fn mut_special_fields(&mut self) -> &mut ::protobuf::SpecialFields { + &mut self.special_fields + } + + fn new() -> InternalMetrics { + InternalMetrics::new() + } + + fn clear(&mut self) { + self.sleep_mode_enabled_transitions = 0; + self.sleep_mode_disabled_transitions = 0; + self.special_fields.clear(); + } + + fn default_instance() -> &'static InternalMetrics { + static instance: InternalMetrics = InternalMetrics { + sleep_mode_enabled_transitions: 0, + sleep_mode_disabled_transitions: 0, + special_fields: ::protobuf::SpecialFields::new(), + }; + &instance + } + } + + impl ::protobuf::MessageFull for InternalMetrics { + fn descriptor() -> ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::Lazy::new(); + descriptor.get(|| super::super::file_descriptor().message_by_package_relative_name("StatsUploadRequest.Snapshot.InternalMetrics").unwrap()).clone() + } + } + + impl ::std::fmt::Display for InternalMetrics { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } + } + + impl ::protobuf::reflect::ProtobufValue for InternalMetrics { + type RuntimeType = ::protobuf::reflect::rt::RuntimeTypeMessage; + } + // @@protoc_insertion_point(message:bitdrift_public.protobuf.client.v1.StatsUploadRequest.Snapshot.Aggregated) #[derive(PartialEq,Clone,Default,Debug)] pub struct Aggregated { @@ -8231,183 +8425,189 @@ static file_descriptor_proto_data: &'static [u8] = b"\ blic/protobuf/logging/v1/payload.proto\x1a\x17validate/validate.proto\"m\ \n\x0eClientKillFile\x12\x20\n\x0capi_key_hash\x18\x01\x20\x01(\x0cR\nap\ iKeyHash\x129\n\nkill_until\x18\x02\x20\x01(\x0b2\x1a.google.protobuf.Ti\ - mestampR\tkillUntil\"\xe0\x03\n\x10HandshakeRequest\x12\x84\x01\n\x16sta\ + mestampR\tkillUntil\"\xff\x03\n\x10HandshakeRequest\x12\x84\x01\n\x16sta\ tic_device_metadata\x18\x01\x20\x03(\x0b2N.bitdrift_public.protobuf.clie\ nt.v1.HandshakeRequest.StaticDeviceMetadataEntryR\x14staticDeviceMetadat\ a\x12>\n\x1bconfiguration_version_nonce\x18\x03\x20\x01(\tR\x19configura\ tionVersionNonce\x122\n\x15runtime_version_nonce\x18\x04\x20\x01(\tR\x13\ runtimeVersionNonce\x12<\n\x1aprevious_disconnect_reason\x18\x06\x20\x01\ - (\tR\x18previousDisconnectReason\x1ar\n\x19StaticDeviceMetadataEntry\x12\ - \x10\n\x03key\x18\x01\x20\x01(\tR\x03key\x12?\n\x05value\x18\x02\x20\x01\ - (\x0b2).bitdrift_public.protobuf.logging.v1.DataR\x05value:\x028\x01J\ - \x04\x08\x02\x10\x03J\x04\x08\x05\x10\x06R\x13fields_for_all_logs\"\x92\ - \x03\n\x16LogUploadIntentRequest\x12\x1b\n\tlog_count\x18\x01\x20\x01(\r\ - R\x08logCount\x12\x1d\n\nbyte_count\x18\x02\x20\x01(\rR\tbyteCount\x12\ - \x1b\n\tbuffer_id\x18\x03\x20\x01(\tR\x08bufferId\x12\x1f\n\x0bintent_uu\ - id\x18\x04\x20\x01(\tR\nintentUuid\x12\x1d\n\nsession_id\x18\x06\x20\x01\ - (\tR\tsessionId\x12\x87\x01\n\x16workflow_action_upload\x18\x05\x20\x01(\ - \x0b2O.bitdrift_public.protobuf.client.v1.LogUploadIntentRequest.Workflo\ - wActionUploadH\0R\x14workflowActionUpload\x1aF\n\x14WorkflowActionUpload\ - \x12.\n\x13workflow_action_ids\x18\x01\x20\x03(\tR\x11workflowActionIdsB\ - \r\n\x0bintent_type\"\xbb\x02\n\x17LogUploadIntentResponse\x12\x1f\n\x0b\ - intent_uuid\x18\x01\x20\x01(\tR\nintentUuid\x12~\n\x12upload_immediately\ - \x18\x02\x20\x01(\x0b2M.bitdrift_public.protobuf.client.v1.LogUploadInte\ - ntResponse.UploadImmediatelyH\0R\x11uploadImmediately\x12V\n\x04drop\x18\ - \x03\x20\x01(\x0b2@.bitdrift_public.protobuf.client.v1.LogUploadIntentRe\ - sponse.DropH\0R\x04drop\x1a\x13\n\x11UploadImmediately\x1a\x06\n\x04Drop\ - B\n\n\x08decision\"\x84\x01\n\x10LogUploadRequest\x12(\n\x0bupload_uuid\ - \x18\x01\x20\x01(\tR\nuploadUuidB\x07\xfaB\x04r\x02\x10\x01\x12\x1c\n\ - \x04logs\x18\x02\x20\x03(\x0cR\x04logsB\x08\xfaB\x05\x92\x01\x02\x08\x01\ - \x12(\n\x0bbuffer_uuid\x18\x03\x20\x01(\tR\nbufferUuidB\x07\xfaB\x04r\ - \x02\x10\x01\"\r\n\x0bPingRequest\"\xfc\x01\n\x16ConfigurationUpdateAck\ - \x12;\n\x1alast_applied_version_nonce\x18\x01\x20\x01(\tR\x17lastApplied\ - VersionNonce\x12S\n\x04nack\x18\x02\x20\x01(\x0b2?.bitdrift_public.proto\ - buf.client.v1.ConfigurationUpdateAck.NackR\x04nack\x1aP\n\x04Nack\x12#\n\ - \rversion_nonce\x18\x01\x20\x01(\tR\x0cversionNonce\x12#\n\rerror_detail\ - s\x18\x02\x20\x01(\tR\x0cerrorDetails\"\xeb\x08\n\nApiRequest\x12T\n\tha\ - ndshake\x18\x01\x20\x01(\x0b24.bitdrift_public.protobuf.client.v1.Handsh\ - akeRequestH\0R\thandshake\x12h\n\x11log_upload_intent\x18\x07\x20\x01(\ - \x0b2:.bitdrift_public.protobuf.client.v1.LogUploadIntentRequestH\0R\x0f\ - logUploadIntent\x12U\n\nlog_upload\x18\x02\x20\x01(\x0b24.bitdrift_publi\ - c.protobuf.client.v1.LogUploadRequestH\0R\tlogUpload\x12[\n\x0cstats_upl\ - oad\x18\x06\x20\x01(\x0b26.bitdrift_public.protobuf.client.v1.StatsUploa\ - dRequestH\0R\x0bstatsUpload\x12E\n\x04ping\x18\x03\x20\x01(\x0b2/.bitdri\ - ft_public.protobuf.client.v1.PingRequestH\0R\x04ping\x12v\n\x18configura\ - tion_update_ack\x18\x04\x20\x01(\x0b2:.bitdrift_public.protobuf.client.v\ - 1.ConfigurationUpdateAckH\0R\x16configurationUpdateAck\x12j\n\x12runtime\ - _update_ack\x18\x05\x20\x01(\x0b2:.bitdrift_public.protobuf.client.v1.Co\ - nfigurationUpdateAckH\0R\x10runtimeUpdateAck\x12k\n\x12sankey_path_uploa\ - d\x18\n\x20\x01(\x0b2;.bitdrift_public.protobuf.client.v1.SankeyPathUplo\ - adRequestH\0R\x10sankeyPathUpload\x12^\n\rsankey_intent\x18\x0b\x20\x01(\ - \x0b27.bitdrift_public.protobuf.client.v1.SankeyIntentRequestH\0R\x0csan\ - keyIntent\x12d\n\x0fartifact_upload\x18\x0c\x20\x01(\x0b29.bitdrift_publ\ - ic.protobuf.client.v1.UploadArtifactRequestH\0R\x0eartifactUpload\x12j\n\ - \x0fartifact_intent\x18\r\x20\x01(\x0b2?.bitdrift_public.protobuf.client\ - .v1.UploadArtifactIntentRequestH\0R\x0eartifactIntentB\x13\n\x0crequest_\ - type\x12\x03\xf8B\x01J\x04\x08\x08\x10\tJ\x04\x08\t\x10\n\"\x9a\x02\n\ - \x17SankeyPathUploadRequest\x12(\n\x0bupload_uuid\x18\x04\x20\x01(\tR\nu\ - ploadUuidB\x07\xfaB\x04r\x02\x10\x01\x12\x17\n\x02id\x18\x01\x20\x01(\tR\ - \x02idB\x07\xfaB\x04r\x02\x10\x01\x12\x20\n\x07path_id\x18\x02\x20\x01(\ - \tR\x06pathIdB\x07\xfaB\x04r\x02\x10\x01\x12`\n\x05nodes\x18\x03\x20\x03\ - (\x0b2@.bitdrift_public.protobuf.client.v1.SankeyPathUploadRequest.NodeR\ - \x05nodesB\x08\xfaB\x05\x92\x01\x02\x08\x01\x1a8\n\x04Node\x120\n\x0fext\ - racted_value\x18\x01\x20\x01(\tR\x0eextractedValueB\x07\xfaB\x04r\x02\ - \x10\x01\"\x96\x01\n\x13SankeyIntentRequest\x12(\n\x0bintent_uuid\x18\ - \x01\x20\x01(\tR\nintentUuidB\x07\xfaB\x04r\x02\x10\x01\x12\x20\n\x07pat\ - h_id\x18\x02\x20\x01(\tR\x06pathIdB\x07\xfaB\x04r\x02\x10\x01\x123\n\x11\ - sankey_diagram_id\x18\x03\x20\x01(\tR\x0fsankeyDiagramIdB\x07\xfaB\x04r\ - \x02\x10\x01\"\xe9\x01\n\x1bUploadArtifactIntentRequest\x12(\n\x0bintent\ - _uuid\x18\x01\x20\x01(\tR\nintentUuidB\x07\xfaB\x04r\x02\x10\x01\x12\x20\ - \n\x07type_id\x18\x02\x20\x01(\tR\x06typeIdB\x07\xfaB\x04r\x02\x10\x01\ - \x12\x1a\n\x08metadata\x18\x03\x20\x01(\x0cR\x08metadata\x12(\n\x0bartif\ - act_id\x18\x04\x20\x01(\tR\nartifactIdB\x07\xfaB\x04r\x02\x10\x01\x128\n\ - \x04time\x18\x05\x20\x01(\x0b2\x1a.google.protobuf.TimestampR\x04timeB\ - \x08\xfaB\x05\x8a\x01\x02\x10\x01\"\xd4\x02\n\x1cUploadArtifactIntentRes\ - ponse\x12(\n\x0bintent_uuid\x18\x01\x20\x01(\tR\nintentUuidB\x07\xfaB\ - \x04r\x02\x10\x01\x12\x83\x01\n\x12upload_immediately\x18\x03\x20\x01(\ - \x0b2R.bitdrift_public.protobuf.client.v1.UploadArtifactIntentResponse.U\ - ploadImmediatelyH\0R\x11uploadImmediately\x12[\n\x04drop\x18\x04\x20\x01\ - (\x0b2E.bitdrift_public.protobuf.client.v1.UploadArtifactIntentResponse.\ - DropH\0R\x04drop\x1a\x13\n\x11UploadImmediately\x1a\x06\n\x04DropB\n\n\ - \x08decision\"\xed\x03\n\x15UploadArtifactRequest\x12(\n\x0bupload_uuid\ - \x18\x01\x20\x01(\tR\nuploadUuidB\x07\xfaB\x04r\x02\x10\x01\x12\x20\n\ - \x07type_id\x18\x02\x20\x01(\tR\x06typeIdB\x07\xfaB\x04r\x02\x10\x01\x12\ - \x1a\n\x08contents\x18\x03\x20\x01(\x0cR\x08contents\x12(\n\x0bartifact_\ - id\x18\x04\x20\x01(\tR\nartifactIdB\x07\xfaB\x04r\x02\x10\x01\x12s\n\x0e\ - state_metadata\x18\x05\x20\x03(\x0b2L.bitdrift_public.protobuf.client.v1\ - .UploadArtifactRequest.StateMetadataEntryR\rstateMetadata\x128\n\x04time\ - \x18\x06\x20\x01(\x0b2\x1a.google.protobuf.TimestampR\x04timeB\x08\xfaB\ - \x05\x8a\x01\x02\x10\x01\x12&\n\nsession_id\x18\x07\x20\x01(\tR\tsession\ - IdB\x07\xfaB\x04r\x02\x10\x01\x1ak\n\x12StateMetadataEntry\x12\x10\n\x03\ - key\x18\x01\x20\x01(\tR\x03key\x12?\n\x05value\x18\x02\x20\x01(\x0b2).bi\ - tdrift_public.protobuf.logging.v1.DataR\x05value:\x028\x01\"X\n\x16Uploa\ - dArtifactResponse\x12(\n\x0bupload_uuid\x18\x01\x20\x01(\tR\nuploadUuidB\ - \x07\xfaB\x04r\x02\x10\x01\x12\x14\n\x05error\x18\x02\x20\x01(\tR\x05err\ - or\"\x94\x02\n\x11HandshakeResponse\x12m\n\x0fstream_settings\x18\x01\ - \x20\x01(\x0b2D.bitdrift_public.protobuf.client.v1.HandshakeResponse.Str\ - eamSettingsR\x0estreamSettings\x12>\n\x1bconfiguration_update_status\x18\ - \x02\x20\x01(\rR\x19configurationUpdateStatus\x1aP\n\x0eStreamSettings\ - \x12>\n\rping_interval\x18\x01\x20\x01(\x0b2\x19.google.protobuf.Duratio\ - nR\x0cpingInterval\"\r\n\x0bRateLimited\"\xca\x01\n\x11LogUploadResponse\ - \x12(\n\x0bupload_uuid\x18\x01\x20\x01(\tR\nuploadUuidB\x07\xfaB\x04r\ - \x02\x10\x01\x12\x14\n\x05error\x18\x02\x20\x01(\tR\x05error\x12!\n\x0cl\ - ogs_dropped\x18\x03\x20\x01(\rR\x0blogsDropped\x12R\n\x0crate_limited\ - \x18\x04\x20\x01(\x0b2/.bitdrift_public.protobuf.client.v1.RateLimitedR\ - \x0brateLimited\"\xae\x06\n\x12StatsUploadRequest\x12(\n\x0bupload_uuid\ - \x18\x01\x20\x01(\tR\nuploadUuidB\x07\xfaB\x04r\x02\x10\x01\x12e\n\x08sn\ - apshot\x18\x02\x20\x03(\x0b2?.bitdrift_public.protobuf.client.v1.StatsUp\ - loadRequest.SnapshotR\x08snapshotB\x08\xfaB\x05\x92\x01\x02\x08\x01\x123\ - \n\x07sent_at\x18\x03\x20\x01(\x0b2\x1a.google.protobuf.TimestampR\x06se\ - ntAt\x1a\xd1\x04\n\x08Snapshot\x12K\n\x07metrics\x18\x01\x20\x01(\x0b2/.\ - bitdrift_public.protobuf.client.v1.MetricsListH\0R\x07metrics\x12l\n\nag\ - gregated\x18\x02\x20\x01(\x0b2J.bitdrift_public.protobuf.client.v1.Stats\ - UploadRequest.Snapshot.AggregatedH\x01R\naggregated\x12\x86\x01\n\x13met\ - ric_id_overflows\x18\x03\x20\x03(\x0b2V.bitdrift_public.protobuf.client.\ - v1.StatsUploadRequest.Snapshot.MetricIdOverflowsEntryR\x11metricIdOverfl\ - ows\x1a\x90\x01\n\nAggregated\x12G\n\x0cperiod_start\x18\x04\x20\x01(\ - \x0b2\x1a.google.protobuf.TimestampR\x0bperiodStartB\x08\xfaB\x05\x8a\ - \x01\x02\x10\x01\x129\n\nperiod_end\x18\x05\x20\x01(\x0b2\x1a.google.pro\ - tobuf.TimestampR\tperiodEnd\x1aD\n\x16MetricIdOverflowsEntry\x12\x10\n\ - \x03key\x18\x01\x20\x01(\tR\x03key\x12\x14\n\x05value\x18\x02\x20\x01(\ - \x04R\x05value:\x028\x01B\x14\n\rsnapshot_type\x12\x03\xf8B\x01B\x12\n\ - \x0boccurred_at\x12\x03\xf8B\x01\"~\n\x13StatsUploadResponse\x12(\n\x0bu\ - pload_uuid\x18\x01\x20\x01(\tR\nuploadUuidB\x07\xfaB\x04r\x02\x10\x01\ - \x12\x14\n\x05error\x18\x02\x20\x01(\tR\x05error\x12'\n\x0fmetrics_dropp\ - ed\x18\x03\x20\x01(\rR\x0emetricsDropped\"\x0e\n\x0cPongResponse\"\xba\ - \x05\n\x13ConfigurationUpdate\x12#\n\rversion_nonce\x18\x01\x20\x01(\tR\ - \x0cversionNonce\x12v\n\x12state_of_the_world\x18\x02\x20\x01(\x0b2G.bit\ - drift_public.protobuf.client.v1.ConfigurationUpdate.StateOfTheWorldH\0R\ - \x0fstateOfTheWorld\x1a\xf6\x03\n\x0fStateOfTheWorld\x12b\n\x12buffer_co\ - nfig_list\x18\x03\x20\x01(\x0b24.bitdrift_public.protobuf.config.v1.Buff\ - erConfigListR\x10bufferConfigList\x12u\n\x17workflows_configuration\x18\ - \x04\x20\x01(\x0b2<.bitdrift_public.protobuf.workflow.v1.WorkflowsConfig\ - urationR\x16workflowsConfiguration\x12k\n\x14bdtail_configuration\x18\ - \x06\x20\x01(\x0b28.bitdrift_public.protobuf.bdtail.v1.BdTailConfigurati\ - onsR\x13bdtailConfiguration\x12m\n\x15filters_configuration\x18\x08\x20\ - \x01(\x0b28.bitdrift_public.protobuf.filter.v1.FiltersConfigurationR\x14\ - filtersConfigurationJ\x04\x08\x02\x10\x03J\x04\x08\x07\x10\x08R\x08mll_l\ - istR\x16insights_configurationB\r\n\x0bupdate_type\"{\n\rRuntimeUpdate\ - \x12#\n\rversion_nonce\x18\x01\x20\x01(\tR\x0cversionNonce\x12E\n\x07run\ - time\x18\x02\x20\x01(\x0b2+.bitdrift_public.protobuf.client.v1.RuntimeR\ - \x07runtime\"S\n\rErrorShutdown\x12\x1f\n\x0bgrpc_status\x18\x01\x20\x01\ - (\x05R\ngrpcStatus\x12!\n\x0cgrpc_message\x18\x02\x20\x01(\tR\x0bgrpcMes\ - sage\"4\n\x0cFlushBuffers\x12$\n\x0ebuffer_id_list\x18\x01\x20\x03(\tR\ - \x0cbufferIdList\"Z\n\x18SankeyPathUploadResponse\x12(\n\x0bupload_uuid\ + (\tR\x18previousDisconnectReason\x12\x1d\n\nsleep_mode\x18\x07\x20\x01(\ + \x08R\tsleepMode\x1ar\n\x19StaticDeviceMetadataEntry\x12\x10\n\x03key\ + \x18\x01\x20\x01(\tR\x03key\x12?\n\x05value\x18\x02\x20\x01(\x0b2).bitdr\ + ift_public.protobuf.logging.v1.DataR\x05value:\x028\x01J\x04\x08\x02\x10\ + \x03J\x04\x08\x05\x10\x06R\x13fields_for_all_logs\"\x92\x03\n\x16LogUplo\ + adIntentRequest\x12\x1b\n\tlog_count\x18\x01\x20\x01(\rR\x08logCount\x12\ + \x1d\n\nbyte_count\x18\x02\x20\x01(\rR\tbyteCount\x12\x1b\n\tbuffer_id\ + \x18\x03\x20\x01(\tR\x08bufferId\x12\x1f\n\x0bintent_uuid\x18\x04\x20\ + \x01(\tR\nintentUuid\x12\x1d\n\nsession_id\x18\x06\x20\x01(\tR\tsessionI\ + d\x12\x87\x01\n\x16workflow_action_upload\x18\x05\x20\x01(\x0b2O.bitdrif\ + t_public.protobuf.client.v1.LogUploadIntentRequest.WorkflowActionUploadH\ + \0R\x14workflowActionUpload\x1aF\n\x14WorkflowActionUpload\x12.\n\x13wor\ + kflow_action_ids\x18\x01\x20\x03(\tR\x11workflowActionIdsB\r\n\x0bintent\ + _type\"\xbb\x02\n\x17LogUploadIntentResponse\x12\x1f\n\x0bintent_uuid\ + \x18\x01\x20\x01(\tR\nintentUuid\x12~\n\x12upload_immediately\x18\x02\ + \x20\x01(\x0b2M.bitdrift_public.protobuf.client.v1.LogUploadIntentRespon\ + se.UploadImmediatelyH\0R\x11uploadImmediately\x12V\n\x04drop\x18\x03\x20\ + \x01(\x0b2@.bitdrift_public.protobuf.client.v1.LogUploadIntentResponse.D\ + ropH\0R\x04drop\x1a\x13\n\x11UploadImmediately\x1a\x06\n\x04DropB\n\n\ + \x08decision\"\x84\x01\n\x10LogUploadRequest\x12(\n\x0bupload_uuid\x18\ + \x01\x20\x01(\tR\nuploadUuidB\x07\xfaB\x04r\x02\x10\x01\x12\x1c\n\x04log\ + s\x18\x02\x20\x03(\x0cR\x04logsB\x08\xfaB\x05\x92\x01\x02\x08\x01\x12(\n\ + \x0bbuffer_uuid\x18\x03\x20\x01(\tR\nbufferUuidB\x07\xfaB\x04r\x02\x10\ + \x01\",\n\x0bPingRequest\x12\x1d\n\nsleep_mode\x18\x01\x20\x01(\x08R\tsl\ + eepMode\"\xfc\x01\n\x16ConfigurationUpdateAck\x12;\n\x1alast_applied_ver\ + sion_nonce\x18\x01\x20\x01(\tR\x17lastAppliedVersionNonce\x12S\n\x04nack\ + \x18\x02\x20\x01(\x0b2?.bitdrift_public.protobuf.client.v1.Configuration\ + UpdateAck.NackR\x04nack\x1aP\n\x04Nack\x12#\n\rversion_nonce\x18\x01\x20\ + \x01(\tR\x0cversionNonce\x12#\n\rerror_details\x18\x02\x20\x01(\tR\x0cer\ + rorDetails\"\xeb\x08\n\nApiRequest\x12T\n\thandshake\x18\x01\x20\x01(\ + \x0b24.bitdrift_public.protobuf.client.v1.HandshakeRequestH\0R\thandshak\ + e\x12h\n\x11log_upload_intent\x18\x07\x20\x01(\x0b2:.bitdrift_public.pro\ + tobuf.client.v1.LogUploadIntentRequestH\0R\x0flogUploadIntent\x12U\n\nlo\ + g_upload\x18\x02\x20\x01(\x0b24.bitdrift_public.protobuf.client.v1.LogUp\ + loadRequestH\0R\tlogUpload\x12[\n\x0cstats_upload\x18\x06\x20\x01(\x0b26\ + .bitdrift_public.protobuf.client.v1.StatsUploadRequestH\0R\x0bstatsUploa\ + d\x12E\n\x04ping\x18\x03\x20\x01(\x0b2/.bitdrift_public.protobuf.client.\ + v1.PingRequestH\0R\x04ping\x12v\n\x18configuration_update_ack\x18\x04\ + \x20\x01(\x0b2:.bitdrift_public.protobuf.client.v1.ConfigurationUpdateAc\ + kH\0R\x16configurationUpdateAck\x12j\n\x12runtime_update_ack\x18\x05\x20\ + \x01(\x0b2:.bitdrift_public.protobuf.client.v1.ConfigurationUpdateAckH\0\ + R\x10runtimeUpdateAck\x12k\n\x12sankey_path_upload\x18\n\x20\x01(\x0b2;.\ + bitdrift_public.protobuf.client.v1.SankeyPathUploadRequestH\0R\x10sankey\ + PathUpload\x12^\n\rsankey_intent\x18\x0b\x20\x01(\x0b27.bitdrift_public.\ + protobuf.client.v1.SankeyIntentRequestH\0R\x0csankeyIntent\x12d\n\x0fart\ + ifact_upload\x18\x0c\x20\x01(\x0b29.bitdrift_public.protobuf.client.v1.U\ + ploadArtifactRequestH\0R\x0eartifactUpload\x12j\n\x0fartifact_intent\x18\ + \r\x20\x01(\x0b2?.bitdrift_public.protobuf.client.v1.UploadArtifactInten\ + tRequestH\0R\x0eartifactIntentB\x13\n\x0crequest_type\x12\x03\xf8B\x01J\ + \x04\x08\x08\x10\tJ\x04\x08\t\x10\n\"\x9a\x02\n\x17SankeyPathUploadReque\ + st\x12(\n\x0bupload_uuid\x18\x04\x20\x01(\tR\nuploadUuidB\x07\xfaB\x04r\ + \x02\x10\x01\x12\x17\n\x02id\x18\x01\x20\x01(\tR\x02idB\x07\xfaB\x04r\ + \x02\x10\x01\x12\x20\n\x07path_id\x18\x02\x20\x01(\tR\x06pathIdB\x07\xfa\ + B\x04r\x02\x10\x01\x12`\n\x05nodes\x18\x03\x20\x03(\x0b2@.bitdrift_publi\ + c.protobuf.client.v1.SankeyPathUploadRequest.NodeR\x05nodesB\x08\xfaB\ + \x05\x92\x01\x02\x08\x01\x1a8\n\x04Node\x120\n\x0fextracted_value\x18\ + \x01\x20\x01(\tR\x0eextractedValueB\x07\xfaB\x04r\x02\x10\x01\"\x96\x01\ + \n\x13SankeyIntentRequest\x12(\n\x0bintent_uuid\x18\x01\x20\x01(\tR\nint\ + entUuidB\x07\xfaB\x04r\x02\x10\x01\x12\x20\n\x07path_id\x18\x02\x20\x01(\ + \tR\x06pathIdB\x07\xfaB\x04r\x02\x10\x01\x123\n\x11sankey_diagram_id\x18\ + \x03\x20\x01(\tR\x0fsankeyDiagramIdB\x07\xfaB\x04r\x02\x10\x01\"\xe9\x01\ + \n\x1bUploadArtifactIntentRequest\x12(\n\x0bintent_uuid\x18\x01\x20\x01(\ + \tR\nintentUuidB\x07\xfaB\x04r\x02\x10\x01\x12\x20\n\x07type_id\x18\x02\ + \x20\x01(\tR\x06typeIdB\x07\xfaB\x04r\x02\x10\x01\x12\x1a\n\x08metadata\ + \x18\x03\x20\x01(\x0cR\x08metadata\x12(\n\x0bartifact_id\x18\x04\x20\x01\ + (\tR\nartifactIdB\x07\xfaB\x04r\x02\x10\x01\x128\n\x04time\x18\x05\x20\ + \x01(\x0b2\x1a.google.protobuf.TimestampR\x04timeB\x08\xfaB\x05\x8a\x01\ + \x02\x10\x01\"\xd4\x02\n\x1cUploadArtifactIntentResponse\x12(\n\x0binten\ + t_uuid\x18\x01\x20\x01(\tR\nintentUuidB\x07\xfaB\x04r\x02\x10\x01\x12\ + \x83\x01\n\x12upload_immediately\x18\x03\x20\x01(\x0b2R.bitdrift_public.\ + protobuf.client.v1.UploadArtifactIntentResponse.UploadImmediatelyH\0R\ + \x11uploadImmediately\x12[\n\x04drop\x18\x04\x20\x01(\x0b2E.bitdrift_pub\ + lic.protobuf.client.v1.UploadArtifactIntentResponse.DropH\0R\x04drop\x1a\ + \x13\n\x11UploadImmediately\x1a\x06\n\x04DropB\n\n\x08decision\"\xed\x03\ + \n\x15UploadArtifactRequest\x12(\n\x0bupload_uuid\x18\x01\x20\x01(\tR\nu\ + ploadUuidB\x07\xfaB\x04r\x02\x10\x01\x12\x20\n\x07type_id\x18\x02\x20\ + \x01(\tR\x06typeIdB\x07\xfaB\x04r\x02\x10\x01\x12\x1a\n\x08contents\x18\ + \x03\x20\x01(\x0cR\x08contents\x12(\n\x0bartifact_id\x18\x04\x20\x01(\tR\ + \nartifactIdB\x07\xfaB\x04r\x02\x10\x01\x12s\n\x0estate_metadata\x18\x05\ + \x20\x03(\x0b2L.bitdrift_public.protobuf.client.v1.UploadArtifactRequest\ + .StateMetadataEntryR\rstateMetadata\x128\n\x04time\x18\x06\x20\x01(\x0b2\ + \x1a.google.protobuf.TimestampR\x04timeB\x08\xfaB\x05\x8a\x01\x02\x10\ + \x01\x12&\n\nsession_id\x18\x07\x20\x01(\tR\tsessionIdB\x07\xfaB\x04r\ + \x02\x10\x01\x1ak\n\x12StateMetadataEntry\x12\x10\n\x03key\x18\x01\x20\ + \x01(\tR\x03key\x12?\n\x05value\x18\x02\x20\x01(\x0b2).bitdrift_public.p\ + rotobuf.logging.v1.DataR\x05value:\x028\x01\"X\n\x16UploadArtifactRespon\ + se\x12(\n\x0bupload_uuid\x18\x01\x20\x01(\tR\nuploadUuidB\x07\xfaB\x04r\ + \x02\x10\x01\x12\x14\n\x05error\x18\x02\x20\x01(\tR\x05error\"\x94\x02\n\ + \x11HandshakeResponse\x12m\n\x0fstream_settings\x18\x01\x20\x01(\x0b2D.b\ + itdrift_public.protobuf.client.v1.HandshakeResponse.StreamSettingsR\x0es\ + treamSettings\x12>\n\x1bconfiguration_update_status\x18\x02\x20\x01(\rR\ + \x19configurationUpdateStatus\x1aP\n\x0eStreamSettings\x12>\n\rping_inte\ + rval\x18\x01\x20\x01(\x0b2\x19.google.protobuf.DurationR\x0cpingInterval\ + \"\r\n\x0bRateLimited\"\xca\x01\n\x11LogUploadResponse\x12(\n\x0bupload_\ + uuid\x18\x01\x20\x01(\tR\nuploadUuidB\x07\xfaB\x04r\x02\x10\x01\x12\x14\ + \n\x05error\x18\x02\x20\x01(\tR\x05error\x12!\n\x0clogs_dropped\x18\x03\ + \x20\x01(\rR\x0blogsDropped\x12R\n\x0crate_limited\x18\x04\x20\x01(\x0b2\ + /.bitdrift_public.protobuf.client.v1.RateLimitedR\x0brateLimited\"\xca\ + \x08\n\x12StatsUploadRequest\x12(\n\x0bupload_uuid\x18\x01\x20\x01(\tR\n\ + uploadUuidB\x07\xfaB\x04r\x02\x10\x01\x12e\n\x08snapshot\x18\x02\x20\x03\ + (\x0b2?.bitdrift_public.protobuf.client.v1.StatsUploadRequest.SnapshotR\ + \x08snapshotB\x08\xfaB\x05\x92\x01\x02\x08\x01\x123\n\x07sent_at\x18\x03\ + \x20\x01(\x0b2\x1a.google.protobuf.TimestampR\x06sentAt\x1a\xed\x06\n\ + \x08Snapshot\x12K\n\x07metrics\x18\x01\x20\x01(\x0b2/.bitdrift_public.pr\ + otobuf.client.v1.MetricsListH\0R\x07metrics\x12l\n\naggregated\x18\x02\ + \x20\x01(\x0b2J.bitdrift_public.protobuf.client.v1.StatsUploadRequest.Sn\ + apshot.AggregatedH\x01R\naggregated\x12\x86\x01\n\x13metric_id_overflows\ + \x18\x03\x20\x03(\x0b2V.bitdrift_public.protobuf.client.v1.StatsUploadRe\ + quest.Snapshot.MetricIdOverflowsEntryR\x11metricIdOverflows\x12z\n\x10in\ + ternal_metrics\x18\x04\x20\x01(\x0b2O.bitdrift_public.protobuf.client.v1\ + .StatsUploadRequest.Snapshot.InternalMetricsR\x0finternalMetrics\x1a\x9d\ + \x01\n\x0fInternalMetrics\x12C\n\x1esleep_mode_enabled_transitions\x18\ + \x01\x20\x01(\x04R\x1bsleepModeEnabledTransitions\x12E\n\x1fsleep_mode_d\ + isabled_transitions\x18\x02\x20\x01(\x04R\x1csleepModeDisabledTransition\ + s\x1a\x90\x01\n\nAggregated\x12G\n\x0cperiod_start\x18\x04\x20\x01(\x0b2\ + \x1a.google.protobuf.TimestampR\x0bperiodStartB\x08\xfaB\x05\x8a\x01\x02\ + \x10\x01\x129\n\nperiod_end\x18\x05\x20\x01(\x0b2\x1a.google.protobuf.Ti\ + mestampR\tperiodEnd\x1aD\n\x16MetricIdOverflowsEntry\x12\x10\n\x03key\ + \x18\x01\x20\x01(\tR\x03key\x12\x14\n\x05value\x18\x02\x20\x01(\x04R\x05\ + value:\x028\x01B\x14\n\rsnapshot_type\x12\x03\xf8B\x01B\x12\n\x0boccurre\ + d_at\x12\x03\xf8B\x01\"~\n\x13StatsUploadResponse\x12(\n\x0bupload_uuid\ \x18\x01\x20\x01(\tR\nuploadUuidB\x07\xfaB\x04r\x02\x10\x01\x12\x14\n\ - \x05error\x18\x02\x20\x01(\tR\x05error\"\xcb\x02\n\x14SankeyIntentRespon\ - se\x12(\n\x0bintent_uuid\x18\x01\x20\x01(\tR\nintentUuidB\x07\xfaB\x04r\ - \x02\x10\x01\x12{\n\x12upload_immediately\x18\x03\x20\x01(\x0b2J.bitdrif\ - t_public.protobuf.client.v1.SankeyIntentResponse.UploadImmediatelyH\0R\ - \x11uploadImmediately\x12S\n\x04drop\x18\x04\x20\x01(\x0b2=.bitdrift_pub\ - lic.protobuf.client.v1.SankeyIntentResponse.DropH\0R\x04drop\x1a\x13\n\ - \x11UploadImmediately\x1a\x06\n\x04DropB\n\n\x08decisionJ\x04\x08\x02\ - \x10\x03R\x08decision\"\xa8\n\n\x0bApiResponse\x12U\n\thandshake\x18\x01\ - \x20\x01(\x0b25.bitdrift_public.protobuf.client.v1.HandshakeResponseH\0R\ - \thandshake\x12V\n\nlog_upload\x18\x02\x20\x01(\x0b25.bitdrift_public.pr\ - otobuf.client.v1.LogUploadResponseH\0R\tlogUpload\x12i\n\x11log_upload_i\ - ntent\x18\x08\x20\x01(\x0b2;.bitdrift_public.protobuf.client.v1.LogUploa\ - dIntentResponseH\0R\x0flogUploadIntent\x12\\\n\x0cstats_upload\x18\x07\ - \x20\x01(\x0b27.bitdrift_public.protobuf.client.v1.StatsUploadResponseH\ - \0R\x0bstatsUpload\x12F\n\x04pong\x18\x03\x20\x01(\x0b20.bitdrift_public\ - .protobuf.client.v1.PongResponseH\0R\x04pong\x12l\n\x14configuration_upd\ - ate\x18\x04\x20\x01(\x0b27.bitdrift_public.protobuf.client.v1.Configurat\ - ionUpdateH\0R\x13configurationUpdate\x12Z\n\x0eruntime_update\x18\x05\ - \x20\x01(\x0b21.bitdrift_public.protobuf.client.v1.RuntimeUpdateH\0R\rru\ - ntimeUpdate\x12Z\n\x0eerror_shutdown\x18\x06\x20\x01(\x0b21.bitdrift_pub\ - lic.protobuf.client.v1.ErrorShutdownH\0R\rerrorShutdown\x12W\n\rflush_bu\ - ffers\x18\t\x20\x01(\x0b20.bitdrift_public.protobuf.client.v1.FlushBuffe\ - rsH\0R\x0cflushBuffers\x12r\n\x15sankey_diagram_upload\x18\x0c\x20\x01(\ - \x0b2<.bitdrift_public.protobuf.client.v1.SankeyPathUploadResponseH\0R\ - \x13sankeyDiagramUpload\x12p\n\x16sankey_intent_response\x18\r\x20\x01(\ - \x0b28.bitdrift_public.protobuf.client.v1.SankeyIntentResponseH\0R\x14sa\ - nkeyIntentResponse\x12e\n\x0fartifact_upload\x18\x0e\x20\x01(\x0b2:.bitd\ - rift_public.protobuf.client.v1.UploadArtifactResponseH\0R\x0eartifactUpl\ - oad\x12k\n\x0fartifact_intent\x18\x0f\x20\x01(\x0b2@.bitdrift_public.pro\ - tobuf.client.v1.UploadArtifactIntentResponseH\0R\x0eartifactIntentB\x14\ - \n\rresponse_type\x12\x03\xf8B\x01J\x04\x08\n\x10\x0bJ\x04\x08\x0b\x10\ - \x0c2x\n\nApiService\x12j\n\x03Mux\x12..bitdrift_public.protobuf.client.\ - v1.ApiRequest\x1a/.bitdrift_public.protobuf.client.v1.ApiResponse(\x010\ - \x01b\x06proto3\ + \x05error\x18\x02\x20\x01(\tR\x05error\x12'\n\x0fmetrics_dropped\x18\x03\ + \x20\x01(\rR\x0emetricsDropped\"\x0e\n\x0cPongResponse\"\xba\x05\n\x13Co\ + nfigurationUpdate\x12#\n\rversion_nonce\x18\x01\x20\x01(\tR\x0cversionNo\ + nce\x12v\n\x12state_of_the_world\x18\x02\x20\x01(\x0b2G.bitdrift_public.\ + protobuf.client.v1.ConfigurationUpdate.StateOfTheWorldH\0R\x0fstateOfThe\ + World\x1a\xf6\x03\n\x0fStateOfTheWorld\x12b\n\x12buffer_config_list\x18\ + \x03\x20\x01(\x0b24.bitdrift_public.protobuf.config.v1.BufferConfigListR\ + \x10bufferConfigList\x12u\n\x17workflows_configuration\x18\x04\x20\x01(\ + \x0b2<.bitdrift_public.protobuf.workflow.v1.WorkflowsConfigurationR\x16w\ + orkflowsConfiguration\x12k\n\x14bdtail_configuration\x18\x06\x20\x01(\ + \x0b28.bitdrift_public.protobuf.bdtail.v1.BdTailConfigurationsR\x13bdtai\ + lConfiguration\x12m\n\x15filters_configuration\x18\x08\x20\x01(\x0b28.bi\ + tdrift_public.protobuf.filter.v1.FiltersConfigurationR\x14filtersConfigu\ + rationJ\x04\x08\x02\x10\x03J\x04\x08\x07\x10\x08R\x08mll_listR\x16insigh\ + ts_configurationB\r\n\x0bupdate_type\"{\n\rRuntimeUpdate\x12#\n\rversion\ + _nonce\x18\x01\x20\x01(\tR\x0cversionNonce\x12E\n\x07runtime\x18\x02\x20\ + \x01(\x0b2+.bitdrift_public.protobuf.client.v1.RuntimeR\x07runtime\"S\n\ + \rErrorShutdown\x12\x1f\n\x0bgrpc_status\x18\x01\x20\x01(\x05R\ngrpcStat\ + us\x12!\n\x0cgrpc_message\x18\x02\x20\x01(\tR\x0bgrpcMessage\"4\n\x0cFlu\ + shBuffers\x12$\n\x0ebuffer_id_list\x18\x01\x20\x03(\tR\x0cbufferIdList\"\ + Z\n\x18SankeyPathUploadResponse\x12(\n\x0bupload_uuid\x18\x01\x20\x01(\t\ + R\nuploadUuidB\x07\xfaB\x04r\x02\x10\x01\x12\x14\n\x05error\x18\x02\x20\ + \x01(\tR\x05error\"\xcb\x02\n\x14SankeyIntentResponse\x12(\n\x0bintent_u\ + uid\x18\x01\x20\x01(\tR\nintentUuidB\x07\xfaB\x04r\x02\x10\x01\x12{\n\ + \x12upload_immediately\x18\x03\x20\x01(\x0b2J.bitdrift_public.protobuf.c\ + lient.v1.SankeyIntentResponse.UploadImmediatelyH\0R\x11uploadImmediately\ + \x12S\n\x04drop\x18\x04\x20\x01(\x0b2=.bitdrift_public.protobuf.client.v\ + 1.SankeyIntentResponse.DropH\0R\x04drop\x1a\x13\n\x11UploadImmediately\ + \x1a\x06\n\x04DropB\n\n\x08decisionJ\x04\x08\x02\x10\x03R\x08decision\"\ + \xa8\n\n\x0bApiResponse\x12U\n\thandshake\x18\x01\x20\x01(\x0b25.bitdrif\ + t_public.protobuf.client.v1.HandshakeResponseH\0R\thandshake\x12V\n\nlog\ + _upload\x18\x02\x20\x01(\x0b25.bitdrift_public.protobuf.client.v1.LogUpl\ + oadResponseH\0R\tlogUpload\x12i\n\x11log_upload_intent\x18\x08\x20\x01(\ + \x0b2;.bitdrift_public.protobuf.client.v1.LogUploadIntentResponseH\0R\ + \x0flogUploadIntent\x12\\\n\x0cstats_upload\x18\x07\x20\x01(\x0b27.bitdr\ + ift_public.protobuf.client.v1.StatsUploadResponseH\0R\x0bstatsUpload\x12\ + F\n\x04pong\x18\x03\x20\x01(\x0b20.bitdrift_public.protobuf.client.v1.Po\ + ngResponseH\0R\x04pong\x12l\n\x14configuration_update\x18\x04\x20\x01(\ + \x0b27.bitdrift_public.protobuf.client.v1.ConfigurationUpdateH\0R\x13con\ + figurationUpdate\x12Z\n\x0eruntime_update\x18\x05\x20\x01(\x0b21.bitdrif\ + t_public.protobuf.client.v1.RuntimeUpdateH\0R\rruntimeUpdate\x12Z\n\x0ee\ + rror_shutdown\x18\x06\x20\x01(\x0b21.bitdrift_public.protobuf.client.v1.\ + ErrorShutdownH\0R\rerrorShutdown\x12W\n\rflush_buffers\x18\t\x20\x01(\ + \x0b20.bitdrift_public.protobuf.client.v1.FlushBuffersH\0R\x0cflushBuffe\ + rs\x12r\n\x15sankey_diagram_upload\x18\x0c\x20\x01(\x0b2<.bitdrift_publi\ + c.protobuf.client.v1.SankeyPathUploadResponseH\0R\x13sankeyDiagramUpload\ + \x12p\n\x16sankey_intent_response\x18\r\x20\x01(\x0b28.bitdrift_public.p\ + rotobuf.client.v1.SankeyIntentResponseH\0R\x14sankeyIntentResponse\x12e\ + \n\x0fartifact_upload\x18\x0e\x20\x01(\x0b2:.bitdrift_public.protobuf.cl\ + ient.v1.UploadArtifactResponseH\0R\x0eartifactUpload\x12k\n\x0fartifact_\ + intent\x18\x0f\x20\x01(\x0b2@.bitdrift_public.protobuf.client.v1.UploadA\ + rtifactIntentResponseH\0R\x0eartifactIntentB\x14\n\rresponse_type\x12\ + \x03\xf8B\x01J\x04\x08\n\x10\x0bJ\x04\x08\x0b\x10\x0c2x\n\nApiService\ + \x12j\n\x03Mux\x12..bitdrift_public.protobuf.client.v1.ApiRequest\x1a/.b\ + itdrift_public.protobuf.client.v1.ApiResponse(\x010\x01b\x06proto3\ "; /// `FileDescriptorProto` object which was a source for this generated file @@ -8435,7 +8635,7 @@ pub fn file_descriptor() -> &'static ::protobuf::reflect::FileDescriptor { deps.push(super::config::file_descriptor().clone()); deps.push(super::payload::file_descriptor().clone()); deps.push(super::validate::file_descriptor().clone()); - let mut messages = ::std::vec::Vec::with_capacity(40); + let mut messages = ::std::vec::Vec::with_capacity(41); messages.push(ClientKillFile::generated_message_descriptor_data()); messages.push(HandshakeRequest::generated_message_descriptor_data()); messages.push(LogUploadIntentRequest::generated_message_descriptor_data()); @@ -8472,6 +8672,7 @@ pub fn file_descriptor() -> &'static ::protobuf::reflect::FileDescriptor { messages.push(upload_artifact_intent_response::Drop::generated_message_descriptor_data()); messages.push(handshake_response::StreamSettings::generated_message_descriptor_data()); messages.push(stats_upload_request::Snapshot::generated_message_descriptor_data()); + messages.push(stats_upload_request::snapshot::InternalMetrics::generated_message_descriptor_data()); messages.push(stats_upload_request::snapshot::Aggregated::generated_message_descriptor_data()); messages.push(configuration_update::StateOfTheWorld::generated_message_descriptor_data()); messages.push(sankey_intent_response::UploadImmediately::generated_message_descriptor_data()); From 711151348c77d4d68ff6e41853a2389673f11525 Mon Sep 17 00:00:00 2001 From: Fran Aguilera Date: Wed, 4 Jun 2025 23:45:19 +0200 Subject: [PATCH 4/9] Adding constant for timeout --- bd-logger/src/async_log_buffer.rs | 68 +++++++++++++++++++------------ 1 file changed, 41 insertions(+), 27 deletions(-) diff --git a/bd-logger/src/async_log_buffer.rs b/bd-logger/src/async_log_buffer.rs index 8575f89d..542312a2 100644 --- a/bd-logger/src/async_log_buffer.rs +++ b/bd-logger/src/async_log_buffer.rs @@ -44,9 +44,13 @@ use bd_shutdown::{ComponentShutdown, ComponentShutdownTrigger, ComponentShutdown use std::collections::VecDeque; use std::mem::size_of_val; use std::sync::Arc; +use std::time::Duration; use time::OffsetDateTime; -use tokio::sync::{mpsc, oneshot}; use tokio::sync::oneshot::error::TryRecvError; +use tokio::sync::{mpsc, oneshot}; + +// The blocking flush timeout threshold given that the Fatal ANR threshold on Android 5 seconds +const BLOCKING_FLUSH_TIMEOUT_SECONDS: Duration = Duration::from_secs(4); #[derive(Debug)] pub enum AsyncLogBufferMessage { @@ -293,7 +297,7 @@ impl AsyncLogBuffer { // Wait for log processing to be completed only if passed `blocking` // argument is equal to `true` and we created a relevant one shot Tokio channel. if let Some(mut rx) = log_processing_completed_rx_option { - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(4); + let deadline = std::time::Instant::now() + BLOCKING_FLUSH_TIMEOUT_SECONDS; loop { if std::time::Instant::now() > deadline { log::debug!("enqueue_log: timeout waiting for log processing completion"); @@ -345,40 +349,50 @@ impl AsyncLogBuffer { tx: &Sender, blocking: bool, ) -> Result<(), TrySendError> { - let (completion_tx, completion_rx) = bd_completion::Sender::new(); + // Create the completion channel only if blocking is enabled. + let (completion_tx, completion_rx) = if blocking { + let (tx, rx) = bd_completion::Sender::new(); + (Some(tx), Some(rx)) + } else { + (None, None) + }; - tx.try_send(AsyncLogBufferMessage::FlushState(Some(completion_tx)))?; + // Send the flush message. If blocking is off, `completion_tx` is None. + tx.try_send(AsyncLogBufferMessage::FlushState(completion_tx))?; - // Only wait with a timeout if blocking is enabled + // Wait for the processing to be completed only if passed `blocking` argument is equal to + // `true`. if blocking { - if let Some(completion_rx) = Some(completion_rx) { - let handle = tokio::runtime::Handle::current(); - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(4); - - loop { - - if std::time::Instant::now() > deadline { - log::debug!("flush state: timeout waiting for completion"); - break; - } - - match handle.block_on(completion_rx.recv()) { - Ok(()) => { - log::debug!("flush state: completion received"); - break; - }, - Err(e) => { - log::debug!("flush state: received an error when waiting for completion: {e}"); - break; - }, - } - } + if let Some(rx) = completion_rx { + let handle = tokio::runtime::Handle::current(); + let deadline = std::time::Instant::now() + BLOCKING_FLUSH_TIMEOUT_SECONDS; + + loop { + // Check if the timeout deadline has been reached. + if std::time::Instant::now() > deadline { + log::debug!("flush state: timeout waiting for completion"); + break; + } + + // Block on the async recv() operation. + match handle.block_on(rx.recv()) { + Ok(()) => { + log::debug!("flush state: completion received"); + break; + }, + Err(e) => { + log::debug!("flush state: received an error when waiting for completion: {e}"); + break; + }, + } } + } } Ok(()) } + async fn process_all_logs(&mut self, log: LogLine) -> anyhow::Result<()> { let mut logs = VecDeque::new(); logs.push_back(log); From 48a6df2335f65c0a89341ec6cd3ccd4f13914abc Mon Sep 17 00:00:00 2001 From: Fran Aguilera Date: Wed, 4 Jun 2025 23:53:02 +0200 Subject: [PATCH 5/9] Remove comments --- bd-logger/src/async_log_buffer.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/bd-logger/src/async_log_buffer.rs b/bd-logger/src/async_log_buffer.rs index 542312a2..323c1bcf 100644 --- a/bd-logger/src/async_log_buffer.rs +++ b/bd-logger/src/async_log_buffer.rs @@ -349,7 +349,6 @@ impl AsyncLogBuffer { tx: &Sender, blocking: bool, ) -> Result<(), TrySendError> { - // Create the completion channel only if blocking is enabled. let (completion_tx, completion_rx) = if blocking { let (tx, rx) = bd_completion::Sender::new(); (Some(tx), Some(rx)) @@ -357,7 +356,6 @@ impl AsyncLogBuffer { (None, None) }; - // Send the flush message. If blocking is off, `completion_tx` is None. tx.try_send(AsyncLogBufferMessage::FlushState(completion_tx))?; // Wait for the processing to be completed only if passed `blocking` argument is equal to @@ -392,7 +390,6 @@ impl AsyncLogBuffer { Ok(()) } - async fn process_all_logs(&mut self, log: LogLine) -> anyhow::Result<()> { let mut logs = VecDeque::new(); logs.push_back(log); From a2ff97d3204a07e8df4c0af3a853a54d8662a28b Mon Sep 17 00:00:00 2001 From: Fran Aguilera Date: Thu, 5 Jun 2025 00:07:40 +0200 Subject: [PATCH 6/9] Fix loop --- bd-logger/src/async_log_buffer.rs | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/bd-logger/src/async_log_buffer.rs b/bd-logger/src/async_log_buffer.rs index 323c1bcf..0fa4f667 100644 --- a/bd-logger/src/async_log_buffer.rs +++ b/bd-logger/src/async_log_buffer.rs @@ -363,30 +363,20 @@ impl AsyncLogBuffer { if blocking { if let Some(rx) = completion_rx { let handle = tokio::runtime::Handle::current(); - let deadline = std::time::Instant::now() + BLOCKING_FLUSH_TIMEOUT_SECONDS; - - loop { - // Check if the timeout deadline has been reached. - if std::time::Instant::now() > deadline { - log::debug!("flush state: timeout waiting for completion"); - break; - } - - // Block on the async recv() operation. - match handle.block_on(rx.recv()) { - Ok(()) => { - log::debug!("flush state: completion received"); - break; - }, - Err(e) => { - log::debug!("flush state: received an error when waiting for completion: {e}"); - break; - }, - } + let result = handle.block_on(async { + tokio::time::timeout(BLOCKING_FLUSH_TIMEOUT_SECONDS, rx.recv()).await + }); + match result { + Ok(Ok(())) => log::debug!("flush state: completion received"), + Ok(Err(e)) => { + log::debug!("flush state: received an error when waiting for completion: {e}") + }, + Err(_) => log::debug!("flush state: timeout waiting for completion"), } } } + Ok(()) } From eb456635461175b186580de846197c0baeef5ab1 Mon Sep 17 00:00:00 2001 From: Fran Aguilera Date: Thu, 5 Jun 2025 01:36:39 +0200 Subject: [PATCH 7/9] Adding blocking_wait_with_timeout (pending to update flush_state) --- bd-logger/src/async_log_buffer.rs | 34 ++++++++----------------------- bd-logger/src/lib.rs | 1 + bd-logger/src/timeout.rs | 32 +++++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 25 deletions(-) create mode 100644 bd-logger/src/timeout.rs diff --git a/bd-logger/src/async_log_buffer.rs b/bd-logger/src/async_log_buffer.rs index 0fa4f667..8fca933e 100644 --- a/bd-logger/src/async_log_buffer.rs +++ b/bd-logger/src/async_log_buffer.rs @@ -16,6 +16,7 @@ use crate::logging_state::{ConfigUpdate, LoggingState, UninitializedLoggingConte use crate::metadata::MetadataCollector; use crate::network::{NetworkQualityInterceptor, SystemTimeProvider}; use crate::pre_config_buffer::PreConfigBuffer; +use crate::timeout::{blocking_wait_with_timeout, WaitError}; use crate::{internal_report, network}; use anyhow::anyhow; use bd_bounded_buffer::{channel, MemorySized, Receiver, Sender, TrySendError}; @@ -46,7 +47,6 @@ use std::mem::size_of_val; use std::sync::Arc; use std::time::Duration; use time::OffsetDateTime; -use tokio::sync::oneshot::error::TryRecvError; use tokio::sync::{mpsc, oneshot}; // The blocking flush timeout threshold given that the Fatal ANR threshold on Android 5 seconds @@ -297,29 +297,14 @@ impl AsyncLogBuffer { // Wait for log processing to be completed only if passed `blocking` // argument is equal to `true` and we created a relevant one shot Tokio channel. if let Some(mut rx) = log_processing_completed_rx_option { - let deadline = std::time::Instant::now() + BLOCKING_FLUSH_TIMEOUT_SECONDS; - loop { - if std::time::Instant::now() > deadline { - log::debug!("enqueue_log: timeout waiting for log processing completion"); - break; - } - - match rx.try_recv() { - Ok(()) => { - log::debug!("enqueue_log: log processing completion received"); - break; - }, - Err(TryRecvError::Closed) => { - log::debug!( - "enqueue_log: received an error when waiting for log processing completion: channel \ - closed" - ); - break; - }, - Err(TryRecvError::Empty) => { - std::thread::sleep(std::time::Duration::from_millis(5)); - }, - } + match blocking_wait_with_timeout(&mut rx, BLOCKING_FLUSH_TIMEOUT_SECONDS) { + Ok(()) => log::debug!("enqueue_log: log processing completion received"), + Err(WaitError::Timeout) => { + log::debug!("enqueue_log: timeout waiting for log processing completion") + }, + Err(WaitError::ChannelClosed) => { + log::debug!("enqueue_log: channel closed before completion received") + }, } } @@ -376,7 +361,6 @@ impl AsyncLogBuffer { } } - Ok(()) } diff --git a/bd-logger/src/lib.rs b/bd-logger/src/lib.rs index ab4a53b4..ce17c66e 100644 --- a/bd-logger/src/lib.rs +++ b/bd-logger/src/lib.rs @@ -20,6 +20,7 @@ mod metadata; mod network; mod pre_config_buffer; mod service; +mod timeout; #[cfg(test)] mod test; diff --git a/bd-logger/src/timeout.rs b/bd-logger/src/timeout.rs new file mode 100644 index 00000000..80762632 --- /dev/null +++ b/bd-logger/src/timeout.rs @@ -0,0 +1,32 @@ +// shared-core - bitdrift's common client/server libraries +// Copyright Bitdrift, Inc. All rights reserved. +// +// Use of this source code is governed by a source available license that can be found in the +// LICENSE file or at: +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt + +use std::time::{Duration, Instant}; +use tokio::sync::oneshot::error::TryRecvError; + +#[derive(Debug)] +pub enum WaitError { + Timeout, + ChannelClosed, +} + +pub fn blocking_wait_with_timeout( + rx: &mut tokio::sync::oneshot::Receiver<()>, + timeout: Duration, +) -> Result<(), WaitError> { + let deadline = Instant::now() + timeout; + loop { + if Instant::now() > deadline { + return Err(WaitError::Timeout); + } + match rx.try_recv() { + Ok(()) => return Ok(()), + Err(TryRecvError::Closed) => return Err(WaitError::ChannelClosed), + Err(TryRecvError::Empty) => std::thread::sleep(Duration::from_millis(5)), + } + } +} From b9179fab71c846248d37749b1bb37179b6592b4a Mon Sep 17 00:00:00 2001 From: Fran Aguilera Date: Thu, 5 Jun 2025 01:42:47 +0200 Subject: [PATCH 8/9] format --- bd-logger/src/async_log_buffer.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bd-logger/src/async_log_buffer.rs b/bd-logger/src/async_log_buffer.rs index 8fca933e..15b1d375 100644 --- a/bd-logger/src/async_log_buffer.rs +++ b/bd-logger/src/async_log_buffer.rs @@ -300,10 +300,10 @@ impl AsyncLogBuffer { match blocking_wait_with_timeout(&mut rx, BLOCKING_FLUSH_TIMEOUT_SECONDS) { Ok(()) => log::debug!("enqueue_log: log processing completion received"), Err(WaitError::Timeout) => { - log::debug!("enqueue_log: timeout waiting for log processing completion") + log::debug!("enqueue_log: timeout waiting for log processing completion"); }, Err(WaitError::ChannelClosed) => { - log::debug!("enqueue_log: channel closed before completion received") + log::debug!("enqueue_log: channel closed before completion received"); }, } } @@ -354,7 +354,7 @@ impl AsyncLogBuffer { match result { Ok(Ok(())) => log::debug!("flush state: completion received"), Ok(Err(e)) => { - log::debug!("flush state: received an error when waiting for completion: {e}") + log::debug!("flush state: received an error when waiting for completion: {e}"); }, Err(_) => log::debug!("flush state: timeout waiting for completion"), } From 52911bdc8cc4f399dbe24e89e4b8bd226010d1bb Mon Sep 17 00:00:00 2001 From: Fran Aguilera Date: Tue, 17 Jun 2025 11:02:38 +0200 Subject: [PATCH 9/9] Update theshold --- bd-logger/src/async_log_buffer.rs | 3 +-- bd-logger/src/timeout.rs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/bd-logger/src/async_log_buffer.rs b/bd-logger/src/async_log_buffer.rs index 15b1d375..ff30b6c4 100644 --- a/bd-logger/src/async_log_buffer.rs +++ b/bd-logger/src/async_log_buffer.rs @@ -49,8 +49,7 @@ use std::time::Duration; use time::OffsetDateTime; use tokio::sync::{mpsc, oneshot}; -// The blocking flush timeout threshold given that the Fatal ANR threshold on Android 5 seconds -const BLOCKING_FLUSH_TIMEOUT_SECONDS: Duration = Duration::from_secs(4); +const BLOCKING_FLUSH_TIMEOUT_SECONDS: Duration = Duration::from_secs(1); #[derive(Debug)] pub enum AsyncLogBufferMessage { diff --git a/bd-logger/src/timeout.rs b/bd-logger/src/timeout.rs index 80762632..b0191bf0 100644 --- a/bd-logger/src/timeout.rs +++ b/bd-logger/src/timeout.rs @@ -15,7 +15,7 @@ pub enum WaitError { } pub fn blocking_wait_with_timeout( - rx: &mut tokio::sync::oneshot::Receiver<()>, + receiver: &mut tokio::sync::oneshot::Receiver<()>, timeout: Duration, ) -> Result<(), WaitError> { let deadline = Instant::now() + timeout; @@ -23,7 +23,7 @@ pub fn blocking_wait_with_timeout( if Instant::now() > deadline { return Err(WaitError::Timeout); } - match rx.try_recv() { + match receiver.try_recv() { Ok(()) => return Ok(()), Err(TryRecvError::Closed) => return Err(WaitError::ChannelClosed), Err(TryRecvError::Empty) => std::thread::sleep(Duration::from_millis(5)),