Skip to content

Commit

Permalink
Wasmtime(pooling allocator): Batch decommits
Browse files Browse the repository at this point in the history
This introduces a `DecommitQueue` for batching decommits together in the pooling
allocator:

* Deallocating a memory/table/stack enqueues their associated regions of memory
  for decommit; it no longer immediately returns the associated slot to the
  pool's free list. If the queue's length has reached the configured batch size,
  then we flush the queue by running all the decommits, and finally returning
  the memory/table/stack slots to their respective pools and free lists.

* Additionally, if allocating a new memory/table/stack fails because the free
  list is empty (aka we've reached the max concurrently-allocated limit for this
  entity) then we fall back to a slow path before propagating the error. This
  slow path flushes the decommit queue and then retries allocation, hoping that
  the queue flush reclaimed slots and made them available for this fallback
  allocation attempt. This involved defining a new `PoolConcurrencyLimitError`
  to match on, which is also exposed in the public embedder API.

It is also worth noting that we *always* use this new decommit queue now. To
keep the existing behavior, where e.g. a memory's decommits happen immediately
on deallocation, you can use a batch size of one. This effectively disables
queueing, forcing all decommits to be flushed immediately.

The default decommit batch size is one.

This commit, with batch size of one, consistently gives me an increase on
`wasmtime serve`'s requests-per-second versus its parent commit, as measured by
`benches/wasmtime-serve-rps.sh`. I get ~39K RPS on this commit compared to ~35K
RPS on the parent commit. This is quite puzzling to me. I was expecting no
change, and hoping there wouldn't be a regression. I was not expecting a speed
up. I cannot explain this result at this time.

Co-Authored-By: Jamey Sharp <jsharp@fastly.com>
  • Loading branch information
fitzgen and jameysharp committed May 9, 2024
1 parent 330eb20 commit 95686db
Show file tree
Hide file tree
Showing 21 changed files with 657 additions and 179 deletions.
6 changes: 5 additions & 1 deletion benches/wasmtime-serve-rps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ set -e

repo_dir="$(dirname $0)/.."
cargo_toml="$repo_dir/Cargo.toml"
target_dir="$CARGO_TARGET_DIR"
if [[ "$target_dir" == "" ]]; then
target_dir="$repo_dir/target"
fi

# Build Wasmtime.
cargo build --manifest-path "$cargo_toml" --release -p wasmtime-cli

# Spawn `wasmtime serve` in the background.
cargo run --manifest-path "$cargo_toml" --release -- serve "$@" &
"$target_dir/release/wasmtime" serve "$@" &
pid=$!

