Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/js-component-bindgen/src/function_bindgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3176,6 +3176,7 @@ impl Bindgen for FunctionBindgen<'_> {
const hostInjectFn = {gen_stream_host_inject_fn}({{
readFn: readFn{tmp},
hostWriteEnd: hostWriteEnd{tmp},
readEnd: readEnd{tmp},
}});
readEnd{tmp}.setHostInjectFn(hostInjectFn);

Expand Down
1 change: 1 addition & 0 deletions crates/js-component-bindgen/src/intrinsics/lower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,7 @@ impl LowerIntrinsic {
const hostInjectFn = {gen_stream_host_inject_fn}({{
readFn: {gen_read_fn_from_lowerable_stream_fn}(stream),
hostWriteEnd: writeEnd,
readEnd,
}});
readEnd.setHostInjectFn(hostInjectFn);

Expand Down
25 changes: 7 additions & 18 deletions crates/js-component-bindgen/src/intrinsics/p3/async_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,9 @@ impl AsyncFutureIntrinsic {


// Before performing this read, if we're dealing with a host-controlled
// future, then we should inject a write, but we can't wait for it to complete
// as we must do the rendesvous read below for the write to complete.
// future, start injecting the write. The injection may depend on sibling
// guest work running, so cleanup is attached without awaiting here; the
// canonical read must be able to return BLOCKED first.
let injectedWritePromise;
if (this.#hostInjectFn) {{
injectedWritePromise = this.#hostInjectFn({{ count: 1 }});
Expand All @@ -687,8 +688,10 @@ impl AsyncFutureIntrinsic {
}});

if (injectedWritePromise) {{
const cleanupFn = await injectedWritePromise;
cleanupFn();
injectedWritePromise.then(
cleanupFn => cleanupFn(),
err => this.setPendingEvent(() => {{ throw err; }}),
);
}}

return {{ buffer }};
Expand Down Expand Up @@ -807,13 +810,6 @@ impl AsyncFutureIntrinsic {
}});
}});

// Perform another write, reusing the buffer
const {{ buffer }} = await this.guestRead({{
buffer,
stringEncoding,
isAsync: true,
}});

if (!this.hasPendingEvent()) {{
throw new Error("missing pending event after blocked future read");
}}
Expand Down Expand Up @@ -862,13 +858,6 @@ impl AsyncFutureIntrinsic {
}});
}});

// Perform another write, reusing the buffer
const {{ buffer }} = await this.guestWrite({{
buffer,
stringEncoding,
isAsync: true,
}});

if (!this.hasPendingEvent()) {{
throw new Error("missing pending event after blocked future write");
}}
Expand Down
113 changes: 58 additions & 55 deletions crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,19 +723,10 @@ impl AsyncStreamIntrinsic {
skipStateCheck,
}});

// If the stream is readable and was lowered from the host,
// when the component is doing a read (i.e. `stream.read`),
// the writer is host-side and may have already written.
//
// We effectively do a just-in-time "write" of the external value,
// if one is present, because what we got from the outside world
// was a reader
//
let onReadFinishFn;
// If the stream is readable and was lowered from the host, the
// writer is host-side. Register the read first; host injection
// will no-op if the read already produced a pending event.
const injectHostWrite = this.isReadable() && !!this.#hostInjectFn;
if (injectHostWrite) {{
onReadFinishFn = await this.#hostInjectFn({{ count }});
}}

