A typed-dict pipeline library for Python. Define transformation DAGs in code or YAML/JSON, get automatic type checking, lazy evaluation, and optional caching.
- Install
- Quickstart
- Strand kinds
- Custom inputs
- YAML / JSON config
- Built-in strands
- Custom execution backends
pip install braidedOptional extras:
| Extra | What it unlocks |
|---|---|
braided[datasets] |
HuggingFace Datasets map/cache backend (braided.integrations.hf_datasets) |
braided[torch] |
Tensor-safe pickle serialization in Cache |
braided[jaxtyping] |
Array dtype/shape awareness in the type checker |
braided[dev] |
pytest, pyright, and build tools |
from typing import Iterator, TypedDict
from braided import Node, NodeSpec, SequenceInput, execute_pipeline, strand
class Record(TypedDict):
x: int
@strand
def double(item: Record) -> Record:
return Record(x=item["x"] * 2)
@strand.one_to_many
def up_to(item: Record) -> Iterator[Record]:
for i in range(item["x"]):
yield Record(x=i)
nodes: NodeSpec[Record] = {
"out": Node(function=up_to, args=["doubled"]),
"doubled": Node(function=double, args=["seed"]),
}
result = execute_pipeline(nodes, {"seed": SequenceInput[Record]([Record(x=3)])})
print(list(result))
# [{"x": 0}, {"x": 1}, {"x": 2}, {"x": 3}, {"x": 4}, {"x": 5}]| Decorator | Input → Output | Use for |
|---|---|---|
@strand |
T → T' |
one-to-one row transforms |
@strand.one_to_many |
T → Iterator[T'] |
splitting or expanding rows |
@strand.many_to_many |
Sequence[T] → Iterator[T'] |
aggregations, joins, reordering |
A strand can take multiple input sequences by declaring multiple parameters. For @strand and @strand.one_to_many, inputs are aligned by position (zipped); for @strand.many_to_many, they are passed as separate sequences:
class Pair(TypedDict):
a: int
b: int
@strand
def zip_add(left: Record, right: Record) -> Pair:
return Pair(a=left["x"], b=right["x"])
nodes: NodeSpec[Pair] = {
"out": Node(function=zip_add, args=["left", "right"]),
}
result = execute_pipeline(nodes, {
"left": SequenceInput[Record]([Record(x=1), Record(x=2)]),
"right": SequenceInput[Record]([Record(x=10), Record(x=20)]),
})
print(list(result)) # [{"a": 1, "b": 10}, {"a": 2, "b": 20}]Class-based strands inherit from Strand[T].OneToOne(), .OneToMany(), or .ManyToMany(). They can take constructor parameters:
from braided import Strand
class Scale(Strand[Record].OneToOne()):
def __init__(self, factor: int) -> None:
self.factor = factor
def __call__(self, item: Record) -> Record:
return Record(x=item["x"] * self.factor)Pipelines receive data through PipelineInput subclasses. SequenceInput wraps an in-memory list. For other data sources — files, databases, streaming APIs — subclass PipelineInput directly:
from collections.abc import Iterator, Sequence
from typing import overload
from braided import PipelineInput
class CSVInput(PipelineInput[Record]):
def __init__(self, path: str) -> None:
import csv
with open(path) as f:
self._rows = [Record(x=int(r["x"])) for r in csv.DictReader(f)]
def __len__(self) -> int:
return len(self._rows)
def __iter__(self) -> Iterator[Record]:
return iter(self._rows)
@overload
def __getitem__(self, index: int) -> Record: ...
@overload
def __getitem__(self, index: slice) -> Sequence[Record]: ...
def __getitem__(self, index: int | slice) -> Record | Sequence[Record]:
return self._rows[index]Pass it like any other input:
result = execute_pipeline(nodes, {"seed": CSVInput("data.csv")})Custom inputs can also be instantiated from YAML config (see YAML / JSON config) as long as their constructor arguments use concrete types that jsonargparse can resolve.
Pipelines can be defined in YAML or JSON and loaded at runtime. The function field accepts a dotted import path or a class_path + init_args dict for class-based strands.
# pipeline.yaml
nodes:
out:
function:
class_path: mypackage.Scale
init_args:
factor: 10
args: [doubled]
doubled:
function: mypackage.double
args: [seed]
inputs:
seed:
class_path: mypackage.CSVInput
init_args:
path: data.csvfrom braided import execute_pipeline_from_config
result = list(execute_pipeline_from_config("pipeline.yaml"))Cache is a pass-through strand that persists its input sequence to disk on the first run and reloads it on subsequent runs, skipping upstream computation:
from braided import Cache, Node, NodeSpec, SequenceInput, execute_pipeline
nodes: NodeSpec[Record] = {
"out": Node(function=Cache[Record]("/tmp/my_cache"), args=["source"]),
"source": Node(function=double, args=["seed"]),
}
# First run: computes and saves to disk.
execute_pipeline(nodes, {"seed": SequenceInput[Record]([Record(x=1), Record(x=2)])})
# Second run: loads from disk; "source" is never evaluated.
result = list(execute_pipeline(nodes, {"seed": SequenceInput[Record]([])}))Inner join on a shared key column:
from braided import Join, Node, NodeSpec, SequenceInput, execute_pipeline
nodes: NodeSpec[dict] = {
"out": Node(function=Join[dict]("id"), args=["left", "right"]),
}
result = execute_pipeline(nodes, {
"left": SequenceInput[dict]([{"id": 1, "val": "a"}]),
"right": SequenceInput[dict]([{"id": 1, "score": 42}]),
})
print(list(result)) # [{"id": 1, "val": "a", "score": 42}]Pass custom map, flat_map, or many_to_many callables to execute_pipeline to control how sequences are materialized — for example, using the HuggingFace Datasets backend:
from braided.integrations.hf_datasets import hf_map_funcs
result = execute_pipeline(nodes, inputs, **hf_map_funcs())