diff --git a/bastion-executor/examples/blocking_run.rs b/bastion-executor/examples/blocking_run.rs index f58ff334..05ff6292 100644 --- a/bastion-executor/examples/blocking_run.rs +++ b/bastion-executor/examples/blocking_run.rs @@ -1,14 +1,12 @@ use bastion_executor::prelude::*; use lightproc::proc_stack::ProcStack; - fn main() { run( async { println!("DATA"); panic!("kaka"); }, - ProcStack::default() - .with_after_panic(|| {println!("after panic")}), + ProcStack::default().with_after_panic(|| println!("after panic")), ); } diff --git a/bastion-executor/examples/spawn_async.rs b/bastion-executor/examples/spawn_async.rs index 7f01556d..10610f20 100644 --- a/bastion-executor/examples/spawn_async.rs +++ b/bastion-executor/examples/spawn_async.rs @@ -1,16 +1,18 @@ use bastion_executor::prelude::*; use lightproc::proc_stack::ProcStack; - fn main() { let pid = 1; let stack = ProcStack::default() .with_pid(pid) - .with_after_panic(move || {println!("after panic {}", pid.clone())}); + .with_after_panic(move || println!("after panic {}", pid.clone())); - let handle = spawn(async { - panic!("test"); - }, stack); + let handle = spawn( + async { + panic!("test"); + }, + stack, + ); let pid = 2; let stack = ProcStack::default().with_pid(pid); @@ -19,6 +21,6 @@ fn main() { async { handle.await; }, - stack.clone() + stack.clone(), ); } diff --git a/bastion-executor/src/distributor.rs b/bastion-executor/src/distributor.rs index 90832209..c567f8a3 100644 --- a/bastion-executor/src/distributor.rs +++ b/bastion-executor/src/distributor.rs @@ -4,8 +4,8 @@ use super::run_queue::{Stealer, Worker}; use lightproc::prelude::*; -use std::thread; use crate::worker; +use std::thread; pub(crate) struct Distributor { pub round: usize, diff --git a/bastion-executor/src/lib.rs b/bastion-executor/src/lib.rs index 8b8d5643..5983e0f4 100644 --- a/bastion-executor/src/lib.rs +++ b/bastion-executor/src/lib.rs @@ -11,13 +11,13 @@ pub mod distributor; pub mod load_balancer; pub mod placement; pub mod pool; +pub mod run; pub mod run_queue; pub mod sleepers; pub mod thread_recovery; pub mod worker; -pub mod run; pub mod prelude { - pub use crate::run::*; pub use crate::pool::*; + pub use crate::run::*; } diff --git a/bastion-executor/src/load_balancer.rs b/bastion-executor/src/load_balancer.rs index a63ce62e..953d2f84 100644 --- a/bastion-executor/src/load_balancer.rs +++ b/bastion-executor/src/load_balancer.rs @@ -1,15 +1,11 @@ - - use super::placement; use lazy_static::*; - use std::{thread, time}; - +use super::load_balancer; use crossbeam_utils::sync::ShardedLock; use rustc_hash::FxHashMap; -use super::load_balancer; const SIXTY_MILLIS: time::Duration = time::Duration::from_millis(60); @@ -23,7 +19,10 @@ impl LoadBalancer { loop { let mut m = 0_usize; if let Ok(stats) = load_balancer::stats().try_read() { - m = stats.smp_queues.values().sum::() + m = stats + .smp_queues + .values() + .sum::() .wrapping_div(placement::get_core_ids().unwrap().len()); } diff --git a/bastion-executor/src/pool.rs b/bastion-executor/src/pool.rs index 33ee182f..54a44a3c 100644 --- a/bastion-executor/src/pool.rs +++ b/bastion-executor/src/pool.rs @@ -1,6 +1,5 @@ use super::distributor::Distributor; - use super::run_queue::{Injector, Stealer, Worker}; use super::sleepers::Sleepers; use super::worker; @@ -32,7 +31,9 @@ impl Pool { pub fn fetch_proc(&self, affinity: usize, local: &Worker) -> Option { if let Ok(mut stats) = load_balancer::stats().try_write() { - stats.smp_queues.insert(affinity, local.worker_run_queue_size()); + stats + .smp_queues + .insert(affinity, local.worker_run_queue_size()); } if let Ok(stats) = load_balancer::stats().try_read() { @@ -41,13 +42,18 @@ impl Pool { return Some(proc); } } else { - let affine_core = - *stats.smp_queues.iter() - .max_by_key(|&(_core, stat)| stat).unwrap().1; - let stealer = - self.stealers.get(affine_core).unwrap(); + let affine_core = *stats + .smp_queues + .iter() + .max_by_key(|&(_core, stat)| stat) + .unwrap() + .1; + let stealer = self.stealers.get(affine_core).unwrap(); if let Some(amount) = stealer.run_queue_size().checked_sub(stats.mean_level) { - if let Some(proc) = stealer.steal_batch_and_pop_with_amount(local, amount.wrapping_add(1)).success() { + if let Some(proc) = stealer + .steal_batch_and_pop_with_amount(local, amount.wrapping_add(1)) + .success() + { return Some(proc); } } diff --git a/bastion-executor/src/run.rs b/bastion-executor/src/run.rs index 0d213c77..e1145288 100644 --- a/bastion-executor/src/run.rs +++ b/bastion-executor/src/run.rs @@ -1,19 +1,18 @@ -use std::future::Future; -use std::cell::UnsafeCell; -use std::cell::Cell; -use std::pin::Pin; -use std::{mem, panic}; -use lightproc::proc_stack::ProcStack; use super::worker; -use std::sync::Arc; use crossbeam_utils::sync::Parker; +use lightproc::proc_stack::ProcStack; +use std::cell::Cell; +use std::cell::UnsafeCell; +use std::future::Future; use std::mem::ManuallyDrop; -use std::task::{Waker, RawWaker, Context, Poll, RawWakerVTable}; - +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +use std::{mem, panic}; pub fn run(future: F, stack: ProcStack) -> T - where - F: Future, +where + F: Future, { unsafe { // A place on the stack where the result will be stored. @@ -59,11 +58,9 @@ pub fn run(future: F, stack: ProcStack) -> T } } - - fn block(f: F) -> T - where - F: Future, +where + F: Future, { thread_local! { // May hold a pre-allocated parker that can be reused for efficiency.