From 4a78c33f52bcd64fdbe5755429f0083a7d7d6786 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Wed, 22 Jan 2025 14:49:51 +0800 Subject: [PATCH] clean expired result --- core/src/co_pool/mod.rs | 12 ++++++------ core/src/net/join.rs | 9 +++++++++ core/src/scheduler.rs | 41 ++++++++++++++++++++++++++--------------- core/tests/scheduler.rs | 15 ++++++++++----- 4 files changed, 51 insertions(+), 26 deletions(-) diff --git a/core/src/co_pool/mod.rs b/core/src/co_pool/mod.rs index 79a552ff..5a784c82 100644 --- a/core/src/co_pool/mod.rs +++ b/core/src/co_pool/mod.rs @@ -236,7 +236,7 @@ impl<'p> CoroutinePool<'p> { } /// Attempt to obtain task results with the given `task_name`. - pub fn try_get_task_result(&self, task_name: &str) -> Option, &'p str>> { + pub fn try_take_task_result(&self, task_name: &str) -> Option, &'p str>> { self.results.remove(task_name).map(|(_, r)| r) } @@ -251,7 +251,7 @@ impl<'p> CoroutinePool<'p> { wait_time: Duration, ) -> std::io::Result, &str>> { let key = Box::leak(Box::from(task_name)); - if let Some(r) = self.try_get_task_result(key) { + if let Some(r) = self.try_take_task_result(key) { self.notify(key); drop(self.waits.remove(key)); return Ok(r); @@ -260,7 +260,7 @@ impl<'p> CoroutinePool<'p> { let timeout_time = get_timeout_time(wait_time); loop { _ = self.try_run(); - if let Some(r) = self.try_get_task_result(key) { + if let Some(r) = self.try_take_task_result(key) { return Ok(r); } if timeout_time.saturating_sub(now()) == 0 { @@ -285,7 +285,7 @@ impl<'p> CoroutinePool<'p> { ) .map_err(|e| Error::new(ErrorKind::Other, format!("{e}")))?, ); - if let Some(r) = self.try_get_task_result(key) { + if let Some(r) = self.try_take_task_result(key) { self.notify(key); assert!(self.waits.remove(key).is_some()); return Ok(r); @@ -436,8 +436,8 @@ impl<'p> CoroutinePool<'p> { } } Self::init_current(self); - let left_time = self.try_timeout_schedule(timeout_time); + let r = self.try_timeout_schedule(timeout_time); Self::clean_current(); - left_time + r.map(|(left_time, _)| left_time) } } diff --git a/core/src/net/join.rs b/core/src/net/join.rs index 5a46a4b6..2323e65f 100644 --- a/core/src/net/join.rs +++ b/core/src/net/join.rs @@ -9,6 +9,15 @@ use std::time::Duration; #[derive(Debug)] pub struct JoinHandle(&'static Arc>, *const c_char); +impl Drop for JoinHandle { + fn drop(&mut self) { + if let Ok(name) = self.get_name() { + // clean data + _ = self.0.try_take_task_result(name); + } + } +} + impl JoinHandle { /// create `JoinHandle` instance. pub(crate) fn err(pool: &'static Arc>) -> Self { diff --git a/core/src/scheduler.rs b/core/src/scheduler.rs index 2970c34e..a5edc24c 100644 --- a/core/src/scheduler.rs +++ b/core/src/scheduler.rs @@ -7,7 +7,7 @@ use crate::coroutine::suspender::Suspender; use crate::coroutine::Coroutine; use crate::{co, impl_current_for, impl_display_by_debug, impl_for_named}; use dashmap::DashMap; -use std::collections::{BinaryHeap, VecDeque}; +use std::collections::{BinaryHeap, HashMap, VecDeque}; use std::ffi::c_longlong; use std::io::{Error, ErrorKind}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -90,7 +90,6 @@ pub struct Scheduler<'s> { suspend: BinaryHeap>, syscall: DashMap<&'s str, SchedulableCoroutine<'s>>, syscall_suspend: BinaryHeap>, - results: DashMap<&'s str, Result, &'s str>>, } impl Default for Scheduler<'_> { @@ -107,9 +106,10 @@ impl Drop for Scheduler<'_> { if std::thread::panicking() { return; } + let name = self.name.clone(); _ = self .try_timed_schedule(Duration::from_secs(30)) - .unwrap_or_else(|_| panic!("Failed to stop scheduler {} !", self.name())); + .unwrap_or_else(|e| panic!("Failed to stop scheduler {name} due to {e} !")); assert!( self.ready.is_empty(), "There are still coroutines to be carried out in the ready queue:{:#?} !", @@ -134,6 +134,7 @@ impl_current_for!(SCHEDULER, Scheduler<'s>); impl_display_by_debug!(Scheduler<'s>); +#[allow(clippy::type_complexity)] impl<'s> Scheduler<'s> { /// Creates a new scheduler. #[must_use] @@ -149,7 +150,6 @@ impl<'s> Scheduler<'s> { suspend: BinaryHeap::default(), syscall: DashMap::default(), syscall_suspend: BinaryHeap::default(), - results: DashMap::default(), } } @@ -229,8 +229,9 @@ impl<'s> Scheduler<'s> { /// /// # Errors /// see `try_timeout_schedule`. - pub fn try_schedule(&mut self) -> std::io::Result<()> { - self.try_timeout_schedule(u64::MAX).map(|_| ()) + pub fn try_schedule(&mut self) -> std::io::Result, &str>>> { + self.try_timeout_schedule(u64::MAX) + .map(|(_, results)| results) } /// Try scheduling the coroutines for up to `dur`. @@ -240,7 +241,10 @@ impl<'s> Scheduler<'s> { /// /// # Errors /// see `try_timeout_schedule`. - pub fn try_timed_schedule(&mut self, dur: Duration) -> std::io::Result { + pub fn try_timed_schedule( + &mut self, + dur: Duration, + ) -> std::io::Result<(u64, HashMap<&str, Result, &str>>)> { self.try_timeout_schedule(get_timeout_time(dur)) } @@ -253,18 +257,25 @@ impl<'s> Scheduler<'s> { /// /// # Errors /// if change to ready fails. - pub fn try_timeout_schedule(&mut self, timeout_time: u64) -> std::io::Result { + pub fn try_timeout_schedule( + &mut self, + timeout_time: u64, + ) -> std::io::Result<(u64, HashMap<&str, Result, &str>>)> { Self::init_current(self); - let left_time = self.do_schedule(timeout_time); + let r = self.do_schedule(timeout_time); Self::clean_current(); - left_time + r } - fn do_schedule(&mut self, timeout_time: u64) -> std::io::Result { + fn do_schedule( + &mut self, + timeout_time: u64, + ) -> std::io::Result<(u64, HashMap<&str, Result, &str>>)> { + let mut results = HashMap::new(); loop { let left_time = timeout_time.saturating_sub(now()); if 0 == left_time { - return Ok(0); + return Ok((0, results)); } self.check_ready()?; // schedule coroutines @@ -295,14 +306,14 @@ impl<'s> Scheduler<'s> { CoroutineState::Complete(result) => { let co_name = Box::leak(Box::from(coroutine.name())); assert!( - self.results.insert(co_name, Ok(result)).is_none(), + results.insert(co_name, Ok(result)).is_none(), "not consume result" ); } CoroutineState::Error(message) => { let co_name = Box::leak(Box::from(coroutine.name())); assert!( - self.results.insert(co_name, Err(message)).is_none(), + results.insert(co_name, Err(message)).is_none(), "not consume result" ); } @@ -315,7 +326,7 @@ impl<'s> Scheduler<'s> { } continue; } - return Ok(left_time); + return Ok((left_time, results)); } } diff --git a/core/tests/scheduler.rs b/core/tests/scheduler.rs index 3299b20a..178205cf 100644 --- a/core/tests/scheduler.rs +++ b/core/tests/scheduler.rs @@ -20,7 +20,8 @@ fn scheduler_basic() -> std::io::Result<()> { None, None, )?; - scheduler.try_schedule() + scheduler.try_schedule()?; + Ok(()) } #[cfg(not(all(unix, feature = "preemptive")))] @@ -36,7 +37,8 @@ fn scheduler_backtrace() -> std::io::Result<()> { None, None, )?; - scheduler.try_schedule() + scheduler.try_schedule()?; + Ok(()) } #[test] @@ -62,7 +64,8 @@ fn scheduler_suspend() -> std::io::Result<()> { None, None, )?; - scheduler.try_schedule() + scheduler.try_schedule()?; + Ok(()) } #[test] @@ -80,7 +83,8 @@ fn scheduler_delay() -> std::io::Result<()> { )?; scheduler.try_schedule()?; std::thread::sleep(Duration::from_millis(100)); - scheduler.try_schedule() + scheduler.try_schedule()?; + Ok(()) } #[cfg(not(all(unix, feature = "preemptive")))] @@ -122,5 +126,6 @@ fn scheduler_listener() -> std::io::Result<()> { None, None, )?; - scheduler.try_schedule() + scheduler.try_schedule()?; + Ok(()) }