From c5416f7e1c26685d10738351c0bbf8f1d23c90d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sat, 17 Feb 2024 13:12:45 +0800 Subject: [PATCH 1/2] Change: remove `AsyncRuntime::abort()` Not all of the async-runtime provide an `abort()` primitive, such as [`monoio`](https://crates.io/crates/monoio). In this commit `abort()` is removed in order to allow Openraft to be compatible with `monoio`. `Tick` is the only mod that relies on `abort()` for shutdown. In this commit shutting down is replaced with using `tokio::sync::oneshot::channel`. Refer to: - https://github.com/datafuselabs/openraft/pull/1010#pullrequestreview-1885639263 - The above discussion is part of #1010 --- openraft/src/async_runtime.rs | 8 --- openraft/src/core/tick.rs | 104 ++++++++++++++++++++++++++++++++-- 2 files changed, 98 insertions(+), 14 deletions(-) diff --git a/openraft/src/async_runtime.rs b/openraft/src/async_runtime.rs index 15ae881fa..5e9c73e2a 100644 --- a/openraft/src/async_runtime.rs +++ b/openraft/src/async_runtime.rs @@ -65,9 +65,6 @@ pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static /// Check if the [`Self::JoinError`] is `panic`. fn is_panic(join_error: &Self::JoinError) -> bool; - /// Abort the task associated with the supplied join handle. - fn abort(join_handle: &Self::JoinHandle); - /// Get the random number generator to use for generating random numbers. /// /// # Note @@ -131,11 +128,6 @@ impl AsyncRuntime for TokioRuntime { join_error.is_panic() } - #[inline] - fn abort(join_handle: &Self::JoinHandle) { - join_handle.abort(); - } - #[inline] fn thread_rng() -> Self::ThreadLocalRng { rand::thread_rng() diff --git a/openraft/src/core/tick.rs b/openraft/src/core/tick.rs index e25c5af62..7970a746a 100644 --- a/openraft/src/core/tick.rs +++ b/openraft/src/core/tick.rs @@ -3,14 +3,19 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; +use futures::future::Either; use tokio::sync::mpsc; +use tokio::sync::oneshot; use tracing::Instrument; use tracing::Level; use tracing::Span; use crate::core::notify::Notify; +use crate::type_config::alias::AsyncRuntimeOf; +use crate::type_config::alias::JoinHandleOf; use crate::AsyncRuntime; use crate::Instant; use crate::RaftTypeConfig; @@ -31,7 +36,9 @@ pub(crate) struct TickHandle where C: RaftTypeConfig { enabled: Arc, - join_handle: ::JoinHandle<()>, + cancel: Mutex>>, + #[allow(dead_code)] + join_handle: JoinHandleOf, } impl Tick @@ -44,21 +51,43 @@ where C: RaftTypeConfig enabled: enabled.clone(), tx, }; - let join_handle = C::AsyncRuntime::spawn(this.tick_loop().instrument(tracing::span!( + + let (cancel, cancel_rx) = oneshot::channel(); + + let join_handle = AsyncRuntimeOf::::spawn(this.tick_loop(cancel_rx).instrument(tracing::span!( parent: &Span::current(), Level::DEBUG, "tick" ))); - TickHandle { enabled, join_handle } + TickHandle { + enabled, + cancel: Mutex::new(Some(cancel)), + join_handle, + } } - pub(crate) async fn tick_loop(self) { + pub(crate) async fn tick_loop(self, mut cancel_rx: oneshot::Receiver<()>) { let mut i = 0; + + let mut cancel = std::pin::pin!(cancel_rx); + loop { i += 1; let at = ::Instant::now() + self.interval; - C::AsyncRuntime::sleep_until(at).await; + let mut sleep_fut = AsyncRuntimeOf::::sleep_until(at); + let sleep_fut = std::pin::pin!(sleep_fut); + let cancel_fut = cancel.as_mut(); + + match futures::future::select(cancel_fut, sleep_fut).await { + Either::Left((_canceled, _)) => { + tracing::info!("TickLoop received cancel signal, quit"); + return; + } + Either::Right((_, _)) => { + // sleep done + } + } if !self.enabled.load(Ordering::Relaxed) { i -= 1; @@ -84,6 +113,69 @@ where C: RaftTypeConfig } pub(crate) async fn shutdown(&self) { - C::AsyncRuntime::abort(&self.join_handle); + let got = { + let mut x = self.cancel.lock().unwrap(); + x.take() + }; + + if let Some(cancel) = got { + let send_res = cancel.send(()); + tracing::info!("Timer cancel signal is sent, result is ok: {}", send_res.is_ok()); + } else { + tracing::info!("Timer cancel signal is already sent"); + } + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use tokio::time::Duration; + + use crate::core::Tick; + use crate::type_config::alias::AsyncRuntimeOf; + use crate::AsyncRuntime; + use crate::RaftTypeConfig; + use crate::TokioRuntime; + + #[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd)] + #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] + pub(crate) struct TickUTConfig {} + impl RaftTypeConfig for TickUTConfig { + type D = (); + type R = (); + type NodeId = u64; + type Node = (); + type Entry = crate::Entry; + type SnapshotData = Cursor>; + type AsyncRuntime = TokioRuntime; + } + + // AsyncRuntime::spawn is `spawn_local` with singlethreaded enabled. + // It will result in a panic: + // `spawn_local` called from outside of a `task::LocalSet`. + #[cfg(not(feature = "singlethreaded"))] + #[tokio::test] + async fn test_shutdown() -> anyhow::Result<()> { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let th = Tick::::spawn(Duration::from_millis(100), tx, true); + + AsyncRuntimeOf::::sleep(Duration::from_millis(500)).await; + th.shutdown().await; + AsyncRuntimeOf::::sleep(Duration::from_millis(500)).await; + + let mut received = vec![]; + while let Some(x) = rx.recv().await { + received.push(x); + } + + assert!( + received.len() < 10, + "no more tick will be received after shutdown: {}", + received.len() + ); + + Ok(()) } } From e43275c18dccc091043de384db8d5cb2cf4cd1b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sun, 18 Feb 2024 14:47:02 +0800 Subject: [PATCH 2/2] Refactor: Tick::shutdown() returns a JoinHandle --- openraft/src/core/tick.rs | 66 ++++++++++++++++++++++++++------------- openraft/src/raft/mod.rs | 4 ++- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/openraft/src/core/tick.rs b/openraft/src/core/tick.rs index 7970a746a..92d89eeca 100644 --- a/openraft/src/core/tick.rs +++ b/openraft/src/core/tick.rs @@ -15,6 +15,7 @@ use tracing::Span; use crate::core::notify::Notify; use crate::type_config::alias::AsyncRuntimeOf; +use crate::type_config::alias::InstantOf; use crate::type_config::alias::JoinHandleOf; use crate::AsyncRuntime; use crate::Instant; @@ -36,9 +37,17 @@ pub(crate) struct TickHandle where C: RaftTypeConfig { enabled: Arc, - cancel: Mutex>>, - #[allow(dead_code)] - join_handle: JoinHandleOf, + shutdown: Mutex>>, + join_handle: Mutex>>, +} + +impl Drop for TickHandle +where C: RaftTypeConfig +{ + /// Signal the tick loop to stop, without waiting for it to stop. + fn drop(&mut self) { + let _ = self.shutdown(); + } } impl Tick @@ -52,17 +61,20 @@ where C: RaftTypeConfig tx, }; - let (cancel, cancel_rx) = oneshot::channel(); + let (shutdown, shutdown_rx) = oneshot::channel(); - let join_handle = AsyncRuntimeOf::::spawn(this.tick_loop(cancel_rx).instrument(tracing::span!( + let shutdown = Mutex::new(Some(shutdown)); + + let join_handle = AsyncRuntimeOf::::spawn(this.tick_loop(shutdown_rx).instrument(tracing::span!( parent: &Span::current(), Level::DEBUG, "tick" ))); + TickHandle { enabled, - cancel: Mutex::new(Some(cancel)), - join_handle, + shutdown, + join_handle: Mutex::new(Some(join_handle)), } } @@ -72,9 +84,7 @@ where C: RaftTypeConfig let mut cancel = std::pin::pin!(cancel_rx); loop { - i += 1; - - let at = ::Instant::now() + self.interval; + let at = InstantOf::::now() + self.interval; let mut sleep_fut = AsyncRuntimeOf::::sleep_until(at); let sleep_fut = std::pin::pin!(sleep_fut); let cancel_fut = cancel.as_mut(); @@ -90,10 +100,11 @@ where C: RaftTypeConfig } if !self.enabled.load(Ordering::Relaxed) { - i -= 1; continue; } + i += 1; + let send_res = self.tx.send(Notify::Tick { i }); if let Err(_e) = send_res { tracing::info!("Stopping tick_loop(), main loop terminated"); @@ -112,18 +123,29 @@ where C: RaftTypeConfig self.enabled.store(enabled, Ordering::Relaxed); } - pub(crate) async fn shutdown(&self) { - let got = { - let mut x = self.cancel.lock().unwrap(); + /// Signal the tick loop to stop. And return a JoinHandle to wait for the loop to stop. + /// + /// If it is called twice, the second call will return None. + pub(crate) fn shutdown(&self) -> Option> { + { + let shutdown = { + let mut x = self.shutdown.lock().unwrap(); + x.take() + }; + + if let Some(shutdown) = shutdown { + let send_res = shutdown.send(()); + tracing::info!("Timer shutdown signal sent: {send_res:?}"); + } else { + tracing::warn!("Double call to Raft::shutdown()"); + } + } + + let jh = { + let mut x = self.join_handle.lock().unwrap(); x.take() }; - - if let Some(cancel) = got { - let send_res = cancel.send(()); - tracing::info!("Timer cancel signal is sent, result is ok: {}", send_res.is_ok()); - } else { - tracing::info!("Timer cancel signal is already sent"); - } + jh } } @@ -162,7 +184,7 @@ mod tests { let th = Tick::::spawn(Duration::from_millis(100), tx, true); AsyncRuntimeOf::::sleep(Duration::from_millis(500)).await; - th.shutdown().await; + let _ = th.shutdown().unwrap().await; AsyncRuntimeOf::::sleep(Duration::from_millis(500)).await; let mut received = vec![]; diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index b495d4784..3a13da743 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -955,7 +955,9 @@ where C: RaftTypeConfig tracing::info!("sending shutdown signal to RaftCore, sending res: {:?}", send_res); } self.inner.join_core_task().await; - self.inner.tick_handle.shutdown().await; + if let Some(join_handle) = self.inner.tick_handle.shutdown() { + let _ = join_handle.await; + } // TODO(xp): API change: replace `JoinError` with `Fatal`, // to let the caller know the return value of RaftCore task.