Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 63 additions & 16 deletions crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,22 @@ impl AsyncStreamIntrinsic {
async write(v) {{
{debug_log_fn}('[{end_class_name}#write()] args', {{ v }});

let data;
if (this.#elemMeta.isNumeric) {{
if (v instanceof ArrayBuffer) {{
v = new Uint8Array(v);
}}
data = Array.isArray(v) || (ArrayBuffer.isView(v) && typeof v.length === 'number') ? Array.from(v) : [v];
}} else {{
data = [v];
}}
return this.writeMany(data);
}}

async writeMany(values) {{
{debug_log_fn}('[{end_class_name}#writeMany()] args', {{ values }});
if (!Array.isArray(values)) {{ throw new TypeError("writeMany values must be an array"); }}

// Wait for an existing write operation to end, if present,
// otherwise register this write for any future operations.
//
Expand All @@ -853,7 +869,7 @@ impl AsyncStreamIntrinsic {
this.#result = newResult;
await p;
}} catch (err) {{
{debug_log_fn}('[{end_class_name}#write()] error waiting for previous write', err);
{debug_log_fn}('[{end_class_name}#writeMany()] error waiting for previous write', err);
// If the previous write we were waiting on errors for any reason,
// we can ignore it and attempt to continue with this write
// which may also fail for a similar reason
Expand All @@ -863,7 +879,8 @@ impl AsyncStreamIntrinsic {
}}
const {{ promise, resolve, reject }} = newResult;

const count = 1;
const data = values;
const count = data.length;
if (this.#elemMeta.stringEncoding === undefined) {{
this.#elemMeta.string = 'utf8';
}}
Expand All @@ -875,9 +892,9 @@ impl AsyncStreamIntrinsic {
isReadable: true, // we need to read from this buffer later
isWritable: false,
elemMeta: this.#elemMeta,
data: v,
data,
}});
buffer.setTarget(`host stream write buffer (id [${{bufferID}}], count [${{count}}], data len [${{v.length}}])`);
buffer.setTarget(`host stream write buffer (id [${{bufferID}}], count [${{count}}], data len [${{data.length}}])`);

