Skip to content

Component model: host stream/future transmits leak when the guest drops its end while the host consumer/producer is HostReady #13514

@gfx

Description

@gfx

When a host registers a stream consumer via StreamReader::pipe (or hands the
guest a host-driven future/stream via FutureReader::new / a host-written
stream) and the guest then drops its end, the TransmitState and both
TransmitHandles are never reclaimed from the instance's concurrent-state
table. The host-side end is left in HostReady and is never finalized, so the
slots leak for the lifetime of the instance. A guest that performs many such
operations in a loop fills the table and eventually traps with resource table has no free keys.

This is in the core component runtime
(crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs),
independent of wasmtime-wasi.

Test Case

The leak is in host-side transmit bookkeeping, so there is no single standalone
.wasm that triggers it: reproducing it requires a host driving the public
StreamReader::pipe / FutureReader::new APIs and a guest that drops its end
while the host end is still HostReady. The minimal reproduction is therefore a
pair of in-tree crates/misc/component-async-tests cases (full source in Steps
to Reproduce
), using only public host APIs and no wasi:* data flow.

Steps to Reproduce

Case 1 — host consumer (host_drop_writer / ReadState::HostReady)

The guest hands the host the readable end of a fresh stream, keeps the
writable end
, writes one byte once the host attaches a consumer, then drops
the writer. The writer-drop reaches host_drop_writer with the read side still
HostReady.

  • Add an interface + world to crates/misc/component-async-tests/wit/test.wit:

    interface host-consumer-drop {
      // Returns the readable end of a fresh stream while the guest keeps the
      // writable end; writes one byte once a consumer attaches, then drops it.
      get: async func() -> stream<u8>;
    }
    
    world host-consumer-drop-guest {
      export host-consumer-drop;
    }
  • Add a guest test program at
    crates/test-programs/src/bin/async_host_consumer_drop.rs:

    mod bindings {
        wit_bindgen::generate!({
            path: "../misc/component-async-tests/wit",
            world: "host-consumer-drop-guest",
            async: true,
        });
    
        use super::Component;
        export!(Component);
    }
    
    use {bindings::exports::local::local::host_consumer_drop::Guest, wit_bindgen::StreamReader};
    
    struct Component;
    
    impl Guest for Component {
        async fn get() -> StreamReader<u8> {
            let (mut tx, rx) = bindings::wit_stream::new();
            // The host attaches a consumer (read side -> `HostReady`); the write
            // below blocks until that consumer reads, after which we drop the
            // writer. Dropping it while the consumer is still `HostReady` leaks.
            wit_bindgen::spawn(async move {
                assert!(tx.write_one(42).await.is_none());
                drop(tx);
            });
            rx
        }
    }
    
    fn main() {}
  • Add a host test (e.g. in tests/scenario/streams.rs):

    mod host_consumer_drop {
        wasmtime::component::bindgen!({
            path: "wit",
            world: "host-consumer-drop-guest",
            exports: { default: store | async },
        });
    }
    
    #[tokio::test]
    pub async fn async_host_consumer_drop() -> Result<()> {
        let engine = Engine::new(&config())?;
        let component = make_component(
            &engine,
            &[test_programs_artifacts::ASYNC_HOST_CONSUMER_DROP_COMPONENT],
        )
        .await?;
    
        let mut linker = Linker::new(&engine);
        wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
    
        let mut store = Store::new(
            &engine,
            Ctx {
                wasi: WasiCtxBuilder::new().inherit_stdio().build(),
                table: ResourceTable::default(),
                continue_: false,
            },
        );
    
        let instance = linker.instantiate_async(&mut store, &component).await?;
        let guest = host_consumer_drop::HostConsumerDropGuest::new(&mut store, &instance)?;
        store
            .run_concurrent(async move |accessor| {
                let stream = guest
                    .local_local_host_consumer_drop()
                    .call_get(accessor)
                    .await?;
    
                let (tx, mut rx) = mpsc::channel(1);
                accessor.with(move |store| stream.pipe(store, PipeConsumer::new(tx)))?;
                assert_eq!(rx.next().await, Some(42));
                assert!(rx.next().await.is_none());
    
                wasmtime::error::Ok(())
            })
            .await??;
    
        store.assert_concurrent_state_empty();
        Ok(())
    }

Case 2 — host producer (host_drop_reader / WriteState::HostReady)

