From c2410dc721eec4a961d209d640b1196e1ffd2411 Mon Sep 17 00:00:00 2001 From: Tomasz Andrzejak Date: Thu, 14 May 2026 10:35:09 +0200 Subject: [PATCH] feat(bindgen): allow expanding numeric chunks in stream lowering Adapt stream lowering to expand numeric chunks like Uint8Array while preserving leftovers across guest reads. Also split host stream writes into public write(value) and internal writeMany(values) batch handling. --- .../src/intrinsics/p3/async_stream.rs | 79 +++++++++++++++---- .../src/intrinsics/p3/async_task.rs | 6 +- packages/preview3-shim/lib/nodejs/cli.js | 15 ++-- packages/preview3-shim/lib/nodejs/future.js | 4 + packages/preview3-shim/test/cli.test.js | 22 ++++++ packages/preview3-shim/test/future.test.js | 10 +++ 6 files changed, 106 insertions(+), 30 deletions(-) diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs index e409927a6..8983c9afd 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs @@ -839,6 +839,22 @@ impl AsyncStreamIntrinsic { async write(v) {{ {debug_log_fn}('[{end_class_name}#write()] args', {{ v }}); + let data; + if (this.#elemMeta.isNumeric) {{ + if (v instanceof ArrayBuffer) {{ + v = new Uint8Array(v); + }} + data = Array.isArray(v) || (ArrayBuffer.isView(v) && typeof v.length === 'number') ? Array.from(v) : [v]; + }} else {{ + data = [v]; + }} + return this.writeMany(data); + }} + + async writeMany(values) {{ + {debug_log_fn}('[{end_class_name}#writeMany()] args', {{ values }}); + if (!Array.isArray(values)) {{ throw new TypeError("writeMany values must be an array"); }} + // Wait for an existing write operation to end, if present, // otherwise register this write for any future operations. // @@ -853,7 +869,7 @@ impl AsyncStreamIntrinsic { this.#result = newResult; await p; }} catch (err) {{ - {debug_log_fn}('[{end_class_name}#write()] error waiting for previous write', err); + {debug_log_fn}('[{end_class_name}#writeMany()] error waiting for previous write', err); // If the previous write we were waiting on errors for any reason, // we can ignore it and attempt to continue with this write // which may also fail for a similar reason @@ -863,7 +879,8 @@ impl AsyncStreamIntrinsic { }} const {{ promise, resolve, reject }} = newResult; - const count = 1; + const data = values; + const count = data.length; if (this.#elemMeta.stringEncoding === undefined) {{ this.#elemMeta.string = 'utf8'; }} @@ -875,9 +892,9 @@ impl AsyncStreamIntrinsic { isReadable: true, // we need to read from this buffer later isWritable: false, elemMeta: this.#elemMeta, - data: v, + data, }}); - buffer.setTarget(`host stream write buffer (id [${{bufferID}}], count [${{count}}], data len [${{v.length}}])`); + buffer.setTarget(`host stream write buffer (id [${{bufferID}}], count [${{count}}], data len [${{data.length}}])`); let packedResult; packedResult = await this.copy({{ @@ -1909,6 +1926,7 @@ impl AsyncStreamIntrinsic { }}; let done = false; + const pendingValues = []; return async function generatedStreamHostInject(args) {{ let {{ count }} = args; @@ -1916,26 +1934,55 @@ impl AsyncStreamIntrinsic { if (count === 0) {{ return doNothingFn; }} if (readEnd.hasPendingEvent()) {{ return doNothingFn; }} - // If we get another read when done is already set, that was - // the case of a iterator that returned a final value - // along with `done: true` - if (done) {{ - hostWriteEnd.getPendingEvent(); - hostWriteEnd.drop(); - return doNothingFn; - }} - if (hostWriteEnd.isDoneState()) {{ return doNothingFn; }} const values = []; + const elemMeta = hostWriteEnd.getElemMeta(); + + const appendValues = (source) => {{ + const transfer = Math.min(count, source.length); + for (let i = 0; i < transfer; i++) {{ + values.push(source[i]); + }} + count -= transfer; + for (let i = transfer; i < source.length; i++) {{ + pendingValues.push(source[i]); + }} + return source.length; + }}; + + const appendReadValue = (value) => {{ + if (value === undefined) {{ return 0; }} + if (elemMeta.isNumeric) {{ + if (value instanceof ArrayBuffer) {{ + value = new Uint8Array(value); + }} + if (Array.isArray(value) || (ArrayBuffer.isView(value) && typeof value.length === 'number')) {{ + return appendValues(value); + }} + }} + return appendValues([value]); + }}; + + const drainPendingValues = () => {{ + const transfer = Math.min(count, pendingValues.length); + for (let i = 0; i < transfer; i++) {{ + values.push(pendingValues[i]); + }} + pendingValues.splice(0, transfer); + count -= transfer; + }}; + + drainPendingValues(); + while (count > 0 && !done) {{ const res = await readFn(); - if (res.value !== undefined) {{ values.push(res.value); }} + const appended = appendReadValue(res.value); done = res.done; + if (appended === 0 && !done) {{ count -= 1; }} if (done) {{ break; }} - count -= 1; }} // Iterator provided `done: true` with no final value @@ -1945,7 +1992,7 @@ impl AsyncStreamIntrinsic { return doNothingFn; }} - await hostWriteEnd.write(values); + await hostWriteEnd.writeMany(values); resetWriteEndToIdleFn(); return doNothingFn; diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs index 358720c7b..9c5df3abe 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs @@ -1673,11 +1673,7 @@ impl AsyncTaskIntrinsic { if (this.#onProgressFn) {{ this.#onProgressFn(); }} - if (subtaskValue === null) {{ - if (this.#cancelRequested) {{ - throw new Error('cancel was not requested, but no value present at return'); - }} - + if (subtaskValue === null && this.#cancelRequested) {{ if (this.#state === {subtask_class}.State.STARTING) {{ this.#state = {subtask_class}.State.CANCELLED_BEFORE_STARTED; }} else {{ diff --git a/packages/preview3-shim/lib/nodejs/cli.js b/packages/preview3-shim/lib/nodejs/cli.js index a580531f8..262b2d9f4 100644 --- a/packages/preview3-shim/lib/nodejs/cli.js +++ b/packages/preview3-shim/lib/nodejs/cli.js @@ -68,22 +68,19 @@ export const stdin = { const readable = Readable.toWeb(process.stdin); const { tx: futureTx, rx: futureRx } = future(); - const reader = new StreamReader(readable); - const originalRead = reader.read.bind(reader); - reader.read = async () => { + async function* stdinChunks() { try { - const chunk = await originalRead(); - if (chunk === null) { - await futureTx.write({ tag: "ok", val: undefined }); + for await (const chunk of readable.values({ preventCancel: true })) { + yield chunk; } - return chunk; + await futureTx.write({ tag: "ok", val: undefined }); } catch (err) { await futureTx.write({ tag: "err", val: errorCode(err) }); throw err; } - }; + } - return [reader, futureRx]; + return [new StreamReader(stdinChunks()), futureRx]; }, }; diff --git a/packages/preview3-shim/lib/nodejs/future.js b/packages/preview3-shim/lib/nodejs/future.js index 00d0df2eb..135fa4776 100644 --- a/packages/preview3-shim/lib/nodejs/future.js +++ b/packages/preview3-shim/lib/nodejs/future.js @@ -74,6 +74,10 @@ export class FutureReader { } } + then(resolve, reject) { + return this.read().then(resolve, reject); + } + /** * Cancels the future. Not supported for FutureReader. */ diff --git a/packages/preview3-shim/test/cli.test.js b/packages/preview3-shim/test/cli.test.js index 94b0f6025..eab4b4e56 100644 --- a/packages/preview3-shim/test/cli.test.js +++ b/packages/preview3-shim/test/cli.test.js @@ -92,6 +92,28 @@ describe("Node.js Preview3 wasi-cli", () => { Object.defineProperty(process, "stdin", { value: originalStdin }); }); + test("readViaStream async iterator resolves future on eof", async () => { + const { cli } = await import("@bytecodealliance/preview3-shim"); + + const fakeStdin = Readable.from([]); + const originalStdin = process.stdin; + + Object.defineProperty(process, "stdin", { + value: fakeStdin, + configurable: true, + }); + + try { + const [streamReader, result] = cli.stdin.readViaStream(); + const iterator = streamReader[Symbol.asyncIterator](); + + expect(await iterator.next()).toEqual({ value: undefined, done: true }); + expect(await result).toEqual({ tag: "ok", val: undefined }); + } finally { + Object.defineProperty(process, "stdin", { value: originalStdin }); + } + }); + test("writeViaStream writes to stderr", async () => { const { cli } = await import("@bytecodealliance/preview3-shim"); const { stream } = await import("@bytecodealliance/preview3-shim/stream"); diff --git a/packages/preview3-shim/test/future.test.js b/packages/preview3-shim/test/future.test.js index 0b57f3bd9..aa47d422a 100644 --- a/packages/preview3-shim/test/future.test.js +++ b/packages/preview3-shim/test/future.test.js @@ -37,6 +37,16 @@ describe("Node.js Preview3 canon future", () => { await expect(rx.read()).rejects.toThrow("aborted"); }); + test("reader has one shot semantics", async () => { + const { future } = await import("@bytecodealliance/preview3-shim/future"); + const { tx, rx } = future(); + + await tx.write("thenable"); + + expect(await rx).toBe("thenable"); + expect(await rx.read()).toBeNull(); + }); + test("writer cannot write twice", async () => { const { future } = await import("@bytecodealliance/preview3-shim/future"); const { tx, rx } = future();