Skip to content

clingier/forge

Repository files navigation

Forge

codecov

An embeddable asset materialization engine for Rust with built-in cryptographic lineage. Define data assets as async functions, declare dependencies via type signatures, and let Forge handle execution order, parallelism, retries, and provenance tracking — from cargo add to a full Kubernetes-native platform.

use forge_core::prelude::*;

#[asset(name = "raw_users")]
async fn raw_users(_ctx: &AssetContext) -> Result<AssetOutput<Vec<String>>, ForgeError> {
    let users = vec![
        "alice@example.com".to_string(),
        "bob@test.com".to_string(),
    ];
    Ok(AssetOutput::new(users))
}

#[asset(name = "valid_users")]
async fn valid_users(
    ctx: &AssetContext,
    raw_users: &Asset<Vec<String>>,
) -> Result<AssetOutput<Vec<String>>, ForgeError> {
    let raw = raw_users.read(ctx).await?;
    let valid: Vec<String> = raw.into_iter().filter(|e| e.contains('@')).collect();
    Ok(AssetOutput::new(valid))
}

What it does

Forge takes a set of typed asset functions, resolves their dependency graph, and executes them in parallel where possible. Data flows between assets through an in-memory store. Failed assets are retried with exponential backoff. Downstream assets are skipped when their dependencies fail. Every materialization produces a cryptographic receipt — a Merkle hash that ties output data, code identity, and upstream inputs into a verifiable chain.

Features

  • #[asset] proc macro — define assets as annotated async functions with automatic dependency inference
  • #[derive(Schema)] — deterministic schema hashing for type evolution detection
  • Cryptographic lineage — Merkle-tree provenance for every materialization with chain verification
  • Persistent storage — SQLite (default) and Postgres (--features postgres) backends for run history, lineage, and asset catalog
  • Partitioned assets — daily/hourly/monthly partitions with cross-partition dependencies
  • Staleness detection — four-signal reconciliation (code change, upstream change, freshness SLA, schema evolution)
  • REST API — Axum-based API with Swagger UI, SSE backfill streaming, and API key auth
  • Embedded dashboard — Leptos WASM single-page application with pipeline registry, run monitoring, lineage visualization, and verification panels
  • CLImaterialize, backfill, reconcile, status, verify, lineage, serve commands
  • Run tracking — full run lifecycle with DAG topology, per-node snapshots, and structured logs
  • Telemetry — OpenTelemetry integration with OTLP export and W3C trace context propagation

Install

[dependencies]
forge-core = "0.1"
tokio = { version = "1", features = ["full"] }

To enable the Postgres storage backend:

[dependencies]
forge = { version = "0.1", features = ["postgres"] }

Concepts

Asset — a named async function that produces typed data. Can depend on other assets.

Engine — resolves the dependency graph, manages parallelism and retries, passes data between assets.

AssetContext — runtime context passed to every asset: variables, typed resources, run ID, scratch directory.

Asset<T> — a handle to upstream data. Call .read(ctx) to get the value.

AssetOutput<T> — wrapper around your return value. Exists so the engine can intercept it for storage and hashing.

MaterializationRecord — cryptographic receipt: SHA-256 output hash, transform hash, and Merkle hash of the full provenance chain.

Project structure

crates/
  forge/             # Umbrella crate (re-exports)
  forge-core/        # Asset types, engine, execution, in-memory store, staleness, partitions
  forge-lineage/     # Lineage store trait, in-memory implementation, verification
  forge-storage/     # SQLite and Postgres backends for catalog, lineage, metadata, and run tracking
  forge-executor/    # Executor trait and implementations
  forge-reconciler/  # Staleness computation and materialization planning
  forge-api/         # Axum REST API, Swagger UI
  forge-api-models/  # Shared API response types (native + WASM compatible)
  forge-cli/         # CLI: materialize, backfill, reconcile, status, verify, lineage, serve
  forge-dashboard/   # Leptos WASM dashboard SPA (registry, monitor, lineage, verify)
  forge-wasm/        # Optional WASM runtime integration
  forge-macros/      # #[asset] proc macro, #[derive(Schema)]
  forge-telemetry/   # OpenTelemetry initialization and trace context propagation

Design principles

Library-first. Forge is a crate you embed, not a service you deploy. Engine::new() gives you a working engine with zero infrastructure.

No magic. Assets are regular async functions. The engine is a struct. There's no global state, no runtime reflection, no hidden threads.

Cost is a feature. In-memory by default. Optional persistence. The binary stays small and starts instantly.

Ring architecture. Same asset code runs everywhere — from a CLI tool (Ring 0) to a distributed Kubernetes platform (Ring 2). You change the engine configuration, not your asset functions.

Development

Enable the pre-commit hook to run formatting and unit tests before each commit:

git config core.hooksPath .githooks

The hook runs cargo fmt --check and cargo test --workspace --lib. To bypass it temporarily, use git commit --no-verify.

Status

Alpha. The core engine, proc macros, lineage, SQLite and Postgres storage, CLI, REST API, and dashboard are working. Higher-ring features (Postgres coordinator, HA leader election) are on the roadmap. API may change.

License

Apache-2.0

About

Forge is a high-performance, type-safe workflow orchestration engine built in Rust for data engineering workloads.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages