diff --git a/bastion-executor/Cargo.toml b/bastion-executor/Cargo.toml index 3474159e..4dd262ff 100644 --- a/bastion-executor/Cargo.toml +++ b/bastion-executor/Cargo.toml @@ -15,4 +15,9 @@ lazy_static = "1.4" libc = "0.2" num_cpus = "1.10" +# Allocator +context-allocator = "0.2" +libc-extra = "^0.3.2" +likely = "^0.1.0" + lightproc = { "path" = "../lightproc" } diff --git a/bastion-executor/examples/new_proc.rs b/bastion-executor/examples/new_proc.rs index 4a14e9ff..583a2465 100644 --- a/bastion-executor/examples/new_proc.rs +++ b/bastion-executor/examples/new_proc.rs @@ -2,7 +2,10 @@ use bastion_executor::prelude::*; use lightproc::proc_stack::ProcStack; fn main() { - spawn(async { - println!("DATA"); - }, ProcStack::default()); + spawn( + async { + println!("DATA"); + }, + ProcStack::default(), + ); } diff --git a/bastion-executor/src/allocator.rs b/bastion-executor/src/allocator.rs new file mode 100644 index 00000000..540839e4 --- /dev/null +++ b/bastion-executor/src/allocator.rs @@ -0,0 +1,333 @@ +use context_allocator::adaptors::*; +use context_allocator::allocators::global::*; +use context_allocator::allocators::*; +use context_allocator::memory_sources::arena_memory_source::ArenaMemorySource; +use context_allocator::memory_sources::mmap::MemoryMapSource; +use context_allocator::MemoryAddress; +use std::alloc::{Alloc, AllocErr, CannotReallocInPlace, Excess, GlobalAlloc, Layout, System}; +use std::mem::replace; +use std::num::NonZeroUsize; + +/// ////////////////////////////////////////////////// +/// /////////// GLOBAL ALLOCATOR +/// ////////////////////////////////////////////////// +#[global_allocator] +static GLOBAL: GlobalThreadAndCoroutineSwitchableAllocatorInstance = + GlobalThreadAndCoroutineSwitchableAllocatorInstance { + global_allocator: GlobalAllocToAllocatorAdaptor(System), + }; + +/// Effectively this is a field of `GlobalThreadAndCoroutineSwitchableAllocatorInstance` with a different value for each thread. +/// +/// It is this piece of logic that necessitates this macro definition. +#[thread_local] +static mut per_thread_state: PerThreadState< + BumpAllocator>, + MultipleBinarySearchTreeAllocator, +> = PerThreadState::empty(); + +pub(crate) struct GlobalThreadAndCoroutineSwitchableAllocatorInstance { + pub(crate) global_allocator: GlobalAllocToAllocatorAdaptor, +} + +#[automatically_derived] +#[allow(unused_qualifications)] +impl ::core::fmt::Debug for GlobalThreadAndCoroutineSwitchableAllocatorInstance { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + match *self { + GlobalThreadAndCoroutineSwitchableAllocatorInstance { + global_allocator: ref __self_0_0, + } => { + let mut debug_trait_builder = + f.debug_struct("GlobalThreadAndCoroutineSwitchableAllocatorInstance"); + let _ = debug_trait_builder.field("global_allocator", &&(*__self_0_0)); + debug_trait_builder.finish() + } + } + } +} + +unsafe impl Sync for GlobalThreadAndCoroutineSwitchableAllocatorInstance {} + +unsafe impl GlobalAlloc for GlobalThreadAndCoroutineSwitchableAllocatorInstance { + #[inline(always)] + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + self.GlobalAlloc_alloc(layout) + } + #[inline(always)] + unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { + self.GlobalAlloc_alloc_zeroed(layout) + } + #[inline(always)] + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + self.GlobalAlloc_dealloc(ptr, layout) + } + #[inline(always)] + unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + self.GlobalAlloc_realloc(ptr, layout, new_size) + } +} + +unsafe impl Alloc for GlobalThreadAndCoroutineSwitchableAllocatorInstance { + #[inline(always)] + unsafe fn alloc(&mut self, layout: Layout) -> Result { + self.Alloc_alloc(layout) + } + #[inline(always)] + unsafe fn alloc_zeroed(&mut self, layout: Layout) -> Result { + self.Alloc_alloc_zeroed(layout) + } + #[inline(always)] + unsafe fn dealloc(&mut self, ptr: MemoryAddress, layout: Layout) { + self.Alloc_dealloc(ptr, layout) + } + #[inline(always)] + unsafe fn realloc( + &mut self, + ptr: MemoryAddress, + layout: Layout, + new_size: usize, + ) -> Result { + self.Alloc_realloc(ptr, layout, new_size) + } + #[inline(always)] + unsafe fn alloc_excess(&mut self, layout: Layout) -> Result { + self.Alloc_alloc_excess(layout) + } + #[inline(always)] + unsafe fn realloc_excess( + &mut self, + ptr: MemoryAddress, + layout: Layout, + new_size: usize, + ) -> Result { + self.Alloc_realloc_excess(ptr, layout, new_size) + } + #[inline(always)] + unsafe fn grow_in_place( + &mut self, + ptr: MemoryAddress, + layout: Layout, + new_size: usize, + ) -> Result<(), CannotReallocInPlace> { + self.Alloc_grow_in_place(ptr, layout, new_size) + } + #[inline(always)] + unsafe fn shrink_in_place( + &mut self, + ptr: MemoryAddress, + layout: Layout, + new_size: usize, + ) -> Result<(), CannotReallocInPlace> { + self.Alloc_shrink_in_place(ptr, layout, new_size) + } +} + +impl Allocator for GlobalThreadAndCoroutineSwitchableAllocatorInstance { + #[inline(always)] + fn allocate( + &self, + non_zero_size: NonZeroUsize, + non_zero_power_of_two_alignment: NonZeroUsize, + ) -> Result { + use self::CurrentAllocatorInUse::*; + match self.save_current_allocator_in_use() { + CoroutineLocal => self + .coroutine_local_allocator() + .expect("Should have assigned a coroutine local allocator") + .allocate(non_zero_size, non_zero_power_of_two_alignment), + ThreadLocal => self + .thread_local_allocator() + .expect("Should have assigned a thread local allocator") + .allocate(non_zero_size, non_zero_power_of_two_alignment), + Global => self + .global_allocator() + .allocate(non_zero_size, non_zero_power_of_two_alignment), + } + } + #[inline(always)] + fn deallocate( + &self, + non_zero_size: NonZeroUsize, + non_zero_power_of_two_alignment: NonZeroUsize, + current_memory: MemoryAddress, + ) { + { + if let Some(coroutine_local_allocator) = self.coroutine_local_allocator() { + if unsafe { + ::std::intrinsics::likely(coroutine_local_allocator.contains(current_memory)) + } { + return coroutine_local_allocator.deallocate( + non_zero_size, + non_zero_power_of_two_alignment, + current_memory, + ); + } + } + if let Some(thread_local_allocator) = self.thread_local_allocator() { + if unsafe { + ::std::intrinsics::likely(thread_local_allocator.contains(current_memory)) + } { + return thread_local_allocator.deallocate( + non_zero_size, + non_zero_power_of_two_alignment, + current_memory, + ); + } + } + self.global_allocator().deallocate( + non_zero_size, + non_zero_power_of_two_alignment, + current_memory, + ) + } + } + #[inline(always)] + fn growing_reallocate( + &self, + non_zero_new_size: NonZeroUsize, + non_zero_power_of_two_alignment: NonZeroUsize, + non_zero_current_size: NonZeroUsize, + current_memory: MemoryAddress, + ) -> Result { + { + if let Some(coroutine_local_allocator) = self.coroutine_local_allocator() { + if unsafe { + ::std::intrinsics::likely(coroutine_local_allocator.contains(current_memory)) + } { + return coroutine_local_allocator.growing_reallocate( + non_zero_new_size, + non_zero_power_of_two_alignment, + non_zero_current_size, + current_memory, + ); + } + } + if let Some(thread_local_allocator) = self.thread_local_allocator() { + if unsafe { + ::std::intrinsics::likely(thread_local_allocator.contains(current_memory)) + } { + return thread_local_allocator.growing_reallocate( + non_zero_new_size, + non_zero_power_of_two_alignment, + non_zero_current_size, + current_memory, + ); + } + } + self.global_allocator().growing_reallocate( + non_zero_new_size, + non_zero_power_of_two_alignment, + non_zero_current_size, + current_memory, + ) + } + } + #[inline(always)] + fn shrinking_reallocate( + &self, + non_zero_new_size: NonZeroUsize, + non_zero_power_of_two_alignment: NonZeroUsize, + non_zero_current_size: NonZeroUsize, + current_memory: MemoryAddress, + ) -> Result { + { + if let Some(coroutine_local_allocator) = self.coroutine_local_allocator() { + if unsafe { + ::std::intrinsics::likely(coroutine_local_allocator.contains(current_memory)) + } { + return coroutine_local_allocator.growing_reallocate( + non_zero_new_size, + non_zero_power_of_two_alignment, + non_zero_current_size, + current_memory, + ); + } + } + if let Some(thread_local_allocator) = self.thread_local_allocator() { + if unsafe { + ::std::intrinsics::likely(thread_local_allocator.contains(current_memory)) + } { + return thread_local_allocator.growing_reallocate( + non_zero_new_size, + non_zero_power_of_two_alignment, + non_zero_current_size, + current_memory, + ); + } + } + self.global_allocator().growing_reallocate( + non_zero_new_size, + non_zero_power_of_two_alignment, + non_zero_current_size, + current_memory, + ) + } + } +} + +impl GlobalThreadAndCoroutineSwitchableAllocator + for GlobalThreadAndCoroutineSwitchableAllocatorInstance +{ + type CoroutineLocalAllocator = BumpAllocator>; + type ThreadLocalAllocator = MultipleBinarySearchTreeAllocator; + type GlobalAllocator = GlobalAllocToAllocatorAdaptor; + #[inline(always)] + fn replace_coroutine_local_allocator( + &self, + replacement: Option, + ) -> Option { + unsafe { replace(&mut per_thread_state.coroutine_local_allocator, replacement) } + } + #[inline(always)] + fn initialize_thread_local_allocator( + &self, + thread_local_allocator: Self::ThreadLocalAllocator, + ) { + if true { + if !unsafe { per_thread_state.thread_local_allocator.is_none() } { + { + ::std::rt::begin_panic( + "Already initialized thread local allocator", + &("allocator.rs", 146u32, 1u32), + ) + } + }; + }; + unsafe { per_thread_state.thread_local_allocator = Some(thread_local_allocator) } + } + #[inline(always)] + fn drop_thread_local_allocator(&self) { + if true { + if !unsafe { per_thread_state.thread_local_allocator.is_some() } { + { + ::std::rt::begin_panic( + "Already deinitialized thread local allocator", + &("allocator.rs", 146u32, 1u32), + ) + } + }; + }; + unsafe { per_thread_state.thread_local_allocator = None } + } + #[inline(always)] + fn save_current_allocator_in_use(&self) -> CurrentAllocatorInUse { + unsafe { per_thread_state.current_allocator_in_use } + } + #[inline(always)] + fn restore_current_allocator_in_use(&self, restore_to: CurrentAllocatorInUse) { + unsafe { per_thread_state.current_allocator_in_use = restore_to } + } + #[inline(always)] + fn coroutine_local_allocator(&self) -> Option<&Self::CoroutineLocalAllocator> { + unsafe { per_thread_state.coroutine_local_allocator.as_ref() } + } + #[inline(always)] + fn thread_local_allocator(&self) -> Option<&Self::ThreadLocalAllocator> { + unsafe { per_thread_state.thread_local_allocator.as_ref() } + } + #[inline(always)] + fn global_allocator(&self) -> &Self::GlobalAllocator { + &self.global_allocator + } +} diff --git a/bastion-executor/src/distributor.rs b/bastion-executor/src/distributor.rs index 0209c526..9a916748 100644 --- a/bastion-executor/src/distributor.rs +++ b/bastion-executor/src/distributor.rs @@ -1,10 +1,10 @@ use super::placement; use super::placement::CoreId; -use std::thread; -use super::run_queue::{Worker, Stealer}; -use lightproc::prelude::*; +use super::run_queue::{Stealer, Worker}; use super::worker; +use lightproc::prelude::*; use std::sync::Arc; +use std::thread; pub(crate) struct Distributor { pub round: usize, @@ -21,8 +21,7 @@ impl Distributor { } } - pub fn assign(mut self) -> (Vec>, Vec>) - { + pub fn assign(mut self) -> (Vec>, Vec>) { let mut stealers = Vec::>::new(); let mut workers = Vec::>::new(); @@ -40,7 +39,7 @@ impl Distributor { placement::set_for_current(core); // actual execution -// worker::main_loop(wrk);x + // worker::main_loop(wrk);x }) .expect("cannot start the thread for running proc"); } diff --git a/bastion-executor/src/lib.rs b/bastion-executor/src/lib.rs index f7214cd2..fdc1f773 100644 --- a/bastion-executor/src/lib.rs +++ b/bastion-executor/src/lib.rs @@ -1,13 +1,19 @@ #![feature(asm)] +// Allocator features +#![feature(allocator_api)] +#![feature(core_intrinsics)] +#![feature(libstd_sys_internals)] +#![feature(thread_local)] +pub mod allocator; pub mod blocking_pool; pub mod distributor; +pub mod load_balancer; pub mod placement; pub mod pool; -pub mod thread_recovery; -pub mod load_balancer; pub mod run_queue; pub mod sleepers; +pub mod thread_recovery; pub mod worker; pub mod prelude { diff --git a/bastion-executor/src/load_balancer.rs b/bastion-executor/src/load_balancer.rs index 23d0319b..8c5e412d 100644 --- a/bastion-executor/src/load_balancer.rs +++ b/bastion-executor/src/load_balancer.rs @@ -1,11 +1,12 @@ +use super::placement; +use super::pool; +use super::run_queue::{Stealer, Worker}; use lazy_static::*; -use std::{thread, time}; -use std::collections::VecDeque; use lightproc::lightproc::LightProc; -use super::run_queue::{Worker, Stealer}; -use super::pool; -use super::placement; +use std::collections::VecDeque; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; +use std::{thread, time}; const SIXTY_MILLIS: time::Duration = time::Duration::from_millis(60); @@ -21,30 +22,29 @@ impl LoadBalancer { pool::get().injector.steal_batch_and_pop(w); }); - let stealer = - pool::get().stealers.iter().min_by_key(|e| e.run_queue_size()) - .unwrap(); + let stealer = pool::get() + .stealers + .iter() + .min_by_key(|e| e.run_queue_size()) + .unwrap(); - let worker = - workers.iter().min_by_key(|e| e.worker_run_queue_size()) - .unwrap(); + let worker = workers + .iter() + .min_by_key(|e| e.worker_run_queue_size()) + .unwrap(); let big = worker.worker_run_queue_size(); let small = stealer.run_queue_size(); let m = (big & small) + ((big ^ small) >> 1); - stealer - .steal_batch_and_pop_with_amount( - &worker, - big.wrapping_sub(m) - ); + stealer.steal_batch_and_pop_with_amount(&worker, big.wrapping_sub(m)); // General suspending is equal to cache line size in ERTS // https://github.com/erlang/otp/blob/master/erts/emulator/beam/erl_process.c#L10887 // https://github.com/erlang/otp/blob/ea7d6c39f2179b2240d55df4a1ddd515b6d32832/erts/emulator/beam/erl_thr_progress.c#L237 // thread::sleep(SIXTY_MILLIS); - (0..64).for_each(|_| { - unsafe { asm!("NOP"); } + (0..64).for_each(|_| unsafe { + asm!("NOP"); }) } }) @@ -53,3 +53,16 @@ impl LoadBalancer { self } } + +pub struct Stats { + // global_run_queue: AtomicUsize, +// smp_queues: Vec, +} + +#[inline] +pub fn stats() -> &'static Stats { + lazy_static! { + static ref LB_STATS: Stats = { Stats {} }; + } + &*LB_STATS +} diff --git a/bastion-executor/src/pool.rs b/bastion-executor/src/pool.rs index d764d3de..f97fc55d 100644 --- a/bastion-executor/src/pool.rs +++ b/bastion-executor/src/pool.rs @@ -1,19 +1,18 @@ use super::distributor::Distributor; -use super::run_queue::{Worker, Injector, Stealer}; -use lazy_static::*; -use lightproc::prelude::*; -use super::sleepers::Sleepers; use super::load_balancer; use super::load_balancer::LoadBalancer; -use std::future::Future; +use super::run_queue::{Injector, Stealer, Worker}; +use super::sleepers::Sleepers; use super::worker; +use lazy_static::*; +use lightproc::prelude::*; +use std::future::Future; use std::sync::Arc; - pub fn spawn(future: F, stack: ProcStack) -> RecoverableHandle - where - F: Future + Send + 'static, - T: Send + 'static, +where + F: Future + Send + 'static, + T: Send + 'static, { self::get().spawn(future, stack) } @@ -36,21 +35,18 @@ impl Pool { } pub fn spawn(&self, future: F, stack: ProcStack) -> RecoverableHandle - where - F: Future + Send + 'static, - T: Send + 'static, + where + F: Future + Send + 'static, + T: Send + 'static, { // Log this `spawn` operation. let child_id = stack.get_pid() as u64; - let parent_id = - worker::get_proc_stack(|t| t.get_pid() as u64) - .unwrap_or(0); + let parent_id = worker::get_proc_stack(|t| t.get_pid() as u64).unwrap_or(0); dbg!(parent_id); dbg!(child_id); - let (task, handle) = - LightProc::recoverable(future, worker::schedule, stack); + let (task, handle) = LightProc::recoverable(future, worker::schedule, stack); task.schedule(); handle } diff --git a/bastion-executor/src/run_queue.rs b/bastion-executor/src/run_queue.rs index ad5f0be4..15b54983 100644 --- a/bastion-executor/src/run_queue.rs +++ b/bastion-executor/src/run_queue.rs @@ -353,7 +353,7 @@ impl Worker { let f = self.inner.front.load(Ordering::SeqCst); match b.wrapping_sub(f) { x if x <= 0 => 0_usize, - y@_ => y as usize + y @ _ => y as usize, } } @@ -669,18 +669,16 @@ impl Stealer { b.wrapping_sub(f) <= 0 } - pub fn run_queue_size(&self) -> usize { let b = self.inner.back.load(Ordering::Acquire); atomic::fence(Ordering::SeqCst); let f = self.inner.front.load(Ordering::Acquire); match b.wrapping_sub(f) { x if x <= 0 => 0_usize, - y@_ => y as usize + y @ _ => y as usize, } } - /// Steals a task from the queue. /// /// # Examples @@ -2128,8 +2126,8 @@ impl Steal { /// assert_eq!(Empty.or_else(|| Empty), Empty::); /// ``` pub fn or_else(self, f: F) -> Steal - where - F: FnOnce() -> Steal, + where + F: FnOnce() -> Steal, { match self { Steal::Empty => f(), @@ -2161,8 +2159,8 @@ impl FromIterator> for Steal { /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`. /// Otherwise, `Empty` is returned. fn from_iter(iter: I) -> Steal - where - I: IntoIterator>, + where + I: IntoIterator>, { let mut retry = false; for s in iter { diff --git a/bastion-executor/src/worker.rs b/bastion-executor/src/worker.rs index c4eda515..ae0085d4 100644 --- a/bastion-executor/src/worker.rs +++ b/bastion-executor/src/worker.rs @@ -2,8 +2,8 @@ use std::cell::Cell; use std::ptr; use super::pool; -use lightproc::prelude::*; use super::run_queue::Worker; +use lightproc::prelude::*; pub fn current() -> ProcStack { get_proc_stack(|proc| proc.clone()) @@ -15,8 +15,8 @@ thread_local! { } pub(crate) fn set_stack(stack: *const ProcStack, f: F) -> R - where - F: FnOnce() -> R, +where + F: FnOnce() -> R, { struct ResetStack<'a>(&'a Cell<*const ProcStack>); @@ -35,12 +35,10 @@ pub(crate) fn set_stack(stack: *const ProcStack, f: F) -> R } pub(crate) fn get_proc_stack(f: F) -> Option - where - F: FnOnce(&ProcStack) -> R, +where + F: FnOnce(&ProcStack) -> R, { - let res = STACK.try_with(|st| unsafe { - st.get().as_ref().map(f) - }); + let res = STACK.try_with(|st| unsafe { st.get().as_ref().map(f) }); match res { Ok(Some(val)) => Some(val),