diff --git a/examples/jpar-stream.ilo b/examples/jpar-stream.ilo new file mode 100644 index 00000000..2eb03f2e --- /dev/null +++ b/examples/jpar-stream.ilo @@ -0,0 +1,14 @@ +-- JSONL streaming: rdjl reads a JSONL file line by line and returns +-- L (R _ t) — a list of per-line parse results. Empty lines are skipped, +-- so log files with stray blank lines parse cleanly. Each entry is +-- wrapped in a Result so a single malformed line doesn't poison the +-- whole stream. + +-- Hydrate a JSONL fixture, then count the parsed entries. In a real +-- pipeline rdjl is used on a file produced by a log shipper or another +-- process; here we generate one in-place so the example is hermetic. +prepare p:t>R t t;wrl p ["{\"k\":1}", "{\"k\":2}", "{\"k\":3}"] +n-lines p:t>n;w=prepare p;es=rdjl p;len es + +-- run: n-lines /tmp/ilo-jpar-stream-example.jsonl +-- out: 3 diff --git a/src/builtins.rs b/src/builtins.rs index c0442c8b..87c16a99 100644 --- a/src/builtins.rs +++ b/src/builtins.rs @@ -109,6 +109,7 @@ pub enum Builtin { Jpth, Jdmp, Jpar, + Rdjl, // HTTP Get, @@ -221,6 +222,7 @@ impl Builtin { "jpth" => Some(Builtin::Jpth), "jdmp" => Some(Builtin::Jdmp), "jpar" => Some(Builtin::Jpar), + "rdjl" => Some(Builtin::Rdjl), "get" => Some(Builtin::Get), "post" => Some(Builtin::Post), "get-many" => Some(Builtin::GetMany), @@ -328,6 +330,7 @@ impl Builtin { Builtin::Jpth => "jpth", Builtin::Jdmp => "jdmp", Builtin::Jpar => "jpar", + Builtin::Rdjl => "rdjl", Builtin::Get => "get", Builtin::Post => "post", Builtin::GetMany => "get-many", @@ -456,6 +459,7 @@ mod tests { "solve", "inv", "det", + "rdjl", ]; for name in &all { let b = Builtin::from_name(name).unwrap_or_else(|| panic!("missing builtin: {name}")); diff --git a/src/interpreter/mod.rs b/src/interpreter/mod.rs index f0942e66..38f28a09 100644 --- a/src/interpreter/mod.rs +++ b/src/interpreter/mod.rs @@ -2197,6 +2197,34 @@ fn call_function(env: &mut Env, name: &str, args: Vec) -> Result { )), }; } + if builtin == Some(Builtin::Rdjl) && args.len() == 1 { + return match &args[0] { + Value::Text(path) => match std::fs::read_to_string(path) { + Ok(content) => { + let mut items: Vec = Vec::new(); + for line in content.split('\n') { + if line.is_empty() { + continue; + } + let parsed = match serde_json::from_str::(line) { + Ok(v) => Value::Ok(Box::new(serde_json_to_value(v))), + Err(e) => Value::Err(Box::new(Value::Text(e.to_string()))), + }; + items.push(parsed); + } + Ok(Value::List(items)) + } + Err(e) => Err(RuntimeError::new( + "ILO-R009", + format!("rdjl failed to read '{}': {}", path, e), + )), + }, + other => Err(RuntimeError::new( + "ILO-R009", + format!("rdjl requires text path, got {:?}", other), + )), + }; + } if builtin == Some(Builtin::Env) && args.len() == 1 { return match &args[0] { diff --git a/src/verify.rs b/src/verify.rs index 4fe6f177..15f056a6 100644 --- a/src/verify.rs +++ b/src/verify.rs @@ -319,6 +319,7 @@ const BUILTINS: &[(&str, &[&str], &str)] = &[ ("fmt", &["t"], "t"), // variadic: fmt template arg1 arg2 … — checked specially ("fmt2", &["n", "n"], "t"), ("jpar", &["t"], "R ? t"), + ("rdjl", &["t"], "L (R ? t)"), // Higher-order: map/flt/fld take a function ref as first arg (special-cased in builtin_check_args) ("map", &["fn", "list"], "list"), ("flt", &["fn", "list"], "list"), @@ -1429,6 +1430,28 @@ fn builtin_check_args( errors, ) } + "rdjl" => { + if let Some(arg) = arg_types.first() + && !compatible(arg, &Ty::Text) + { + errors.push(VerifyError { + code: "ILO-T013", + function: func_ctx.to_string(), + message: format!("'rdjl' expects t (path), got {arg}"), + hint: None, + span, + is_warning: false, + }); + } + // rdjl path → L (R _ t): list of per-line parse results + ( + Ty::List(Box::new(Ty::Result( + Box::new(Ty::Unknown), + Box::new(Ty::Text), + ))), + errors, + ) + } "map" => { // map fn:F a b xs:L a → L b // map fn:F a c b ctx:c xs:L a → L b (closure-bind variant) diff --git a/src/vm/compile_cranelift.rs b/src/vm/compile_cranelift.rs index a8987bc6..a58de5b8 100644 --- a/src/vm/compile_cranelift.rs +++ b/src/vm/compile_cranelift.rs @@ -113,6 +113,7 @@ struct HelperFuncs { jpth: FuncId, jdmp: FuncId, jpar: FuncId, + rdjl: FuncId, call: FuncId, // Type predicates isnum: FuncId, @@ -273,6 +274,7 @@ fn declare_all_helpers(module: &mut ObjectModule) -> HelperFuncs { jpth: declare_helper(module, "jit_jpth", 2, 1), jdmp: declare_helper(module, "jit_jdmp", 1, 1), jpar: declare_helper(module, "jit_jpar", 1, 1), + rdjl: declare_helper(module, "jit_rdjl", 1, 1), call: declare_helper(module, "jit_call", 4, 1), // Type predicates isnum: declare_helper(module, "jit_isnum", 1, 1), @@ -993,13 +995,14 @@ fn compile_function_body( | OP_UNWRAP | OP_RECFLD | OP_RECFLD_NAME | OP_LISTGET | OP_INDEX | OP_STR | OP_HD | OP_AT | OP_FMT2 | OP_TL | OP_REV | OP_SRT | OP_SRTDESC | OP_SLC | OP_TAKE | OP_DROP | OP_SPL | OP_CAT | OP_GET | OP_POST | OP_GETH | OP_POSTH - | OP_GETMANY | OP_ENV | OP_JPTH | OP_JDMP | OP_JPAR | OP_MAPNEW | OP_MGET - | OP_MSET | OP_MDEL | OP_MKEYS | OP_MVALS | OP_LISTNEW | OP_LISTAPPEND - | OP_RECNEW | OP_RECWITH | OP_PRT | OP_RD | OP_RDL | OP_WR | OP_WRL | OP_TRM - | OP_UPR | OP_LWR | OP_CAP | OP_PADL | OP_PADR | OP_UNQ | OP_UNIQBY - | OP_PARTITION | OP_FRQ | OP_NUM | OP_RGXSUB | OP_ZIP | OP_ENUMERATE | OP_RANGE - | OP_WINDOW | OP_CHUNKS | OP_CUMSUM | OP_SETUNION | OP_SETINTER | OP_SETDIFF - | OP_FFT | OP_IFFT | OP_TRANSPOSE | OP_MATMUL | OP_INV | OP_SOLVE => { + | OP_GETMANY | OP_ENV | OP_JPTH | OP_JDMP | OP_JPAR | OP_RDJL | OP_MAPNEW + | OP_MGET | OP_MSET | OP_MDEL | OP_MKEYS | OP_MVALS | OP_LISTNEW + | OP_LISTAPPEND | OP_RECNEW | OP_RECWITH | OP_PRT | OP_RD | OP_RDL | OP_WR + | OP_WRL | OP_TRM | OP_UPR | OP_LWR | OP_CAP | OP_PADL | OP_PADR | OP_UNQ + | OP_UNIQBY | OP_PARTITION | OP_FRQ | OP_NUM | OP_RGXSUB | OP_ZIP + | OP_ENUMERATE | OP_RANGE | OP_WINDOW | OP_CHUNKS | OP_CUMSUM | OP_SETUNION + | OP_SETINTER | OP_SETDIFF | OP_FFT | OP_IFFT | OP_TRANSPOSE | OP_MATMUL + | OP_INV | OP_SOLVE => { non_num_write[a] = true; non_bool_write[a] = true; } @@ -3055,6 +3058,13 @@ fn compile_function_body( let result = builder.inst_results(call_inst)[0]; builder.def_var(vars[a_idx], result); } + OP_RDJL => { + let bv = builder.use_var(vars[b_idx]); + let fref = get_func_ref(&mut builder, module, helpers.rdjl); + let call_inst = builder.ins().call(fref, &[bv]); + let result = builder.inst_results(call_inst)[0]; + builder.def_var(vars[a_idx], result); + } // ── Type predicates ── OP_ISNUM => { let bv = builder.use_var(vars[b_idx]); diff --git a/src/vm/jit_cranelift.rs b/src/vm/jit_cranelift.rs index 1a56ab81..492340a9 100644 --- a/src/vm/jit_cranelift.rs +++ b/src/vm/jit_cranelift.rs @@ -121,6 +121,7 @@ struct HelperFuncs { jpth: FuncId, jdmp: FuncId, jpar: FuncId, + rdjl: FuncId, call: FuncId, // Type predicates isnum: FuncId, @@ -271,6 +272,7 @@ fn register_helpers(builder: &mut JITBuilder) { ("jit_jpth", jit_jpth as *const u8), ("jit_jdmp", jit_jdmp as *const u8), ("jit_jpar", jit_jpar as *const u8), + ("jit_rdjl", jit_rdjl as *const u8), ("jit_call", jit_call as *const u8), // Type predicates ("jit_isnum", jit_isnum as *const u8), @@ -411,6 +413,7 @@ fn declare_all_helpers(module: &mut JITModule) -> HelperFuncs { jpth: declare_helper(module, "jit_jpth", 2, 1), jdmp: declare_helper(module, "jit_jdmp", 1, 1), jpar: declare_helper(module, "jit_jpar", 1, 1), + rdjl: declare_helper(module, "jit_rdjl", 1, 1), call: declare_helper(module, "jit_call", 4, 1), // Type predicates isnum: declare_helper(module, "jit_isnum", 1, 1), @@ -1016,7 +1019,7 @@ fn compile_function_body( | OP_SETUNION | OP_SETINTER | OP_SETDIFF | OP_INV | OP_SOLVE | OP_SPL | OP_CAT | OP_GET | OP_POST | OP_GETH | OP_POSTH | OP_GETMANY - | OP_ENV | OP_JPTH | OP_JDMP | OP_JPAR + | OP_ENV | OP_JPTH | OP_JDMP | OP_JPAR | OP_RDJL | OP_MAPNEW | OP_MGET | OP_MSET | OP_MDEL | OP_MKEYS | OP_MVALS | OP_LISTNEW | OP_LISTAPPEND | OP_RECNEW | OP_RECWITH @@ -3602,6 +3605,13 @@ fn compile_function_body( let result = builder.inst_results(call_inst)[0]; builder.def_var(vars[a_idx], result); } + OP_RDJL => { + let bv = builder.use_var(vars[b_idx]); + let fref = get_func_ref(&mut builder, module, helpers.rdjl); + let call_inst = builder.ins().call(fref, &[bv]); + let result = builder.inst_results(call_inst)[0]; + builder.def_var(vars[a_idx], result); + } // ── Type predicates (1-arg → 1 return) ── OP_ISNUM => { let bv = builder.use_var(vars[b_idx]); diff --git a/src/vm/mod.rs b/src/vm/mod.rs index a51481cd..690b2c47 100644 --- a/src/vm/mod.rs +++ b/src/vm/mod.rs @@ -250,6 +250,7 @@ pub(crate) const OP_PADL: u8 = 121; // R[A] = pad_left(R[B], R[C]) (text, width pub(crate) const OP_PADR: u8 = 122; // R[A] = pad_right(R[B], R[C]) (text, width → text) pub(crate) const OP_GETMANY: u8 = 136; // R[A] = get_many(R[B]) (L t → L (R t t), concurrent fan-out) +pub(crate) const OP_RDJL: u8 = 135; // R[A] = rdjl(R[B]) (read JSONL file → L (R _ t)) // Linear algebra advanced. pub(crate) const OP_SOLVE: u8 = 126; // R[A] = solve(R[B], R[C]) — solve Ax = b @@ -2271,6 +2272,15 @@ impl RegCompiler { } return ra; } + (Builtin::Rdjl, 1) => { + // rdjl path → L (R _ t). Not a Result-returning op, so `!` + // is unsupported here; the verifier rejects it via the + // standard return-type check. + let rb = self.compile_expr(&args[0]); + let ra = self.alloc_reg(); + self.emit_abc(OP_RDJL, ra, rb, 0); + return ra; + } // Map builtins (Builtin::Mmap, 0) => { let ra = self.alloc_reg(); @@ -2786,10 +2796,10 @@ fn chunk_is_all_numeric(chunk: &Chunk) -> bool { OP_RECNEW | OP_LISTNEW | OP_RECWITH | OP_WRAPOK | OP_WRAPERR | OP_STR | OP_CAT | OP_SPL | OP_REV | OP_SRT | OP_SRTDESC | OP_SLC | OP_TAKE | OP_DROP | OP_UNQ | OP_UNIQBY | OP_FRQ | OP_PARTITION | OP_LISTAPPEND | OP_JPAR | OP_JDMP | OP_ENV - | OP_GET | OP_GETH | OP_GETMANY | OP_POST | OP_POSTH | OP_RD | OP_RDL | OP_WR - | OP_WRL | OP_MAPNEW | OP_MGET | OP_MSET | OP_MKEYS | OP_MVALS | OP_HD | OP_AT - | OP_LST | OP_TL | OP_FMT2 | OP_RGXSUB | OP_ZIP | OP_ENUMERATE | OP_WINDOW | OP_FFT - | OP_IFFT | OP_RANGE | OP_CHUNKS | OP_CUMSUM | OP_SETUNION | OP_SETINTER + | OP_GET | OP_GETH | OP_GETMANY | OP_POST | OP_POSTH | OP_RD | OP_RDL | OP_RDJL + | OP_WR | OP_WRL | OP_MAPNEW | OP_MGET | OP_MSET | OP_MKEYS | OP_MVALS | OP_HD + | OP_AT | OP_LST | OP_TL | OP_FMT2 | OP_RGXSUB | OP_ZIP | OP_ENUMERATE | OP_WINDOW + | OP_FFT | OP_IFFT | OP_RANGE | OP_CHUNKS | OP_CUMSUM | OP_SETUNION | OP_SETINTER | OP_SETDIFF | OP_TRANSPOSE | OP_MATMUL | OP_INV | OP_SOLVE => { return false; } @@ -6169,6 +6179,40 @@ impl<'a> VM<'a> { }; reg_set!(a, result); } + OP_RDJL => { + let a = ((inst >> 16) & 0xFF) as usize + base; + let b = ((inst >> 8) & 0xFF) as usize + base; + let v = reg!(b); + if !v.is_string() { + vm_err!(VmError::Type("rdjl requires a string path")); + } + // SAFETY: is_string() confirmed heap-tagged string with live RC. + let path = unsafe { + match v.as_heap_ref() { + HeapObj::Str(s) => s.as_str().to_owned(), + _ => unreachable!(), + } + }; + match std::fs::read_to_string(&path) { + Ok(content) => { + let mut items: Vec = Vec::new(); + for line in content.split('\n') { + if line.is_empty() { + continue; + } + let entry = match serde_json::from_str::(line) { + Ok(parsed) => NanVal::heap_ok(serde_json_to_nanval(parsed)), + Err(e) => NanVal::heap_err(NanVal::heap_string(e.to_string())), + }; + items.push(entry); + } + reg_set!(a, NanVal::heap_list(items)); + } + Err(_) => { + vm_err!(VmError::Type("rdjl failed to read file")); + } + } + } OP_SPL => { let a = ((inst >> 16) & 0xFF) as usize + base; let b = ((inst >> 8) & 0xFF) as usize + base; @@ -10083,6 +10127,42 @@ pub(crate) extern "C" fn jit_jpar(a: u64) -> u64 { } } +#[cfg(feature = "cranelift")] +#[unsafe(no_mangle)] +pub(crate) extern "C" fn jit_rdjl(a: u64) -> u64 { + let v = NanVal(a); + if !v.is_string() { + return TAG_NIL; + } + let path = unsafe { + match v.as_heap_ref() { + HeapObj::Str(s) => s.as_str().to_owned(), + _ => unreachable!(), + } + }; + match std::fs::read_to_string(&path) { + Ok(content) => { + let mut items: Vec = Vec::new(); + for line in content.split('\n') { + if line.is_empty() { + continue; + } + let entry = match serde_json::from_str::(line) { + Ok(parsed) => NanVal::heap_ok(serde_json_to_nanval(parsed)), + Err(e) => NanVal::heap_err(NanVal::heap_string(e.to_string())), + }; + items.push(entry); + } + NanVal::heap_list(items).0 + } + // On file-read failure JIT returns nil; the VM dispatch path raises a + // typed runtime error. JIT helper-callers don't have a runtime-error + // channel, so nil is the conventional signal here (matches jit_rd / + // jit_jpar conventions for non-string inputs). + Err(_) => TAG_NIL, + } +} + /// Call a VM function from JIT code. `func_idx` is the chunk index, /// `regs` points to `n_args` u64 values. Returns the result as u64. #[cfg(feature = "cranelift")] diff --git a/tests/regression_jpar_stream.rs b/tests/regression_jpar_stream.rs new file mode 100644 index 00000000..21f348a0 --- /dev/null +++ b/tests/regression_jpar_stream.rs @@ -0,0 +1,155 @@ +// Cross-engine regression tests for `rdjl` — JSONL streaming. +// +// rdjl path:t → L (R _ t) +// +// Reads a file line by line, parses each non-empty line as JSON, and +// wraps the result so a single malformed line never poisons the whole +// stream. These tests cover: +// 1. happy path: every line parses +// 2. mixed valid / invalid: malformed lines yield Err entries +// 3. empty file: empty list +// 4. blank lines: skipped, not surfaced as Err +// Every case is exercised through the tree-walker, the register VM, and +// (when compiled in) the Cranelift JIT, matching the cross-engine +// convention used elsewhere in this crate. + +use std::path::PathBuf; +use std::process::Command; +use std::sync::atomic::{AtomicUsize, Ordering}; + +fn ilo() -> Command { + Command::new(env!("CARGO_BIN_EXE_ilo")) +} + +// Unique temp paths per call: pid + monotonic counter keeps cross-engine +// runs from racing on the same fixture when `cargo test` schedules them +// in parallel. +fn temp_path(tag: &str) -> PathBuf { + static COUNTER: AtomicUsize = AtomicUsize::new(0); + let n = COUNTER.fetch_add(1, Ordering::Relaxed); + let pid = std::process::id(); + std::env::temp_dir().join(format!("ilo_rdjl_{tag}_{pid}_{n}.jsonl")) +} + +fn write_fixture(path: &PathBuf, contents: &str) { + std::fs::write(path, contents).expect("write fixture"); +} + +fn run(engine: &str, src: &str, entry: &str, extra: &[&str]) -> String { + let mut cmd = ilo(); + cmd.arg(src).arg(engine).arg(entry); + for a in extra { + cmd.arg(a); + } + let out = cmd.output().expect("failed to run ilo"); + assert!( + out.status.success(), + "ilo {engine} failed for `{src}` (entry={entry}): stderr={}", + String::from_utf8_lossy(&out.stderr) + ); + String::from_utf8_lossy(&out.stdout).trim().to_string() +} + +fn engines() -> Vec<&'static str> { + let mut v = vec!["--run-tree", "--run-vm"]; + if cfg!(feature = "cranelift") { + v.push("--run-cranelift"); + } + v +} + +// ── len of result list: works on every engine (no higher-order calls) ─ +// +// All four numeric assertions share this one entry point — we just +// vary the file contents — which keeps the test surface small while +// still exercising rdjl on each engine. +const COUNT_SRC: &str = "count p:t>n;es=rdjl p;len es"; + +#[test] +fn rdjl_three_well_formed_lines() { + let path = temp_path("happy"); + write_fixture(&path, "{\"amount\":10}\n{\"amount\":20}\n{\"amount\":12}\n"); + for engine in engines() { + let got = run(engine, COUNT_SRC, "count", &[path.to_str().unwrap()]); + assert_eq!(got, "3", "engine={engine}"); + } + let _ = std::fs::remove_file(&path); +} + +#[test] +fn rdjl_empty_file_yields_empty_list() { + let path = temp_path("empty"); + write_fixture(&path, ""); + for engine in engines() { + let got = run(engine, COUNT_SRC, "count", &[path.to_str().unwrap()]); + assert_eq!(got, "0", "engine={engine}"); + } + let _ = std::fs::remove_file(&path); +} + +#[test] +fn rdjl_skips_blank_lines() { + let path = temp_path("blanks"); + write_fixture(&path, "{\"x\":1}\n\n{\"x\":2}\n\n\n{\"x\":3}\n"); + for engine in engines() { + let got = run(engine, COUNT_SRC, "count", &[path.to_str().unwrap()]); + assert_eq!(got, "3", "engine={engine}"); + } + let _ = std::fs::remove_file(&path); +} + +#[test] +fn rdjl_mixed_lines_each_wrapped() { + let path = temp_path("mixed"); + // Three valid lines and two malformed ones interleaved. rdjl is + // expected to yield five entries total (3 Ok + 2 Err) rather than + // halting at the first parse error. + write_fixture( + &path, + "{\"a\":1}\nnot json\n{\"a\":2}\n{also bad\n{\"a\":3}\n", + ); + for engine in engines() { + let got = run(engine, COUNT_SRC, "count", &[path.to_str().unwrap()]); + assert_eq!(got, "5", "engine={engine}"); + } + let _ = std::fs::remove_file(&path); +} + +// ── tree-only: verify Ok and Err entries are distinguishable ───────── +// +// `?r{~v:..;^e:..}` Result-matching is supported on the tree-walker. +// The VM/JIT lack higher-order builtins for the same expressivity, so +// the structural Ok/Err assertion is tree-only — the count tests above +// already confirm the entry shape on the other engines. +const FIRST_OK_SRC: &str = "head-amt p:t>n;es=rdjl p;e=hd es;?e{~v:v.amount;^er:999}"; + +#[test] +fn rdjl_first_line_unwraps_to_record_field() { + let path = temp_path("first"); + write_fixture(&path, "{\"amount\":7}\n{\"amount\":8}\n"); + let got = run( + "--run-tree", + FIRST_OK_SRC, + "head-amt", + &[path.to_str().unwrap()], + ); + assert_eq!(got, "7"); + let _ = std::fs::remove_file(&path); +} + +const HEAD_ERR_SRC: &str = "head-tag p:t>n;es=rdjl p;e=hd es;?e{~v:1;^er:0}"; + +#[test] +fn rdjl_malformed_first_line_is_err() { + let path = temp_path("err"); + write_fixture(&path, "not json\n{\"ok\":true}\n"); + let got = run( + "--run-tree", + HEAD_ERR_SRC, + "head-tag", + &[path.to_str().unwrap()], + ); + // First line is unparseable, so head returns the Err arm (0). + assert_eq!(got, "0"); + let _ = std::fs::remove_file(&path); +}