From 36fdf90997de89b8eceebfaa559e2702b552cbd7 Mon Sep 17 00:00:00 2001 From: Tomasz Andrzejak Date: Wed, 29 Apr 2026 18:56:14 +0200 Subject: [PATCH 1/4] fix(bindgen): avoid async future and stream host injection deadlocks The spec is subtle here but IIUC we should start host side future and stream injection without awaiting it before the canonical read operation can see BLOCKED. So if the copy does not immediately produce a pending event, the implementation must set ASYNC_COPYING and return BLOCKED. Our old code awaited the host promise during host injection before allowing this path to return BLOCKED, which deadlocks the guest scheduler. This is consistent with python code from canon ABI spec: ```python if not e.has_pending_event(): if not opts.async_: e.state = CopyState.SYNC_COPYING thread.wait_until(e.has_pending_event) else: e.state = CopyState.ASYNC_COPYING return [BLOCKED] code,index,payload = e.get_pending_event() assert(code == event_code and index == i) return [payload] ``` This is the same for futures and streams. An example of deadlock on the guest side with an old implementation looks like this: ```rust let (mut tx, rx) = wit_stream::new(); futures::join!( async { wasi::cli::stdout::write_via_stream(rx).await.unwrap(); }, async { tx.write(b"hello, world\n".to_vec()).await; drop(tx); }, ); ``` I've added concurrency tests for both streams and futures that this patch fixes. --- Cargo.lock | 1 + .../src/intrinsics/p3/async_future.rs | 25 ++--- .../src/intrinsics/p3/async_stream.rs | 20 ++-- crates/test-components/Cargo.toml | 1 + .../src/bin/future_concurrency.rs | 31 ++++++ .../src/bin/stream_concurrency.rs | 33 ++++++ crates/test-components/wit/all.wit | 22 ++++ packages/jco/test/p3/deadlock-regressions.js | 103 ++++++++++++++++++ 8 files changed, 211 insertions(+), 25 deletions(-) create mode 100644 crates/test-components/src/bin/future_concurrency.rs create mode 100644 crates/test-components/src/bin/stream_concurrency.rs create mode 100644 packages/jco/test/p3/deadlock-regressions.js diff --git a/Cargo.lock b/Cargo.lock index 53e861731..ec8a802bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -767,6 +767,7 @@ name = "jco-test-components" version = "0.0.0" dependencies = [ "anyhow", + "futures", "wit-bindgen", ] diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_future.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_future.rs index 03bc42d46..5fd81d16f 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_future.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_future.rs @@ -687,8 +687,15 @@ impl AsyncFutureIntrinsic { }}); if (injectedWritePromise) {{ - const cleanupFn = await injectedWritePromise; - cleanupFn(); + if (this.hasPendingEvent()) {{ + const cleanupFn = await injectedWritePromise; + cleanupFn(); + }} else {{ + injectedWritePromise.then( + cleanupFn => cleanupFn(), + err => this.setPendingEvent(() => {{ throw err; }}), + ); + }} }} return {{ buffer }}; @@ -807,13 +814,6 @@ impl AsyncFutureIntrinsic { }}); }}); - // Perform another write, reusing the buffer - const {{ buffer }} = await this.guestRead({{ - buffer, - stringEncoding, - isAsync: true, - }}); - if (!this.hasPendingEvent()) {{ throw new Error("missing pending event after blocked future read"); }} @@ -862,13 +862,6 @@ impl AsyncFutureIntrinsic { }}); }}); - // Perform another write, reusing the buffer - const {{ buffer }} = await this.guestWrite({{ - buffer, - stringEncoding, - isAsync: true, - }}); - if (!this.hasPendingEvent()) {{ throw new Error("missing pending event after blocked future write"); }} diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs index d00150165..619a518d5 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs @@ -731,10 +731,10 @@ impl AsyncStreamIntrinsic { // if one is present, because what we got from the outside world // was a reader // - let onReadFinishFn; + let injectedWritePromise; const injectHostWrite = this.isReadable() && !!this.#hostInjectFn; if (injectHostWrite) {{ - onReadFinishFn = await this.#hostInjectFn({{ count }}); + injectedWritePromise = this.#hostInjectFn({{ count }}); }} // Perform the read/write @@ -747,13 +747,15 @@ impl AsyncStreamIntrinsic { // If sync, wait forever but allow task to do other things if (!this.hasPendingEvent()) {{ - if (injectHostWrite) {{ - throw new Error('reader unexpectedly blocked after injected write'); - }} - if (isAsync) {{ this.setCopyState({stream_end_class}.CopyState.ASYNC_COPYING); {debug_log_fn}('[{stream_end_class}#copy()] blocked', {{ componentIdx, eventCode, self: this }}); + if (injectedWritePromise) {{ + injectedWritePromise.then( + cleanupFn => cleanupFn(), + err => this.setPendingEvent(() => {{ throw err; }}), + ); + }} return {async_blocked_const}; }} else {{ this.setCopyState({stream_end_class}.CopyState.SYNC_COPYING); @@ -774,9 +776,9 @@ impl AsyncStreamIntrinsic { // If we injected a write and the read has completed, we should reset // we can skip the rest of the async machinery since there the host controlled // write end does not need to use the pending event machinery - if (injectHostWrite) {{ - if (!onReadFinishFn) {{ throw new Error('missing read finish fn'); }} - onReadFinishFn(); + if (injectedWritePromise) {{ + const cleanupFn = await injectedWritePromise; + cleanupFn(); }} const event = this.getPendingEvent(); diff --git a/crates/test-components/Cargo.toml b/crates/test-components/Cargo.toml index e541f1182..fe80e8974 100644 --- a/crates/test-components/Cargo.toml +++ b/crates/test-components/Cargo.toml @@ -8,6 +8,7 @@ Test components for use during jco testing """ [dependencies] +futures = { version = "0.3.32", default-features = false, features = ["alloc"] } wit-bindgen = { workspace = true, features = [ "macros", "async-spawn"] } [build-dependencies] diff --git a/crates/test-components/src/bin/future_concurrency.rs b/crates/test-components/src/bin/future_concurrency.rs new file mode 100644 index 000000000..33cce5160 --- /dev/null +++ b/crates/test-components/src/bin/future_concurrency.rs @@ -0,0 +1,31 @@ +mod bindings { + use super::Component; + wit_bindgen::generate!({ + world: "future-concurrency", + }); + export!(Component); +} + +use bindings::exports::jco::test_components::local_run_async; +use bindings::jco::test_components::future_concurrency_host; +use bindings::wit_stream; + +struct Component; + +impl local_run_async::Guest for Component { + async fn run() { + let (mut tx, rx) = wit_stream::new(); + + futures::join!( + async { + assert_eq!(future_concurrency_host::write_via_stream(rx).await, 42); + }, + async { + tx.write_all(vec![42]).await; + drop(tx); + }, + ); + } +} + +fn main() {} diff --git a/crates/test-components/src/bin/stream_concurrency.rs b/crates/test-components/src/bin/stream_concurrency.rs new file mode 100644 index 000000000..550d8e155 --- /dev/null +++ b/crates/test-components/src/bin/stream_concurrency.rs @@ -0,0 +1,33 @@ +mod bindings { + use super::Component; + wit_bindgen::generate!({ + world: "stream-concurrency", + }); + export!(Component); +} + +use bindings::exports::jco::test_components::stream_concurrency_test; +use bindings::jco::test_components::stream_concurrency_host; +use wit_bindgen::StreamReader; + +struct Component; + +impl stream_concurrency_test::Guest for Component { + async fn read_after_signal(mut rx: StreamReader) -> Vec { + let (values, ()) = futures::join!( + async { + let mut values = Vec::new(); + while let Some(value) = rx.next().await { + values.push(value); + } + values + }, + async { + stream_concurrency_host::signal(); + }, + ); + values + } +} + +fn main() {} diff --git a/crates/test-components/wit/all.wit b/crates/test-components/wit/all.wit index 431936cb1..446dfa7b8 100644 --- a/crates/test-components/wit/all.wit +++ b/crates/test-components/wit/all.wit @@ -242,6 +242,28 @@ world sleep-post-return-callee { export sleep-post-return; } +interface future-concurrency-host { + write-via-stream: func(s: stream) -> future; +} + +world future-concurrency { + import future-concurrency-host; + export local-run-async; +} + +interface stream-concurrency-host { + signal: func(); +} + +interface stream-concurrency-test { + read-after-signal: async func(s: stream) -> list; +} + +world stream-concurrency { + import stream-concurrency-host; + export stream-concurrency-test; +} + interface fixed-length-lists-fn { takes-returns-fixed: func(bools: list) -> result, string>; } diff --git a/packages/jco/test/p3/deadlock-regressions.js b/packages/jco/test/p3/deadlock-regressions.js new file mode 100644 index 000000000..95c578102 --- /dev/null +++ b/packages/jco/test/p3/deadlock-regressions.js @@ -0,0 +1,103 @@ +import { join } from "node:path"; + +import { suite, test, assert } from "vitest"; + +import { WASIShim } from "@bytecodealliance/preview2-shim/instantiation"; + +import { setupAsyncTest } from "../helpers.js"; +import { LOCAL_TEST_COMPONENTS_DIR } from "../common.js"; + +function deferred() { + let resolve, reject; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +suite("async scheduling regressions", () => { + test("host future can be completed by a guest sibling task", async () => { + const { instance, cleanup } = await setupAsyncTest({ + asyncMode: "jspi", + component: { + path: join(LOCAL_TEST_COMPONENTS_DIR, "future-concurrency.wasm"), + imports: { + ...new WASIShim().getImportObject(), + "jco:test-components/future-concurrency-host": { + writeViaStream: async (stream) => { + const values = []; + for await (const value of stream) { + values.push(value); + } + assert.deepEqual(values, [42]); + return 42; + }, + }, + }, + }, + jco: { + transpile: { + extraArgs: { + asyncExports: ["jco:test-components/local-run-async#run"], + }, + }, + }, + }); + + try { + await instance["jco:test-components/local-run-async"].run(); + } finally { + await cleanup(); + } + }); + + test("host stream can be unblocked by a guest sibling task", async () => { + const signaled = deferred(); + let yielded = false; + + const stream = { + [Symbol.asyncIterator]() { + return { + next: async () => { + await signaled.promise; + if (yielded) { + return { value: undefined, done: true }; + } + yielded = true; + return { value: 42, done: false }; + }, + }; + }, + }; + + const { instance, cleanup } = await setupAsyncTest({ + asyncMode: "jspi", + component: { + path: join(LOCAL_TEST_COMPONENTS_DIR, "stream-concurrency.wasm"), + imports: { + ...new WASIShim().getImportObject(), + "jco:test-components/stream-concurrency-host": { + signal: () => signaled.resolve(), + }, + }, + }, + jco: { + transpile: { + extraArgs: { + asyncExports: ["jco:test-components/stream-concurrency-test#read-after-signal"], + }, + }, + }, + }); + + try { + assert.deepEqual( + await instance["jco:test-components/stream-concurrency-test"].readAfterSignal(stream), + [42], + ); + } finally { + await cleanup(); + } + }); +}); From 1e400bb6925fe7b5d7ffbc35b2f622fbda5c52a7 Mon Sep 17 00:00:00 2001 From: Tomasz Andrzejak Date: Thu, 30 Apr 2026 11:32:55 +0200 Subject: [PATCH 2/4] fix(bindgen): get blocked copy result from pending event --- .../src/intrinsics/p3/async_stream.rs | 75 ++++++++++--------- 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs index 619a518d5..524bf3cb5 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs @@ -731,11 +731,7 @@ impl AsyncStreamIntrinsic { // if one is present, because what we got from the outside world // was a reader // - let injectedWritePromise; const injectHostWrite = this.isReadable() && !!this.#hostInjectFn; - if (injectHostWrite) {{ - injectedWritePromise = this.#hostInjectFn({{ count }}); - }} // Perform the read/write this._{rw_fn_name}({{ @@ -745,6 +741,11 @@ impl AsyncStreamIntrinsic { componentIdx, }}); + let injectedWritePromise; + if (injectHostWrite && !this.hasPendingEvent()) {{ + injectedWritePromise = this.#hostInjectFn({{ count }}); + }} + // If sync, wait forever but allow task to do other things if (!this.hasPendingEvent()) {{ if (isAsync) {{ @@ -900,8 +901,8 @@ impl AsyncStreamIntrinsic { // just-before reads, no matter what the user does on the other end). // if (packedResult === {async_blocked_const} && !this.#isHostOwned) {{ - // If the write was blocked, we can only make progress when - // the read side notifies us of a read, then we must attempt the copy again + // If the write was blocked, the pending event produced by the + // read side represents the completed copy. await new Promise((resolve) => {{ let waitInterval = setInterval(async () => {{ @@ -911,18 +912,21 @@ impl AsyncStreamIntrinsic { }}); }}); - packedResult = await this.copy({{ - isAsync: true, - count, - bufferID, - buffer, - eventCode: {async_event_code_enum}.STREAM_WRITE, - // NOTE: we skip state checks only when dealing with a post blocked - // read/write in the host. This enables the host to quickly pick up the - // guest operation on the otherside quickly. - skipStateCheck: true, - componentIdx: -1, - }}); + if (!this.hasPendingEvent()) {{ + throw new Error("missing pending event after blocked stream write"); + }} + + const event = this.getPendingEvent(); + if (!event) {{ throw new Error("missing pending event after blocked stream write"); }} + + const {{ code, payload0: index, payload1: payload }} = event; + + if (code !== {async_event_code_enum}.STREAM_WRITE) {{ + throw new Error(`mismatched event code [${{code}}] for host stream write`); + }} + + if (index !== this.waitableIdx()) {{ throw new Error('invalid stream end index'); }} + packedResult = payload; const copied = packedResult >> 4; if (copied === 0 && this.isDoneState()) {{ @@ -1018,8 +1022,8 @@ impl AsyncStreamIntrinsic { }}); if (packedResult === {async_blocked_const}) {{ - // If the read was blocked, we can only make progress when - // the write side notifies us of a write, then we must attempt the copy again + // If the read was blocked, the pending event produced by the + // write side represents the completed copy. await new Promise((resolve) => {{ let waitInterval = setInterval(() => {{ @@ -1029,24 +1033,22 @@ impl AsyncStreamIntrinsic { }}); }}); - packedResult = await this.copy({{ - isAsync: true, - count, - bufferID, - buffer, - eventCode: {async_event_code_enum}.STREAM_READ, - // NOTE: we skip state checks only when dealing with a post blocked - // read/write in the host. This enables the host to quickly pick up the - // guest operation on the otherside quickly. - skipStateCheck: true, - componentIdx: -1, - }}); + if (!this.hasPendingEvent()) {{ + throw new Error("missing pending event after blocked stream read"); + }} - const copied = packedResult >> 4; - if (copied === 0 && this.isDoneState()) {{ - reject(new Error("write end dropped during read")); + const event = this.getPendingEvent(); + if (!event) {{ throw new Error("missing pending event after blocked stream read"); }} + + const {{ code, payload0: index, payload1: payload }} = event; + + if (code !== {async_event_code_enum}.STREAM_READ) {{ + throw new Error(`mismatched event code [${{code}}] for host stream read`); }} + if (index !== this.waitableIdx()) {{ throw new Error('invalid stream end index'); }} + packedResult = payload; + if (packedResult === {async_blocked_const}) {{ throw new Error("unexpected double block during read"); }} @@ -1907,8 +1909,9 @@ impl AsyncStreamIntrinsic { }} await hostWriteEnd.write(values); + resetWriteEndToIdleFn(); - return resetWriteEndToIdleFn; + return doNothingFn; }}; }} "# From deabbcd8372c47cb669e04e022524593933ed5a3 Mon Sep 17 00:00:00 2001 From: Tomasz Andrzejak Date: Mon, 4 May 2026 09:58:07 +0200 Subject: [PATCH 3/4] fix(bindgen): address code review --- .../src/intrinsics/p3/async_future.rs | 18 ++++++------- .../src/intrinsics/p3/async_stream.rs | 18 ++++++------- packages/jco/test/p3/deadlock-regressions.js | 25 +------------------ 3 files changed, 15 insertions(+), 46 deletions(-) diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_future.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_future.rs index 5fd81d16f..2d6bf1d2b 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_future.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_future.rs @@ -673,8 +673,9 @@ impl AsyncFutureIntrinsic { // Before performing this read, if we're dealing with a host-controlled - // future, then we should inject a write, but we can't wait for it to complete - // as we must do the rendesvous read below for the write to complete. + // future, start injecting the write. The injection may depend on sibling + // guest work running, so cleanup is attached without awaiting here; the + // canonical read must be able to return BLOCKED first. let injectedWritePromise; if (this.#hostInjectFn) {{ injectedWritePromise = this.#hostInjectFn({{ count: 1 }}); @@ -687,15 +688,10 @@ impl AsyncFutureIntrinsic { }}); if (injectedWritePromise) {{ - if (this.hasPendingEvent()) {{ - const cleanupFn = await injectedWritePromise; - cleanupFn(); - }} else {{ - injectedWritePromise.then( - cleanupFn => cleanupFn(), - err => this.setPendingEvent(() => {{ throw err; }}), - ); - }} + injectedWritePromise.then( + cleanupFn => cleanupFn(), + err => this.setPendingEvent(() => {{ throw err; }}), + ); }} return {{ buffer }}; diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs index 524bf3cb5..f558465ed 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs @@ -723,14 +723,9 @@ impl AsyncStreamIntrinsic { skipStateCheck, }}); - // If the stream is readable and was lowered from the host, - // when the component is doing a read (i.e. `stream.read`), - // the writer is host-side and may have already written. - // - // We effectively do a just-in-time "write" of the external value, - // if one is present, because what we got from the outside world - // was a reader - // + // If the stream is readable and was lowered from the host, the + // writer is host-side. Register the read first, then do a + // just-in-time host write only if the read blocked. const injectHostWrite = this.isReadable() && !!this.#hostInjectFn; // Perform the read/write @@ -752,6 +747,8 @@ impl AsyncStreamIntrinsic { this.setCopyState({stream_end_class}.CopyState.ASYNC_COPYING); {debug_log_fn}('[{stream_end_class}#copy()] blocked', {{ componentIdx, eventCode, self: this }}); if (injectedWritePromise) {{ + // Do not await here: the injected write may depend on sibling + // guest work running, so the canonical read must return BLOCKED. injectedWritePromise.then( cleanupFn => cleanupFn(), err => this.setPendingEvent(() => {{ throw err; }}), @@ -774,9 +771,8 @@ impl AsyncStreamIntrinsic { }} }} - // If we injected a write and the read has completed, we should reset - // we can skip the rest of the async machinery since there the host controlled - // write end does not need to use the pending event machinery + // If the read completed immediately after injecting a host write, + // it is safe to await injection cleanup before consuming the event. if (injectedWritePromise) {{ const cleanupFn = await injectedWritePromise; cleanupFn(); diff --git a/packages/jco/test/p3/deadlock-regressions.js b/packages/jco/test/p3/deadlock-regressions.js index 95c578102..c64ed59e7 100644 --- a/packages/jco/test/p3/deadlock-regressions.js +++ b/packages/jco/test/p3/deadlock-regressions.js @@ -7,15 +7,6 @@ import { WASIShim } from "@bytecodealliance/preview2-shim/instantiation"; import { setupAsyncTest } from "../helpers.js"; import { LOCAL_TEST_COMPONENTS_DIR } from "../common.js"; -function deferred() { - let resolve, reject; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - return { promise, resolve, reject }; -} - suite("async scheduling regressions", () => { test("host future can be completed by a guest sibling task", async () => { const { instance, cleanup } = await setupAsyncTest({ @@ -36,13 +27,6 @@ suite("async scheduling regressions", () => { }, }, }, - jco: { - transpile: { - extraArgs: { - asyncExports: ["jco:test-components/local-run-async#run"], - }, - }, - }, }); try { @@ -53,7 +37,7 @@ suite("async scheduling regressions", () => { }); test("host stream can be unblocked by a guest sibling task", async () => { - const signaled = deferred(); + const signaled = Promise.withResolvers(); let yielded = false; const stream = { @@ -82,13 +66,6 @@ suite("async scheduling regressions", () => { }, }, }, - jco: { - transpile: { - extraArgs: { - asyncExports: ["jco:test-components/stream-concurrency-test#read-after-signal"], - }, - }, - }, }); try { From 2ff6dd5232df7c27cbe9dc0b1c535f22149703b3 Mon Sep 17 00:00:00 2001 From: Tomasz Andrzejak Date: Mon, 4 May 2026 10:29:26 +0200 Subject: [PATCH 4/4] refactor(bindgen): move stream host injection readiness check --- crates/js-component-bindgen/src/function_bindgen.rs | 1 + crates/js-component-bindgen/src/intrinsics/lower.rs | 1 + .../src/intrinsics/p3/async_stream.rs | 10 ++++++---- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/crates/js-component-bindgen/src/function_bindgen.rs b/crates/js-component-bindgen/src/function_bindgen.rs index d2bf4c3c7..6e16099df 100644 --- a/crates/js-component-bindgen/src/function_bindgen.rs +++ b/crates/js-component-bindgen/src/function_bindgen.rs @@ -3176,6 +3176,7 @@ impl Bindgen for FunctionBindgen<'_> { const hostInjectFn = {gen_stream_host_inject_fn}({{ readFn: readFn{tmp}, hostWriteEnd: hostWriteEnd{tmp}, + readEnd: readEnd{tmp}, }}); readEnd{tmp}.setHostInjectFn(hostInjectFn); diff --git a/crates/js-component-bindgen/src/intrinsics/lower.rs b/crates/js-component-bindgen/src/intrinsics/lower.rs index 0dfdf770f..f07d6ae0b 100644 --- a/crates/js-component-bindgen/src/intrinsics/lower.rs +++ b/crates/js-component-bindgen/src/intrinsics/lower.rs @@ -1136,6 +1136,7 @@ impl LowerIntrinsic { const hostInjectFn = {gen_stream_host_inject_fn}({{ readFn: {gen_read_fn_from_lowerable_stream_fn}(stream), hostWriteEnd: writeEnd, + readEnd, }}); readEnd.setHostInjectFn(hostInjectFn); diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs index f558465ed..6a6bbe128 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs @@ -724,8 +724,8 @@ impl AsyncStreamIntrinsic { }}); // If the stream is readable and was lowered from the host, the - // writer is host-side. Register the read first, then do a - // just-in-time host write only if the read blocked. + // writer is host-side. Register the read first; host injection + // will no-op if the read already produced a pending event. const injectHostWrite = this.isReadable() && !!this.#hostInjectFn; // Perform the read/write @@ -737,7 +737,7 @@ impl AsyncStreamIntrinsic { }}); let injectedWritePromise; - if (injectHostWrite && !this.hasPendingEvent()) {{ + if (injectHostWrite) {{ injectedWritePromise = this.#hostInjectFn({{ count }}); }} @@ -1860,7 +1860,8 @@ impl AsyncStreamIntrinsic { output.push_str(&format!( r#" function {gen_host_inject_fn}(genArgs) {{ - const {{ readFn, hostWriteEnd }} = genArgs; + const {{ readFn, hostWriteEnd, readEnd }} = genArgs; + if (!readEnd) {{ throw new TypeError('missing read end'); }} const doNothingFn = () => {{}}; const resetWriteEndToIdleFn = () => {{ // After the write is finished, we consume the event that was generated @@ -1874,6 +1875,7 @@ impl AsyncStreamIntrinsic { let {{ count }} = args; if (count < 0) {{ throw new Error('invalid count'); }} if (count === 0) {{ return doNothingFn; }} + if (readEnd.hasPendingEvent()) {{ return doNothingFn; }} // If we get another read when done is already set, that was // the case of a iterator that returned a final value