Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 286 additions & 7 deletions execution_graph/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,13 @@ impl Node {
/// If you want to invalidate using the tape key type directly, use
/// [`ExecutionGraph::invalidate_tape_key`].
/// - Dependencies are refined dynamically: after each run, each output key’s dependency set is
/// replaced with “all reads observed during that run”. The [`connect`](ExecutionGraph::connect)
/// method adds conservative edges to enforce initial topological ordering before the first run.
/// replaced with “all reads observed during that run, minus any key the node also wrote”. A node
/// is therefore never re-triggered by its own writes — a node that reads and writes the same key
/// (a read-modify-write) reaches a fixpoint — while *other* nodes that read the written key are
/// still invalidated. Because such a key is excluded from the writer's own dependency set, even
/// an external invalidation of it will not re-run that node: treat a key a node writes as an
/// output it owns, not an input. The [`connect`](ExecutionGraph::connect) method adds
/// conservative edges to enforce initial topological ordering before the first run.
/// - [`ExecutionGraph::run_all`] / [`ExecutionGraph::run_node`] execute dirty work and return a
/// cheap executed-node summary.
/// - If you need “why re-ran” data, use [`ExecutionGraph::run_all_with_report`] /
Expand Down Expand Up @@ -274,6 +279,7 @@ struct Scratch {
to_run: Vec<NodeId>,
seen_stamp: Vec<u32>,
read_ids: Vec<DirtyKey>,
write_ids: Vec<DirtyKey>,
args: Vec<Value>,
stamp: u32,
}
Expand Down Expand Up @@ -312,6 +318,28 @@ impl Scratch {
self.to_run.push(node);
true
}

/// Canonicalizes `read_ids` in place into the set of dependencies the caller will install for
/// this node's outputs (via [`DirtyEngine::set_dependencies`]); this method does not touch the
/// dirty engine itself.
///
/// Reads are sorted and deduped to set semantics, so host/read emission order does not cause
/// spurious dependency-set "changes" across runs. Any key the node also *wrote* this run is
/// then removed from `read_ids`: the write already marked that key dirty (so *other* nodes
/// that read it are still invalidated), but a node must not depend on — and so be re-triggered
/// by — its own writes, or a read-modify-write node would never reach a fixpoint.
#[inline]
fn finalize_node_deps(&mut self) {
self.read_ids.sort_unstable();
self.read_ids.dedup();
if !self.write_ids.is_empty() {
self.write_ids.sort_unstable();
self.write_ids.dedup();
let write_ids = &self.write_ids;
self.read_ids
.retain(|id| write_ids.binary_search(id).is_err());
}
}
}

impl<H: Host> ExecutionGraph<H> {
Expand Down Expand Up @@ -984,6 +1012,7 @@ impl<H: Host> ExecutionGraph<H> {
let mut log = collect_access.then(AccessLog::new);

self.scratch.read_ids.clear();
self.scratch.write_ids.clear();

for (slot, name) in n.input_names.iter().enumerate() {
let b = n.inputs.get(slot).and_then(Option::as_ref).ok_or_else(|| {
Expand Down Expand Up @@ -1042,6 +1071,7 @@ impl<H: Host> ExecutionGraph<H> {
&mut self.host_state_ids,
&mut self.opaque_host_ids,
&mut self.scratch.read_ids,
&mut self.scratch.write_ids,
log,
&access_count,
))
Expand All @@ -1052,6 +1082,7 @@ impl<H: Host> ExecutionGraph<H> {
&mut self.host_state_ids,
&mut self.opaque_host_ids,
&mut self.scratch.read_ids,
&mut self.scratch.write_ids,
&access_count,
))
};
Expand Down Expand Up @@ -1112,11 +1143,9 @@ impl<H: Host> ExecutionGraph<H> {
}
}

// Update dirty dependencies: each output depends on all reads observed during the run.
// Canonicalize to set semantics so host/read emission order does not cause
// spurious dependency-set "changes" across runs.
self.scratch.read_ids.sort_unstable();
self.scratch.read_ids.dedup();
// Refine this node's dependency set from the reads observed during the run (dedup to set
// semantics, then drop any key the node also wrote — see `Scratch::finalize_node_deps`).
self.scratch.finalize_node_deps();

let deps_changed = !self.nodes[node_index].deps_initialized
|| self.nodes[node_index].last_read_ids != self.scratch.read_ids;
Expand Down Expand Up @@ -1572,6 +1601,77 @@ mod tests {
);
}

