From 95686db41dabc8270132a58e68014b3d744e8202 Mon Sep 17 00:00:00 2001 From: Nick Fitzgerald Date: Wed, 8 May 2024 17:39:25 -0700 Subject: [PATCH] Wasmtime(pooling allocator): Batch decommits This introduces a `DecommitQueue` for batching decommits together in the pooling allocator: * Deallocating a memory/table/stack enqueues their associated regions of memory for decommit; it no longer immediately returns the associated slot to the pool's free list. If the queue's length has reached the configured batch size, then we flush the queue by running all the decommits, and finally returning the memory/table/stack slots to their respective pools and free lists. * Additionally, if allocating a new memory/table/stack fails because the free list is empty (aka we've reached the max concurrently-allocated limit for this entity) then we fall back to a slow path before propagating the error. This slow path flushes the decommit queue and then retries allocation, hoping that the queue flush reclaimed slots and made them available for this fallback allocation attempt. This involved defining a new `PoolConcurrencyLimitError` to match on, which is also exposed in the public embedder API. It is also worth noting that we *always* use this new decommit queue now. To keep the existing behavior, where e.g. a memory's decommits happen immediately on deallocation, you can use a batch size of one. This effectively disables queueing, forcing all decommits to be flushed immediately. The default decommit batch size is one. This commit, with batch size of one, consistently gives me an increase on `wasmtime serve`'s requests-per-second versus its parent commit, as measured by `benches/wasmtime-serve-rps.sh`. I get ~39K RPS on this commit compared to ~35K RPS on the parent commit. This is quite puzzling to me. I was expecting no change, and hoping there wouldn't be a regression. I was not expecting a speed up. I cannot explain this result at this time. Co-Authored-By: Jamey Sharp --- benches/wasmtime-serve-rps.sh | 6 +- crates/cli-flags/src/lib.rs | 4 + crates/fiber/src/lib.rs | 14 ++ crates/fiber/src/unix.rs | 12 +- crates/fiber/src/windows.rs | 4 + .../fuzzing/src/generators/pooling_config.rs | 3 + crates/wasmtime/src/config.rs | 14 ++ crates/wasmtime/src/runtime.rs | 3 + crates/wasmtime/src/runtime/store.rs | 16 +- .../wasmtime/src/runtime/trampoline/memory.rs | 2 +- crates/wasmtime/src/runtime/vm.rs | 3 +- crates/wasmtime/src/runtime/vm/cow.rs | 110 +++++++--- .../src/runtime/vm/instance/allocator.rs | 7 +- .../vm/instance/allocator/on_demand.rs | 4 +- .../runtime/vm/instance/allocator/pooling.rs | 180 +++++++++++++++-- .../allocator/pooling/decommit_queue.rs | 189 ++++++++++++++++++ .../allocator/pooling/generic_stack_pool.rs | 14 +- .../instance/allocator/pooling/memory_pool.rs | 64 +++--- .../instance/allocator/pooling/table_pool.rs | 53 +++-- .../allocator/pooling/unix_stack_pool.rs | 98 ++++++--- tests/all/pooling_allocator.rs | 36 +--- 21 files changed, 657 insertions(+), 179 deletions(-) create mode 100644 crates/wasmtime/src/runtime/vm/instance/allocator/pooling/decommit_queue.rs diff --git a/benches/wasmtime-serve-rps.sh b/benches/wasmtime-serve-rps.sh index 19262b5392b2..33d83a74ab15 100755 --- a/benches/wasmtime-serve-rps.sh +++ b/benches/wasmtime-serve-rps.sh @@ -15,12 +15,16 @@ set -e repo_dir="$(dirname $0)/.." cargo_toml="$repo_dir/Cargo.toml" +target_dir="$CARGO_TARGET_DIR" +if [[ "$target_dir" == "" ]]; then + target_dir="$repo_dir/target" +fi # Build Wasmtime. cargo build --manifest-path "$cargo_toml" --release -p wasmtime-cli # Spawn `wasmtime serve` in the background. -cargo run --manifest-path "$cargo_toml" --release -- serve "$@" & +"$target_dir/release/wasmtime" serve "$@" & pid=$! # Give it a second to print its diagnostic information and get the server up and diff --git a/crates/cli-flags/src/lib.rs b/crates/cli-flags/src/lib.rs index 50ddc944103d..9f196f45966d 100644 --- a/crates/cli-flags/src/lib.rs +++ b/crates/cli-flags/src/lib.rs @@ -66,6 +66,10 @@ wasmtime_option_group! { /// Enable the pooling allocator, in place of the on-demand allocator. pub pooling_allocator: Option, + /// The number of decommits to do per batch. A batch size of 1 + /// effectively disables decommit batching. (default: 1) + pub pooling_decommit_batch_size: Option, + /// How many bytes to keep resident between instantiations for the /// pooling allocator in linear memories. pub pooling_memory_keep_resident: Option, diff --git a/crates/fiber/src/lib.rs b/crates/fiber/src/lib.rs index 6292079e3214..4d48d5e80cb4 100644 --- a/crates/fiber/src/lib.rs +++ b/crates/fiber/src/lib.rs @@ -1,4 +1,5 @@ use anyhow::Error; +use core::{mem, ptr}; use std::any::Any; use std::cell::Cell; use std::io; @@ -61,6 +62,12 @@ impl FiberStack { pub fn range(&self) -> Option> { self.0.range() } + + /// Is this a manually-managed stack created from raw parts? If so, it is up + /// to whoever created it to manage the stack's memory allocation. + pub fn is_from_raw_parts(&self) -> bool { + self.0.is_from_raw_parts() + } } /// A creator of RuntimeFiberStacks. @@ -160,6 +167,13 @@ impl<'a, Resume, Yield, Return> Fiber<'a, Resume, Yield, Return> { pub fn stack(&self) -> &FiberStack { &self.stack } + + /// When this fiber has finished executing, reclaim its stack. + pub fn into_stack(mut self) -> FiberStack { + assert!(self.done()); + let null_stack = unsafe { FiberStack::from_raw_parts(ptr::null_mut(), 0).unwrap() }; + mem::replace(&mut self.stack, null_stack) + } } impl Suspend { diff --git a/crates/fiber/src/unix.rs b/crates/fiber/src/unix.rs index f14a5cf5394a..b1600654361a 100644 --- a/crates/fiber/src/unix.rs +++ b/crates/fiber/src/unix.rs @@ -41,7 +41,7 @@ pub struct FiberStack { /// Stored here to ensure that when this `FiberStack` the backing storage, /// if any, is additionally dropped. - _storage: FiberStackStorage, + storage: FiberStackStorage, } enum FiberStackStorage { @@ -66,7 +66,7 @@ impl FiberStack { Ok(FiberStack { base: stack.mapping_base.wrapping_byte_add(page_size), len: stack.mapping_len - page_size, - _storage: FiberStackStorage::Mmap(stack), + storage: FiberStackStorage::Mmap(stack), }) } @@ -79,10 +79,14 @@ impl FiberStack { Ok(FiberStack { base, len, - _storage: FiberStackStorage::Unmanaged, + storage: FiberStackStorage::Unmanaged, }) } + pub fn is_from_raw_parts(&self) -> bool { + matches!(self.storage, FiberStackStorage::Unmanaged) + } + pub fn from_custom(custom: Box) -> io::Result { let range = custom.range(); let page_size = rustix::param::page_size(); @@ -99,7 +103,7 @@ impl FiberStack { Ok(FiberStack { base: start_ptr, len: range.len(), - _storage: FiberStackStorage::Custom(custom), + storage: FiberStackStorage::Custom(custom), }) } diff --git a/crates/fiber/src/windows.rs b/crates/fiber/src/windows.rs index 9a0b3497e9f0..e15d1bb2bc40 100644 --- a/crates/fiber/src/windows.rs +++ b/crates/fiber/src/windows.rs @@ -19,6 +19,10 @@ impl FiberStack { Err(io::Error::from_raw_os_error(ERROR_NOT_SUPPORTED as i32)) } + pub fn is_from_raw_parts(&self) -> bool { + false + } + pub fn from_custom(_custom: Box) -> io::Result { Err(io::Error::from_raw_os_error(ERROR_NOT_SUPPORTED as i32)) } diff --git a/crates/fuzzing/src/generators/pooling_config.rs b/crates/fuzzing/src/generators/pooling_config.rs index 0997597f0350..a832bd047b8f 100644 --- a/crates/fuzzing/src/generators/pooling_config.rs +++ b/crates/fuzzing/src/generators/pooling_config.rs @@ -27,6 +27,7 @@ pub struct PoolingAllocationConfig { pub table_keep_resident: usize, pub linear_memory_keep_resident: usize, + pub decommit_batch_size: u32, pub max_unused_warm_slots: u32, pub async_stack_zeroing: bool, @@ -61,6 +62,7 @@ impl PoolingAllocationConfig { cfg.table_keep_resident(self.table_keep_resident); cfg.linear_memory_keep_resident(self.linear_memory_keep_resident); + cfg.decommit_batch_size(self.decommit_batch_size); cfg.max_unused_warm_slots(self.max_unused_warm_slots); cfg.async_stack_zeroing(self.async_stack_zeroing); @@ -106,6 +108,7 @@ impl<'a> Arbitrary<'a> for PoolingAllocationConfig { table_keep_resident: u.int_in_range(0..=1 << 20)?, linear_memory_keep_resident: u.int_in_range(0..=1 << 20)?, + decommit_batch_size: u.int_in_range(1..=1000)?, max_unused_warm_slots: u.int_in_range(0..=total_memories + 10)?, async_stack_zeroing: u.arbitrary()?, diff --git a/crates/wasmtime/src/config.rs b/crates/wasmtime/src/config.rs index 7c7da779ce79..c63e8b32b0d0 100644 --- a/crates/wasmtime/src/config.rs +++ b/crates/wasmtime/src/config.rs @@ -2405,6 +2405,20 @@ impl PoolingAllocationConfig { self } + /// The target number of decommits to do per batch. + /// + /// This is not precise, as we can queue up decommits at times when we + /// aren't prepared to immediately flush them, and so we may go over this + /// target size occasionally. + /// + /// A batch size of one effectively disables batching. + /// + /// Defaults to `1`. + pub fn decommit_batch_size(&mut self, batch_size: u32) -> &mut Self { + self.config.decommit_batch_size = batch_size; + self + } + /// Configures whether or not stacks used for async futures are reset to /// zero after usage. /// diff --git a/crates/wasmtime/src/runtime.rs b/crates/wasmtime/src/runtime.rs index b3a5ecd527ea..0750ad42451b 100644 --- a/crates/wasmtime/src/runtime.rs +++ b/crates/wasmtime/src/runtime.rs @@ -61,6 +61,9 @@ pub use values::*; pub(crate) use uninhabited::*; +#[cfg(feature = "pooling-allocator")] +pub use vm::PoolConcurrencyLimitError; + #[cfg(feature = "profiling")] mod profiling; #[cfg(feature = "profiling")] diff --git a/crates/wasmtime/src/runtime/store.rs b/crates/wasmtime/src/runtime/store.rs index 599960ebdf98..e84491ab3584 100644 --- a/crates/wasmtime/src/runtime/store.rs +++ b/crates/wasmtime/src/runtime/store.rs @@ -2067,7 +2067,7 @@ impl StoreContextMut<'_, T> { // wrap that in a custom future implementation which does the // translation from the future protocol to our fiber API. FiberFuture { - fiber, + fiber: Some(fiber), current_poll_cx, engine, state: Some(crate::runtime::vm::AsyncWasmCallState::new()), @@ -2078,7 +2078,7 @@ impl StoreContextMut<'_, T> { return Ok(slot.unwrap()); struct FiberFuture<'a> { - fiber: wasmtime_fiber::Fiber<'a, Result<()>, (), Result<()>>, + fiber: Option, (), Result<()>>>, current_poll_cx: *mut *mut Context<'static>, engine: Engine, // See comments in `FiberFuture::resume` for this @@ -2149,6 +2149,10 @@ impl StoreContextMut<'_, T> { unsafe impl Send for FiberFuture<'_> {} impl FiberFuture<'_> { + fn fiber(&self) -> &wasmtime_fiber::Fiber<'_, Result<()>, (), Result<()>> { + self.fiber.as_ref().unwrap() + } + /// This is a helper function to call `resume` on the underlying /// fiber while correctly managing Wasmtime's thread-local data. /// @@ -2174,7 +2178,7 @@ impl StoreContextMut<'_, T> { fiber: self, state: Some(prev), }; - return restore.fiber.fiber.resume(val); + return restore.fiber.fiber().resume(val); } struct Restore<'a, 'b> { @@ -2241,7 +2245,7 @@ impl StoreContextMut<'_, T> { // then that's a bug indicating that TLS management in // Wasmtime is incorrect. Err(()) => { - if let Some(range) = self.fiber.stack().range() { + if let Some(range) = self.fiber().stack().range() { crate::runtime::vm::AsyncWasmCallState::assert_current_state_not_in_range(range); } Poll::Pending @@ -2268,7 +2272,7 @@ impl StoreContextMut<'_, T> { // completion. impl Drop for FiberFuture<'_> { fn drop(&mut self) { - if !self.fiber.done() { + if !self.fiber().done() { let result = self.resume(Err(anyhow!("future dropped"))); // This resumption with an error should always complete the // fiber. While it's technically possible for host code to catch @@ -2282,7 +2286,7 @@ impl StoreContextMut<'_, T> { unsafe { self.engine .allocator() - .deallocate_fiber_stack(self.fiber.stack()); + .deallocate_fiber_stack(self.fiber.take().unwrap().into_stack()); } } } diff --git a/crates/wasmtime/src/runtime/trampoline/memory.rs b/crates/wasmtime/src/runtime/trampoline/memory.rs index 9afffeec1984..a13923d62b47 100644 --- a/crates/wasmtime/src/runtime/trampoline/memory.rs +++ b/crates/wasmtime/src/runtime/trampoline/memory.rs @@ -248,7 +248,7 @@ unsafe impl InstanceAllocatorImpl for SingleMemoryInstance<'_> { } #[cfg(feature = "async")] - unsafe fn deallocate_fiber_stack(&self, _stack: &wasmtime_fiber::FiberStack) { + unsafe fn deallocate_fiber_stack(&self, _stack: wasmtime_fiber::FiberStack) { unreachable!() } diff --git a/crates/wasmtime/src/runtime/vm.rs b/crates/wasmtime/src/runtime/vm.rs index 95e85777bec1..3683b3182a80 100644 --- a/crates/wasmtime/src/runtime/vm.rs +++ b/crates/wasmtime/src/runtime/vm.rs @@ -55,7 +55,8 @@ pub use crate::runtime::vm::instance::{ }; #[cfg(feature = "pooling-allocator")] pub use crate::runtime::vm::instance::{ - InstanceLimits, PoolingInstanceAllocator, PoolingInstanceAllocatorConfig, + InstanceLimits, PoolConcurrencyLimitError, PoolingInstanceAllocator, + PoolingInstanceAllocatorConfig, }; pub use crate::runtime::vm::memory::{Memory, RuntimeLinearMemory, RuntimeMemoryCreator}; pub use crate::runtime::vm::mmap::Mmap; diff --git a/crates/wasmtime/src/runtime/vm/cow.rs b/crates/wasmtime/src/runtime/vm/cow.rs index c511b64bb458..da3f0e229eeb 100644 --- a/crates/wasmtime/src/runtime/vm/cow.rs +++ b/crates/wasmtime/src/runtime/vm/cow.rs @@ -470,11 +470,15 @@ impl MemoryImageSlot { /// process's memory on Linux. Up to that much memory will be `memset` to /// zero where the rest of it will be reset or released with `madvise`. #[allow(dead_code)] // ignore warnings as this is only used in some cfgs - pub(crate) fn clear_and_remain_ready(&mut self, keep_resident: usize) -> Result<()> { + pub(crate) fn clear_and_remain_ready( + &mut self, + keep_resident: usize, + decommit: impl FnMut(*mut u8, usize), + ) -> Result<()> { assert!(self.dirty); unsafe { - self.reset_all_memory_contents(keep_resident)?; + self.reset_all_memory_contents(keep_resident, decommit)?; } self.dirty = false; @@ -482,7 +486,11 @@ impl MemoryImageSlot { } #[allow(dead_code)] // ignore warnings as this is only used in some cfgs - unsafe fn reset_all_memory_contents(&mut self, keep_resident: usize) -> Result<()> { + unsafe fn reset_all_memory_contents( + &mut self, + keep_resident: usize, + decommit: impl FnMut(*mut u8, usize), + ) -> Result<()> { match vm::decommit_behavior() { DecommitBehavior::Zero => { // If we're not on Linux then there's no generic platform way to @@ -494,13 +502,18 @@ impl MemoryImageSlot { self.reset_with_anon_memory() } DecommitBehavior::RestoreOriginalMapping => { - self.reset_with_original_mapping(keep_resident) + self.reset_with_original_mapping(keep_resident, decommit); + Ok(()) } } } #[allow(dead_code)] // ignore warnings as this is only used in some cfgs - unsafe fn reset_with_original_mapping(&mut self, keep_resident: usize) -> Result<()> { + unsafe fn reset_with_original_mapping( + &mut self, + keep_resident: usize, + mut decommit: impl FnMut(*mut u8, usize), + ) { match &self.image { Some(image) => { assert!(self.accessible >= image.linear_memory_offset + image.len); @@ -544,7 +557,11 @@ impl MemoryImageSlot { ptr::write_bytes(self.base.as_ptr(), 0u8, image.linear_memory_offset); // This is madvise (2) - self.restore_original_mapping(image.linear_memory_offset, image.len)?; + self.restore_original_mapping( + image.linear_memory_offset, + image.len, + &mut decommit, + ); // This is memset (3) ptr::write_bytes(self.base.as_ptr().add(image_end), 0u8, remaining_memset); @@ -553,7 +570,8 @@ impl MemoryImageSlot { self.restore_original_mapping( image_end + remaining_memset, mem_after_image - remaining_memset, - )?; + &mut decommit, + ); } else { // If the image starts after the `keep_resident` threshold // then we memset the start of linear memory and then use @@ -578,7 +596,11 @@ impl MemoryImageSlot { ptr::write_bytes(self.base.as_ptr(), 0u8, keep_resident); // This is madvise (2) - self.restore_original_mapping(keep_resident, self.accessible - keep_resident)?; + self.restore_original_mapping( + keep_resident, + self.accessible - keep_resident, + decommit, + ); } } @@ -588,26 +610,32 @@ impl MemoryImageSlot { None => { let size_to_memset = keep_resident.min(self.accessible); ptr::write_bytes(self.base.as_ptr(), 0u8, size_to_memset); - self.restore_original_mapping(size_to_memset, self.accessible - size_to_memset)?; + self.restore_original_mapping( + size_to_memset, + self.accessible - size_to_memset, + decommit, + ); } } - - Ok(()) } #[allow(dead_code)] // ignore warnings as this is only used in some cfgs - unsafe fn restore_original_mapping(&self, base: usize, len: usize) -> Result<()> { + unsafe fn restore_original_mapping( + &self, + base: usize, + len: usize, + mut decommit: impl FnMut(*mut u8, usize), + ) { assert!(base + len <= self.accessible); if len == 0 { - return Ok(()); + return; } assert_eq!( vm::decommit_behavior(), DecommitBehavior::RestoreOriginalMapping ); - vm::decommit_pages(self.base.as_ptr().add(base), len).err2anyhow()?; - Ok(()) + decommit(self.base.as_ptr().add(base), len); } fn set_protection(&self, range: Range, readwrite: bool) -> Result<()> { @@ -699,11 +727,11 @@ impl Drop for MemoryImageSlot { #[cfg(all(test, target_os = "linux", not(miri)))] mod test { - use std::sync::Arc; - use super::{MemoryImage, MemoryImageSlot, MemoryImageSource, MemoryPlan, MemoryStyle}; use crate::runtime::vm::mmap::Mmap; + use crate::runtime::vm::sys::vm::decommit_pages; use anyhow::Result; + use std::sync::Arc; use wasmtime_environ::Memory; fn create_memfd_with_data(offset: usize, data: &[u8]) -> Result { @@ -762,7 +790,9 @@ mod test { assert_eq!(0, slice[131071]); // instantiate again; we should see zeroes, even as the // reuse-anon-mmap-opt kicks in - memfd.clear_and_remain_ready(0).unwrap(); + memfd + .clear_and_remain_ready(0, |ptr, len| unsafe { decommit_pages(ptr, len).unwrap() }) + .unwrap(); assert!(!memfd.is_dirty()); memfd.instantiate(64 << 10, None, &plan).unwrap(); let slice = unsafe { mmap.slice(0..65536) }; @@ -786,31 +816,41 @@ mod test { assert_eq!(&[1, 2, 3, 4], &slice[4096..4100]); slice[4096] = 5; // Clear and re-instantiate same image - memfd.clear_and_remain_ready(0).unwrap(); + memfd + .clear_and_remain_ready(0, |ptr, len| unsafe { decommit_pages(ptr, len).unwrap() }) + .unwrap(); memfd.instantiate(64 << 10, Some(&image), &plan).unwrap(); let slice = unsafe { mmap.slice_mut(0..65536) }; // Should not see mutation from above assert_eq!(&[1, 2, 3, 4], &slice[4096..4100]); // Clear and re-instantiate no image - memfd.clear_and_remain_ready(0).unwrap(); + memfd + .clear_and_remain_ready(0, |ptr, len| unsafe { decommit_pages(ptr, len).unwrap() }) + .unwrap(); memfd.instantiate(64 << 10, None, &plan).unwrap(); assert!(!memfd.has_image()); let slice = unsafe { mmap.slice_mut(0..65536) }; assert_eq!(&[0, 0, 0, 0], &slice[4096..4100]); // Clear and re-instantiate image again - memfd.clear_and_remain_ready(0).unwrap(); + memfd + .clear_and_remain_ready(0, |ptr, len| unsafe { decommit_pages(ptr, len).unwrap() }) + .unwrap(); memfd.instantiate(64 << 10, Some(&image), &plan).unwrap(); let slice = unsafe { mmap.slice_mut(0..65536) }; assert_eq!(&[1, 2, 3, 4], &slice[4096..4100]); // Create another image with different data. let image2 = Arc::new(create_memfd_with_data(4096, &[10, 11, 12, 13]).unwrap()); - memfd.clear_and_remain_ready(0).unwrap(); + memfd + .clear_and_remain_ready(0, |ptr, len| unsafe { decommit_pages(ptr, len).unwrap() }) + .unwrap(); memfd.instantiate(128 << 10, Some(&image2), &plan).unwrap(); let slice = unsafe { mmap.slice_mut(0..65536) }; assert_eq!(&[10, 11, 12, 13], &slice[4096..4100]); // Instantiate the original image again; we should notice it's // a different image and not reuse the mappings. - memfd.clear_and_remain_ready(0).unwrap(); + memfd + .clear_and_remain_ready(0, |ptr, len| unsafe { decommit_pages(ptr, len).unwrap() }) + .unwrap(); memfd.instantiate(64 << 10, Some(&image), &plan).unwrap(); let slice = unsafe { mmap.slice_mut(0..65536) }; assert_eq!(&[1, 2, 3, 4], &slice[4096..4100]); @@ -838,7 +878,11 @@ mod test { assert_eq!(&[1, 2, 3, 4], &slice[image_off..][..4]); slice[image_off] = 5; assert_eq!(&[5, 2, 3, 4], &slice[image_off..][..4]); - memfd.clear_and_remain_ready(amt_to_memset).unwrap(); + memfd + .clear_and_remain_ready(amt_to_memset, |ptr, len| unsafe { + decommit_pages(ptr, len).unwrap() + }) + .unwrap(); } } @@ -850,7 +894,11 @@ mod test { assert_eq!(chunk[0], 0); chunk[0] = 5; } - memfd.clear_and_remain_ready(amt_to_memset).unwrap(); + memfd + .clear_and_remain_ready(amt_to_memset, |ptr, len| unsafe { + decommit_pages(ptr, len).unwrap() + }) + .unwrap(); } } @@ -873,7 +921,9 @@ mod test { assert_eq!(&[1, 2, 3, 4], &slice[4096..4100]); slice[4096] = 5; assert_eq!(&[5, 2, 3, 4], &slice[4096..4100]); - memfd.clear_and_remain_ready(0).unwrap(); + memfd + .clear_and_remain_ready(0, |ptr, len| unsafe { decommit_pages(ptr, len).unwrap() }) + .unwrap(); assert_eq!(&[1, 2, 3, 4], &slice[4096..4100]); // Re-instantiate make sure it preserves memory. Grow a bit and set data @@ -884,7 +934,9 @@ mod test { assert_eq!(&[0, 0], &slice[initial..initial + 2]); slice[initial] = 100; assert_eq!(&[100, 0], &slice[initial..initial + 2]); - memfd.clear_and_remain_ready(0).unwrap(); + memfd + .clear_and_remain_ready(0, |ptr, len| unsafe { decommit_pages(ptr, len).unwrap() }) + .unwrap(); // Test that memory is still accessible, but it's been reset assert_eq!(&[0, 0], &slice[initial..initial + 2]); @@ -897,7 +949,9 @@ mod test { assert_eq!(&[0, 0], &slice[initial..initial + 2]); slice[initial] = 100; assert_eq!(&[100, 0], &slice[initial..initial + 2]); - memfd.clear_and_remain_ready(0).unwrap(); + memfd + .clear_and_remain_ready(0, |ptr, len| unsafe { decommit_pages(ptr, len).unwrap() }) + .unwrap(); // Reset the image to none and double-check everything is back to zero memfd.instantiate(64 << 10, None, &plan).unwrap(); diff --git a/crates/wasmtime/src/runtime/vm/instance/allocator.rs b/crates/wasmtime/src/runtime/vm/instance/allocator.rs index 3fa10e930b69..6309cdf8b228 100644 --- a/crates/wasmtime/src/runtime/vm/instance/allocator.rs +++ b/crates/wasmtime/src/runtime/vm/instance/allocator.rs @@ -30,7 +30,10 @@ pub use self::on_demand::OnDemandInstanceAllocator; #[cfg(feature = "pooling-allocator")] mod pooling; #[cfg(feature = "pooling-allocator")] -pub use self::pooling::{InstanceLimits, PoolingInstanceAllocator, PoolingInstanceAllocatorConfig}; +pub use self::pooling::{ + InstanceLimits, PoolConcurrencyLimitError, PoolingInstanceAllocator, + PoolingInstanceAllocatorConfig, +}; /// Represents a request for a new runtime instance. pub struct InstanceAllocationRequest<'a> { @@ -290,7 +293,7 @@ pub unsafe trait InstanceAllocatorImpl { /// The provided stack is required to have been allocated with /// `allocate_fiber_stack`. #[cfg(feature = "async")] - unsafe fn deallocate_fiber_stack(&self, stack: &wasmtime_fiber::FiberStack); + unsafe fn deallocate_fiber_stack(&self, stack: wasmtime_fiber::FiberStack); /// Allocate a GC heap for allocating Wasm GC objects within. #[cfg(feature = "gc")] diff --git a/crates/wasmtime/src/runtime/vm/instance/allocator/on_demand.rs b/crates/wasmtime/src/runtime/vm/instance/allocator/on_demand.rs index 48fd1009e7be..c7286c867803 100644 --- a/crates/wasmtime/src/runtime/vm/instance/allocator/on_demand.rs +++ b/crates/wasmtime/src/runtime/vm/instance/allocator/on_demand.rs @@ -171,8 +171,10 @@ unsafe impl InstanceAllocatorImpl for OnDemandInstanceAllocator { } #[cfg(feature = "async")] - unsafe fn deallocate_fiber_stack(&self, _stack: &wasmtime_fiber::FiberStack) { + unsafe fn deallocate_fiber_stack(&self, stack: wasmtime_fiber::FiberStack) { // The on-demand allocator has no further bookkeeping for fiber stacks + // beyond dropping them. + let _ = stack; } fn purge_module(&self, _: CompiledModuleId) {} diff --git a/crates/wasmtime/src/runtime/vm/instance/allocator/pooling.rs b/crates/wasmtime/src/runtime/vm/instance/allocator/pooling.rs index 6b64fde95b11..28380a364af4 100644 --- a/crates/wasmtime/src/runtime/vm/instance/allocator/pooling.rs +++ b/crates/wasmtime/src/runtime/vm/instance/allocator/pooling.rs @@ -18,6 +18,7 @@ //! item is stored in its own separate pool: [`memory_pool`], [`table_pool`], //! [`stack_pool`]. See those modules for more details. +mod decommit_queue; mod index_allocator; mod memory_pool; mod table_pool; @@ -39,6 +40,7 @@ cfg_if::cfg_if! { } } +use self::decommit_queue::DecommitQueue; use self::memory_pool::MemoryPool; use self::table_pool::TablePool; use super::{ @@ -51,6 +53,9 @@ use crate::runtime::vm::{ CompiledModuleId, Memory, Table, }; use anyhow::{bail, Result}; +use std::borrow::Cow; +use std::fmt::Display; +use std::sync::{Mutex, MutexGuard}; use std::{ mem, sync::atomic::{AtomicU64, Ordering}, @@ -174,6 +179,11 @@ impl Default for InstanceLimits { pub struct PoolingInstanceAllocatorConfig { /// See `PoolingAllocatorConfig::max_unused_warm_slots` in `wasmtime` pub max_unused_warm_slots: u32, + /// The target number of decommits to do per batch. This is not precise, as + /// we can queue up decommits at times when we aren't prepared to + /// immediately flush them, and so we may go over this target size + /// occasionally. + pub decommit_batch_size: u32, /// The size, in bytes, of async stacks to allocate (not including the guard /// page). pub stack_size: usize, @@ -207,6 +217,7 @@ impl Default for PoolingInstanceAllocatorConfig { fn default() -> PoolingInstanceAllocatorConfig { PoolingInstanceAllocatorConfig { max_unused_warm_slots: 100, + decommit_batch_size: 1, stack_size: 2 << 20, limits: InstanceLimits::default(), async_stack_zeroing: false, @@ -219,6 +230,34 @@ impl Default for PoolingInstanceAllocatorConfig { } } +/// An error returned when the pooling allocator cannot allocate a table, +/// memory, etc... because the maximum number of concurrent allocations for that +/// entity has been reached. +#[derive(Debug)] +pub struct PoolConcurrencyLimitError { + limit: usize, + kind: Cow<'static, str>, +} + +impl std::error::Error for PoolConcurrencyLimitError {} + +impl Display for PoolConcurrencyLimitError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let limit = self.limit; + let kind = &self.kind; + write!(f, "maximum concurrent limit of {limit} for {kind} reached") + } +} + +impl PoolConcurrencyLimitError { + fn new(limit: usize, kind: impl Into>) -> Self { + Self { + limit, + kind: kind.into(), + } + } +} + /// Implements the pooling instance allocator. /// /// This allocator internally maintains pools of instances, memories, tables, @@ -240,6 +279,7 @@ pub struct PoolingInstanceAllocator { live_core_instances: AtomicU64, live_component_instances: AtomicU64, + decommit_queue: Mutex, memories: MemoryPool, tables: TablePool, @@ -273,6 +313,7 @@ impl PoolingInstanceAllocator { limits: config.limits, live_component_instances: AtomicU64::new(0), live_core_instances: AtomicU64::new(0), + decommit_queue: Mutex::new(DecommitQueue::new(config.decommit_batch_size)), memories: MemoryPool::new(config, tunables)?, tables: TablePool::new(config)?, #[cfg(feature = "gc")] @@ -370,6 +411,14 @@ impl PoolingInstanceAllocator { self.limits.component_instance_size ) } + + fn flush_decommit_queue(&self, mut locked_queue: MutexGuard<'_, DecommitQueue>) { + // First, take the queue out of the mutex and drop the lock, to minimize + // contention. + let queue = locked_queue.take(); + drop(locked_queue); + queue.flush(self); + } } unsafe impl InstanceAllocatorImpl for PoolingInstanceAllocator { @@ -450,10 +499,11 @@ unsafe impl InstanceAllocatorImpl for PoolingInstanceAllocator { let old_count = self.live_component_instances.fetch_add(1, Ordering::AcqRel); if old_count >= u64::from(self.limits.total_component_instances) { self.decrement_component_instance_count(); - bail!( - "maximum concurrent component instance limit of {} reached", - self.limits.total_component_instances - ); + return Err(PoolConcurrencyLimitError::new( + usize::try_from(self.limits.total_component_instances).unwrap(), + "component instances", + ) + .into()); } Ok(()) } @@ -466,10 +516,11 @@ unsafe impl InstanceAllocatorImpl for PoolingInstanceAllocator { let old_count = self.live_core_instances.fetch_add(1, Ordering::AcqRel); if old_count >= u64::from(self.limits.total_core_instances) { self.decrement_core_instance_count(); - bail!( - "maximum concurrent core instance limit of {} reached", - self.limits.total_core_instances - ); + return Err(PoolConcurrencyLimitError::new( + usize::try_from(self.limits.total_core_instances).unwrap(), + "core instances", + ) + .into()); } Ok(()) } @@ -484,7 +535,17 @@ unsafe impl InstanceAllocatorImpl for PoolingInstanceAllocator { memory_plan: &MemoryPlan, memory_index: DefinedMemoryIndex, ) -> Result<(MemoryAllocationIndex, Memory)> { - self.memories.allocate(request, memory_plan, memory_index) + self.memories + .allocate(request, memory_plan, memory_index) + .or_else(|e| { + if e.is::() { + let queue = self.decommit_queue.lock().unwrap(); + self.flush_decommit_queue(queue); + self.memories.allocate(request, memory_plan, memory_index) + } else { + Err(e) + } + }) } unsafe fn deallocate_memory( @@ -493,7 +554,41 @@ unsafe impl InstanceAllocatorImpl for PoolingInstanceAllocator { allocation_index: MemoryAllocationIndex, memory: Memory, ) { - self.memories.deallocate(allocation_index, memory); + // Reset the image slot. If there is any error clearing the + // image, just drop it here, and let the drop handler for the + // slot unmap in a way that retains the address space + // reservation. + let mut image = memory.unwrap_static_image(); + + let mut any_enqueued = false; + let mut queue_is_full = false; + let mut queue = self.decommit_queue.lock().unwrap(); + + image + .clear_and_remain_ready(self.memories.keep_resident, |ptr, len| { + any_enqueued = true; + queue_is_full |= queue.enqueue_raw(ptr, len); + }) + .expect("failed to reset memory image"); + + if any_enqueued { + // If we enqueued parts of this memory for batched decommit, then + // don't return this index to the memory pool's free list yet + // because this slot isn't ready for reuse until the queue is + // flushed. + queue.push_memory(allocation_index, image); + + // If the queue reached capacity, then flush it. + if queue_is_full { + self.flush_decommit_queue(queue); + } + } else { + // If we didn't enqueue any regions for decommit, then we must have + // either memset the whole memory or eagerly remapped it to zero + // because we don't have linux's `madvise(DONTNEED)` semantics. In + // either case, the memory slot is ready for reuse immediately. + self.memories.deallocate(allocation_index, image); + } } unsafe fn allocate_table( @@ -502,26 +597,75 @@ unsafe impl InstanceAllocatorImpl for PoolingInstanceAllocator { table_plan: &TablePlan, _table_index: DefinedTableIndex, ) -> Result<(super::TableAllocationIndex, Table)> { - self.tables.allocate(request, table_plan) + self.tables.allocate(request, table_plan).or_else(|e| { + if e.is::() { + let queue = self.decommit_queue.lock().unwrap(); + self.flush_decommit_queue(queue); + self.tables.allocate(request, table_plan) + } else { + Err(e) + } + }) } unsafe fn deallocate_table( &self, _table_index: DefinedTableIndex, allocation_index: TableAllocationIndex, - table: Table, + mut table: Table, ) { - self.tables.deallocate(allocation_index, table); + let mut any_enqueued = false; + let mut queue_is_full = false; + let mut queue = self.decommit_queue.lock().unwrap(); + + self.tables + .reset_table_pages_to_zero(allocation_index, &mut table, |ptr, len| { + any_enqueued = true; + queue_is_full |= queue.enqueue_raw(ptr, len); + }); + + if any_enqueued { + queue.push_table(allocation_index, table); + if queue_is_full { + self.flush_decommit_queue(queue); + } + } else { + self.tables.deallocate(allocation_index, table); + } } #[cfg(feature = "async")] fn allocate_fiber_stack(&self) -> Result { - self.stacks.allocate() + self.stacks.allocate().or_else(|e| { + if e.is::() { + let queue = self.decommit_queue.lock().unwrap(); + self.flush_decommit_queue(queue); + self.stacks.allocate() + } else { + Err(e) + } + }) } #[cfg(feature = "async")] - unsafe fn deallocate_fiber_stack(&self, stack: &wasmtime_fiber::FiberStack) { - self.stacks.deallocate(stack); + unsafe fn deallocate_fiber_stack(&self, mut stack: wasmtime_fiber::FiberStack) { + let mut any_enqueued = false; + let mut queue_is_full = false; + let mut queue = self.decommit_queue.lock().unwrap(); + + self.stacks.zero_stack(&mut stack, |ptr, len| { + any_enqueued = true; + queue_is_full |= queue.enqueue_raw(ptr, len); + }); + + if any_enqueued { + queue.push_stack(stack); + if queue_is_full { + self.flush_decommit_queue(queue); + } + } else { + self.stacks.deallocate(stack); + } } fn purge_module(&self, module: CompiledModuleId) { @@ -613,7 +757,7 @@ mod test { assert_eq!(*addr, 0); *addr = 1; - allocator.deallocate_fiber_stack(&stack); + allocator.deallocate_fiber_stack(stack); } } @@ -647,7 +791,7 @@ mod test { assert_eq!(*addr, i); *addr = i + 1; - allocator.deallocate_fiber_stack(&stack); + allocator.deallocate_fiber_stack(stack); } } diff --git a/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/decommit_queue.rs b/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/decommit_queue.rs new file mode 100644 index 000000000000..b3b7a899427c --- /dev/null +++ b/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/decommit_queue.rs @@ -0,0 +1,189 @@ +//! A queue for batching decommits together. +//! +//! We don't immediately decommit a Wasm table/memory/stack/etc... eagerly, but +//! instead batch them up to be decommitted together. This module implements +//! that queuing and batching. +//! +//! Even when batching is "disabled" we still use this queue. Batching is +//! disabled by specifying a batch size of one, in which case, this queue will +//! immediately get flushed everytime we push onto it. + +use super::PoolingInstanceAllocator; +use crate::vm::{MemoryAllocationIndex, MemoryImageSlot, Table, TableAllocationIndex}; +use core::mem; +use smallvec::SmallVec; + +#[cfg(feature = "async")] +use wasmtime_fiber::FiberStack; + +#[repr(transparent)] +struct IoVec(libc::iovec); + +unsafe impl Send for IoVec {} +unsafe impl Sync for IoVec {} + +impl std::fmt::Debug for IoVec { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("IoVec") + .field("base", &self.0.iov_base) + .field("len", &self.0.iov_len) + .finish() + } +} + +#[cfg(feature = "async")] +struct SendSyncStack(FiberStack); +#[cfg(feature = "async")] +unsafe impl Send for SendSyncStack {} +#[cfg(feature = "async")] +unsafe impl Sync for SendSyncStack {} + +pub struct DecommitQueue { + batch_size: usize, + raw: SmallVec<[IoVec; 2]>, + memories: SmallVec<[(MemoryAllocationIndex, MemoryImageSlot); 1]>, + tables: SmallVec<[(TableAllocationIndex, Table); 1]>, + #[cfg(feature = "async")] + stacks: SmallVec<[SendSyncStack; 1]>, + // + // TODO: GC heaps are not well-integrated with the pooling allocator + // yet. Once we better integrate them, we should start (optionally) zeroing + // them, and batching that up here. + // + // #[cfg(feature = "gc")] + // pub gc_heaps: SmallVec<[(GcHeapAllocationIndex, Box); 1]>, +} + +impl std::fmt::Debug for DecommitQueue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DecommitQueue") + .field("batch_size", &self.batch_size) + .field("raw", &self.raw) + .field("memories", &"..") + .field("tables", &"..") + .field("stacks", &"..") + .finish() + } +} + +impl DecommitQueue { + /// Construct a new queue with the given batch size. + pub fn new(batch_size: u32) -> Self { + // Don't try to `madvise` more than the kernel can put in a single + // iovec-array at the same time. + let iov_max = u32::try_from(unsafe { libc::sysconf(libc::_SC_IOV_MAX) }).unwrap(); + let batch_size = core::cmp::min(batch_size, iov_max); + let batch_size = usize::try_from(batch_size).unwrap(); + + Self { + batch_size, + raw: Default::default(), + memories: Default::default(), + tables: Default::default(), + #[cfg(feature = "async")] + stacks: Default::default(), + } + } + + /// Take this queue and leave behind an empty queue with the same batch + /// size. + pub fn take(&mut self) -> Self { + let new = Self::new(u32::try_from(self.batch_size).unwrap()); + mem::replace(self, new) + } + + /// Enqueue a region of memory for decommit. + /// + /// It is the caller's responsibility to push the associated data via + /// `self.push_{memory,table,stack}` as appropriate. + /// + /// Returns `true` if the queue is "full", i.e. has reached the target batch + /// size, and should be flushed; returns `false` otherwise. + /// + /// Callers may continue to enqueue more memory regions even after this + /// returns `true` if they must, but they should arrange for the queue to be + /// flushed as soon as possible. + /// + /// # Safety + /// + /// The enqueued memory regions must be safe to decommit when `flush` is + /// called (no other references, not in use, won't be otherwise unmapped, + /// etc...). + pub unsafe fn enqueue_raw(&mut self, ptr: *mut u8, len: usize) -> bool { + self.raw.push(IoVec(libc::iovec { + iov_base: ptr.cast(), + iov_len: len, + })); + self.raw.len() >= self.batch_size + } + + /// Push a memory into the queue. + /// + /// # Safety + /// + /// This memory should not be in use, and its decommit regions must have + /// already been enqueued via `self.enqueue_raw`. + pub unsafe fn push_memory( + &mut self, + allocation_index: MemoryAllocationIndex, + image: MemoryImageSlot, + ) { + self.memories.push((allocation_index, image)); + } + + /// Push a table into the queue. + /// + /// # Safety + /// + /// This table should not be in use, and its decommit regions must have + /// already been enqueued via `self.enqueue_raw`. + pub unsafe fn push_table(&mut self, allocation_index: TableAllocationIndex, table: Table) { + self.tables.push((allocation_index, table)); + } + + /// Push a stack into the queue. + /// + /// # Safety + /// + /// This stack should not be in use, and its decommit regions must have + /// already been enqueued via `self.enqueue_raw`. + #[cfg(feature = "async")] + pub unsafe fn push_stack(&mut self, stack: FiberStack) { + self.stacks.push(SendSyncStack(stack)); + } + + fn decommit_all_raw(&mut self) { + for iovec in self.raw.drain(..) { + unsafe { + crate::vm::sys::vm::decommit_pages(iovec.0.iov_base.cast(), iovec.0.iov_len) + .expect("failed to decommit pages"); + } + } + } + + /// Flush this queue + pub fn flush(mut self, pool: &PoolingInstanceAllocator) { + // First, do the raw decommit syscall(s). + self.decommit_all_raw(); + + // Second, restore the various entities to their associated pools' free + // lists. This is safe, and they are ready for reuse, now that their + // memory regions have been decommitted. + for (allocation_index, image) in self.memories { + unsafe { + pool.memories.deallocate(allocation_index, image); + } + } + for (allocation_index, table) in self.tables { + unsafe { + pool.tables.deallocate(allocation_index, table); + } + } + #[cfg(feature = "async")] + for stack in self.stacks { + unsafe { + pool.stacks.deallocate(stack.0); + } + } + } +} diff --git a/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/generic_stack_pool.rs b/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/generic_stack_pool.rs index 3b8bc99203be..9bd6e4258336 100644 --- a/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/generic_stack_pool.rs +++ b/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/generic_stack_pool.rs @@ -58,9 +58,19 @@ impl StackPool { } } - pub unsafe fn deallocate(&self, stack: &wasmtime_fiber::FiberStack) { + pub unsafe fn zero_stack( + &self, + _stack: &mut wasmtime_fiber::FiberStack, + _decommit: impl FnMut(*mut u8, usize), + ) { + // No need to actually zero the stack, since the stack won't ever be + // reused on non-unix systems. + } + + /// Safety: see the unix implementation. + pub unsafe fn deallocate(&self, stack: wasmtime_fiber::FiberStack) { self.live_stacks.fetch_sub(1, Ordering::AcqRel); - // A no-op as we don't own the fiber stack on Windows. + // A no-op as we don't actually own the fiber stack on Windows. let _ = stack; } } diff --git a/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/memory_pool.rs b/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/memory_pool.rs index 6615707bb73c..df7d237427e2 100644 --- a/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/memory_pool.rs +++ b/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/memory_pool.rs @@ -109,29 +109,33 @@ pub struct MemoryPool { /// will contain one stripe per available key; otherwise, a single stripe /// with an empty key. stripes: Vec, - // If using a copy-on-write allocation scheme, the slot management. We - // dynamically transfer ownership of a slot to a Memory when in - // use. + + /// If using a copy-on-write allocation scheme, the slot management. We + /// dynamically transfer ownership of a slot to a Memory when in use. image_slots: Vec>>, + /// A description of the various memory sizes used in allocating the /// `mapping` slab. layout: SlabLayout, - // The maximum number of memories that a single core module instance may - // use. - // - // NB: this is needed for validation but does not affect the pool's size. + + /// The maximum number of memories that a single core module instance may + /// use. + /// + /// NB: this is needed for validation but does not affect the pool's size. memories_per_instance: usize, - // How much linear memory, in bytes, to keep resident after resetting for - // use with the next instance. This much memory will be `memset` to zero - // when a linear memory is deallocated. - // - // Memory exceeding this amount in the wasm linear memory will be released - // with `madvise` back to the kernel. - // - // Only applicable on Linux. - keep_resident: usize, - // Keep track of protection keys handed out to initialized stores; this - // allows us to round-robin the assignment of stores to stripes. + + /// How much linear memory, in bytes, to keep resident after resetting for + /// use with the next instance. This much memory will be `memset` to zero + /// when a linear memory is deallocated. + /// + /// Memory exceeding this amount in the wasm linear memory will be released + /// with `madvise` back to the kernel. + /// + /// Only applicable on Linux. + pub(super) keep_resident: usize, + + /// Keep track of protection keys handed out to initialized stores; this + /// allows us to round-robin the assignment of stores to stripes. next_available_pkey: AtomicUsize, } @@ -321,10 +325,9 @@ impl MemoryPool { ) .map(|slot| StripedAllocationIndex(u32::try_from(slot.index()).unwrap())) .ok_or_else(|| { - anyhow!( - "maximum concurrent memory limit of {} reached for stripe {}", + super::PoolConcurrencyLimitError::new( self.stripes[stripe_index].allocator.len(), - stripe_index + format!("memory stripe {stripe_index}"), ) })?; let allocation_index = @@ -390,16 +393,15 @@ impl MemoryPool { /// The memory must have been previously allocated from this pool and /// assigned the given index, must currently be in an allocated state, and /// must never be used again. - pub unsafe fn deallocate(&self, allocation_index: MemoryAllocationIndex, memory: Memory) { - let mut image = memory.unwrap_static_image(); - - // Reset the image slot. If there is any error clearing the - // image, just drop it here, and let the drop handler for the - // slot unmap in a way that retains the address space - // reservation. - if image.clear_and_remain_ready(self.keep_resident).is_ok() { - self.return_memory_image_slot(allocation_index, image); - } + /// + /// The caller must have already called `clear_and_remain_ready` on the + /// memory's image and flushed any enqueued decommits for this memory. + pub unsafe fn deallocate( + &self, + allocation_index: MemoryAllocationIndex, + image: MemoryImageSlot, + ) { + self.return_memory_image_slot(allocation_index, image); let (stripe_index, striped_allocation_index) = StripedAllocationIndex::from_unstriped_slot_index(allocation_index, self.stripes.len()); diff --git a/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/table_pool.rs b/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/table_pool.rs index 223cca4ff77a..c8010db8bd0a 100644 --- a/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/table_pool.rs +++ b/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/table_pool.rs @@ -2,7 +2,7 @@ use super::{ index_allocator::{SimpleIndexAllocator, SlotId}, round_up_to_pow2, TableAllocationIndex, }; -use crate::runtime::vm::sys::vm::{commit_pages, decommit_pages}; +use crate::runtime::vm::sys::vm::commit_pages; use crate::runtime::vm::{ InstanceAllocationRequest, Mmap, PoolingInstanceAllocatorConfig, SendSyncPtr, Table, }; @@ -122,10 +122,7 @@ impl TablePool { .alloc() .map(|slot| TableAllocationIndex(slot.0)) .ok_or_else(|| { - anyhow!( - "maximum concurrent table limit of {} reached", - self.max_total_tables - ) + super::PoolConcurrencyLimitError::new(self.max_total_tables, "tables") })?; match (|| { @@ -166,31 +163,45 @@ impl TablePool { /// The table must have been previously-allocated by this pool and assigned /// the given allocation index, it must currently be allocated, and it must /// never be used again. + /// + /// The caller must have already called `reset_table_pages_to_zero` on the + /// memory and flushed any enqueued decommits for this table's memory. pub unsafe fn deallocate(&self, allocation_index: TableAllocationIndex, table: Table) { assert!(table.is_static()); + drop(table); + self.index_allocator.free(SlotId(allocation_index.0)); + } + + /// Reset the given table's memory to zero. + /// + /// Invokes the given `decommit` function for each region of memory that + /// needs to be decommitted. It is the caller's responsibility to actually + /// perform that decommit before this table is reused. + /// + /// # Safety + /// + /// This table must not be in active use, and ready for returning to the + /// table pool once it is zeroed and decommitted. + pub unsafe fn reset_table_pages_to_zero( + &self, + allocation_index: TableAllocationIndex, + table: &mut Table, + mut decommit: impl FnMut(*mut u8, usize), + ) { + assert!(table.is_static()); + let base = self.get(allocation_index); let size = round_up_to_pow2( table.size() as usize * mem::size_of::<*mut u8>(), self.page_size, ); - drop(table); - - let base = self.get(allocation_index); - self.reset_table_pages_to_zero(base, size) - .expect("failed to decommit table pages"); - - self.index_allocator.free(SlotId(allocation_index.0)); - } - - fn reset_table_pages_to_zero(&self, base: *mut u8, size: usize) -> Result<()> { + // `memset` the first `keep_resident` bytes. let size_to_memset = size.min(self.keep_resident); - unsafe { - std::ptr::write_bytes(base, 0, size_to_memset); - decommit_pages(base.add(size_to_memset), size - size_to_memset) - .context("failed to decommit table page")?; - } - Ok(()) + std::ptr::write_bytes(base, 0, size_to_memset); + + // And decommit the the rest of it. + decommit(base.add(size_to_memset), size - size_to_memset) } } diff --git a/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/unix_stack_pool.rs b/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/unix_stack_pool.rs index f314333a8ffd..48ffaa8308c8 100644 --- a/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/unix_stack_pool.rs +++ b/crates/wasmtime/src/runtime/vm/instance/allocator/pooling/unix_stack_pool.rs @@ -4,7 +4,7 @@ use super::{ index_allocator::{SimpleIndexAllocator, SlotId}, round_up_to_pow2, }; -use crate::runtime::vm::sys::vm::{commit_pages, decommit_pages}; +use crate::runtime::vm::sys::vm::commit_pages; use crate::runtime::vm::{Mmap, PoolingInstanceAllocatorConfig}; use anyhow::{anyhow, bail, Context, Result}; @@ -90,12 +90,7 @@ impl StackPool { let index = self .index_allocator .alloc() - .ok_or_else(|| { - anyhow!( - "maximum concurrent fiber limit of {} reached", - self.max_stacks - ) - })? + .ok_or_else(|| super::PoolConcurrencyLimitError::new(self.max_stacks, "fibers"))? .index(); assert!(index < self.max_stacks); @@ -118,13 +113,27 @@ impl StackPool { } } - /// Deallocate a previously-allocated fiber. + /// Zero the given stack, if we are configured to do so. + /// + /// This will call the given `decommit` function for each region of memory + /// that should be decommitted. It is the caller's responsibility to ensure + /// that those decommits happen before this stack is reused. /// /// # Safety /// - /// The fiber must have been allocated by this pool, must be in an allocated - /// state, and must never be used again. - pub unsafe fn deallocate(&self, stack: &wasmtime_fiber::FiberStack) { + /// The stack must no longer be in use, and ready for returning to the pool + /// after it is zeroed and decommitted. + pub unsafe fn zero_stack( + &self, + stack: &mut wasmtime_fiber::FiberStack, + mut decommit: impl FnMut(*mut u8, usize), + ) { + assert!(stack.is_from_raw_parts()); + + if !self.async_stack_zeroing { + return; + } + let top = stack .top() .expect("fiber stack not allocated from the pool") as usize; @@ -143,17 +152,6 @@ impl StackPool { assert!(start_of_stack >= base && start_of_stack < (base + len)); assert!((start_of_stack - base) % self.stack_size == 0); - let index = (start_of_stack - base) / self.stack_size; - assert!(index < self.max_stacks); - - if self.async_stack_zeroing { - self.zero_stack(bottom_of_stack, stack_size); - } - - self.index_allocator.free(SlotId(index as u32)); - } - - fn zero_stack(&self, bottom: usize, size: usize) { // Manually zero the top of the stack to keep the pages resident in // memory and avoid future page faults. Use the system to deallocate // pages past this. This hopefully strikes a reasonable balance between: @@ -161,17 +159,51 @@ impl StackPool { // * memset for the whole range is probably expensive // * madvise for the whole range incurs expensive future page faults // * most threads probably don't use most of the stack anyway - let size_to_memset = size.min(self.async_stack_keep_resident); - unsafe { - std::ptr::write_bytes( - (bottom + size - size_to_memset) as *mut u8, - 0, - size_to_memset, - ); + let size_to_memset = stack_size.min(self.async_stack_keep_resident); + std::ptr::write_bytes( + (bottom_of_stack + stack_size - size_to_memset) as *mut u8, + 0, + size_to_memset, + ); - // Use the system to reset remaining stack pages to zero. - decommit_pages(bottom as _, size - size_to_memset).unwrap(); - } + // Use the system to reset remaining stack pages to zero. + decommit(bottom_of_stack as _, stack_size - size_to_memset); + } + + /// Deallocate a previously-allocated fiber. + /// + /// # Safety + /// + /// The fiber must have been allocated by this pool, must be in an allocated + /// state, and must never be used again. + /// + /// The caller must have already called `zero_stack` on the fiber stack and + /// flushed any enqueued decommits for this stack's memory. + pub unsafe fn deallocate(&self, stack: wasmtime_fiber::FiberStack) { + assert!(stack.is_from_raw_parts()); + + let top = stack + .top() + .expect("fiber stack not allocated from the pool") as usize; + + let base = self.mapping.as_ptr() as usize; + let len = self.mapping.len(); + assert!( + top > base && top <= (base + len), + "fiber stack top pointer not in range" + ); + + // Remove the guard page from the size + let stack_size = self.stack_size - self.page_size; + let bottom_of_stack = top - stack_size; + let start_of_stack = bottom_of_stack - self.page_size; + assert!(start_of_stack >= base && start_of_stack < (base + len)); + assert!((start_of_stack - base) % self.stack_size == 0); + + let index = (start_of_stack - base) / self.stack_size; + assert!(index < self.max_stacks); + + self.index_allocator.free(SlotId(index as u32)); } } @@ -220,7 +252,7 @@ mod tests { for stack in stacks { unsafe { - pool.deallocate(&stack); + pool.deallocate(stack); } } diff --git a/tests/all/pooling_allocator.rs b/tests/all/pooling_allocator.rs index b33910209a19..b24aa7412e8a 100644 --- a/tests/all/pooling_allocator.rs +++ b/tests/all/pooling_allocator.rs @@ -430,13 +430,7 @@ fn total_core_instances_limit() -> Result<()> { match Instance::new(&mut store, &module, &[]) { Ok(_) => panic!("instantiation should fail"), - Err(e) => assert_eq!( - e.to_string(), - format!( - "maximum concurrent core instance limit of {} reached", - INSTANCE_LIMIT - ) - ), + Err(e) => assert!(e.is::()), } } @@ -650,7 +644,7 @@ fn instance_too_large() -> Result<()> { let engine = Engine::new(&config)?; let expected = if cfg!(feature = "wmemcheck") { "\ -instance allocation for this module requires 336 bytes which exceeds the \ + instance allocation for this module requires 336 bytes which exceeds the \ configured maximum of 16 bytes; breakdown of allocation requirement: * 76.19% - 256 bytes - instance state management @@ -867,13 +861,7 @@ fn total_component_instances_limit() -> Result<()> { match linker.instantiate(&mut store, &component) { Ok(_) => panic!("should have hit component instance limit"), - Err(e) => assert_eq!( - e.to_string(), - format!( - "maximum concurrent component instance limit of {} reached", - TOTAL_COMPONENT_INSTANCES - ), - ), + Err(e) => assert!(e.is::()), } drop(store); @@ -929,10 +917,7 @@ fn total_tables_limit() -> Result<()> { match linker.instantiate(&mut store, &module) { Ok(_) => panic!("should have hit table limit"), - Err(e) => assert_eq!( - e.to_string(), - format!("maximum concurrent table limit of {} reached", TOTAL_TABLES), - ), + Err(e) => assert!(e.is::()), } drop(store); @@ -1006,10 +991,7 @@ async fn total_stacks_limit() -> Result<()> { let mut store3 = Store::new(&engine, ()); match linker.instantiate_async(&mut store3, &module).await { Ok(_) => panic!("should have hit stack limit"), - Err(e) => assert_eq!( - e.to_string(), - format!("maximum concurrent fiber limit of {} reached", TOTAL_STACKS), - ), + Err(e) => assert!(e.is::()), } // Finish the futures and return their Wasm stacks to the pool. @@ -1185,13 +1167,7 @@ fn total_memories_limit() -> Result<()> { match linker.instantiate(&mut store, &module) { Ok(_) => panic!("should have hit memory limit"), - Err(e) => assert_eq!( - e.to_string(), - format!( - "maximum concurrent memory limit of {} reached for stripe 0", - TOTAL_MEMORIES - ), - ), + Err(e) => assert!(e.is::()), } drop(store);