Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/src/co_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl<'p> CoroutinePool<'p> {
}

/// Attempt to obtain task results with the given `task_name`.
pub fn try_get_task_result(&self, task_name: &str) -> Option<Result<Option<usize>, &'p str>> {
pub fn try_take_task_result(&self, task_name: &str) -> Option<Result<Option<usize>, &'p str>> {
self.results.remove(task_name).map(|(_, r)| r)
}

Expand All @@ -251,7 +251,7 @@ impl<'p> CoroutinePool<'p> {
wait_time: Duration,
) -> std::io::Result<Result<Option<usize>, &str>> {
let key = Box::leak(Box::from(task_name));
if let Some(r) = self.try_get_task_result(key) {
if let Some(r) = self.try_take_task_result(key) {
self.notify(key);
drop(self.waits.remove(key));
return Ok(r);
Expand All @@ -260,7 +260,7 @@ impl<'p> CoroutinePool<'p> {
let timeout_time = get_timeout_time(wait_time);
loop {
_ = self.try_run();
if let Some(r) = self.try_get_task_result(key) {
if let Some(r) = self.try_take_task_result(key) {
return Ok(r);
}
if timeout_time.saturating_sub(now()) == 0 {
Expand All @@ -285,7 +285,7 @@ impl<'p> CoroutinePool<'p> {
)
.map_err(|e| Error::new(ErrorKind::Other, format!("{e}")))?,
);
if let Some(r) = self.try_get_task_result(key) {
if let Some(r) = self.try_take_task_result(key) {
self.notify(key);
assert!(self.waits.remove(key).is_some());
return Ok(r);
Expand Down Expand Up @@ -436,8 +436,8 @@ impl<'p> CoroutinePool<'p> {
}
}
Self::init_current(self);
let left_time = self.try_timeout_schedule(timeout_time);
let r = self.try_timeout_schedule(timeout_time);
Self::clean_current();
left_time
r.map(|(left_time, _)| left_time)
}
}
9 changes: 9 additions & 0 deletions core/src/net/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ use std::time::Duration;
#[derive(Debug)]
pub struct JoinHandle(&'static Arc<EventLoop<'static>>, *const c_char);

impl Drop for JoinHandle {
fn drop(&mut self) {
if let Ok(name) = self.get_name() {
// clean data
_ = self.0.try_take_task_result(name);
}
}
}

impl JoinHandle {
/// create `JoinHandle` instance.
pub(crate) fn err(pool: &'static Arc<EventLoop<'static>>) -> Self {
Expand Down
41 changes: 26 additions & 15 deletions core/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::coroutine::suspender::Suspender;
use crate::coroutine::Coroutine;
use crate::{co, impl_current_for, impl_display_by_debug, impl_for_named};
use dashmap::DashMap;
use std::collections::{BinaryHeap, VecDeque};
use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::ffi::c_longlong;
use std::io::{Error, ErrorKind};
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -90,7 +90,6 @@ pub struct Scheduler<'s> {
suspend: BinaryHeap<SuspendItem<'s>>,
syscall: DashMap<&'s str, SchedulableCoroutine<'s>>,
syscall_suspend: BinaryHeap<SyscallSuspendItem<'s>>,
results: DashMap<&'s str, Result<Option<usize>, &'s str>>,
}

impl Default for Scheduler<'_> {
Expand All @@ -107,9 +106,10 @@ impl Drop for Scheduler<'_> {
if std::thread::panicking() {
return;
}
let name = self.name.clone();
_ = self
.try_timed_schedule(Duration::from_secs(30))
.unwrap_or_else(|_| panic!("Failed to stop scheduler {} !", self.name()));
.unwrap_or_else(|e| panic!("Failed to stop scheduler {name} due to {e} !"));
assert!(
self.ready.is_empty(),
"There are still coroutines to be carried out in the ready queue:{:#?} !",
Expand All @@ -134,6 +134,7 @@ impl_current_for!(SCHEDULER, Scheduler<'s>);

impl_display_by_debug!(Scheduler<'s>);

#[allow(clippy::type_complexity)]
impl<'s> Scheduler<'s> {
/// Creates a new scheduler.
#[must_use]
Expand All @@ -149,7 +150,6 @@ impl<'s> Scheduler<'s> {
suspend: BinaryHeap::default(),
syscall: DashMap::default(),
syscall_suspend: BinaryHeap::default(),
results: DashMap::default(),
}
}

Expand Down Expand Up @@ -229,8 +229,9 @@ impl<'s> Scheduler<'s> {
///
/// # Errors
/// see `try_timeout_schedule`.
pub fn try_schedule(&mut self) -> std::io::Result<()> {
self.try_timeout_schedule(u64::MAX).map(|_| ())
pub fn try_schedule(&mut self) -> std::io::Result<HashMap<&str, Result<Option<usize>, &str>>> {
self.try_timeout_schedule(u64::MAX)
.map(|(_, results)| results)
}

/// Try scheduling the coroutines for up to `dur`.
Expand All @@ -240,7 +241,10 @@ impl<'s> Scheduler<'s> {
///
/// # Errors
/// see `try_timeout_schedule`.
pub fn try_timed_schedule(&mut self, dur: Duration) -> std::io::Result<u64> {
pub fn try_timed_schedule(
&mut self,
dur: Duration,
) -> std::io::Result<(u64, HashMap<&str, Result<Option<usize>, &str>>)> {
self.try_timeout_schedule(get_timeout_time(dur))
}

Expand All @@ -253,18 +257,25 @@ impl<'s> Scheduler<'s> {
///
/// # Errors
/// if change to ready fails.
pub fn try_timeout_schedule(&mut self, timeout_time: u64) -> std::io::Result<u64> {
pub fn try_timeout_schedule(
&mut self,
timeout_time: u64,
) -> std::io::Result<(u64, HashMap<&str, Result<Option<usize>, &str>>)> {
Self::init_current(self);
let left_time = self.do_schedule(timeout_time);
let r = self.do_schedule(timeout_time);
Self::clean_current();
left_time
r
}

fn do_schedule(&mut self, timeout_time: u64) -> std::io::Result<u64> {
fn do_schedule(
&mut self,
timeout_time: u64,
) -> std::io::Result<(u64, HashMap<&str, Result<Option<usize>, &str>>)> {
let mut results = HashMap::new();
loop {
let left_time = timeout_time.saturating_sub(now());
if 0 == left_time {
return Ok(0);
return Ok((0, results));
}
self.check_ready()?;
// schedule coroutines
Expand Down Expand Up @@ -295,14 +306,14 @@ impl<'s> Scheduler<'s> {
CoroutineState::Complete(result) => {
let co_name = Box::leak(Box::from(coroutine.name()));
assert!(
self.results.insert(co_name, Ok(result)).is_none(),
results.insert(co_name, Ok(result)).is_none(),
"not consume result"
);
}
CoroutineState::Error(message) => {
let co_name = Box::leak(Box::from(coroutine.name()));
assert!(
self.results.insert(co_name, Err(message)).is_none(),
results.insert(co_name, Err(message)).is_none(),
"not consume result"
);
}
Expand All @@ -315,7 +326,7 @@ impl<'s> Scheduler<'s> {
}
continue;
}
return Ok(left_time);
return Ok((left_time, results));
}
}

Expand Down
15 changes: 10 additions & 5 deletions core/tests/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ fn scheduler_basic() -> std::io::Result<()> {
None,
None,
)?;
scheduler.try_schedule()
scheduler.try_schedule()?;
Ok(())
}

#[cfg(not(all(unix, feature = "preemptive")))]
Expand All @@ -36,7 +37,8 @@ fn scheduler_backtrace() -> std::io::Result<()> {
None,
None,
)?;
scheduler.try_schedule()
scheduler.try_schedule()?;
Ok(())
}

#[test]
Expand All @@ -62,7 +64,8 @@ fn scheduler_suspend() -> std::io::Result<()> {
None,
None,
)?;
scheduler.try_schedule()
scheduler.try_schedule()?;
Ok(())
}

#[test]
Expand All @@ -80,7 +83,8 @@ fn scheduler_delay() -> std::io::Result<()> {
)?;
scheduler.try_schedule()?;
std::thread::sleep(Duration::from_millis(100));
scheduler.try_schedule()
scheduler.try_schedule()?;
Ok(())
}

#[cfg(not(all(unix, feature = "preemptive")))]
Expand Down Expand Up @@ -122,5 +126,6 @@ fn scheduler_listener() -> std::io::Result<()> {
None,
None,
)?;
scheduler.try_schedule()
scheduler.try_schedule()?;
Ok(())
}
Loading