Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ jobs:
- name: Install Python dependencies
run: uv sync --group dev

- name: Set up pnpm
uses: pnpm/action-setup@v4
with:
version: 10

- name: Set up Node.js
uses: actions/setup-node@v6
with:
node-version: "22"
cache: pnpm
cache-dependency-path: docs/pnpm-lock.yaml

- name: Install docs dependencies
run: cd docs && pnpm install --frozen-lockfile

- name: Check Rust formatting
run: cargo fmt --all --check

Expand Down Expand Up @@ -84,7 +99,7 @@ jobs:
args: --release

- name: Run tests
run: uv run pytest tests/python/ -v --tb=short --junit-xml=results.xml --ignore=tests/python/test_benchmarks.py
run: uv run pytest tests/python/ -v --tb=short --junit-xml=results.xml --ignore=tests/python/test_benchmarks.py --ignore=tests/python/test_reactive_bench.py

- name: Upload test results
if: always()
Expand Down
22 changes: 16 additions & 6 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,36 @@ concurrency:
jobs:
build:
runs-on: ubuntu-latest
defaults:
run:
working-directory: docs
steps:
- uses: actions/checkout@v6

- name: Set up pnpm
uses: pnpm/action-setup@v4
with:
version: 10

- name: Set up Node.js
uses: actions/setup-node@v6
with:
node-version: "22"
cache: npm
cache-dependency-path: docs/package-lock.json
cache: pnpm
cache-dependency-path: docs/pnpm-lock.yaml

- name: Install dependencies
run: cd docs && npm ci
run: pnpm install --frozen-lockfile

- name: Build Docusaurus
run: cd docs && npm run build
- name: Build Fumadocs site
env:
DOCS_BASE_PATH: /dagron
run: pnpm build

- name: Upload Pages artifact
uses: actions/upload-pages-artifact@v3
with:
path: docs/build
path: docs/out

deploy:
needs: build
Expand Down
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,9 @@ dist/
downloads/
eggs/
.eggs/
lib/
lib64/
# Anchor `lib/` to repo root so it doesn't catch e.g. docs/src/lib/.
/lib/
/lib64/
parts/
sdist/
var/
Expand Down
14 changes: 10 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@ repos:
language: system
types: [rust]
pass_filenames: false
- id: biome
name: biome
entry: bash -c 'cd docs && npx biome ci src/ docusaurus.config.ts sidebars.ts'
- id: docs-biome
name: docs biome
entry: bash -c 'cd docs && pnpm exec biome check'
language: system
files: ^docs/src/|^docs/docusaurus\.config\.ts|^docs/sidebars\.ts
files: ^docs/(src/|content/|source\.config\.ts|next\.config\.mjs|biome\.json|tsconfig\.json|package\.json)
pass_filenames: false
- id: docs-types
name: docs types
entry: bash -c 'cd docs && pnpm types:check'
language: system
files: ^docs/(src/|source\.config\.ts|next\.config\.mjs|tsconfig\.json|package\.json|content/.*\.mdx)
pass_filenames: false
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,27 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## [Unreleased]

### Added

