Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/perry-codegen/src/expr/property_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ pub(crate) fn lower(ctx: &mut FnCtx<'_>, expr: &Expr) -> Result<String> {
(class_name.as_str(), property.as_str()),
(
"ReadableStream",
"getReader" | "cancel" | "tee" | "pipeTo" | "pipeThrough"
"getReader" | "cancel" | "tee" | "pipeTo" | "pipeThrough" | "values"
) | (
"ReadableStreamDefaultReader",
"read" | "releaseLock" | "cancel"
Expand Down
75 changes: 58 additions & 17 deletions crates/perry-hir/src/lower/stmt_loops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,56 @@ use swc_ecma_ast as ast;
use super::*;
use crate::ir::*;

fn unwrap_stream_expr(mut expr: &ast::Expr) -> &ast::Expr {
loop {
expr = match expr {
ast::Expr::TsAs(ts_as) => &ts_as.expr,
ast::Expr::TsNonNull(non_null) => &non_null.expr,
ast::Expr::TsConstAssertion(assertion) => &assertion.expr,
ast::Expr::TsTypeAssertion(assertion) => &assertion.expr,
ast::Expr::Paren(paren) => &paren.expr,
_ => break,
};
}
expr
}

fn web_readable_stream_values_receiver(expr: &ast::Expr) -> Option<&ast::Expr> {
let ast::Expr::Call(call) = unwrap_stream_expr(expr) else {
return None;
};
let ast::Callee::Expr(callee_expr) = &call.callee else {
return None;
};
let ast::Expr::Member(member) = callee_expr.as_ref() else {
return None;
};
if !matches!(&member.prop, ast::MemberProp::Ident(prop) if prop.sym.as_ref() == "values") {
return None;
}
Some(member.obj.as_ref())
}

fn is_web_readable_stream_expr(ctx: &LoweringContext, expr: &ast::Expr) -> bool {
match unwrap_stream_expr(expr) {
ast::Expr::Ident(ident) => {
let name = ident.sym.as_ref();
matches!(
ctx.lookup_native_instance(name),
Some((_, "ReadableStream"))
) || matches!(
ctx.lookup_local_type(name),
Some(Type::Named(n)) if n == "ReadableStream"
)
}
ast::Expr::New(new_expr) => matches!(
new_expr.callee.as_ref(),
ast::Expr::Ident(callee) if callee.sym.as_ref() == "ReadableStream"
),
_ => false,
}
}

pub(crate) fn lower_stmt_for_of(
ctx: &mut LoweringContext,
module: &mut Module,
Expand Down Expand Up @@ -243,7 +293,9 @@ pub(crate) fn lower_stmt_for_of(
// `.length` on the numeric stream handle (0) and silently iterates zero
// times. Mirrors the function-body path in `lower_decl/body_stmt.rs`.
if for_of_stmt.is_await {
let mut iter_inner: &ast::Expr = &for_of_stmt.right;
let stream_source =
web_readable_stream_values_receiver(&for_of_stmt.right).unwrap_or(&for_of_stmt.right);
let mut iter_inner: &ast::Expr = stream_source;
loop {
iter_inner = match iter_inner {
ast::Expr::TsAs(x) => &x.expr,
Expand All @@ -254,20 +306,7 @@ pub(crate) fn lower_stmt_for_of(
};
}
let is_readable_stream = match iter_inner {
ast::Expr::Ident(ident) => {
let name = ident.sym.as_ref();
matches!(
ctx.lookup_native_instance(name),
Some((_, "ReadableStream"))
) || matches!(
ctx.lookup_local_type(name),
Some(Type::Named(n)) if n == "ReadableStream"
)
}
ast::Expr::New(new_expr) => matches!(
new_expr.callee.as_ref(),
ast::Expr::Ident(c) if c.sym.as_ref() == "ReadableStream"
),
ast::Expr::Ident(_) | ast::Expr::New(_) => is_web_readable_stream_expr(ctx, iter_inner),
// #1670: `for await (const c of res.body)` — `res.body` is a
// `ReadableStream` but arrives as a bare `Member` (Any-typed), so
// the Ident arm above misses it. Recognise `<obj>.body` on a
Expand Down Expand Up @@ -298,8 +337,10 @@ pub(crate) fn lower_stmt_for_of(

if is_readable_stream {
let for_scope_mark = ctx.push_block_scope();
// `as T` etc. are erased by lower_expr, so lower the full receiver.
let stream_expr = lower_expr(ctx, &for_of_stmt.right)?;
// `as T` etc. are erased by lower_expr; for `rs.values()` lower
// the underlying stream receiver because this branch drives the
// reader loop directly.
let stream_expr = lower_expr(ctx, stream_source)?;

// const __reader = stream.getReader();
let reader_id = ctx.fresh_local();
Expand Down
72 changes: 56 additions & 16 deletions crates/perry-hir/src/lower_decl/body_stmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,56 @@ use crate::lower_types::*;

use super::*;

fn unwrap_stream_expr(mut expr: &ast::Expr) -> &ast::Expr {
loop {
expr = match expr {
ast::Expr::TsAs(ts_as) => &ts_as.expr,
ast::Expr::TsNonNull(non_null) => &non_null.expr,
ast::Expr::TsConstAssertion(assertion) => &assertion.expr,
ast::Expr::TsTypeAssertion(assertion) => &assertion.expr,
ast::Expr::Paren(paren) => &paren.expr,
_ => break,
};
}
expr
}

fn web_readable_stream_values_receiver(expr: &ast::Expr) -> Option<&ast::Expr> {
let ast::Expr::Call(call) = unwrap_stream_expr(expr) else {
return None;
};
let ast::Callee::Expr(callee_expr) = &call.callee else {
return None;
};
let ast::Expr::Member(member) = callee_expr.as_ref() else {
return None;
};
if !matches!(&member.prop, ast::MemberProp::Ident(prop) if prop.sym.as_ref() == "values") {
return None;
}
Some(member.obj.as_ref())
}

fn is_web_readable_stream_expr(ctx: &LoweringContext, expr: &ast::Expr) -> bool {
match unwrap_stream_expr(expr) {
ast::Expr::Ident(ident) => {
let name = ident.sym.as_ref();
matches!(
ctx.lookup_native_instance(name),
Some((_, "ReadableStream"))
) || matches!(
ctx.lookup_local_type(name),
Some(Type::Named(n)) if n == "ReadableStream"
)
}
ast::Expr::New(new_expr) => matches!(
new_expr.callee.as_ref(),
ast::Expr::Ident(callee) if callee.sym.as_ref() == "ReadableStream"
),
_ => false,
}
}

pub fn lower_body_stmt(ctx: &mut LoweringContext, stmt: &ast::Stmt) -> Result<Vec<Stmt>> {
let mut result = Vec::new();

Expand Down Expand Up @@ -701,7 +751,9 @@ pub fn lower_body_stmt(ctx: &mut LoweringContext, stmt: &ast::Stmt) -> Result<Ve
// idiom — the WHATWG ReadableStream async-iterator isn't in
// the lib.dom.d.ts types Perry sees) is still recognised as a
// ReadableStream and lowered to the getReader/read loop below.
let mut iter_inner: &ast::Expr = &for_of_stmt.right;
let stream_source = web_readable_stream_values_receiver(&for_of_stmt.right)
.unwrap_or(&for_of_stmt.right);
let mut iter_inner: &ast::Expr = stream_source;
loop {
iter_inner = match iter_inner {
ast::Expr::TsAs(x) => &x.expr,
Expand All @@ -712,20 +764,8 @@ pub fn lower_body_stmt(ctx: &mut LoweringContext, stmt: &ast::Stmt) -> Result<Ve
};
}
let is_readable_stream = match iter_inner {
ast::Expr::Ident(ident) => {
let name = ident.sym.as_ref();
// A `new ReadableStream(...)` local is tracked by its
// inferred `Named("ReadableStream")` type, not the
// native-instance registry (that's only populated for
// `response.body` / `blob.stream()` factories). Accept
// either so a directly-constructed Web stream iterates too.
matches!(
ctx.lookup_native_instance(name),
Some((_, "ReadableStream"))
) || matches!(
ctx.lookup_local_type(name),
Some(Type::Named(n)) if n == "ReadableStream"
)
ast::Expr::Ident(_) | ast::Expr::New(_) => {
is_web_readable_stream_expr(ctx, iter_inner)
}
// #1670: `for await (const c of res.body)` — the stream
// arrives as a bare `Member` (Any-typed). Recognise
Expand Down Expand Up @@ -755,7 +795,7 @@ pub fn lower_body_stmt(ctx: &mut LoweringContext, stmt: &ast::Stmt) -> Result<Ve

if is_readable_stream {
let scope_mark = ctx.push_block_scope();
let stream_expr = lower_expr(ctx, &for_of_stmt.right)?;
let stream_expr = lower_expr(ctx, stream_source)?;

// const __reader = stream.getReader();
let reader_id = ctx.fresh_local();
Expand Down
24 changes: 24 additions & 0 deletions crates/perry-runtime/src/symbol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,30 @@ pub unsafe extern "C" fn js_object_get_symbol_property(obj_f64: f64, sym_f64: f6
}
}
}
// #1545: Web ReadableStream handles are normal finite numbers, not
// heap objects. Expose `rs[Symbol.asyncIterator]` as the same bound method
// as `rs.values`, matching Node's Web Streams surface while leaving
// `Symbol.iterator` absent.
if obj_f64.is_finite() && obj_f64 > 0.0 && obj_f64.fract() == 0.0 {
if let Some(kind_probe) = crate::object::stream_handle_kind_probe() {
if kind_probe(obj_f64 as usize) == 1 {
let async_iterator = well_known_symbol("asyncIterator");
if !async_iterator.is_null() {
let async_iterator_f64 = f64::from_bits(
crate::value::JSValue::pointer(async_iterator as *const u8).bits(),
);
if sym_key_from_f64(sym_f64) == sym_key_from_f64(async_iterator_f64) {
let mname = b"values";
return crate::object::js_class_method_bind(
obj_f64,
mname.as_ptr(),
mname.len(),
);
}
}
}
}
}
// #321: arrays expose `Symbol.iterator`. perry has no standalone array
// iterator object (for-of is special-cased), but `arr[Symbol.iterator]`
// must resolve to a callable so `Symbol.iterator in arr` is true
Expand Down
90 changes: 90 additions & 0 deletions crates/perry-stdlib/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,95 @@ pub unsafe extern "C" fn js_reader_read(reader_handle: f64) -> *mut Promise {
promise
}

fn resolved_done_promise() -> f64 {
unsafe {
let promise = js_promise_new();
let result = build_iter_result(TAG_UNDEFINED, true);
js_promise_resolve(promise, f64::from_bits(result));
box_promise(promise)
}
}

fn closure_capture_value(
func: extern "C" fn(*const ClosureHeader) -> f64,
value: f64,
) -> *mut ClosureHeader {
let fn_ptr = func as *const u8;
perry_runtime::closure::js_register_closure_arity(fn_ptr, 0);
let closure = perry_runtime::closure::js_closure_alloc(fn_ptr, 1);
perry_runtime::closure::js_closure_set_capture_ptr(closure, 0, value.to_bits() as i64);
closure
}

fn closure_capture_value_get(closure: *const ClosureHeader) -> f64 {
if closure.is_null() {
return f64::from_bits(TAG_UNDEFINED);
}
let bits = perry_runtime::closure::js_closure_get_capture_ptr(closure, 0) as u64;
f64::from_bits(bits)
}

extern "C" fn readable_stream_iterator_next(closure: *const ClosureHeader) -> f64 {
let reader = closure_capture_value_get(closure);
if reader.to_bits() == TAG_UNDEFINED {
return resolved_done_promise();
}
unsafe { box_promise(js_reader_read(reader)) }
}

extern "C" fn readable_stream_iterator_return(closure: *const ClosureHeader) -> f64 {
let reader = closure_capture_value_get(closure);
if reader.to_bits() != TAG_UNDEFINED {
unsafe {
let _ = js_reader_release_lock(reader);
}
}
resolved_done_promise()
}

extern "C" fn readable_stream_iterator_self(closure: *const ClosureHeader) -> f64 {
closure_capture_value_get(closure)
}

unsafe fn build_readable_stream_iterator(stream_handle: f64) -> f64 {
let reader = js_readable_stream_get_reader(stream_handle);
let obj = js_object_alloc(0, 2);
let keys = js_array_alloc(2);
let k_next = js_string_from_bytes(b"next".as_ptr(), 4);
let k_return = js_string_from_bytes(b"return".as_ptr(), 6);
js_array_push(keys, JSValue::string_ptr(k_next));
js_array_push(keys, JSValue::string_ptr(k_return));
js_object_set_field(
obj,
0,
JSValue::pointer(closure_capture_value(readable_stream_iterator_next, reader) as *const u8),
);
js_object_set_field(
obj,
1,
JSValue::pointer(
closure_capture_value(readable_stream_iterator_return, reader) as *const u8,
),
);
js_object_set_keys(obj, keys);
let iterator = f64::from_bits(JSValue::object_ptr(obj as *mut u8).bits());

let async_iterator = perry_runtime::symbol::well_known_symbol("asyncIterator");
if !async_iterator.is_null() {
let self_closure = closure_capture_value(readable_stream_iterator_self, iterator);
let symbol_value = f64::from_bits(JSValue::pointer(async_iterator as *const u8).bits());
let closure_value = f64::from_bits(JSValue::pointer(self_closure as *const u8).bits());
perry_runtime::symbol::js_object_set_symbol_property(iterator, symbol_value, closure_value);
}

iterator
}

#[no_mangle]
pub unsafe extern "C" fn js_readable_stream_values(stream_handle: f64) -> f64 {
build_readable_stream_iterator(stream_handle)
}

#[no_mangle]
pub unsafe extern "C" fn js_reader_release_lock(reader_handle: f64) -> f64 {
let reader_id = reader_handle as usize;
Expand Down Expand Up @@ -1642,6 +1731,7 @@ pub(crate) unsafe fn dispatch_stream_method(
if is_readable {
match method {
"getReader" => return Some(js_readable_stream_get_reader(handle)),
"values" | "@@asyncIterator" => return Some(js_readable_stream_values(handle)),
"cancel" => return Some(box_promise(js_readable_stream_cancel(handle, arg0))),
"tee" => return Some(js_readable_stream_tee(handle)),
"pipeTo" => return Some(box_promise(js_readable_stream_pipe_to(handle, arg0))),
Expand Down
12 changes: 0 additions & 12 deletions test-parity/known_failures.json
Original file line number Diff line number Diff line change
Expand Up @@ -1280,12 +1280,6 @@
"category": "bug-open",
"reason": "node:stream: 'readable' event ordering relative to 'end' — extra readable before end missing or out-of-order. Flips to PASS when #1532 lands."
},
"node-suite/stream/web/readable-values-iterator": {
"issue": "1545",
"added": "2026-05-24",
"category": "bug-open",
"reason": "node:stream/web: ReadableStream.values() async-iterator method not yet exposed. Flips to PASS when #1545 lands."
},
"node-suite/stream/events/pipe-arg-source": {
"issue": "1532",
"added": "2026-05-24",
Expand Down Expand Up @@ -1976,12 +1970,6 @@
"category": "bug-open",
"reason": "node:stream/web: pipeThrough(transform, {preventClose:true}) — transform.writable should remain unlocked. Flips to PASS when #1545 lands."
},
"node-suite/stream/web/readable-no-sync-iterator": {
"issue": "1545",
"added": "2026-05-24",
"category": "bug-open",
"reason": "node:stream/web: Web ReadableStream exposes Symbol.iterator (should be undefined; only Symbol.asyncIterator). Flips to PASS when #1545 lands."
},
"node-suite/stream/promises/finished-returns-promise": {
"issue": "1532",
"added": "2026-05-24",
Expand Down