Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bd-completion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ doctest = false
[dependencies]
anyhow.workspace = true
log.workspace = true
thiserror.workspace = true
tokio.workspace = true
38 changes: 38 additions & 0 deletions bd-completion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt

use std::fmt::Debug;
use std::time::{Duration, Instant};

//
// Sender
Expand Down Expand Up @@ -41,6 +42,12 @@ pub struct Receiver<T: Debug> {
}

impl<T: Debug> Receiver<T> {
/// Create a [`bd_completion`] `Receiver<T>` from a raw `tokio::sync::oneshot::Receiver<T>`
#[must_use]
pub fn to_bd_completion_rx(rx: tokio::sync::oneshot::Receiver<T>) -> Self {
Self { rx }
}

pub async fn recv(self) -> anyhow::Result<T> {
match self.rx.await {
Ok(value) => Ok(value),
Expand All @@ -51,4 +58,35 @@ impl<T: Debug> Receiver<T> {
pub fn blocking_recv(self) -> anyhow::Result<T> {
Ok(self.rx.blocking_recv()?)
}

pub fn blocking_recv_with_timeout(
mut self,
timeout: Duration,
) -> Result<T, RecvWithTimeoutError> {
let deadline = Instant::now() + timeout;

loop {
if Instant::now() > deadline {
return Err(RecvWithTimeoutError::Timeout);
}

match self.rx.try_recv() {
Ok(value) => return Ok(value),
Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
return Err(RecvWithTimeoutError::ChannelClosed)
},
Err(tokio::sync::oneshot::error::TryRecvError::Empty) => {
std::thread::sleep(Duration::from_millis(5));
},
}
}
}
}

#[derive(thiserror::Error, Debug)]
pub enum RecvWithTimeoutError {
#[error("timeout duration reached")]
Timeout,
#[error("the oneshot channel was closed")]
ChannelClosed,
}
12 changes: 7 additions & 5 deletions bd-logger/src/async_log_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ use bd_shutdown::{ComponentShutdown, ComponentShutdownTrigger, ComponentShutdown
use std::collections::VecDeque;
use std::mem::size_of_val;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::{mpsc, oneshot};

const BLOCKING_FLUSH_TIMEOUT_SECONDS: Duration = Duration::from_secs(1);

#[derive(Debug)]
pub enum AsyncLogBufferMessage {
EmitLog(LogLine),
Expand Down Expand Up @@ -257,7 +260,8 @@ impl<R: LogReplay + Send + 'static> AsyncLogBuffer<R> {
// Create a (sender, receiver) pair only if the caller wants to wait on
// on the log being pushed through the whole log processing pipeline.
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
(Some(tx), Some(rx))
let bd_rx = bd_completion::Receiver::to_bd_completion_rx(rx);
Copy link
Contributor Author

@FranAguilera FranAguilera Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit nasty but couldn't find a better way to handle proper receiver type without further refactors given LogLine is pub log_processing_completed_tx: Option<oneshot::Sender<()>>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just change the type to use bd_completion in LogLine as well, seems like an appropriate usage of this type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, will give it a try

Copy link
Contributor Author

@FranAguilera FranAguilera Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I gave this a try and requires major changes, functions like write_log, replay_log, process_log, etc will required to be updated as well. It seems a bit risky to do in this one. I can create a follow up task to consolidate all this tokio::sync::oneshot with bd_completion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added ticket here BIT-5624

(Some(tx), Some(bd_rx))
} else {
(None, None)
};
Expand Down Expand Up @@ -292,7 +296,7 @@ impl<R: LogReplay + Send + 'static> AsyncLogBuffer<R> {
// Wait for log processing to be completed only if passed `blocking`
// argument is equal to `true` and we created a relevant one shot Tokio channel.
if let Some(rx) = log_processing_completed_rx_option {
match rx.blocking_recv() {
match &rx.blocking_recv_with_timeout(BLOCKING_FLUSH_TIMEOUT_SECONDS) {
Ok(()) => {
log::debug!("enqueue_log: log processing completion received");
},
Expand All @@ -303,7 +307,6 @@ impl<R: LogReplay + Send + 'static> AsyncLogBuffer<R> {
},
}
}

// Report success even if the `blocking == true` part of the
// implementation above failed.
Ok(())
Expand Down Expand Up @@ -342,7 +345,7 @@ impl<R: LogReplay + Send + 'static> AsyncLogBuffer<R> {
// Wait for the processing to be completed only if passed `blocking` argument is equal to
// `true`.
if let Some(completion_rx) = completion_rx {
match &completion_rx.blocking_recv() {
match &completion_rx.blocking_recv_with_timeout(BLOCKING_FLUSH_TIMEOUT_SECONDS) {
Ok(()) => {
log::debug!("flush state: completion received");
},
Expand All @@ -351,7 +354,6 @@ impl<R: LogReplay + Send + 'static> AsyncLogBuffer<R> {
},
}
}

Ok(())
}

Expand Down
Loading