#[test]
fn strict_deps_rejects_host_call_whose_only_access_is_an_ignored_input_write() {
// Writes to graph-owned Input keys are ignored (no dependency, no invalidation, no log).
// In strict-deps mode such a write must NOT count as "this host call recorded an access":
// a call whose only event is an ignored Input write reports nothing usable and must trip a
// StrictDepsViolation, just like a call that records nothing at all.
#[derive(Debug, Default)]
struct InputWriteOnly;

impl Host for InputWriteOnly {
fn call(
&mut self,
symbol: &str,
_sig_hash: SigHash,
_args: &[ValueRef<'_>],
rets: &mut [Value],
mut ctx: HostContext<'_, '_>,
) -> Result<u64, HostError> {
if symbol != "write_input" {
return Err(HostError::UnknownSymbol);
}
ctx.record_write(ResourceKeyRef::Input("x"));
rets[0] = Value::I64(7);
Ok(0)
}
}

let mut pb = ProgramBuilder::new();
let host_sig = pb.host_sig_for(
"write_input",
HostSig {
args: vec![ValueType::I64],
rets: vec![ValueType::I64],
},
);

let mut a = Asm::new();
a.const_i64(1, 42);
a.host_call(0, host_sig, 0, &[1], &[2]);
a.ret(0, &[2]);

let f = pb
.push_function_checked(
a,
FunctionSig {
arg_types: vec![],
ret_types: vec![ValueType::I64],
},
)
.unwrap();
pb.set_function_output_name(f, 0, "value").unwrap();

let prog = Arc::new(pb.build_verified().unwrap());

let mut g = ExecutionGraph::new(InputWriteOnly, Limits::default());
let n = g.add_node(prog, f, vec![]).unwrap();
g.set_strict_deps(true);

assert_eq!(
g.run_all(),
Err(GraphError::StrictDepsViolation {
node: n,
symbol: "write_input".into(),
sig_hash: sig_hash(&HostSig {
args: vec![ValueType::I64],
rets: vec![ValueType::I64],
}),
})
);
}

#[test]
fn run_all_errors_on_missing_input_binding() {
let mut pb = ProgramBuilder::new();
Expand Down Expand Up @@ -2003,6 +2103,185 @@ mod tests {
assert_eq!(g.node_run_count(reader), Some(2));
}

#[test]
fn node_that_reads_and_writes_same_key_reaches_fixpoint() {
// A single node whose host call both reads and writes the SAME host-state key
// (a read-modify-write). The write marks the key dirty so other readers would be
// invalidated, but the node must not invalidate *itself*: excluding self-written keys
// from its own dependency set keeps it convergent. Without that exclusion the node
// re-runs on every run_all() forever (run_count would grow 1, 2, 3, ...).
#[derive(Clone)]
struct BumpHost {
kv: Rc<RefCell<BTreeMap<u64, i64>>>,
sig: SigHash,
}

impl Host for BumpHost {
fn call(
&mut self,
symbol: &str,
sig_hash: SigHash,
args: &[ValueRef<'_>],
rets: &mut [Value],
mut ctx: HostContext<'_, '_>,
) -> Result<u64, HostError> {
if symbol != "kv.bump" {
return Err(HostError::UnknownSymbol);
}
if sig_hash != self.sig {
return Err(HostError::SignatureMismatch);
}
let [ValueRef::U64(key)] = args else {
return Err(HostError::Failed);
};
// Read the current value (records a dependency on the key)...
ctx.record_read(ResourceKeyRef::HostState {
op: self.sig,
key: *key,
});
let next = self.kv.borrow().get(key).unwrap_or(&0) + 1;
self.kv.borrow_mut().insert(*key, next);
// ...then write the bumped value back under the SAME key.
ctx.record_write(ResourceKeyRef::HostState {
op: self.sig,
key: *key,
});
rets[0] = Value::I64(next);
Ok(0)
}
}

let bump_sig = HostSig {
args: vec![ValueType::U64],
rets: vec![ValueType::I64],
};
let bump_hash = sig_hash(&bump_sig);

let mut pb = ProgramBuilder::new();
let bump_host = pb.host_sig_for("kv.bump", bump_sig);
let mut asm = Asm::new();
asm.const_u64(1, 1);
asm.host_call(0, bump_host, 0, &[1], &[2]);
asm.ret(0, &[2]);
let entry = pb
.push_function_checked(
asm,
FunctionSig {
arg_types: vec![],
ret_types: vec![ValueType::I64],
},
)
.unwrap();
pb.set_function_output_name(entry, 0, "value").unwrap();
let prog = Arc::new(pb.build_verified().unwrap());

let kv = Rc::new(RefCell::new(BTreeMap::new()));
let host = BumpHost { kv, sig: bump_hash };

let mut g = ExecutionGraph::new(host, Limits::default());
let n = g.add_node(prog, entry, vec![]).unwrap();

g.run_all().unwrap();
assert_eq!(g.node_run_count(n), Some(1));
assert_eq!(
g.node_outputs(n).unwrap().get("value"),
Some(&Value::I64(1))
);

// No external invalidation between calls: the node's own write must not re-trigger it,
// so repeated run_all() calls are no-ops and the bumped value stays put.
g.run_all().unwrap();
g.run_all().unwrap();
assert_eq!(
g.node_run_count(n),
Some(1),
"a node that reads and writes the same key must not re-run itself"
);
assert_eq!(
g.node_outputs(n).unwrap().get("value"),
Some(&Value::I64(1))
);
}

#[test]
fn host_write_to_input_key_does_not_drop_graph_input_dependency() {
// A host call writes a graph `Input` key whose name matches the node's own input binding.
// `Input` keys are graph-owned, so the write must be ignored — otherwise it would intern
// to the same id as the binding dependency and the self-write filter would strip it,
// leaving the node stale after a later `invalidate_input`.
struct PublishHost;

impl Host for PublishHost {
fn call(
&mut self,
symbol: &str,
_sig_hash: SigHash,
args: &[ValueRef<'_>],
rets: &mut [Value],
mut ctx: HostContext<'_, '_>,
) -> Result<u64, HostError> {
if symbol != "publish" {
return Err(HostError::UnknownSymbol);
}
let [ValueRef::I64(v)] = args else {
return Err(HostError::Failed);
};
// Host misuses a graph-owned Input key as a write target.
ctx.record_write(ResourceKeyRef::Input("x"));
rets[0] = Value::I64(*v);
Ok(0)
}
}

let publish_sig = HostSig {
args: vec![ValueType::I64],
rets: vec![ValueType::I64],
};

let mut pb = ProgramBuilder::new();
let publish = pb.host_sig_for("publish", publish_sig);
let mut asm = Asm::new();
asm.host_call(0, publish, 0, &[1], &[2]);
asm.ret(0, &[2]);
let entry = pb
.push_function_checked(
asm,
FunctionSig {
arg_types: vec![ValueType::I64],
ret_types: vec![ValueType::I64],
},
)
.unwrap();
pb.set_function_output_name(entry, 0, "value").unwrap();
let prog = Arc::new(pb.build_verified().unwrap());

let mut g = ExecutionGraph::new(PublishHost, Limits::default());
let n = g.add_node(prog, entry, vec!["x".into()]).unwrap();
g.set_input_value(n, "x", Value::I64(1)).unwrap();

g.run_all().unwrap();
assert_eq!(g.node_run_count(n), Some(1));
assert_eq!(
g.node_outputs(n).unwrap().get("value"),
Some(&Value::I64(1))
);

// The host's Input-key write is ignored, so the node keeps its "x" binding dependency:
// changing and invalidating "x" must still rerun the node and refresh its output.
g.set_input_value(n, "x", Value::I64(2)).unwrap();
g.invalidate_input("x");
g.run_all().unwrap();
assert_eq!(
g.node_run_count(n),
Some(2),
"graph input dependency must survive a host write to the same Input key"
);
assert_eq!(
g.node_outputs(n).unwrap().get("value"),
Some(&Value::I64(2))
);
}

#[test]
fn host_read_order_changes_do_not_change_last_read_ids() {
#[derive(Clone)]
Expand Down
Loading
Loading