let packedResult;
packedResult = await this.copy({{
Expand Down Expand Up @@ -1909,33 +1926,63 @@ impl AsyncStreamIntrinsic {
}};

let done = false;
const pendingValues = [];

return async function generatedStreamHostInject(args) {{
let {{ count }} = args;
if (count < 0) {{ throw new Error('invalid count'); }}
if (count === 0) {{ return doNothingFn; }}
if (readEnd.hasPendingEvent()) {{ return doNothingFn; }}

// If we get another read when done is already set, that was
// the case of a iterator that returned a final value
// along with `done: true`
if (done) {{
hostWriteEnd.getPendingEvent();
hostWriteEnd.drop();
return doNothingFn;
}}

if (hostWriteEnd.isDoneState()) {{
return doNothingFn;
}}

const values = [];
const elemMeta = hostWriteEnd.getElemMeta();

const appendValues = (source) => {{
const transfer = Math.min(count, source.length);
for (let i = 0; i < transfer; i++) {{
values.push(source[i]);
}}
count -= transfer;
for (let i = transfer; i < source.length; i++) {{
pendingValues.push(source[i]);
}}
return source.length;
}};

const appendReadValue = (value) => {{
if (value === undefined) {{ return 0; }}
if (elemMeta.isNumeric) {{
if (value instanceof ArrayBuffer) {{
value = new Uint8Array(value);
}}
if (Array.isArray(value) || (ArrayBuffer.isView(value) && typeof value.length === 'number')) {{
return appendValues(value);
}}
}}
return appendValues([value]);
}};

const drainPendingValues = () => {{
const transfer = Math.min(count, pendingValues.length);
for (let i = 0; i < transfer; i++) {{
values.push(pendingValues[i]);
}}
pendingValues.splice(0, transfer);
count -= transfer;
}};

drainPendingValues();

while (count > 0 && !done) {{
const res = await readFn();
if (res.value !== undefined) {{ values.push(res.value); }}
const appended = appendReadValue(res.value);
done = res.done;
if (appended === 0 && !done) {{ count -= 1; }}
if (done) {{ break; }}
count -= 1;
}}

// Iterator provided `done: true` with no final value
Expand All @@ -1945,7 +1992,7 @@ impl AsyncStreamIntrinsic {
return doNothingFn;
}}

await hostWriteEnd.write(values);
await hostWriteEnd.writeMany(values);
resetWriteEndToIdleFn();

return doNothingFn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1673,11 +1673,7 @@ impl AsyncTaskIntrinsic {

if (this.#onProgressFn) {{ this.#onProgressFn(); }}

if (subtaskValue === null) {{
if (this.#cancelRequested) {{
throw new Error('cancel was not requested, but no value present at return');
}}

if (subtaskValue === null && this.#cancelRequested) {{
if (this.#state === {subtask_class}.State.STARTING) {{
this.#state = {subtask_class}.State.CANCELLED_BEFORE_STARTED;
}} else {{
Expand Down
15 changes: 6 additions & 9 deletions packages/preview3-shim/lib/nodejs/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,19 @@ export const stdin = {
const readable = Readable.toWeb(process.stdin);
const { tx: futureTx, rx: futureRx } = future();

const reader = new StreamReader(readable);
const originalRead = reader.read.bind(reader);
reader.read = async () => {
async function* stdinChunks() {
try {
const chunk = await originalRead();
if (chunk === null) {
await futureTx.write({ tag: "ok", val: undefined });
for await (const chunk of readable.values({ preventCancel: true })) {
yield chunk;
}
return chunk;
await futureTx.write({ tag: "ok", val: undefined });
} catch (err) {
await futureTx.write({ tag: "err", val: errorCode(err) });
throw err;
}
};
}

return [reader, futureRx];
return [new StreamReader(stdinChunks()), futureRx];
},
};

Expand Down
4 changes: 4 additions & 0 deletions packages/preview3-shim/lib/nodejs/future.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ export class FutureReader {
}
}

then(resolve, reject) {
return this.read().then(resolve, reject);
}

/**
* Cancels the future. Not supported for FutureReader.
*/
Expand Down
22 changes: 22 additions & 0 deletions packages/preview3-shim/test/cli.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,28 @@ describe("Node.js Preview3 wasi-cli", () => {
Object.defineProperty(process, "stdin", { value: originalStdin });
});

test("readViaStream async iterator resolves future on eof", async () => {
const { cli } = await import("@bytecodealliance/preview3-shim");

const fakeStdin = Readable.from([]);
const originalStdin = process.stdin;

Object.defineProperty(process, "stdin", {
value: fakeStdin,
configurable: true,
});

try {
const [streamReader, result] = cli.stdin.readViaStream();
const iterator = streamReader[Symbol.asyncIterator]();

expect(await iterator.next()).toEqual({ value: undefined, done: true });
expect(await result).toEqual({ tag: "ok", val: undefined });
} finally {
Object.defineProperty(process, "stdin", { value: originalStdin });
}
});

test("writeViaStream writes to stderr", async () => {
const { cli } = await import("@bytecodealliance/preview3-shim");
const { stream } = await import("@bytecodealliance/preview3-shim/stream");
Expand Down
10 changes: 10 additions & 0 deletions packages/preview3-shim/test/future.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ describe("Node.js Preview3 canon future", () => {
await expect(rx.read()).rejects.toThrow("aborted");
});

test("reader has one shot semantics", async () => {
const { future } = await import("@bytecodealliance/preview3-shim/future");
const { tx, rx } = future();

await tx.write("thenable");

expect(await rx).toBe("thenable");
expect(await rx.read()).toBeNull();
});

test("writer cannot write twice", async () => {
const { future } = await import("@bytecodealliance/preview3-shim/future");
const { tx, rx } = future();
Expand Down
Loading