From 3b5b1499320bb43893df7eba6fe3301cbecbe2b4 Mon Sep 17 00:00:00 2001 From: Fran Aguilera Date: Tue, 17 Jun 2025 14:20:36 +0200 Subject: [PATCH 1/2] Adding blocking_recv_with_timeout --- bd-completion/src/lib.rs | 46 +++++++++++++++++++++++++++++++ bd-logger/src/async_log_buffer.rs | 12 ++++---- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/bd-completion/src/lib.rs b/bd-completion/src/lib.rs index 947bcebb..f016c2c8 100644 --- a/bd-completion/src/lib.rs +++ b/bd-completion/src/lib.rs @@ -6,6 +6,7 @@ // https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt use std::fmt::Debug; +use std::time::{Duration, Instant}; // // Sender @@ -41,6 +42,12 @@ pub struct Receiver { } impl Receiver { + /// Create a [`bd_completion`] `Receiver` from a raw `tokio::sync::oneshot::Receiver` + #[must_use] + pub fn to_bd_completion_rx(rx: tokio::sync::oneshot::Receiver) -> Self { + Self { rx } + } + pub async fn recv(self) -> anyhow::Result { match self.rx.await { Ok(value) => Ok(value), @@ -51,4 +58,43 @@ impl Receiver { pub fn blocking_recv(self) -> anyhow::Result { Ok(self.rx.blocking_recv()?) } + + pub fn blocking_recv_with_timeout(mut self, timeout: Duration) -> Result { + let deadline = Instant::now() + timeout; + + loop { + if Instant::now() > deadline { + return Err(WaitError::Timeout); + } + + match self.rx.try_recv() { + Ok(value) => return Ok(value), + Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { + return Err(WaitError::ChannelClosed) + }, + Err(tokio::sync::oneshot::error::TryRecvError::Empty) => { + std::thread::sleep(Duration::from_millis(5)); + }, + } + } + } } + +#[derive(Debug)] +pub enum WaitError { + Timeout, + ChannelClosed, +} + +impl std::fmt::Display for WaitError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let message = match self { + Self::Timeout => "timeout duration reached", + Self::ChannelClosed => "the oneshot channel was closed", + }; + + write!(f, "{message}") + } +} + +impl std::error::Error for WaitError {} diff --git a/bd-logger/src/async_log_buffer.rs b/bd-logger/src/async_log_buffer.rs index 4bd6b39d..d20ff7b1 100644 --- a/bd-logger/src/async_log_buffer.rs +++ b/bd-logger/src/async_log_buffer.rs @@ -44,9 +44,12 @@ use bd_shutdown::{ComponentShutdown, ComponentShutdownTrigger, ComponentShutdown use std::collections::VecDeque; use std::mem::size_of_val; use std::sync::Arc; +use std::time::Duration; use time::OffsetDateTime; use tokio::sync::{mpsc, oneshot}; +const BLOCKING_FLUSH_TIMEOUT_SECONDS: Duration = Duration::from_secs(1); + #[derive(Debug)] pub enum AsyncLogBufferMessage { EmitLog(LogLine), @@ -257,7 +260,8 @@ impl AsyncLogBuffer { // Create a (sender, receiver) pair only if the caller wants to wait on // on the log being pushed through the whole log processing pipeline. let (tx, rx) = tokio::sync::oneshot::channel::<()>(); - (Some(tx), Some(rx)) + let bd_rx = bd_completion::Receiver::to_bd_completion_rx(rx); + (Some(tx), Some(bd_rx)) } else { (None, None) }; @@ -292,7 +296,7 @@ impl AsyncLogBuffer { // Wait for log processing to be completed only if passed `blocking` // argument is equal to `true` and we created a relevant one shot Tokio channel. if let Some(rx) = log_processing_completed_rx_option { - match rx.blocking_recv() { + match &rx.blocking_recv_with_timeout(BLOCKING_FLUSH_TIMEOUT_SECONDS) { Ok(()) => { log::debug!("enqueue_log: log processing completion received"); }, @@ -303,7 +307,6 @@ impl AsyncLogBuffer { }, } } - // Report success even if the `blocking == true` part of the // implementation above failed. Ok(()) @@ -342,7 +345,7 @@ impl AsyncLogBuffer { // Wait for the processing to be completed only if passed `blocking` argument is equal to // `true`. if let Some(completion_rx) = completion_rx { - match &completion_rx.blocking_recv() { + match &completion_rx.blocking_recv_with_timeout(BLOCKING_FLUSH_TIMEOUT_SECONDS) { Ok(()) => { log::debug!("flush state: completion received"); }, @@ -351,7 +354,6 @@ impl AsyncLogBuffer { }, } } - Ok(()) } From dd1ed5475133aac78d6739d4cda6d69bcd2c6768 Mon Sep 17 00:00:00 2001 From: Fran Aguilera Date: Tue, 17 Jun 2025 17:34:58 +0200 Subject: [PATCH 2/2] PR feedback --- Cargo.lock | 1 + bd-completion/Cargo.toml | 1 + bd-completion/src/lib.rs | 28 ++++++++++------------------ 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7ac85bb1..1553cef1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -526,6 +526,7 @@ version = "1.0.0" dependencies = [ "anyhow", "log", + "thiserror 2.0.12", "tokio", ] diff --git a/bd-completion/Cargo.toml b/bd-completion/Cargo.toml index 70a6d382..4461ecfb 100644 --- a/bd-completion/Cargo.toml +++ b/bd-completion/Cargo.toml @@ -11,4 +11,5 @@ doctest = false [dependencies] anyhow.workspace = true log.workspace = true +thiserror.workspace = true tokio.workspace = true diff --git a/bd-completion/src/lib.rs b/bd-completion/src/lib.rs index f016c2c8..cbc52f5d 100644 --- a/bd-completion/src/lib.rs +++ b/bd-completion/src/lib.rs @@ -59,18 +59,21 @@ impl Receiver { Ok(self.rx.blocking_recv()?) } - pub fn blocking_recv_with_timeout(mut self, timeout: Duration) -> Result { + pub fn blocking_recv_with_timeout( + mut self, + timeout: Duration, + ) -> Result { let deadline = Instant::now() + timeout; loop { if Instant::now() > deadline { - return Err(WaitError::Timeout); + return Err(RecvWithTimeoutError::Timeout); } match self.rx.try_recv() { Ok(value) => return Ok(value), Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { - return Err(WaitError::ChannelClosed) + return Err(RecvWithTimeoutError::ChannelClosed) }, Err(tokio::sync::oneshot::error::TryRecvError::Empty) => { std::thread::sleep(Duration::from_millis(5)); @@ -80,21 +83,10 @@ impl Receiver { } } -#[derive(Debug)] -pub enum WaitError { +#[derive(thiserror::Error, Debug)] +pub enum RecvWithTimeoutError { + #[error("timeout duration reached")] Timeout, + #[error("the oneshot channel was closed")] ChannelClosed, } - -impl std::fmt::Display for WaitError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let message = match self { - Self::Timeout => "timeout duration reached", - Self::ChannelClosed => "the oneshot channel was closed", - }; - - write!(f, "{message}") - } -} - -impl std::error::Error for WaitError {}