- **Typed `NodeRef` handles** — `dag.add_node()` now returns a stable `NodeRef`. Every public method that takes a node identifier accepts either a `str` name or a `NodeRef`, so existing string-based code keeps working unchanged. NodeRefs survive unrelated graph mutations and detect remove-then-readd via per-node epochs (`StaleNodeRefError`).
- **`@dagron.flow` Pythonic compose API** — Tawazi-style: write a regular Python function that calls `@task`-decorated tasks; the call structure becomes the DAG. `pipeline.dag()` returns the underlying DAG; `pipeline()` runs it. Compatible with the legacy parameter-name-based `Pipeline` (the same `@task` decorator powers both).
- **Generic typing + `dagron.stubgen`** — `FlowFuture[T]`, `NodeResult[T]`, `ExecutionResult.__getitem__` overloads typed by `FlowFuture[T]`. `dagron.stubgen.generate_stub(dag, types)` emits a `.pyi` with `Literal["nodename"] -> NodeResult[T]` overloads, so even string-keyed lookups become statically typed. `@task` is a passthrough decorator with `[**P, R]` ParamSpec — IDE autocomplete and mypy both work.
- **Effect-typed nodes** — `dagron.Effect` enum (`PURE`/`READ`/`WRITE`/`NETWORK`/`NONDETERMINISTIC`) with `is_cacheable`/`is_deterministic`/`is_isolated` properties. `@task(effect=Effect.NETWORK)` tags impurity. AST-scan heuristic emits a `UserWarning` when a `PURE` task contains obviously-impure calls (`time.time`, `random.*`, `os.*`, etc.). `effects_of(dag)` reads tags back from DAG metadata. New `DAGExecutor(enforce_effect_isolation=True)` flag serializes `NONDETERMINISTIC` tasks while letting other effects parallelize freely.
- **`dagron.reactive` — Solid.js / Jane-Street-`Incremental` style reactive engine** — `Signal` / `Computed` / `Watcher` with auto-tracked dependencies. Mutating one signal that feeds 10,000 derived nodes and reading just one of them takes ~10 µs on the recompute path. `batch()` context manager guarantees glitch-free updates: multiple signal mutations coalesce into a single watcher fire. Distinct from the existing `dagron.execution.reactive.ReactiveDAG` (which wraps a pre-built DAG); the new module is for building reactive graphs from scratch.
- **`dagron.contentcache` — Nix-flake-style cross-process cache** — `ContentCache` stores cached values keyed by their content fingerprint. The filesystem itself is the index — independent processes (CI workers, two terminals) share intermediates without coordination. Atomic temp-file + rename writes, magic-byte header, sharded layout `<aa>/<bb>/<rest>.cache`. Pluggable `Hasher` protocol with `default_hash` (pickle + blake2b) and `numpy_hash` (uses `array.tobytes()`). `compute_or_cached` is effect-aware and skips the cache for `WRITE`/`NETWORK`/`NONDETERMINISTIC` automatically. Honors `$DAGRON_CACHE_DIR`.
- **`dagron.trace` — time-travel debugging with `replay(at=t)`** — `TraceWriter` appends per-node JSONL records; payloads are stored in the `ContentCache` keyed by output fingerprint, so identical values across runs deduplicate. `TraceReader` reads back. `replay(source, at=t)` reconstructs the per-node `ReplayedNode` state at any past wall-clock instant. Pure / READ nodes replay byte-identically; impure nodes are flagged `replayable=False` but their logged values are still surfaced. Re-recorded nodes (retries) take the latest value up to the cutoff. Honors `$DAGRON_TRACE_DIR`.

### Changed

- `DAG.add_node()` now returns `NodeRef` instead of `NodeId`. `NodeId` is still returned by enumeration methods (`nodes()`, `successors()`, `roots()`, …) where a snapshot identifier is appropriate.
- `NodeData::name` is now `Arc<str>` (was `String`) — cheaper to share between handles.
- `DAGExecutor.execute` and `AsyncDAGExecutor.execute` accept `Mapping[str | NodeRef, Callable]` for the `tasks` parameter.
- `ExecutionResult.__getitem__` and `__contains__` accept `str`, `NodeRef`, or `FlowFuture[T]`.
- `NodeResult` is now `NodeResult[T]` (PEP 695 generic). Existing references default to `NodeResult[Any]` and remain backwards compatible.
- `@task` is now flow-aware: outside a `@flow` body it executes normally; inside one it records the call and returns `FlowFuture[T]`. The same decorator works for both the legacy `Pipeline` and the new `@flow` API.

## [0.1.0] - 2026-03-06

