From 3ea5bf5b0a09d40309e39e54f4f6dd2db3af4b45 Mon Sep 17 00:00:00 2001 From: Donghyun Kim Date: Sun, 16 Jun 2024 15:32:57 +0900 Subject: [PATCH 1/2] Use the true current-thread tokio runtime by default --- rust_crate/Cargo.toml | 4 +- rust_crate/src/interface_os.rs | 120 ++++++++++++++++++++++++++------- 2 files changed, 99 insertions(+), 25 deletions(-) diff --git a/rust_crate/Cargo.toml b/rust_crate/Cargo.toml index b03aef31..520a0f49 100644 --- a/rust_crate/Cargo.toml +++ b/rust_crate/Cargo.toml @@ -9,7 +9,7 @@ documentation = "https://rinf.cunarist.com" rust-version = "1.70" [features] -multi-worker = [] +multi-worker = ["tokio/rt-multi-thread"] [target.'cfg(not(target_family = "wasm"))'.dependencies] os-thread-local = "0.1.3" @@ -18,7 +18,7 @@ protoc-prebuilt = "0.3.0" home = "0.5.9" which = "6.0.0" allo-isolate = "0.1.25" -tokio = { version = "1", features = ["rt-multi-thread"] } +tokio = { version = "1", features = ["rt"] } [target.'cfg(target_family = "wasm")'.dependencies] js-sys = "0.3.69" diff --git a/rust_crate/src/interface_os.rs b/rust_crate/src/interface_os.rs index 11851af7..c5818968 100644 --- a/rust_crate/src/interface_os.rs +++ b/rust_crate/src/interface_os.rs @@ -3,8 +3,12 @@ use allo_isolate::{IntoDart, Isolate, ZeroCopyBuffer}; use os_thread_local::ThreadLocal; use std::cell::RefCell; use std::future::Future; -use std::sync::{Mutex, OnceLock}; -use tokio::runtime::{Builder, Runtime}; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, OnceLock}; +use std::task::{Context, Poll, Waker}; +use std::thread; +use tokio::runtime::Builder; static DART_ISOLATE: Mutex> = Mutex::new(None); @@ -23,11 +27,12 @@ pub extern "C" fn prepare_isolate_extern(port: i64) { // We use `os_thread_local` so that when the program fails // and the main thread exits unexpectedly, -// the whole async tokio runtime can disappear as well. +// the whole async tokio runtime can shut down as well +// by receiving a signal via the shutdown channel. // Without this solution, zombie threads inside the tokio runtime // might outlive the app. -type TokioRuntime = OnceLock>>>; -static TOKIO_RUNTIME: TokioRuntime = OnceLock::new(); +type ShutdownSenderLock = OnceLock>>>; +static SHUTDOWN_SENDER: ShutdownSenderLock = OnceLock::new(); pub fn start_rust_logic_real(main_future: F) -> Result<()> where @@ -42,28 +47,49 @@ where })); } + // Prepare the channel that will notify tokio runtime to shutdown + // after the main Dart thread has gone. + let (shutdown_sender, shutdown_receiver) = shutdown_channel(); + let shutdown_sender_lock = + SHUTDOWN_SENDER.get_or_init(move || ThreadLocal::new(|| RefCell::new(None))); + shutdown_sender_lock.with(|cell| cell.replace(Some(shutdown_sender))); + // Build the tokio runtime. #[cfg(not(feature = "multi-worker"))] - let tokio_runtime = Builder::new_multi_thread() - .worker_threads(1) - .enable_all() - .build()?; + { + let tokio_runtime = Builder::new_current_thread().enable_all().build()?; + thread::spawn(move || { + tokio_runtime.spawn(async { + main_future.await; + }); + tokio_runtime.block_on(shutdown_receiver); + // Dropping the tokio runtime makes it shut down. + drop(tokio_runtime); + }); + } #[cfg(feature = "multi-worker")] - let tokio_runtime = Builder::new_multi_thread().enable_all().build()?; - - // Run the main function. - tokio_runtime.spawn(async { - main_future.await; - }); - TOKIO_RUNTIME - .get_or_init(|| ThreadLocal::new(|| RefCell::new(None))) - .with(move |cell| { - // If there was already a tokio runtime previously, - // most likely due to Dart's hot restart, - // its tasks as well as itself will be terminated, - // being replaced with the new one. - cell.replace(Some(tokio_runtime)); + { + static TOKIO_RUNTIME: Mutex> = Mutex::new(None); + let tokio_runtime = Builder::new_multi_thread().enable_all().build()?; + tokio_runtime.spawn(async { + main_future.await; + }); + tokio_runtime.spawn(async { + shutdown_receiver.await; + thread::spawn(|| { + if let Ok(mut guard) = TOKIO_RUNTIME.lock() { + let runtime_option = guard.take(); + if let Some(runtime) = runtime_option { + // Dropping the tokio runtime makes it shut down. + drop(runtime); + } + } + }) }); + if let Ok(mut guard) = TOKIO_RUNTIME.lock() { + guard.replace(tokio_runtime); + } + } Ok(()) } @@ -105,3 +131,51 @@ pub fn send_rust_signal_real( Ok(()) } + +struct ShutdownSender { + is_sent: Arc, + waker: Arc>>, +} + +impl Drop for ShutdownSender { + fn drop(&mut self) { + self.is_sent.store(true, Ordering::SeqCst); + if let Ok(mut guard) = self.waker.lock() { + if let Some(waker) = guard.take() { + waker.wake(); + } + } + } +} + +struct ShutdownReceiver { + is_sent: Arc, + waker: Arc>>, +} + +impl Future for ShutdownReceiver { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.is_sent.load(Ordering::SeqCst) { + Poll::Ready(()) + } else { + if let Ok(mut guard) = self.waker.lock() { + guard.replace(cx.waker().clone()); + } + Poll::Pending + } + } +} + +fn shutdown_channel() -> (ShutdownSender, ShutdownReceiver) { + let is_sent = Arc::new(AtomicBool::new(false)); + let waker = Arc::new(Mutex::new(None)); + + let sender = ShutdownSender { + is_sent: Arc::clone(&is_sent), + waker: Arc::clone(&waker), + }; + let receiver = ShutdownReceiver { is_sent, waker }; + + (sender, receiver) +} From 36679e80f16a67264572dedcc1ba3d5862544179 Mon Sep 17 00:00:00 2001 From: Donghyun Kim Date: Sun, 16 Jun 2024 15:52:39 +0900 Subject: [PATCH 2/2] Add comments and clarify code --- rust_crate/src/interface_os.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rust_crate/src/interface_os.rs b/rust_crate/src/interface_os.rs index c5818968..e0629a8b 100644 --- a/rust_crate/src/interface_os.rs +++ b/rust_crate/src/interface_os.rs @@ -87,7 +87,14 @@ where }) }); if let Ok(mut guard) = TOKIO_RUNTIME.lock() { - guard.replace(tokio_runtime); + // If there was already a tokio runtime previously, + // most likely due to Dart's hot restart, + // its tasks as well as itself will be terminated, + // being replaced with the new one. + let runtime_option = guard.replace(tokio_runtime); + if let Some(previous_runtime) = runtime_option { + drop(previous_runtime); + } } }