Skip to content

Commit

Permalink
Merge pull request #373 from cunarist/efficient-single-threaded-runtime
Browse files Browse the repository at this point in the history
Use the true current-thread tokio runtime by default
  • Loading branch information
temeddix committed Jun 16, 2024
2 parents c3c5243 + 36679e8 commit 5133402
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 22 deletions.
4 changes: 2 additions & 2 deletions rust_crate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ documentation = "https://rinf.cunarist.com"
rust-version = "1.70"

[features]
multi-worker = []
multi-worker = ["tokio/rt-multi-thread"]

[target.'cfg(not(target_family = "wasm"))'.dependencies]
os-thread-local = "0.1.3"
Expand All @@ -18,7 +18,7 @@ protoc-prebuilt = "0.3.0"
home = "0.5.9"
which = "6.0.0"
allo-isolate = "0.1.25"
tokio = { version = "1", features = ["rt-multi-thread"] }
tokio = { version = "1", features = ["rt"] }

[target.'cfg(target_family = "wasm")'.dependencies]
js-sys = "0.3.69"
Expand Down
121 changes: 101 additions & 20 deletions rust_crate/src/interface_os.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ use allo_isolate::{IntoDart, Isolate, ZeroCopyBuffer};
use os_thread_local::ThreadLocal;
use std::cell::RefCell;
use std::future::Future;
use std::sync::{Mutex, OnceLock};
use tokio::runtime::{Builder, Runtime};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::task::{Context, Poll, Waker};
use std::thread;
use tokio::runtime::Builder;

static DART_ISOLATE: Mutex<Option<Isolate>> = Mutex::new(None);

Expand All @@ -23,11 +27,12 @@ pub extern "C" fn prepare_isolate_extern(port: i64) {

// We use `os_thread_local` so that when the program fails
// and the main thread exits unexpectedly,
// the whole async tokio runtime can disappear as well.
// the whole async tokio runtime can shut down as well
// by receiving a signal via the shutdown channel.
// Without this solution, zombie threads inside the tokio runtime
// might outlive the app.
type TokioRuntime = OnceLock<ThreadLocal<RefCell<Option<Runtime>>>>;
static TOKIO_RUNTIME: TokioRuntime = OnceLock::new();
type ShutdownSenderLock = OnceLock<ThreadLocal<RefCell<Option<ShutdownSender>>>>;
static SHUTDOWN_SENDER: ShutdownSenderLock = OnceLock::new();

pub fn start_rust_logic_real<F>(main_future: F) -> Result<()>
where
Expand All @@ -42,28 +47,56 @@ where
}));
}

// Prepare the channel that will notify tokio runtime to shutdown
// after the main Dart thread has gone.
let (shutdown_sender, shutdown_receiver) = shutdown_channel();
let shutdown_sender_lock =
SHUTDOWN_SENDER.get_or_init(move || ThreadLocal::new(|| RefCell::new(None)));
shutdown_sender_lock.with(|cell| cell.replace(Some(shutdown_sender)));

// Build the tokio runtime.
#[cfg(not(feature = "multi-worker"))]
let tokio_runtime = Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()?;
{
let tokio_runtime = Builder::new_current_thread().enable_all().build()?;
thread::spawn(move || {
tokio_runtime.spawn(async {
main_future.await;
});
tokio_runtime.block_on(shutdown_receiver);
// Dropping the tokio runtime makes it shut down.
drop(tokio_runtime);
});
}
#[cfg(feature = "multi-worker")]
let tokio_runtime = Builder::new_multi_thread().enable_all().build()?;

// Run the main function.
tokio_runtime.spawn(async {
main_future.await;
});
TOKIO_RUNTIME
.get_or_init(|| ThreadLocal::new(|| RefCell::new(None)))
.with(move |cell| {
{
static TOKIO_RUNTIME: Mutex<Option<tokio::runtime::Runtime>> = Mutex::new(None);
let tokio_runtime = Builder::new_multi_thread().enable_all().build()?;
tokio_runtime.spawn(async {
main_future.await;
});
tokio_runtime.spawn(async {
shutdown_receiver.await;
thread::spawn(|| {
if let Ok(mut guard) = TOKIO_RUNTIME.lock() {
let runtime_option = guard.take();
if let Some(runtime) = runtime_option {
// Dropping the tokio runtime makes it shut down.
drop(runtime);
}
}
})
});
if let Ok(mut guard) = TOKIO_RUNTIME.lock() {
// If there was already a tokio runtime previously,
// most likely due to Dart's hot restart,
// its tasks as well as itself will be terminated,
// being replaced with the new one.
cell.replace(Some(tokio_runtime));
});
let runtime_option = guard.replace(tokio_runtime);
if let Some(previous_runtime) = runtime_option {
drop(previous_runtime);
}
}
}

Ok(())
}
Expand Down Expand Up @@ -105,3 +138,51 @@ pub fn send_rust_signal_real(

Ok(())
}

struct ShutdownSender {
is_sent: Arc<AtomicBool>,
waker: Arc<Mutex<Option<Waker>>>,
}

impl Drop for ShutdownSender {
fn drop(&mut self) {
self.is_sent.store(true, Ordering::SeqCst);
if let Ok(mut guard) = self.waker.lock() {
if let Some(waker) = guard.take() {
waker.wake();
}
}
}
}

struct ShutdownReceiver {
is_sent: Arc<AtomicBool>,
waker: Arc<Mutex<Option<Waker>>>,
}

impl Future for ShutdownReceiver {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.is_sent.load(Ordering::SeqCst) {
Poll::Ready(())
} else {
if let Ok(mut guard) = self.waker.lock() {
guard.replace(cx.waker().clone());
}
Poll::Pending
}
}
}

fn shutdown_channel() -> (ShutdownSender, ShutdownReceiver) {
let is_sent = Arc::new(AtomicBool::new(false));
let waker = Arc::new(Mutex::new(None));

let sender = ShutdownSender {
is_sent: Arc::clone(&is_sent),
waker: Arc::clone(&waker),
};
let receiver = ShutdownReceiver { is_sent, waker };

(sender, receiver)
}

0 comments on commit 5133402

Please sign in to comment.