### Added
Expand Down
5 changes: 5 additions & 0 deletions crates/dagron-core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ pub enum DagronError {
#[error("Edge not found: {0} -> {1}")]
EdgeNotFound(String, String),

#[error(
"Stale node reference: {0} (the node was removed or replaced after the ref was created)"
)]
StaleNodeRef(String),

#[error("Graph error: {0}")]
Graph(String),
}
20 changes: 12 additions & 8 deletions crates/dagron-core/src/graph/construction.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::algorithms;
use crate::errors::DagronError;
use crate::node::NodeId;
use crate::node::NodeRef;
use crate::types::EdgeData;

use super::DAG;

impl<P> DAG<P> {
/// Add a single node to the graph.
///
/// Returns NodeId for the newly created node.
/// Returns DagronError::DuplicateNode if a node with this name already exists.
pub fn add_node(&mut self, name: String, payload: P) -> Result<NodeId, DagronError> {
/// Returns a [`NodeRef`] for the newly created node. The ref is stable —
/// it remains valid across edge mutations and other-node changes, and is
/// invalidated only if this node is removed.
///
/// Returns `DagronError::DuplicateNode` if a node with this name already exists.
pub fn add_node(&mut self, name: String, payload: P) -> Result<NodeRef, DagronError> {
if self.name_to_index.contains_key(&name) {
return Err(DagronError::DuplicateNode(name));
}
Expand All @@ -19,12 +22,12 @@ impl<P> DAG<P> {
payload,
};
let idx = self.graph.add_node(node_data);
let epoch = self.next_node_epoch;
self.next_node_epoch = self.next_node_epoch.wrapping_add(1);
self.name_to_index.insert(name.clone(), idx);
self.node_epochs.insert(name.clone(), epoch);
self.bump_generation();
Ok(NodeId {
index: idx.index() as u32,
name,
})
Ok(NodeRef::new(name, epoch))
}

/// Add a directed edge from one node to another.
Expand Down Expand Up @@ -71,6 +74,7 @@ impl<P> DAG<P> {
let idx = self.resolve_name(name)?;
self.graph.remove_node(idx);
self.name_to_index.remove(name);
self.node_epochs.remove(name);
self.bump_generation();
Ok(())
}
Expand Down
34 changes: 34 additions & 0 deletions crates/dagron-core/src/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@ use std::sync::RwLock;
use ahash::AHashMap;

use crate::errors::DagronError;
use crate::node::NodeRef;
use crate::types::{InternalGraph, InternalNodeIndex};

pub struct DAG<P = ()> {
pub(crate) graph: InternalGraph<P>,
pub(crate) name_to_index: AHashMap<String, InternalNodeIndex>,
/// Per-node creation epoch — used to validate `NodeRef`s against
/// remove/re-add cycles. The entry is removed when the node is removed.
pub(crate) node_epochs: AHashMap<String, u64>,
/// Monotonic counter; assigned to each newly created node.
pub(crate) next_node_epoch: u64,
generation: u64,
pub(crate) cache: RwLock<cache::DagCache>,
}
Expand All @@ -35,6 +41,8 @@ impl<P> DAG<P> {
DAG {
graph: InternalGraph::default(),
name_to_index: AHashMap::new(),
node_epochs: AHashMap::new(),
next_node_epoch: 0,
generation: 0,
cache: RwLock::new(cache::DagCache::new()),
}
Expand All @@ -48,6 +56,30 @@ impl<P> DAG<P> {
.ok_or_else(|| DagronError::NodeNotFound(name.to_string()))
}

/// Resolve a `NodeRef` to its current index, validating that the node
/// still exists with the same creation epoch. Returns
/// `DagronError::NodeNotFound` if the node has been removed and
/// `DagronError::StaleNodeRef` if a different node now occupies the name.
pub fn resolve_ref(&self, r: &NodeRef) -> Result<InternalNodeIndex, DagronError> {
let stored_epoch = self
.node_epochs
.get(r.name.as_ref())
.ok_or_else(|| DagronError::NodeNotFound(r.name.to_string()))?;
if *stored_epoch != r.epoch {
return Err(DagronError::StaleNodeRef(r.name.to_string()));
}
self.name_to_index
.get(r.name.as_ref())
.copied()
.ok_or_else(|| DagronError::NodeNotFound(r.name.to_string()))
}

/// Look up the current `NodeRef` for a name, if it exists.
pub fn node_ref(&self, name: &str) -> Option<NodeRef> {
let epoch = *self.node_epochs.get(name)?;
Some(NodeRef::new(name, epoch))
}

/// Access the underlying petgraph.
pub fn inner_graph(&self) -> &InternalGraph<P> {
&self.graph
Expand Down Expand Up @@ -160,6 +192,8 @@ impl<P: Clone> DAG<P> {
DAG {
graph: self.graph.clone(),
name_to_index: self.name_to_index.clone(),
node_epochs: self.node_epochs.clone(),
next_node_epoch: self.next_node_epoch,
generation: self.generation,
cache: RwLock::new(cache::DagCache::new()),
}
Expand Down
2 changes: 1 addition & 1 deletion crates/dagron-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ pub use graph::serialization::{SerializableEdge, SerializableGraph, Serializable
pub use graph::stats::GraphStats;
pub use graph::transforms::MergeConflict;
pub use graph::DAG;
pub use node::NodeId;
pub use node::{NodeId, NodeRef};
pub use types::{EdgeData, InternalGraph, InternalNodeIndex, NodeData};
71 changes: 66 additions & 5 deletions crates/dagron-core/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

/// A unique identifier for a node in a DAG.
/// A snapshot identifier for a node in a DAG.
///
/// **Note:** The `index` field corresponds to the internal `petgraph` node index
/// and is only valid for the lifetime of that node in the graph. After a node is
/// removed, its index may be reused by a subsequently added node. Do not persist
/// or compare `index` values across graph mutations that involve removals.
/// `NodeId` is returned by enumeration methods (`nodes()`, `successors()`, …).
/// It carries the node's `name` plus its current `petgraph` index. The `index`
/// field is a *snapshot*: after a node is removed, its index may be reused by
/// a subsequently added node, so do not persist or compare `index` values
/// across graph mutations that involve removals.
///
/// For a stable, persistent handle that survives mutations, use [`NodeRef`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NodeId {
pub index: u32,
Expand All @@ -17,3 +22,59 @@ impl fmt::Display for NodeId {
write!(f, "{}", self.name)
}
}

/// A stable, persistent handle to a node.
///
/// `NodeRef` is returned by [`DAG::add_node`] and remains valid as long as the
/// node it points to has not been removed (or removed-and-readded with the
/// same name, which produces a fresh `epoch`). Use it anywhere a `&str` name
/// is accepted; resolution is O(1) and detects stale references.
///
/// `NodeRef` clones cheaply: the name is reference-counted via `Arc<str>`.
#[derive(Debug, Clone)]
pub struct NodeRef {
pub name: Arc<str>,
pub epoch: u64,
}

impl NodeRef {
/// Construct a `NodeRef` directly. Prefer obtaining one from
/// [`DAG::add_node`] or [`DAG::node_ref`].
pub fn new(name: impl Into<Arc<str>>, epoch: u64) -> Self {
NodeRef {
name: name.into(),
epoch,
}
}

/// Borrow the name as a string slice.
pub fn name(&self) -> &str {
&self.name
}

/// The creation epoch this ref was minted with.
pub fn epoch(&self) -> u64 {
self.epoch
}
}

impl PartialEq for NodeRef {
fn eq(&self, other: &Self) -> bool {
self.epoch == other.epoch && self.name == other.name
}
}

impl Eq for NodeRef {}

impl Hash for NodeRef {
fn hash<H: Hasher>(&self, state: &mut H) {
self.name.hash(state);
self.epoch.hash(state);
}
}

impl fmt::Display for NodeRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name)
}
}
Loading
Loading