diff --git a/Cargo.lock b/Cargo.lock index 7ac85bb1..1553cef1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -526,6 +526,7 @@ version = "1.0.0" dependencies = [ "anyhow", "log", + "thiserror 2.0.12", "tokio", ] diff --git a/bd-completion/Cargo.toml b/bd-completion/Cargo.toml index 70a6d382..4461ecfb 100644 --- a/bd-completion/Cargo.toml +++ b/bd-completion/Cargo.toml @@ -11,4 +11,5 @@ doctest = false [dependencies] anyhow.workspace = true log.workspace = true +thiserror.workspace = true tokio.workspace = true diff --git a/bd-completion/src/lib.rs b/bd-completion/src/lib.rs index 947bcebb..cbc52f5d 100644 --- a/bd-completion/src/lib.rs +++ b/bd-completion/src/lib.rs @@ -6,6 +6,7 @@ // https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt use std::fmt::Debug; +use std::time::{Duration, Instant}; // // Sender @@ -41,6 +42,12 @@ pub struct Receiver { } impl Receiver { + /// Create a [`bd_completion`] `Receiver` from a raw `tokio::sync::oneshot::Receiver` + #[must_use] + pub fn to_bd_completion_rx(rx: tokio::sync::oneshot::Receiver) -> Self { + Self { rx } + } + pub async fn recv(self) -> anyhow::Result { match self.rx.await { Ok(value) => Ok(value), @@ -51,4 +58,35 @@ impl Receiver { pub fn blocking_recv(self) -> anyhow::Result { Ok(self.rx.blocking_recv()?) } + + pub fn blocking_recv_with_timeout( + mut self, + timeout: Duration, + ) -> Result { + let deadline = Instant::now() + timeout; + + loop { + if Instant::now() > deadline { + return Err(RecvWithTimeoutError::Timeout); + } + + match self.rx.try_recv() { + Ok(value) => return Ok(value), + Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { + return Err(RecvWithTimeoutError::ChannelClosed) + }, + Err(tokio::sync::oneshot::error::TryRecvError::Empty) => { + std::thread::sleep(Duration::from_millis(5)); + }, + } + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum RecvWithTimeoutError { + #[error("timeout duration reached")] + Timeout, + #[error("the oneshot channel was closed")] + ChannelClosed, } diff --git a/bd-logger/src/async_log_buffer.rs b/bd-logger/src/async_log_buffer.rs index 4bd6b39d..d20ff7b1 100644 --- a/bd-logger/src/async_log_buffer.rs +++ b/bd-logger/src/async_log_buffer.rs @@ -44,9 +44,12 @@ use bd_shutdown::{ComponentShutdown, ComponentShutdownTrigger, ComponentShutdown use std::collections::VecDeque; use std::mem::size_of_val; use std::sync::Arc; +use std::time::Duration; use time::OffsetDateTime; use tokio::sync::{mpsc, oneshot}; +const BLOCKING_FLUSH_TIMEOUT_SECONDS: Duration = Duration::from_secs(1); + #[derive(Debug)] pub enum AsyncLogBufferMessage { EmitLog(LogLine), @@ -257,7 +260,8 @@ impl AsyncLogBuffer { // Create a (sender, receiver) pair only if the caller wants to wait on // on the log being pushed through the whole log processing pipeline. let (tx, rx) = tokio::sync::oneshot::channel::<()>(); - (Some(tx), Some(rx)) + let bd_rx = bd_completion::Receiver::to_bd_completion_rx(rx); + (Some(tx), Some(bd_rx)) } else { (None, None) }; @@ -292,7 +296,7 @@ impl AsyncLogBuffer { // Wait for log processing to be completed only if passed `blocking` // argument is equal to `true` and we created a relevant one shot Tokio channel. if let Some(rx) = log_processing_completed_rx_option { - match rx.blocking_recv() { + match &rx.blocking_recv_with_timeout(BLOCKING_FLUSH_TIMEOUT_SECONDS) { Ok(()) => { log::debug!("enqueue_log: log processing completion received"); }, @@ -303,7 +307,6 @@ impl AsyncLogBuffer { }, } } - // Report success even if the `blocking == true` part of the // implementation above failed. Ok(()) @@ -342,7 +345,7 @@ impl AsyncLogBuffer { // Wait for the processing to be completed only if passed `blocking` argument is equal to // `true`. if let Some(completion_rx) = completion_rx { - match &completion_rx.blocking_recv() { + match &completion_rx.blocking_recv_with_timeout(BLOCKING_FLUSH_TIMEOUT_SECONDS) { Ok(()) => { log::debug!("flush state: completion received"); }, @@ -351,7 +354,6 @@ impl AsyncLogBuffer { }, } } - Ok(()) }