diff --git a/execution_graph/src/graph.rs b/execution_graph/src/graph.rs index 38f6e6e..5035f7b 100644 --- a/execution_graph/src/graph.rs +++ b/execution_graph/src/graph.rs @@ -242,8 +242,13 @@ impl Node { /// If you want to invalidate using the tape key type directly, use /// [`ExecutionGraph::invalidate_tape_key`]. /// - Dependencies are refined dynamically: after each run, each output key’s dependency set is -/// replaced with “all reads observed during that run”. The [`connect`](ExecutionGraph::connect) -/// method adds conservative edges to enforce initial topological ordering before the first run. +/// replaced with “all reads observed during that run, minus any key the node also wrote”. A node +/// is therefore never re-triggered by its own writes — a node that reads and writes the same key +/// (a read-modify-write) reaches a fixpoint — while *other* nodes that read the written key are +/// still invalidated. Because such a key is excluded from the writer's own dependency set, even +/// an external invalidation of it will not re-run that node: treat a key a node writes as an +/// output it owns, not an input. The [`connect`](ExecutionGraph::connect) method adds +/// conservative edges to enforce initial topological ordering before the first run. /// - [`ExecutionGraph::run_all`] / [`ExecutionGraph::run_node`] execute dirty work and return a /// cheap executed-node summary. /// - If you need “why re-ran” data, use [`ExecutionGraph::run_all_with_report`] / @@ -274,6 +279,7 @@ struct Scratch { to_run: Vec, seen_stamp: Vec, read_ids: Vec, + write_ids: Vec, args: Vec, stamp: u32, } @@ -312,6 +318,28 @@ impl Scratch { self.to_run.push(node); true } + + /// Canonicalizes `read_ids` in place into the set of dependencies the caller will install for + /// this node's outputs (via [`DirtyEngine::set_dependencies`]); this method does not touch the + /// dirty engine itself. + /// + /// Reads are sorted and deduped to set semantics, so host/read emission order does not cause + /// spurious dependency-set "changes" across runs. Any key the node also *wrote* this run is + /// then removed from `read_ids`: the write already marked that key dirty (so *other* nodes + /// that read it are still invalidated), but a node must not depend on — and so be re-triggered + /// by — its own writes, or a read-modify-write node would never reach a fixpoint. + #[inline] + fn finalize_node_deps(&mut self) { + self.read_ids.sort_unstable(); + self.read_ids.dedup(); + if !self.write_ids.is_empty() { + self.write_ids.sort_unstable(); + self.write_ids.dedup(); + let write_ids = &self.write_ids; + self.read_ids + .retain(|id| write_ids.binary_search(id).is_err()); + } + } } impl ExecutionGraph { @@ -984,6 +1012,7 @@ impl ExecutionGraph { let mut log = collect_access.then(AccessLog::new); self.scratch.read_ids.clear(); + self.scratch.write_ids.clear(); for (slot, name) in n.input_names.iter().enumerate() { let b = n.inputs.get(slot).and_then(Option::as_ref).ok_or_else(|| { @@ -1042,6 +1071,7 @@ impl ExecutionGraph { &mut self.host_state_ids, &mut self.opaque_host_ids, &mut self.scratch.read_ids, + &mut self.scratch.write_ids, log, &access_count, )) @@ -1052,6 +1082,7 @@ impl ExecutionGraph { &mut self.host_state_ids, &mut self.opaque_host_ids, &mut self.scratch.read_ids, + &mut self.scratch.write_ids, &access_count, )) }; @@ -1112,11 +1143,9 @@ impl ExecutionGraph { } } - // Update dirty dependencies: each output depends on all reads observed during the run. - // Canonicalize to set semantics so host/read emission order does not cause - // spurious dependency-set "changes" across runs. - self.scratch.read_ids.sort_unstable(); - self.scratch.read_ids.dedup(); + // Refine this node's dependency set from the reads observed during the run (dedup to set + // semantics, then drop any key the node also wrote — see `Scratch::finalize_node_deps`). + self.scratch.finalize_node_deps(); let deps_changed = !self.nodes[node_index].deps_initialized || self.nodes[node_index].last_read_ids != self.scratch.read_ids; @@ -1572,6 +1601,77 @@ mod tests { ); } + #[test] + fn strict_deps_rejects_host_call_whose_only_access_is_an_ignored_input_write() { + // Writes to graph-owned Input keys are ignored (no dependency, no invalidation, no log). + // In strict-deps mode such a write must NOT count as "this host call recorded an access": + // a call whose only event is an ignored Input write reports nothing usable and must trip a + // StrictDepsViolation, just like a call that records nothing at all. + #[derive(Debug, Default)] + struct InputWriteOnly; + + impl Host for InputWriteOnly { + fn call( + &mut self, + symbol: &str, + _sig_hash: SigHash, + _args: &[ValueRef<'_>], + rets: &mut [Value], + mut ctx: HostContext<'_, '_>, + ) -> Result { + if symbol != "write_input" { + return Err(HostError::UnknownSymbol); + } + ctx.record_write(ResourceKeyRef::Input("x")); + rets[0] = Value::I64(7); + Ok(0) + } + } + + let mut pb = ProgramBuilder::new(); + let host_sig = pb.host_sig_for( + "write_input", + HostSig { + args: vec![ValueType::I64], + rets: vec![ValueType::I64], + }, + ); + + let mut a = Asm::new(); + a.const_i64(1, 42); + a.host_call(0, host_sig, 0, &[1], &[2]); + a.ret(0, &[2]); + + let f = pb + .push_function_checked( + a, + FunctionSig { + arg_types: vec![], + ret_types: vec![ValueType::I64], + }, + ) + .unwrap(); + pb.set_function_output_name(f, 0, "value").unwrap(); + + let prog = Arc::new(pb.build_verified().unwrap()); + + let mut g = ExecutionGraph::new(InputWriteOnly, Limits::default()); + let n = g.add_node(prog, f, vec![]).unwrap(); + g.set_strict_deps(true); + + assert_eq!( + g.run_all(), + Err(GraphError::StrictDepsViolation { + node: n, + symbol: "write_input".into(), + sig_hash: sig_hash(&HostSig { + args: vec![ValueType::I64], + rets: vec![ValueType::I64], + }), + }) + ); + } + #[test] fn run_all_errors_on_missing_input_binding() { let mut pb = ProgramBuilder::new(); @@ -2003,6 +2103,185 @@ mod tests { assert_eq!(g.node_run_count(reader), Some(2)); } + #[test] + fn node_that_reads_and_writes_same_key_reaches_fixpoint() { + // A single node whose host call both reads and writes the SAME host-state key + // (a read-modify-write). The write marks the key dirty so other readers would be + // invalidated, but the node must not invalidate *itself*: excluding self-written keys + // from its own dependency set keeps it convergent. Without that exclusion the node + // re-runs on every run_all() forever (run_count would grow 1, 2, 3, ...). + #[derive(Clone)] + struct BumpHost { + kv: Rc>>, + sig: SigHash, + } + + impl Host for BumpHost { + fn call( + &mut self, + symbol: &str, + sig_hash: SigHash, + args: &[ValueRef<'_>], + rets: &mut [Value], + mut ctx: HostContext<'_, '_>, + ) -> Result { + if symbol != "kv.bump" { + return Err(HostError::UnknownSymbol); + } + if sig_hash != self.sig { + return Err(HostError::SignatureMismatch); + } + let [ValueRef::U64(key)] = args else { + return Err(HostError::Failed); + }; + // Read the current value (records a dependency on the key)... + ctx.record_read(ResourceKeyRef::HostState { + op: self.sig, + key: *key, + }); + let next = self.kv.borrow().get(key).unwrap_or(&0) + 1; + self.kv.borrow_mut().insert(*key, next); + // ...then write the bumped value back under the SAME key. + ctx.record_write(ResourceKeyRef::HostState { + op: self.sig, + key: *key, + }); + rets[0] = Value::I64(next); + Ok(0) + } + } + + let bump_sig = HostSig { + args: vec![ValueType::U64], + rets: vec![ValueType::I64], + }; + let bump_hash = sig_hash(&bump_sig); + + let mut pb = ProgramBuilder::new(); + let bump_host = pb.host_sig_for("kv.bump", bump_sig); + let mut asm = Asm::new(); + asm.const_u64(1, 1); + asm.host_call(0, bump_host, 0, &[1], &[2]); + asm.ret(0, &[2]); + let entry = pb + .push_function_checked( + asm, + FunctionSig { + arg_types: vec![], + ret_types: vec![ValueType::I64], + }, + ) + .unwrap(); + pb.set_function_output_name(entry, 0, "value").unwrap(); + let prog = Arc::new(pb.build_verified().unwrap()); + + let kv = Rc::new(RefCell::new(BTreeMap::new())); + let host = BumpHost { kv, sig: bump_hash }; + + let mut g = ExecutionGraph::new(host, Limits::default()); + let n = g.add_node(prog, entry, vec![]).unwrap(); + + g.run_all().unwrap(); + assert_eq!(g.node_run_count(n), Some(1)); + assert_eq!( + g.node_outputs(n).unwrap().get("value"), + Some(&Value::I64(1)) + ); + + // No external invalidation between calls: the node's own write must not re-trigger it, + // so repeated run_all() calls are no-ops and the bumped value stays put. + g.run_all().unwrap(); + g.run_all().unwrap(); + assert_eq!( + g.node_run_count(n), + Some(1), + "a node that reads and writes the same key must not re-run itself" + ); + assert_eq!( + g.node_outputs(n).unwrap().get("value"), + Some(&Value::I64(1)) + ); + } + + #[test] + fn host_write_to_input_key_does_not_drop_graph_input_dependency() { + // A host call writes a graph `Input` key whose name matches the node's own input binding. + // `Input` keys are graph-owned, so the write must be ignored — otherwise it would intern + // to the same id as the binding dependency and the self-write filter would strip it, + // leaving the node stale after a later `invalidate_input`. + struct PublishHost; + + impl Host for PublishHost { + fn call( + &mut self, + symbol: &str, + _sig_hash: SigHash, + args: &[ValueRef<'_>], + rets: &mut [Value], + mut ctx: HostContext<'_, '_>, + ) -> Result { + if symbol != "publish" { + return Err(HostError::UnknownSymbol); + } + let [ValueRef::I64(v)] = args else { + return Err(HostError::Failed); + }; + // Host misuses a graph-owned Input key as a write target. + ctx.record_write(ResourceKeyRef::Input("x")); + rets[0] = Value::I64(*v); + Ok(0) + } + } + + let publish_sig = HostSig { + args: vec![ValueType::I64], + rets: vec![ValueType::I64], + }; + + let mut pb = ProgramBuilder::new(); + let publish = pb.host_sig_for("publish", publish_sig); + let mut asm = Asm::new(); + asm.host_call(0, publish, 0, &[1], &[2]); + asm.ret(0, &[2]); + let entry = pb + .push_function_checked( + asm, + FunctionSig { + arg_types: vec![ValueType::I64], + ret_types: vec![ValueType::I64], + }, + ) + .unwrap(); + pb.set_function_output_name(entry, 0, "value").unwrap(); + let prog = Arc::new(pb.build_verified().unwrap()); + + let mut g = ExecutionGraph::new(PublishHost, Limits::default()); + let n = g.add_node(prog, entry, vec!["x".into()]).unwrap(); + g.set_input_value(n, "x", Value::I64(1)).unwrap(); + + g.run_all().unwrap(); + assert_eq!(g.node_run_count(n), Some(1)); + assert_eq!( + g.node_outputs(n).unwrap().get("value"), + Some(&Value::I64(1)) + ); + + // The host's Input-key write is ignored, so the node keeps its "x" binding dependency: + // changing and invalidating "x" must still rerun the node and refresh its output. + g.set_input_value(n, "x", Value::I64(2)).unwrap(); + g.invalidate_input("x"); + g.run_all().unwrap(); + assert_eq!( + g.node_run_count(n), + Some(2), + "graph input dependency must survive a host write to the same Input key" + ); + assert_eq!( + g.node_outputs(n).unwrap().get("value"), + Some(&Value::I64(2)) + ); + } + #[test] fn host_read_order_changes_do_not_change_last_read_ids() { #[derive(Clone)] diff --git a/execution_graph/src/tape_access.rs b/execution_graph/src/tape_access.rs index 8c0c1dd..915cccd 100644 --- a/execution_graph/src/tape_access.rs +++ b/execution_graph/src/tape_access.rs @@ -36,6 +36,7 @@ pub(crate) struct CollectingAccessSink<'a> { host_state_ids: &'a mut HashMap<(HostOpId, u64), DirtyKey>, opaque_host_ids: &'a mut HashMap, read_ids: &'a mut Vec, + write_ids: &'a mut Vec, log: &'a mut AccessLog, counter: &'a Cell, } @@ -55,6 +56,7 @@ impl<'a> CollectingAccessSink<'a> { host_state_ids: &'a mut HashMap<(HostOpId, u64), DirtyKey>, opaque_host_ids: &'a mut HashMap, read_ids: &'a mut Vec, + write_ids: &'a mut Vec, log: &'a mut AccessLog, counter: &'a Cell, ) -> Self { @@ -64,6 +66,7 @@ impl<'a> CollectingAccessSink<'a> { host_state_ids, opaque_host_ids, read_ids, + write_ids, log, counter, } @@ -97,15 +100,17 @@ impl AccessSink for CollectingAccessSink<'_> { } fn write(&mut self, key: ResourceKeyRef<'_>) { - self.counter.set(self.counter.get().saturating_add(1)); - let key = mark_tape_key_dirty( - self.dirty, - self.input_ids, - self.host_state_ids, - self.opaque_host_ids, - key, - ); - self.log.push(Access::Write(key)); + if let Some((id, key)) = + mark_tape_key_dirty(self.dirty, self.host_state_ids, self.opaque_host_ids, key) + { + // Count only accepted writes as strict-deps access events: an ignored write (e.g. to a + // graph-owned Input key) reports nothing usable, so it must not satisfy strict-deps. + self.counter.set(self.counter.get().saturating_add(1)); + // Record the write so a node is not re-triggered by its own write (see + // `run_node_internal`, which excludes self-written keys from the node's dependency set). + self.write_ids.push(id); + self.log.push(Access::Write(key)); + } } } @@ -159,31 +164,39 @@ pub(crate) fn intern_opaque_host_key_id( id } +/// Interns a host-written key, marks it dirty, and returns its [`DirtyKey`] id and owned +/// [`ResourceKey`]. +/// +/// Marking the key dirty is what invalidates *other* nodes that read it. The returned id lets the +/// caller record the write so the writing node can exclude its own writes from its dependency set +/// (preventing a read-modify-write node from re-triggering itself indefinitely). +/// +/// Writes to [`ResourceKeyRef::Input`] are ignored (returns `None`): graph inputs are owned by the +/// graph (set via `set_input_value` / `invalidate_input`), not by host calls. An `Input` write +/// would intern to the same id as a node's input-binding dependency, and recording it would let +/// the self-write filter strip that binding — silently dropping a real dependency. Hosts should +/// only write the host-owned `HostState` / `OpaqueHost` namespaces. #[inline] fn mark_tape_key_dirty( dirty: &mut DirtyEngine, - input_ids: &mut BTreeMap, DirtyKey>, host_state_ids: &mut HashMap<(HostOpId, u64), DirtyKey>, opaque_host_ids: &mut HashMap, key: ResourceKeyRef<'_>, -) -> ResourceKey { +) -> Option<(DirtyKey, ResourceKey)> { match key { - ResourceKeyRef::Input(name) => { - let id = intern_input_key_id(dirty, input_ids, name); - dirty.mark_dirty(id); - ResourceKey::input(name) - } + // Graph-owned: not a valid host write target. Ignore rather than collide with bindings. + ResourceKeyRef::Input(_) => None, ResourceKeyRef::HostState { op, key } => { let op = HostOpId::new(op.0); let id = intern_host_state_key_id(dirty, host_state_ids, op, key); dirty.mark_dirty(id); - ResourceKey::host_state(op, key) + Some((id, ResourceKey::host_state(op, key))) } ResourceKeyRef::OpaqueHost { op } => { let op = HostOpId::new(op.0); let id = intern_opaque_host_key_id(dirty, opaque_host_ids, op); dirty.mark_dirty(id); - ResourceKey::opaque_host(op) + Some((id, ResourceKey::opaque_host(op))) } } } @@ -199,6 +212,7 @@ pub(crate) struct DepsOnlyAccessSink<'a> { host_state_ids: &'a mut HashMap<(HostOpId, u64), DirtyKey>, opaque_host_ids: &'a mut HashMap, read_ids: &'a mut Vec, + write_ids: &'a mut Vec, counter: &'a Cell, } @@ -215,6 +229,7 @@ impl<'a> DepsOnlyAccessSink<'a> { host_state_ids: &'a mut HashMap<(HostOpId, u64), DirtyKey>, opaque_host_ids: &'a mut HashMap, read_ids: &'a mut Vec, + write_ids: &'a mut Vec, counter: &'a Cell, ) -> Self { Self { @@ -223,6 +238,7 @@ impl<'a> DepsOnlyAccessSink<'a> { host_state_ids, opaque_host_ids, read_ids, + write_ids, counter, } } @@ -247,15 +263,16 @@ impl AccessSink for DepsOnlyAccessSink<'_> { #[inline] fn write(&mut self, key: ResourceKeyRef<'_>) { - // Strict-deps mode requires host scopes to emit at least one access event. - self.counter.set(self.counter.get().saturating_add(1)); - let _ = mark_tape_key_dirty( - self.dirty, - self.input_ids, - self.host_state_ids, - self.opaque_host_ids, - key, - ); + if let Some((id, _key)) = + mark_tape_key_dirty(self.dirty, self.host_state_ids, self.opaque_host_ids, key) + { + // Count only accepted writes as strict-deps access events: an ignored write (e.g. to a + // graph-owned Input key) reports nothing usable, so it must not satisfy strict-deps. + self.counter.set(self.counter.get().saturating_add(1)); + // Record the write so a node is not re-triggered by its own write (see + // `run_node_internal`, which excludes self-written keys from the node's dependency set). + self.write_ids.push(id); + } } }