Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions crates/wasmtime/src/runtime/component/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2121,11 +2121,17 @@ impl Instance {
) -> Result<()> {
let state = store.concurrent_state_mut();
let thread_data = state.get_mut(guest_thread.thread)?;
let guest_id = match thread_data.instance_rep {
Some(id) => id,
None => bail_bug!("thread must have instance_rep set by now"),
};
let sync_call_set = thread_data.sync_call_set;
if let Some(guest_id) = thread_data.instance_rep {
store
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: runtime_instance,
})
.thread_handle_table()
.guest_thread_remove(guest_id)?;
}
let state = store.concurrent_state_mut();

// Clean up any pending subtasks in the sync_call_set
for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
Expand All @@ -2137,14 +2143,6 @@ impl Instance {
}
}

store
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: runtime_instance,
})
.thread_handle_table()
.guest_thread_remove(guest_id)?;

store.concurrent_state_mut().delete(guest_thread.thread)?;
store.concurrent_state_mut().delete(sync_call_set)?;
let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
Expand Down Expand Up @@ -3672,14 +3670,20 @@ impl Instance {
task.lower_params = None;
task.lift_result = None;
task.exited = true;

let instance = task.instance;

// Clean up the thread within this task as it's now never going
// to run.
assert_eq!(1, task.threads.len());
let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
let concurrent_state = store.concurrent_state_mut();
concurrent_state.delete(thread)?;
assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
let thread = *task.threads.iter().next().unwrap();
self.cleanup_thread(
store,
QualifiedThreadId {
task: guest_task,
thread,
},
caller_instance,
)?;

// Not yet started; cancel and remove from pending
let pending = &mut store.instance_state(instance).concurrent_state().pending;
Expand Down
15 changes: 15 additions & 0 deletions crates/wast/src/wast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,21 @@ impl WastContext {
caller.gc(None)?;
Ok(())
})?;
#[cfg(feature = "component-model")]
{
let mut i = self.component_linker.instance("wasmtime")?;
i.func_wrap(
"set-max-table-capacity",
|mut store, (capacity,): (u32,)| {
store
.as_context_mut()
.concurrent_resource_table()
.expect("table must be present")
.set_max_capacity(capacity.try_into().unwrap());
Ok(())
},
)?;
}
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
;;! component_model_async = true
;;! reference_types = true

;; This exposes a historical bug in wasmtime where when a guest subtask was
;; dropped in the `STARTING` state it leaked resources within the store. Here
;; this is done in a loop N times after setting the store's table capacity much
;; lower than the loop iterations.

(component
(import "wasmtime" (instance $wasmtime
(export "set-max-table-capacity" (func (param "max" u32)))
))

(component $A
(core module $m
(import "" "backpressure.inc" (func $backpressure.inc))

(func (export "set-backpressure") (call $backpressure.inc))
(func (export "hi"))
)
(core func $backpressure.inc (canon backpressure.inc))
(core instance $i (instantiate $m
(with "" (instance
(export "backpressure.inc" (func $backpressure.inc))
))
))

(func (export "set-backpressure") (canon lift (core func $i "set-backpressure")))
(func (export "hi") async (canon lift (core func $i "hi")))
)
(instance $a (instantiate $A))

(component $B
(import "wasmtime" (instance $wasmtime
(export "set-max-table-capacity" (func (param "max" u32)))
))
(import "a" (instance $a
(export "set-backpressure" (func))
(export "hi" (func async))
))

(core func $set-backpressure (canon lower (func $a "set-backpressure")))
(core func $hi (canon lower (func $a "hi") async))
(core func $set-max-table-capacity (canon lower (func $wasmtime "set-max-table-capacity")))
(core func $subtask.cancel (canon subtask.cancel))
(core func $subtask.drop (canon subtask.drop))

(core module $m
(import "" "set-backpressure" (func $set-backpressure))
(import "" "hi" (func $hi (result i32)))
(import "" "subtask.cancel" (func $subtask.cancel (param i32) (result i32)))
(import "" "subtask.drop" (func $subtask.drop (param i32)))
(import "" "set-max-table-capacity" (func $set-max-table-capacity (param i32)))

(func (export "run")
(local $rc i32)
(local $task i32)
(local $cnt i32)
call $set-backpressure

(call $set-max-table-capacity (i32.const 100))

(local.set $cnt (i32.const 1000))

loop $l
(local.set $rc (call $hi))
(if (i32.ne (i32.and (local.get $rc) (i32.const 0xf)) (i32.const 0 (; STARTING ;)))
(then unreachable))
(local.set $task (i32.shr_u (local.get $rc) (i32.const 4)))
(local.set $rc (call $subtask.cancel (local.get $task)))
(if (i32.ne (i32.and (local.get $rc) (i32.const 0xf)) (i32.const 3 (; START_CANCELLED ;)))
(then unreachable))

(call $subtask.drop (local.get $task))

(local.set $cnt (i32.sub (local.get $cnt) (i32.const 1)))
(if (local.get $cnt)
(then (br $l)))
end
)
)

(core instance $i (instantiate $m
(with "" (instance
(export "set-backpressure" (func $set-backpressure))
(export "hi" (func $hi))
(export "subtask.cancel" (func $subtask.cancel))
(export "subtask.drop" (func $subtask.drop))
(export "set-max-table-capacity" (func $set-max-table-capacity))
))
))

(func (export "run") async (canon lift (core func $i "run")))
)

(instance $b (instantiate $B
(with "a" (instance $a))
(with "wasmtime" (instance $wasmtime))
))
(export "run" (func $b "run"))
)

(assert_return (invoke "run"))
Loading