diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs index e409927a6..8983c9afd 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs @@ -839,6 +839,22 @@ impl AsyncStreamIntrinsic { async write(v) {{ {debug_log_fn}('[{end_class_name}#write()] args', {{ v }}); + let data; + if (this.#elemMeta.isNumeric) {{ + if (v instanceof ArrayBuffer) {{ + v = new Uint8Array(v); + }} + data = Array.isArray(v) || (ArrayBuffer.isView(v) && typeof v.length === 'number') ? Array.from(v) : [v]; + }} else {{ + data = [v]; + }} + return this.writeMany(data); + }} + + async writeMany(values) {{ + {debug_log_fn}('[{end_class_name}#writeMany()] args', {{ values }}); + if (!Array.isArray(values)) {{ throw new TypeError("writeMany values must be an array"); }} + // Wait for an existing write operation to end, if present, // otherwise register this write for any future operations. // @@ -853,7 +869,7 @@ impl AsyncStreamIntrinsic { this.#result = newResult; await p; }} catch (err) {{ - {debug_log_fn}('[{end_class_name}#write()] error waiting for previous write', err); + {debug_log_fn}('[{end_class_name}#writeMany()] error waiting for previous write', err); // If the previous write we were waiting on errors for any reason, // we can ignore it and attempt to continue with this write // which may also fail for a similar reason @@ -863,7 +879,8 @@ impl AsyncStreamIntrinsic { }} const {{ promise, resolve, reject }} = newResult; - const count = 1; + const data = values; + const count = data.length; if (this.#elemMeta.stringEncoding === undefined) {{ this.#elemMeta.string = 'utf8'; }} @@ -875,9 +892,9 @@ impl AsyncStreamIntrinsic { isReadable: true, // we need to read from this buffer later isWritable: false, elemMeta: this.#elemMeta, - data: v, + data, }}); - buffer.setTarget(`host stream write buffer (id [${{bufferID}}], count [${{count}}], data len [${{v.length}}])`); + buffer.setTarget(`host stream write buffer (id [${{bufferID}}], count [${{count}}], data len [${{data.length}}])`); let packedResult; packedResult = await this.copy({{ @@ -1909,6 +1926,7 @@ impl AsyncStreamIntrinsic { }}; let done = false; + const pendingValues = []; return async function generatedStreamHostInject(args) {{ let {{ count }} = args; @@ -1916,26 +1934,55 @@ impl AsyncStreamIntrinsic { if (count === 0) {{ return doNothingFn; }} if (readEnd.hasPendingEvent()) {{ return doNothingFn; }} - // If we get another read when done is already set, that was - // the case of a iterator that returned a final value - // along with `done: true` - if (done) {{ - hostWriteEnd.getPendingEvent(); - hostWriteEnd.drop(); - return doNothingFn; - }} - if (hostWriteEnd.isDoneState()) {{ return doNothingFn; }} const values = []; + const elemMeta = hostWriteEnd.getElemMeta(); + + const appendValues = (source) => {{ + const transfer = Math.min(count, source.length); + for (let i = 0; i < transfer; i++) {{ + values.push(source[i]); + }} + count -= transfer; + for (let i = transfer; i < source.length; i++) {{ + pendingValues.push(source[i]); + }} + return source.length; + }}; + + const appendReadValue = (value) => {{ + if (value === undefined) {{ return 0; }} + if (elemMeta.isNumeric) {{ + if (value instanceof ArrayBuffer) {{ + value = new Uint8Array(value); + }} + if (Array.isArray(value) || (ArrayBuffer.isView(value) && typeof value.length === 'number')) {{ + return appendValues(value); + }} + }} + return appendValues([value]); + }}; + + const drainPendingValues = () => {{ + const transfer = Math.min(count, pendingValues.length); + for (let i = 0; i < transfer; i++) {{ + values.push(pendingValues[i]); + }} + pendingValues.splice(0, transfer); + count -= transfer; + }}; + + drainPendingValues(); + while (count > 0 && !done) {{ const res = await readFn(); - if (res.value !== undefined) {{ values.push(res.value); }} + const appended = appendReadValue(res.value); done = res.done; + if (appended === 0 && !done) {{ count -= 1; }} if (done) {{ break; }} - count -= 1; }} // Iterator provided `done: true` with no final value @@ -1945,7 +1992,7 @@ impl AsyncStreamIntrinsic { return doNothingFn; }} - await hostWriteEnd.write(values); + await hostWriteEnd.writeMany(values); resetWriteEndToIdleFn(); return doNothingFn; diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs index 358720c7b..9c5df3abe 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs @@ -1673,11 +1673,7 @@ impl AsyncTaskIntrinsic { if (this.#onProgressFn) {{ this.#onProgressFn(); }} - if (subtaskValue === null) {{ - if (this.#cancelRequested) {{ - throw new Error('cancel was not requested, but no value present at return'); - }} - + if (subtaskValue === null && this.#cancelRequested) {{ if (this.#state === {subtask_class}.State.STARTING) {{ this.#state = {subtask_class}.State.CANCELLED_BEFORE_STARTED; }} else {{ diff --git a/packages/preview3-shim/lib/nodejs/cli.js b/packages/preview3-shim/lib/nodejs/cli.js index a580531f8..262b2d9f4 100644 --- a/packages/preview3-shim/lib/nodejs/cli.js +++ b/packages/preview3-shim/lib/nodejs/cli.js @@ -68,22 +68,19 @@ export const stdin = { const readable = Readable.toWeb(process.stdin); const { tx: futureTx, rx: futureRx } = future(); - const reader = new StreamReader(readable); - const originalRead = reader.read.bind(reader); - reader.read = async () => { + async function* stdinChunks() { try { - const chunk = await originalRead(); - if (chunk === null) { - await futureTx.write({ tag: "ok", val: undefined }); + for await (const chunk of readable.values({ preventCancel: true })) { + yield chunk; } - return chunk; + await futureTx.write({ tag: "ok", val: undefined }); } catch (err) { await futureTx.write({ tag: "err", val: errorCode(err) }); throw err; } - }; + } - return [reader, futureRx]; + return [new StreamReader(stdinChunks()), futureRx]; }, }; diff --git a/packages/preview3-shim/lib/nodejs/future.js b/packages/preview3-shim/lib/nodejs/future.js index 00d0df2eb..135fa4776 100644 --- a/packages/preview3-shim/lib/nodejs/future.js +++ b/packages/preview3-shim/lib/nodejs/future.js @@ -74,6 +74,10 @@ export class FutureReader { } } + then(resolve, reject) { + return this.read().then(resolve, reject); + } + /** * Cancels the future. Not supported for FutureReader. */ diff --git a/packages/preview3-shim/test/cli.test.js b/packages/preview3-shim/test/cli.test.js index 94b0f6025..eab4b4e56 100644 --- a/packages/preview3-shim/test/cli.test.js +++ b/packages/preview3-shim/test/cli.test.js @@ -92,6 +92,28 @@ describe("Node.js Preview3 wasi-cli", () => { Object.defineProperty(process, "stdin", { value: originalStdin }); }); + test("readViaStream async iterator resolves future on eof", async () => { + const { cli } = await import("@bytecodealliance/preview3-shim"); + + const fakeStdin = Readable.from([]); + const originalStdin = process.stdin; + + Object.defineProperty(process, "stdin", { + value: fakeStdin, + configurable: true, + }); + + try { + const [streamReader, result] = cli.stdin.readViaStream(); + const iterator = streamReader[Symbol.asyncIterator](); + + expect(await iterator.next()).toEqual({ value: undefined, done: true }); + expect(await result).toEqual({ tag: "ok", val: undefined }); + } finally { + Object.defineProperty(process, "stdin", { value: originalStdin }); + } + }); + test("writeViaStream writes to stderr", async () => { const { cli } = await import("@bytecodealliance/preview3-shim"); const { stream } = await import("@bytecodealliance/preview3-shim/stream"); diff --git a/packages/preview3-shim/test/future.test.js b/packages/preview3-shim/test/future.test.js index 0b57f3bd9..aa47d422a 100644 --- a/packages/preview3-shim/test/future.test.js +++ b/packages/preview3-shim/test/future.test.js @@ -37,6 +37,16 @@ describe("Node.js Preview3 canon future", () => { await expect(rx.read()).rejects.toThrow("aborted"); }); + test("reader has one shot semantics", async () => { + const { future } = await import("@bytecodealliance/preview3-shim/future"); + const { tx, rx } = future(); + + await tx.write("thenable"); + + expect(await rx).toBe("thenable"); + expect(await rx.read()).toBeNull(); + }); + test("writer cannot write twice", async () => { const { future } = await import("@bytecodealliance/preview3-shim/future"); const { tx, rx } = future();