Skip to content

Commit

Permalink
fix(threading): run_with_timeout mem leak
Browse files Browse the repository at this point in the history
  • Loading branch information
Jon-Becker committed Jun 9, 2024
1 parent 6571d36 commit 044ef49
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions crates/common/src/utils/threading.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use crossbeam_channel::unbounded;
use eyre::Result;
use std::{sync::Arc, thread};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};

/// A simple thread pool implementation that takes a vector of items, splits them into chunks, and
/// processes each chunk in a separate thread. The results are collected and returned.
Expand Down Expand Up @@ -28,7 +35,7 @@ pub fn task_pool<
) -> Vec<R> {
// if items is empty, return empty results
if items.is_empty() {
return Vec::new()
return Vec::new();
}

let (tx, rx) = unbounded();
Expand Down Expand Up @@ -73,20 +80,26 @@ pub fn task_pool<

/// Takes a function and some arguments, and runs the function in a separate thread. If the function
/// doesnt finish within the given timeout, the thread is killed, and the function returns None.
pub fn run_with_timeout<T, F>(f: F, timeout: std::time::Duration) -> Result<T>
pub fn run_with_timeout<T, F>(f: F, timeout: Duration) -> Result<T>
where
T: Send + 'static,
F: FnOnce() -> T + Send + 'static, {
F: FnOnce() -> T + Send + 'static,
{
let (tx, rx) = unbounded();
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&flag);

let handle = thread::spawn(move || {
let result = f();
let _ = tx.send(result);
if !flag_clone.load(Ordering::Relaxed) {
let result = f();
let _ = tx.send(result);
}
});

let result = rx.recv_timeout(timeout);
if result.is_err() {
handle.thread().unpark();
return result.map_err(|e| eyre::eyre!(e));
flag.store(true, Ordering::Relaxed);
return Err(eyre::eyre!("timed out"));
}

handle.join().ok();
Expand Down

0 comments on commit 044ef49

Please sign in to comment.