# Give it a second to print its diagnostic information and get the server up and
Expand Down
4 changes: 4 additions & 0 deletions crates/cli-flags/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ wasmtime_option_group! {
/// Enable the pooling allocator, in place of the on-demand allocator.
pub pooling_allocator: Option<bool>,

/// The number of decommits to do per batch. A batch size of 1
/// effectively disables decommit batching. (default: 1)
pub pooling_decommit_batch_size: Option<u32>,

/// How many bytes to keep resident between instantiations for the
/// pooling allocator in linear memories.
pub pooling_memory_keep_resident: Option<usize>,
Expand Down
14 changes: 14 additions & 0 deletions crates/fiber/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Error;
use core::{mem, ptr};
use std::any::Any;
use std::cell::Cell;
use std::io;
Expand Down Expand Up @@ -61,6 +62,12 @@ impl FiberStack {
pub fn range(&self) -> Option<Range<usize>> {
self.0.range()
}

/// Is this a manually-managed stack created from raw parts? If so, it is up
/// to whoever created it to manage the stack's memory allocation.
pub fn is_from_raw_parts(&self) -> bool {
self.0.is_from_raw_parts()
}
}

/// A creator of RuntimeFiberStacks.
Expand Down Expand Up @@ -160,6 +167,13 @@ impl<'a, Resume, Yield, Return> Fiber<'a, Resume, Yield, Return> {
pub fn stack(&self) -> &FiberStack {
&self.stack
}

/// When this fiber has finished executing, reclaim its stack.
pub fn into_stack(mut self) -> FiberStack {
assert!(self.done());
let null_stack = unsafe { FiberStack::from_raw_parts(ptr::null_mut(), 0).unwrap() };
mem::replace(&mut self.stack, null_stack)
}
}

impl<Resume, Yield, Return> Suspend<Resume, Yield, Return> {
Expand Down
12 changes: 8 additions & 4 deletions crates/fiber/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct FiberStack {

/// Stored here to ensure that when this `FiberStack` the backing storage,
/// if any, is additionally dropped.
_storage: FiberStackStorage,
storage: FiberStackStorage,
}

enum FiberStackStorage {
Expand All @@ -66,7 +66,7 @@ impl FiberStack {
Ok(FiberStack {
base: stack.mapping_base.wrapping_byte_add(page_size),
len: stack.mapping_len - page_size,
_storage: FiberStackStorage::Mmap(stack),
storage: FiberStackStorage::Mmap(stack),
})
}

Expand All @@ -79,10 +79,14 @@ impl FiberStack {
Ok(FiberStack {
base,
len,
_storage: FiberStackStorage::Unmanaged,
storage: FiberStackStorage::Unmanaged,
})
}

pub fn is_from_raw_parts(&self) -> bool {
matches!(self.storage, FiberStackStorage::Unmanaged)
}

pub fn from_custom(custom: Box<dyn RuntimeFiberStack>) -> io::Result<Self> {
let range = custom.range();
let page_size = rustix::param::page_size();
Expand All @@ -99,7 +103,7 @@ impl FiberStack {
Ok(FiberStack {
base: start_ptr,
len: range.len(),
_storage: FiberStackStorage::Custom(custom),
storage: FiberStackStorage::Custom(custom),
})
}

Expand Down
4 changes: 4 additions & 0 deletions crates/fiber/src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ impl FiberStack {
Err(io::Error::from_raw_os_error(ERROR_NOT_SUPPORTED as i32))
}

pub fn is_from_raw_parts(&self) -> bool {
false
}

pub fn from_custom(_custom: Box<dyn RuntimeFiberStack>) -> io::Result<Self> {
Err(io::Error::from_raw_os_error(ERROR_NOT_SUPPORTED as i32))
}
Expand Down
3 changes: 3 additions & 0 deletions crates/fuzzing/src/generators/pooling_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct PoolingAllocationConfig {
pub table_keep_resident: usize,
pub linear_memory_keep_resident: usize,

pub decommit_batch_size: u32,
pub max_unused_warm_slots: u32,

pub async_stack_zeroing: bool,
Expand Down Expand Up @@ -61,6 +62,7 @@ impl PoolingAllocationConfig {
cfg.table_keep_resident(self.table_keep_resident);
cfg.linear_memory_keep_resident(self.linear_memory_keep_resident);

cfg.decommit_batch_size(self.decommit_batch_size);
cfg.max_unused_warm_slots(self.max_unused_warm_slots);

cfg.async_stack_zeroing(self.async_stack_zeroing);
Expand Down Expand Up @@ -106,6 +108,7 @@ impl<'a> Arbitrary<'a> for PoolingAllocationConfig {
table_keep_resident: u.int_in_range(0..=1 << 20)?,
linear_memory_keep_resident: u.int_in_range(0..=1 << 20)?,

decommit_batch_size: u.int_in_range(1..=1000)?,
max_unused_warm_slots: u.int_in_range(0..=total_memories + 10)?,

async_stack_zeroing: u.arbitrary()?,
Expand Down
14 changes: 14 additions & 0 deletions crates/wasmtime/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2405,6 +2405,20 @@ impl PoolingAllocationConfig {
self
}

/// The target number of decommits to do per batch.
///
/// This is not precise, as we can queue up decommits at times when we
/// aren't prepared to immediately flush them, and so we may go over this
/// target size occasionally.
///
/// A batch size of one effectively disables batching.
///
/// Defaults to `1`.
pub fn decommit_batch_size(&mut self, batch_size: u32) -> &mut Self {
self.config.decommit_batch_size = batch_size;
self
}

/// Configures whether or not stacks used for async futures are reset to
/// zero after usage.
///
Expand Down
3 changes: 3 additions & 0 deletions crates/wasmtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ pub use values::*;

pub(crate) use uninhabited::*;

#[cfg(feature = "pooling-allocator")]
pub use vm::PoolConcurrencyLimitError;

#[cfg(feature = "profiling")]
mod profiling;
#[cfg(feature = "profiling")]
Expand Down
16 changes: 10 additions & 6 deletions crates/wasmtime/src/runtime/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2067,7 +2067,7 @@ impl<T> StoreContextMut<'_, T> {
// wrap that in a custom future implementation which does the
// translation from the future protocol to our fiber API.
FiberFuture {
fiber,
fiber: Some(fiber),
current_poll_cx,
engine,
state: Some(crate::runtime::vm::AsyncWasmCallState::new()),
Expand All @@ -2078,7 +2078,7 @@ impl<T> StoreContextMut<'_, T> {
return Ok(slot.unwrap());

struct FiberFuture<'a> {
fiber: wasmtime_fiber::Fiber<'a, Result<()>, (), Result<()>>,
fiber: Option<wasmtime_fiber::Fiber<'a, Result<()>, (), Result<()>>>,
current_poll_cx: *mut *mut Context<'static>,
engine: Engine,
// See comments in `FiberFuture::resume` for this
Expand Down Expand Up @@ -2149,6 +2149,10 @@ impl<T> StoreContextMut<'_, T> {
unsafe impl Send for FiberFuture<'_> {}

impl FiberFuture<'_> {
fn fiber(&self) -> &wasmtime_fiber::Fiber<'_, Result<()>, (), Result<()>> {
self.fiber.as_ref().unwrap()
}

/// This is a helper function to call `resume` on the underlying
/// fiber while correctly managing Wasmtime's thread-local data.
///
Expand All @@ -2174,7 +2178,7 @@ impl<T> StoreContextMut<'_, T> {
fiber: self,
state: Some(prev),
};
return restore.fiber.fiber.resume(val);
return restore.fiber.fiber().resume(val);
}

struct Restore<'a, 'b> {
Expand Down Expand Up @@ -2241,7 +2245,7 @@ impl<T> StoreContextMut<'_, T> {
// then that's a bug indicating that TLS management in
// Wasmtime is incorrect.
Err(()) => {
if let Some(range) = self.fiber.stack().range() {
if let Some(range) = self.fiber().stack().range() {
crate::runtime::vm::AsyncWasmCallState::assert_current_state_not_in_range(range);
}
Poll::Pending
Expand All @@ -2268,7 +2272,7 @@ impl<T> StoreContextMut<'_, T> {
// completion.
impl Drop for FiberFuture<'_> {
fn drop(&mut self) {
if !self.fiber.done() {
if !self.fiber().done() {
let result = self.resume(Err(anyhow!("future dropped")));
// This resumption with an error should always complete the
// fiber. While it's technically possible for host code to catch
Expand All @@ -2282,7 +2286,7 @@ impl<T> StoreContextMut<'_, T> {
unsafe {
self.engine
.allocator()
.deallocate_fiber_stack(self.fiber.stack());
.deallocate_fiber_stack(self.fiber.take().unwrap().into_stack());
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/wasmtime/src/runtime/trampoline/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ unsafe impl InstanceAllocatorImpl for SingleMemoryInstance<'_> {
}

#[cfg(feature = "async")]
unsafe fn deallocate_fiber_stack(&self, _stack: &wasmtime_fiber::FiberStack) {
unsafe fn deallocate_fiber_stack(&self, _stack: wasmtime_fiber::FiberStack) {
unreachable!()
}

Expand Down
3 changes: 2 additions & 1 deletion crates/wasmtime/src/runtime/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ pub use crate::runtime::vm::instance::{
};
#[cfg(feature = "pooling-allocator")]
pub use crate::runtime::vm::instance::{
InstanceLimits, PoolingInstanceAllocator, PoolingInstanceAllocatorConfig,
InstanceLimits, PoolConcurrencyLimitError, PoolingInstanceAllocator,
PoolingInstanceAllocatorConfig,
};
pub use crate::runtime::vm::memory::{Memory, RuntimeLinearMemory, RuntimeMemoryCreator};
pub use crate::runtime::vm::mmap::Mmap;
Expand Down

0 comments on commit 95686db

Please sign in to comment.