Skip to content

bhatti/pipeflow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

PipeFlow

A functional observability data pipeline built in Rust, demonstrating how Domain-Driven Design (DDD), hexagonal architecture, and functional programming patterns solve common software design antipatterns.

Architecture

Layers (Hexagonal / Clean Architecture)

graph TD
    classDef domain fill:#2563EB,stroke:#1E40AF,color:#FFFFFF
    classDef infra fill:#059669,stroke:#047857,color:#FFFFFF
    classDef app fill:#D97706,stroke:#B45309,color:#FFFFFF
    classDef iface fill:#7C3AED,stroke:#6D28D9,color:#FFFFFF
    
    subgraph Interfaces
        API[REST API - Axum]:::iface
        CLI[CLI - Clap]:::iface
    end
    
    subgraph Application
        CMD[Commands]:::app
        QRY[Queries]:::app
        SVC[Streaming Service]:::app
    end
    
    subgraph Domain
        MDL[Model: Event, Pipeline, RouteTable]:::domain
        SRV[Services: PipelineEngine, RoutingEngine]:::domain
        PRT[Ports: Repository, EventBus, PipelineFn]:::domain
    end
    
    subgraph Infrastructure
        DB[SQLite Repos]:::infra
        MSG[Channel EventBus]:::infra
        FNS[Pipeline Functions + Registry]:::infra
        ACT[Streaming Actors]:::infra
    end
    
    API --> CMD
    API --> QRY
    CLI --> CMD
    CMD --> SRV
    QRY --> MDL
    SVC --> SRV
    DB --> PRT
    MSG --> PRT
    FNS --> PRT
    ACT --> SRV
Loading

Event Flow (Pipes and Filters)

graph LR
    classDef source fill:#2563EB,stroke:#1E40AF,color:#FFFFFF
    classDef pipe fill:#D97706,stroke:#B45309,color:#FFFFFF
    classDef filter fill:#059669,stroke:#047857,color:#FFFFFF
    classDef sink fill:#DC2626,stroke:#B91C1C,color:#FFFFFF
    
    S[HTTP Source]:::source
    R[Router]:::pipe
    F1[Eval]:::filter
    F2[Mask]:::filter
    F3[RegexExtract]:::filter
    K[Sink]:::sink
    
    S -->|bounded channel| R
    R -->|route decision| F1
    F1 -->|enriched event| F2
    F2 -->|masked event| F3
    F3 -->|extracted fields| K
Loading

Quick Start

# Build
cargo build

# Run tests
cargo test --workspace

# Start the server
cargo run -- --port 3000 --database pipeflow.db

# Create a pipeline
curl -X POST http://localhost:3000/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "description": "Log enrichment pipeline",
    "functions": [
      {"type": "eval", "field": "env", "expression": "production"},
      {"type": "mask", "field": "message", "pattern": "\\d{3}-\\d{2}-\\d{4}", "replacement": "***-**-****"}
    ]
  }'

# Ingest events
curl -X POST http://localhost:3000/events/ingest \
  -H "Content-Type: application/json" \
  -d '{
    "events": [
      {"raw": "User login from 192.168.1.1", "fields": {"source": "auth", "severity": "info"}}
    ]
  }'

Pattern Reference