// Perform the read/write
this._{rw_fn_name}({{
Expand All @@ -745,15 +736,24 @@ impl AsyncStreamIntrinsic {
componentIdx,
}});

let injectedWritePromise;
if (injectHostWrite) {{
injectedWritePromise = this.#hostInjectFn({{ count }});
}}

// If sync, wait forever but allow task to do other things
if (!this.hasPendingEvent()) {{
if (injectHostWrite) {{
throw new Error('reader unexpectedly blocked after injected write');
}}

if (isAsync) {{
this.setCopyState({stream_end_class}.CopyState.ASYNC_COPYING);
{debug_log_fn}('[{stream_end_class}#copy()] blocked', {{ componentIdx, eventCode, self: this }});
if (injectedWritePromise) {{
// Do not await here: the injected write may depend on sibling
// guest work running, so the canonical read must return BLOCKED.
injectedWritePromise.then(
Comment thread
andreiltd marked this conversation as resolved.
cleanupFn => cleanupFn(),
err => this.setPendingEvent(() => {{ throw err; }}),
);
}}
return {async_blocked_const};
}} else {{
this.setCopyState({stream_end_class}.CopyState.SYNC_COPYING);
Expand All @@ -771,12 +771,11 @@ impl AsyncStreamIntrinsic {
}}
}}

// If we injected a write and the read has completed, we should reset
// we can skip the rest of the async machinery since there the host controlled
// write end does not need to use the pending event machinery
if (injectHostWrite) {{
if (!onReadFinishFn) {{ throw new Error('missing read finish fn'); }}
onReadFinishFn();
// If the read completed immediately after injecting a host write,
// it is safe to await injection cleanup before consuming the event.
if (injectedWritePromise) {{
const cleanupFn = await injectedWritePromise;
cleanupFn();
}}

const event = this.getPendingEvent();
Expand Down Expand Up @@ -898,8 +897,8 @@ impl AsyncStreamIntrinsic {
// just-before reads, no matter what the user does on the other end).
//
if (packedResult === {async_blocked_const} && !this.#isHostOwned) {{
// If the write was blocked, we can only make progress when
// the read side notifies us of a read, then we must attempt the copy again
// If the write was blocked, the pending event produced by the
// read side represents the completed copy.

await new Promise((resolve) => {{
let waitInterval = setInterval(async () => {{
Expand All @@ -909,18 +908,21 @@ impl AsyncStreamIntrinsic {
}});
}});

packedResult = await this.copy({{
isAsync: true,
count,
bufferID,
buffer,
eventCode: {async_event_code_enum}.STREAM_WRITE,
// NOTE: we skip state checks only when dealing with a post blocked
// read/write in the host. This enables the host to quickly pick up the
// guest operation on the otherside quickly.
skipStateCheck: true,
componentIdx: -1,
}});
if (!this.hasPendingEvent()) {{
throw new Error("missing pending event after blocked stream write");
}}

const event = this.getPendingEvent();
if (!event) {{ throw new Error("missing pending event after blocked stream write"); }}

const {{ code, payload0: index, payload1: payload }} = event;

if (code !== {async_event_code_enum}.STREAM_WRITE) {{
throw new Error(`mismatched event code [${{code}}] for host stream write`);
}}

if (index !== this.waitableIdx()) {{ throw new Error('invalid stream end index'); }}
packedResult = payload;

const copied = packedResult >> 4;
if (copied === 0 && this.isDoneState()) {{
Expand Down Expand Up @@ -1016,8 +1018,8 @@ impl AsyncStreamIntrinsic {
}});

if (packedResult === {async_blocked_const}) {{
// If the read was blocked, we can only make progress when
// the write side notifies us of a write, then we must attempt the copy again
// If the read was blocked, the pending event produced by the
// write side represents the completed copy.

await new Promise((resolve) => {{
let waitInterval = setInterval(() => {{
Expand All @@ -1027,24 +1029,22 @@ impl AsyncStreamIntrinsic {
}});
}});

packedResult = await this.copy({{
isAsync: true,
count,
bufferID,
buffer,
eventCode: {async_event_code_enum}.STREAM_READ,
// NOTE: we skip state checks only when dealing with a post blocked
// read/write in the host. This enables the host to quickly pick up the
// guest operation on the otherside quickly.
skipStateCheck: true,
componentIdx: -1,
}});
if (!this.hasPendingEvent()) {{
throw new Error("missing pending event after blocked stream read");
}}

const copied = packedResult >> 4;
if (copied === 0 && this.isDoneState()) {{
reject(new Error("write end dropped during read"));
const event = this.getPendingEvent();
if (!event) {{ throw new Error("missing pending event after blocked stream read"); }}

const {{ code, payload0: index, payload1: payload }} = event;

if (code !== {async_event_code_enum}.STREAM_READ) {{
throw new Error(`mismatched event code [${{code}}] for host stream read`);
}}

if (index !== this.waitableIdx()) {{ throw new Error('invalid stream end index'); }}
packedResult = payload;

if (packedResult === {async_blocked_const}) {{
throw new Error("unexpected double block during read");
}}
Expand Down Expand Up @@ -1860,7 +1860,8 @@ impl AsyncStreamIntrinsic {
output.push_str(&format!(
r#"
function {gen_host_inject_fn}(genArgs) {{
const {{ readFn, hostWriteEnd }} = genArgs;
const {{ readFn, hostWriteEnd, readEnd }} = genArgs;
if (!readEnd) {{ throw new TypeError('missing read end'); }}
const doNothingFn = () => {{}};
const resetWriteEndToIdleFn = () => {{
// After the write is finished, we consume the event that was generated
Expand All @@ -1874,6 +1875,7 @@ impl AsyncStreamIntrinsic {
let {{ count }} = args;
if (count < 0) {{ throw new Error('invalid count'); }}
if (count === 0) {{ return doNothingFn; }}
if (readEnd.hasPendingEvent()) {{ return doNothingFn; }}

// If we get another read when done is already set, that was
// the case of a iterator that returned a final value
Expand Down Expand Up @@ -1905,8 +1907,9 @@ impl AsyncStreamIntrinsic {
}}

await hostWriteEnd.write(values);
resetWriteEndToIdleFn();

return resetWriteEndToIdleFn;
return doNothingFn;
}};
}}
"#
Expand Down
1 change: 1 addition & 0 deletions crates/test-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Test components for use during jco testing
"""

[dependencies]
futures = { version = "0.3.32", default-features = false, features = ["alloc"] }
wit-bindgen = { workspace = true, features = [ "macros", "async-spawn"] }

[build-dependencies]
Expand Down
31 changes: 31 additions & 0 deletions crates/test-components/src/bin/future_concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
mod bindings {
use super::Component;
wit_bindgen::generate!({
world: "future-concurrency",
});
export!(Component);
}

use bindings::exports::jco::test_components::local_run_async;
use bindings::jco::test_components::future_concurrency_host;
use bindings::wit_stream;

struct Component;

impl local_run_async::Guest for Component {
async fn run() {
let (mut tx, rx) = wit_stream::new();

futures::join!(
async {
assert_eq!(future_concurrency_host::write_via_stream(rx).await, 42);
},
async {
tx.write_all(vec![42]).await;
drop(tx);
},
);
}
}

fn main() {}
33 changes: 33 additions & 0 deletions crates/test-components/src/bin/stream_concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
mod bindings {
use super::Component;
wit_bindgen::generate!({
world: "stream-concurrency",
});
export!(Component);
}

use bindings::exports::jco::test_components::stream_concurrency_test;
use bindings::jco::test_components::stream_concurrency_host;
use wit_bindgen::StreamReader;

struct Component;

impl stream_concurrency_test::Guest for Component {
async fn read_after_signal(mut rx: StreamReader<u8>) -> Vec<u8> {
let (values, ()) = futures::join!(
async {
let mut values = Vec::new();
while let Some(value) = rx.next().await {
values.push(value);
}
values
},
async {
stream_concurrency_host::signal();
},
);
values
}
}

fn main() {}
22 changes: 22 additions & 0 deletions crates/test-components/wit/all.wit
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,28 @@ world sleep-post-return-callee {
export sleep-post-return;
}

interface future-concurrency-host {
write-via-stream: func(s: stream<u8>) -> future<u32>;
}

world future-concurrency {
import future-concurrency-host;
export local-run-async;
}

interface stream-concurrency-host {
signal: func();
}

interface stream-concurrency-test {
read-after-signal: async func(s: stream<u8>) -> list<u8>;
}

world stream-concurrency {
import stream-concurrency-host;
export stream-concurrency-test;
}

interface fixed-length-lists-fn {
takes-returns-fixed: func(bools: list<bool, 17>) -> result<list<u8, 32>, string>;
}
Expand Down
Loading
Loading