diff --git a/core/src/co_pool/mod.rs b/core/src/co_pool/mod.rs index f4b4a238..faa75b0b 100644 --- a/core/src/co_pool/mod.rs +++ b/core/src/co_pool/mod.rs @@ -197,11 +197,14 @@ impl<'p> CoroutinePool<'p> { assert_eq!(PoolState::Running, self.stopping()?); _ = self.try_timed_schedule_task(dur)?; assert_eq!(PoolState::Stopping, self.stopped()?); - Ok(()) } - PoolState::Stopping => Err(Error::new(ErrorKind::Other, "should never happens")), - PoolState::Stopped => Ok(()), + PoolState::Stopping => { + _ = self.try_timed_schedule_task(dur)?; + assert_eq!(PoolState::Stopping, self.stopped()?); + } + PoolState::Stopped => {} } + Ok(()) } /// Submit a new task to this pool. diff --git a/core/src/net/event_loop.rs b/core/src/net/event_loop.rs index 21d9324a..c3546a08 100644 --- a/core/src/net/event_loop.rs +++ b/core/src/net/event_loop.rs @@ -58,6 +58,21 @@ pub(crate) struct EventLoop<'e> { phantom_data: PhantomData<&'e EventLoop<'e>>, } +impl Drop for EventLoop<'_> { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + self.stop_sync(Duration::from_secs(30)) + .unwrap_or_else(|e| panic!("Failed to stop event-loop {} due to {e} !", self.name())); + assert_eq!( + PoolState::Stopped, + self.state(), + "The event-loop is not stopped !" + ); + } +} + impl<'e> Deref for EventLoop<'e> { type Target = CoroutinePool<'e>; @@ -399,49 +414,62 @@ impl<'e> EventLoop<'e> { match self.state() { PoolState::Running => { assert_eq!(PoolState::Running, self.stopping()?); - let timeout_time = crate::common::get_timeout_time(wait_time); - loop { - let left_time = timeout_time.saturating_sub(crate::common::now()); - if 0 == left_time { - return Err(Error::new(ErrorKind::TimedOut, "stop timeout !")); - } - self.wait_event(Some(Duration::from_nanos(left_time).min(SLICE)))?; - if self.is_empty() && self.get_running_size() == 0 { - assert_eq!(PoolState::Stopping, self.stopped()?); - return Ok(()); - } - } + self.do_stop_sync(wait_time) } - PoolState::Stopping => Err(Error::new(ErrorKind::Other, "should never happens")), + PoolState::Stopping => self.do_stop_sync(wait_time), PoolState::Stopped => Ok(()), } } + fn do_stop_sync(&mut self, wait_time: Duration) -> std::io::Result<()> { + let timeout_time = crate::common::get_timeout_time(wait_time); + loop { + let left_time = timeout_time.saturating_sub(crate::common::now()); + if 0 == left_time { + return Err(Error::new(ErrorKind::TimedOut, "stop timeout !")); + } + self.wait_event(Some(Duration::from_nanos(left_time).min(SLICE)))?; + if self.is_empty() && self.get_running_size() == 0 { + assert_eq!(PoolState::Stopping, self.stopped()?); + return Ok(()); + } + } + } + pub(super) fn stop(&self, wait_time: Duration) -> std::io::Result<()> { match self.state() { PoolState::Running => { if BeanFactory::remove_bean::>(&self.get_thread_name()).is_some() { assert_eq!(PoolState::Running, self.stopping()?); - //开启了单独的线程 - let (lock, cvar) = &*self.stop; - let result = cvar - .wait_timeout_while( - lock.lock().expect("lock failed"), - wait_time, - |&mut pending| pending, - ) - .expect("lock failed"); - if result.1.timed_out() { - return Err(Error::new(ErrorKind::TimedOut, "stop timeout !")); - } - assert_eq!(PoolState::Stopping, self.stopped()?); + return self.do_stop(wait_time); } - Ok(()) + Err(Error::new( + ErrorKind::Unsupported, + "use EventLoop::stop_sync instead !", + )) } - PoolState::Stopping => Err(Error::new(ErrorKind::Other, "should never happens")), + PoolState::Stopping => self.do_stop(wait_time), PoolState::Stopped => Ok(()), } } + + fn do_stop(&self, wait_time: Duration) -> std::io::Result<()> { + //开启了单独的线程 + let (lock, cvar) = &*self.stop; + let result = cvar + .wait_timeout_while( + lock.lock().expect("lock failed"), + wait_time, + |&mut pending| pending, + ) + .expect("lock failed"); + if result.1.timed_out() { + return Err(Error::new(ErrorKind::TimedOut, "stop timeout !")); + } + assert_eq!(PoolState::Stopping, self.stopped()?); + assert!(BeanFactory::remove_bean::(self.name()).is_some()); + Ok(()) + } } impl_current_for!(EVENT_LOOP, EventLoop<'e>);