# Antipattern Functional Pattern Location
1 Singleton DI via constructor main.rs, all handlers
2 Module-level mutable state Immutable values Event::set_field() returns new Event
3 Mode/env branching Sum types (ADTs) FunctionConfig, FnResult, SinkConfig
4 Type-string dispatch Registry + Factory DefaultFunctionRegistry
5 God class Bounded contexts Crate-level separation
6 forEach + push Iterator combinators PipelineEngine::process_batch()
7 Error swallowing Result<T, E> DomainError enum everywhere
8 Temporal coupling Typestate builder PipelineBuilder<NoFunctions> -> <HasFunctions>
9 Global mutable registry Persistent data structures RouteTable::add_rule() returns new table
10 Callback chains Async composition async fn chains in handlers
11 Primitive obsession Newtypes PipelineId(String), FieldName(String)
12 Signal switch Handler registry HashMap<String, Box<dyn PipelineFnFactory>>
13 Anemic domain model Rich domain objects Pipeline::add_function(), validate()
14 Eager loading Lazy evaluation once_cell::Lazy, demand-driven streams
15 Mixed I/O + logic Effect separation Domain returns data; infra executes I/O
16 Monolithic functions Function composition process_event() folds over fn chain
17 No rollback Saga pattern Command handlers with compensation
18 any types Generics + trait bounds Box<dyn PipelineFn> with typed trait
19 Feature flag conditionals Capability interfaces FunctionResolver trait
20 Monolithic startup Plugin architecture Function registry + Cargo features
21 OS process forking Actor model PipelineActor with mpsc channels
22 Leader bottleneck Version vectors Pipeline.version field
23 Shared code bloat Feature-gated modules Cargo.toml features
24 Push without backpressure Bounded channels mpsc::channel(buffer_size)
25 Eager fetch Lazy pull streams Stream trait, async iteration
26 Deep inheritance Trait composition trait PipelineFn: Send + Sync
27 Unbounded recursion Iterative fold Config resolution via fold
28 Ad-hoc recursion Catamorphism evaluate_filter() recursive descent
29 Hardcoded parsers Parser combinators nom for filter expression parsing
30 Nested string access Typed lenses Event::get_field() typed accessors
31 Implicit mutable state Reducer pattern Actor: match msg { ... } -> new state
32 Monkey-patching Extension via traits impl PipelineFn for CustomFn
33 Implicit ordering Typestate lifecycle Actor: Created -> Running -> Stopped
34 Window via mutation Comonad-style SlidingWindow::extend()
35 Static worker assignment Work-stealing rayon::par_iter() for batch processing

Project Structure

pipeflow/
├── Cargo.toml                 # Workspace root
├── src/main.rs                # Composition root (DI wiring)
├── crates/
    ├── domain/                # Pure logic, zero I/O deps
    │   └── src/
    │       ├── model/         # Event, Pipeline, RouteTable, IDs
    │       ├── services/      # PipelineEngine, RoutingEngine
    │       ├── ports/         # Repository traits, PipelineFn
    │       ├── events/        # Domain events
    │       └── error.rs       # DomainError ADT
    ├── application/           # Use cases, orchestration
    │   └── src/
    │       ├── commands/      # CreatePipeline, IngestEvent
    │       ├── queries/       # GetPipeline
    │       └── services/      # StreamingService
    ├── infrastructure/        # Adapters (SQLite, channels, parsers)
    │   └── src/
    │       ├── persistence/   # SQLite repos + migrations
    │       ├── messaging/     # Channel-based event bus
    │       ├── pipeline/      # Functions, registry, parser, window
    │       ├── streaming/     # Actor-based pipeline execution
    │       ├── sources/       # HTTP event source
    │       └── sinks/         # Memory sink (testing)
    └── interfaces/            # HTTP API (Axum)
       └── src/api/           # REST handlers

Design Principles

  1. Domain purity: The domain crate has zero I/O dependencies. Business logic is pure functions operating on immutable data.
  2. Compile-time boundaries: Rust's crate system enforces architectural layers. The domain cannot accidentally import infrastructure.
  3. Exhaustive matching: Sum types (enums) with match ensure all cases are handled. The compiler catches missing branches.
  4. Ownership = lifecycle: Rust's ownership model makes resource management explicit. No hidden shared mutable state.
  5. Errors as values: Result<T, DomainError> makes failure explicit in the type signature. No silent swallowing.

License

MIT

About

A functional observability data pipeline built in Rust, demonstrating how Domain-Driven Design (DDD), hexagonal architecture, and functional programming patterns solve common software design antipatterns.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages