Skip to content

fitzee/flownet

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

FlowNet

Deterministic stream/process network library for Modula-2+.

FlowNet provides a small, practical runtime for building streaming pipelines inspired by Kahn Process Networks (KPN). Processes communicate exclusively through bounded FIFO channels, enabling clean dataflow architectures with backpressure and deterministic execution for linear pipelines.

Design Philosophy

FlowNet is designed as a small, deterministic pipeline runtime.

It intentionally avoids:

  • Actor frameworks
  • Message brokers
  • Dynamic graph mutation
  • Distributed execution
  • Heavy runtime systems

Instead FlowNet focuses on:

  • Explicit dataflow
  • Bounded memory usage
  • Deterministic pipelines
  • Simple concurrency semantics
  • Minimal runtime overhead

Quick Start

Source ──> Channel ch1 ──> Map ──> Channel ch2 ──> Sink
(* channels *)
Channel.Init(ch1, ADR(slots1), 8);
Channel.Init(ch2, ADR(slots2), 8);

(* source *)
srcCtx.outCh := ADR(ch1);
srcCtx.genFn := MyGenerator;
srcCtx.userData := ADR(myData);
srcCtx.err := NIL;

(* map *)
mapCtx.inCh := ADR(ch1);
mapCtx.outCh := ADR(ch2);
mapCtx.mapFn := MyTransform;
mapCtx.userData := NIL;
mapCtx.err := NIL;

(* sink *)
sinkCtx.inCh := ADR(ch2);
sinkCtx.consumeFn := MyConsumer;
sinkCtx.userData := ADR(results);
sinkCtx.err := NIL;

(* network *)
Network.Init(net, ADR(procs), 3);
Network.AddProc(net, "source", SourceRun, ADR(srcCtx));
Network.AddProc(net, "map", MapRun, ADR(mapCtx));
Network.AddProc(net, "sink", SinkRun, ADR(sinkCtx));

Network.Start(net);
Network.Wait(net);

Network.Destroy(net);
Channel.Destroy(ch2);
Channel.Destroy(ch1);

Or use the Pipe builder for less boilerplate:

Pipe.Init(p);
Pipe.AddNode(p, "source", SourceRun, ADR(srcCtx));
Pipe.AddNode(p, "map",    MapRun,    ADR(mapCtx));
Pipe.AddNode(p, "sink",   SinkRun,   ADR(sinkCtx));

Pipe.Connect(p, ADR(srcCtx.outCh), ADR(mapCtx.inCh), 8);
Pipe.Connect(p, ADR(mapCtx.outCh), ADR(sinkCtx.inCh), 8);

Pipe.Start(p);
Pipe.Wait(p);
Pipe.Destroy(p);

Connect creates a channel, heap-allocates its slot buffer, and wires both context fields in one call. Destroy cleans up everything.

Features

  • Bounded FIFO channels with blocking read/write and backpressure
  • Typed channels with runtime type tags for safety
  • Thread-per-process execution model via m2pthreads
  • 10 standard nodes: Source, Sink, Map, Filter, Tee, Broadcast, Merge, Batch, Reduce, Window
  • Error propagation via NodeError records — stops pipeline without deadlocks
  • Topology validation — detect disconnected processes before start
  • Deterministic linear pipelines (merge is nondeterministic by nature)
  • Clean shutdown propagation through channel close semantics
  • No heap allocation in core channel runtime
  • Pipeline builder (Pipe) — ergonomic API for common topologies
  • Diagnostics: process state, channel depth, aggregate metrics

Data Ownership

FlowNet channels transmit ADDRESS references only. FlowNet does not allocate, copy, or free application data.

Ownership rules:

  • The producer owns the data it writes
  • Consumers must not free or mutate data unless the pipeline contract allows it
  • Tee/Broadcast duplicate the pointer only, not the underlying data
  • If multiple branches share a value, the producer must ensure it remains valid until all consumers finish