The host hands the guest two host-produced futures via FutureReader::new; the
guest reads one and drops the other. This reuses the existing closed-streams
guest (local::local::closed::read-future, whose _rx_ignored argument is
dropped), so it is a drop-in host test with no new guest:

#[tokio::test]
pub async fn async_host_producer_drop() -> Result<()> {
    let engine = Engine::new(&config())?;
    let component = make_component(
        &engine,
        &[test_programs_artifacts::ASYNC_CLOSED_STREAMS_COMPONENT],
    )
    .await?;

    let mut linker = Linker::new(&engine);
    wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;

    let mut store = Store::new(
        &engine,
        Ctx {
            wasi: WasiCtxBuilder::new().inherit_stdio().build(),
            table: ResourceTable::default(),
            continue_: false,
        },
    );

    let instance = linker.instantiate_async(&mut store, &component).await?;

    let value = 42_u8;
    let (tx, rx) = oneshot::channel();
    let rx = FutureReader::new(&mut store, OneshotProducer::new(rx))?;
    let (_, rx_ignored) = oneshot::channel();
    let rx_ignored = FutureReader::new(&mut store, OneshotProducer::new(rx_ignored))?;

    let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;
    store
        .run_concurrent(async move |accessor| {
            _ = tx.send(value);
            closed_streams
                .local_local_closed()
                .call_read_future(accessor, rx, value, rx_ignored)
                .await
        })
        .await??;

    store.assert_concurrent_state_empty();
    Ok(())
}
  • Run: cargo test -p component-async-tests --test test_all async_host_

Expected Results

Both tests pass: when the guest drops its end of a stream/future, the host
consumer/producer is finalized and the transmit (state + both handles) is
reclaimed, so assert_concurrent_state_empty() succeeds after a clean run.

Actual Results

Both tests fail with leftover entries in the concurrent-state table:

non-empty table: [3, 4, 5]            // Case 1: 1 stream transmit (state + 2 handles)
non-empty table: [0, 1, 2, 3, 4, 5]   // Case 2: 2 future transmits

(In Case 2 both futures leak — reading a host-produced future and then
dropping the reader still strands the producer, alongside the never-read
rx_ignored.) The transmits are never reclaimed, so a long-running guest that
loops over such operations eventually traps with resource table has no free keys.

Root cause, in
crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs — two
match arms are no-ops where they must finalize the stranded host end. Both
functions are StoreOpaque methods reached from Instance::guest_drop_readable
/ Instance::guest_drop_writable:

  • host_drop_reader — guest drops the readable end while the host producer
    is WriteState::HostReady:

    WriteState::HostReady { .. } => {}
  • host_drop_writer — guest drops the writable end while the host consumer
    is ReadState::HostReady:

    ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {}

A TransmitState is only removed by delete_transmit, which is reached only
once the other end is already Dropped. With these no-ops, one end stays
HostReady forever and the transmit is never deleted. (The host producer set up
by new_transmit is purely reactive — it has no self-cleanup future that
observes the drop — so nothing else reclaims it either.)

Versions and Environment

Wasmtime version or commit: main, 46.0.0-dev (commit 9c49989a2e)

Operating system: macOS 26.5

Architecture: aarch64 (Apple Silicon)

The leak is deterministic host-side bookkeeping and is not OS/architecture
specific.

Extra Info

Proposed fix: finalize the stranded host end on guest drop — when the guest's
end is now Dropped and the host end is HostReady, set the host end to
Dropped and call delete_transmit (reclaiming the state + both handles and
dropping the host consumer/producer). A change covering exactly these two arms
takes both reproductions above from leaking to empty, and the full
component-async-tests suite stays green. I'm happy to open a PR with the fix
and these two tests.

On test coverage: round_trip* / post_return call
assert_concurrent_state_empty, but exercise read-based guest↔guest flows. The
existing .pipe()-based host-consumer scenarios (tests/scenario/streams.rs,
tests/scenario/transmit.rs) do not assert an empty concurrent state, so this
guest-drop path was untested.

Related: #12091 proposes a close() method on the
{Future,Stream}{Producer,Consumer} traits, motivated by the fact that today a
host can only detect a guest-side drop via Drop. This bug compounds that: on
the HostReady guest-drop path the host producer/consumer is never finalized at
all, so even the current Drop-based detection never fires. Finalizing the
stranded host end (the fix above) is also the natural place from which a future
close() hook would be invoked.

Related: wado-lang/wado#1236

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugIncorrect behavior in the current implementation that needs fixing

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions