Skip to content

Commit

Permalink
Destroy the thread pool before the final flush if tearing down
Browse files Browse the repository at this point in the history
  • Loading branch information
SpriteOvO committed Mar 26, 2024
1 parent 586591c commit 049c638
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
9 changes: 6 additions & 3 deletions spdlog/src/sink/async_sink/async_pool_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,12 @@ impl Sink for AsyncPoolSink {
if crate::IS_TEARING_DOWN.load(Ordering::SeqCst) {
// https://github.com/SpriteOvO/spdlog-rs/issues/64
//
// `crossbeam` uses thread-local internally, which is not supported in `atexit`
// callback. Let's directly flush the sinks on the current thread if the program
// is tearing down.
// If the program is tearing down, this will be the final flush. `crossbeam`
// uses thread-local internally, which is not supported in `atexit` callback.
// This can be bypassed by flushing sinks directly on the current thread, but
// before we do that we have to destroy the thread pool to ensure that any
// pending log tasks are completed.
self.thread_pool.destroy();
self.backend.flush()
} else {
self.assign_task(Task::Flush {
Expand Down
46 changes: 29 additions & 17 deletions spdlog/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use crate::{
/// ```
///
/// [`AsyncPoolSink`]: crate::sink::AsyncPoolSink
pub struct ThreadPool {
pub struct ThreadPool(ArcSwapOption<ThreadPoolInner>);

struct ThreadPoolInner {
threads: Vec<Option<JoinHandle<()>>>,
sender: Option<Sender<Task>>,
}
Expand Down Expand Up @@ -68,7 +70,8 @@ impl ThreadPool {
}

pub(super) fn assign_task(&self, task: Task, overflow_policy: OverflowPolicy) -> Result<()> {
let sender = self.sender.as_ref().unwrap();
let inner = self.0.load();
let sender = inner.as_ref().unwrap().sender.as_ref().unwrap();

match overflow_policy {
OverflowPolicy::Block => sender.send(task).map_err(Error::from_crossbeam_send),
Expand All @@ -77,21 +80,28 @@ impl ThreadPool {
.map_err(Error::from_crossbeam_try_send),
}
}

pub(super) fn destroy(&self) {
if let Some(mut inner) = self.0.swap(None) {
// Or use `Arc::into_inner`, but it requires us to bump MSRV.
let inner = Arc::get_mut(&mut inner).unwrap();

// drop our sender, threads will break the loop after receiving and processing
// the remaining tasks
inner.sender.take();

for thread in &mut inner.threads {
if let Some(thread) = thread.take() {
thread.join().expect("failed to join a thread from pool");
}
}
}
}
}

impl Drop for ThreadPool {
fn drop(&mut self) {
// drop our sender, threads will break the loop after receiving and processing
// the remaining tasks
self.sender.take();

for thread in &mut self.threads {
thread
.take()
.unwrap()
.join()
.expect("failed to join a thread from pool");
}
self.destroy();
}
}

Expand Down Expand Up @@ -179,10 +189,12 @@ impl ThreadPoolBuilder {
}))
});

Ok(ThreadPool {
threads,
sender: Some(sender),
})
Ok(ThreadPool(ArcSwapOption::new(Some(Arc::new(
ThreadPoolInner {
threads,
sender: Some(sender),
},
)))))
}
}

Expand Down
4 changes: 4 additions & 0 deletions spdlog/tests/global_async_pool_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ use spdlog::{
ErrorHandler,
};

static IS_LOGGED: AtomicBool = AtomicBool::new(false);
static IS_FLUSHED: AtomicBool = AtomicBool::new(false);

struct SetFlagSink;

impl Sink for SetFlagSink {
fn log(&self, _record: &spdlog::Record) -> error::Result<()> {
IS_LOGGED.store(true, Ordering::SeqCst);
Ok(())
}

fn flush(&self) -> error::Result<()> {
// Assert that the record has been logged before flushing
assert!(IS_LOGGED.load(Ordering::SeqCst));
IS_FLUSHED.store(true, Ordering::SeqCst);
Ok(())
}
Expand Down

0 comments on commit 049c638

Please sign in to comment.