Typical patterns for safe shared data:

  • Immutable objects (most common — producer writes, consumers only read)
  • Static or stack-allocated arrays where lifetime outlives the pipeline
  • Object pools with explicit release after pipeline completion

Channel Buffer Model

Each channel is backed by a caller-provided array of ADDRESS slots:

VAR slots: ARRAY [0..7] OF ADDRESS;
Channel.Init(ch, ADR(slots), 8);

Each slot stores a pointer-sized value (ADDRESS). Channels never copy objects — they only move ADDRESS values between processes. The channel is a circular buffer of pointers, not a buffer of data.

Channel State

A channel transitions through a single state change:

Open ──> Closed

Behavior by state:

Operation Open Closed (items remain) Closed (drained)
Write Ok (blocks if full) Closed Closed
Read Ok (blocks if empty) Ok (drains) Closed
TryWrite Ok or Full Closed Closed
TryRead Ok or Empty Ok (drains) Closed

Close is idempotent — calling it multiple times is safe. After close, reads drain any buffered items before returning Closed.

Merge Ordering

Merge reads from inputs using round-robin scheduling. If multiple inputs are ready simultaneously, the ordering depends on thread scheduling.

FlowNet guarantees:

  • Per-channel FIFO ordering — items from the same input channel arrive in order
  • No global ordering between channels — the interleaving is nondeterministic

The set of output items is deterministic; their order is not.

Performance Characteristics

  • One OS thread per process
  • Lock-based bounded channels (mutex + two condition variables)
  • No heap allocation in channel runtime
  • Constant-time enqueue/dequeue (circular buffer)
  • Backpressure via blocking writes
  • Shutdown cascade via channel close propagation

Implementation Limits

Resource Maximum
Processes per network 256
Fanout outputs (Broadcast) 8
Fanin inputs (Merge) 8
Batch size 64 items
Window size 64 items

Building

mx test                      # run test suite (103 tests)

Project Structure

src/
  Channel.def/.mod    bounded FIFO channel (typed channels)
  Process.def/.mod    process abstraction
  Network.def/.mod    topology and runtime
  Nodes.def/.mod      standard pipeline nodes (10 types)
  Pipe.def/.mod       pipeline builder (ergonomic API)
  Topology.def/.mod   topology validation
  FlowDiag.def/.mod   diagnostics and metrics
tests/
  flownet_tests.mod   test suite (103 assertions, 26 test groups)
examples/
  pipeline.mod        source -> map -> filter -> sink
  fanout.mod          source -> broadcast -> maps -> merge -> sink
  aggregate.mod       source -> transform -> aggregate -> sink
  reduce.mod          source -> reduce(sum) -> sink
  window.mod          source -> window(sliding sum) -> sink
  multistage.mod      source -> parse -> filter -> batch(avg) -> sink
  telemetry.mod       source -> broadcast -> {window, reduce} -> merge -> sink
  tee.mod             source -> tee -> {map(x2), filter(odd)} -> 2 sinks
  pipe_demo.mod       source -> map -> filter -> sink (Pipe builder)

Dependencies

  • m2pthreads — POSIX threads (mutex, condition variable, thread spawning)

Documentation

  • Architecture — design decisions and component overview
  • Usage Guide — API reference, node types, error propagation

Limitations

  • Bounded queues only (no infinite buffers)
  • Merge is nondeterministic (round-robin, see Merge Ordering)
  • Static topology (no runtime reconfiguration)
  • Thread-based runtime (one OS thread per process)
  • No cooperative scheduling (no coroutine primitives in mx)
  • No compile-time typed channels (Modula-2 lacks generics; runtime tags only)
  • No distributed execution
  • No persistence or checkpointing
  • No timeout on channel operations

Copyright (c) 2026 Matt Fitzgerald. Licensed under the MIT License.

About

Deterministic stream/process network library

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors