Skip to content

hajirufai/streamlite

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

StreamLite

A lightweight stream processing engine built from scratch in Python. Zero external dependencies.

StreamLite implements the core concepts from Apache Flink and Kafka Streams — windowing, keyed state, watermarks, checkpointing — in ~4,000 lines of pure Python. No frameworks, no magic. Just the fundamentals of how real stream processors work under the hood.

CI Python 3.11+ License: MIT

Why?

Most "stream processing" tutorials stop at reading from Kafka and writing to a database. They don't explain:

  • How does a tumbling window actually assign records to time buckets?
  • What happens when data arrives out of order?
  • How does a watermark decide when "all data for this window" has arrived?
  • What does checkpointing look like without a distributed coordinator?

StreamLite answers these questions with working code.

Features

Stream API

from streamlite import StreamLite
from streamlite.window import TumblingWindow

# Word count in 3 lines
result = (
    StreamLite.from_collection(["the quick brown fox", "the lazy dog"])
    .flat_map(str.split)
    .key_by(lambda w: w)
    .count()
)
# [("the", 2), ("quick", 1), ("brown", 1), ...]

Windowing

Type Description
Tumbling Fixed-size, non-overlapping time windows
Sliding Overlapping windows with configurable slide
Session Gap-based windows that merge on activity
Count Fire every N elements
Global Single window for all time

Keyed State

def running_average(value, ctx):
    total = ctx.get_value_state("total", default=0)
    count = ctx.get_value_state("count", default=0)

    t = (total.get() or 0) + value["temp"]
    c = (count.get() or 0) + 1
    total.update(t)
    count.update(c)

    return [{"sensor": value["sensor"], "avg": t / c}]

result = stream.key_by(lambda r: r["sensor"]).process(running_average)

Four state primitives: ValueState, ListState, MapState, ReducingState — all with TTL support and snapshot/restore for checkpointing.

Watermarks & Late Data

from streamlite.watermark import BoundedOutOfOrderness

# Allow up to 5 seconds of out-of-order data
strategy = BoundedOutOfOrderness(max_delay_ms=5000)
strategy.on_event(10000)
strategy.current_watermark()  # → 5000

Four strategies: BoundedOutOfOrderness, AscendingTimestamps, PeriodicWatermark, PunctuatedWatermark.

Stream Operations

  • map / flat_map / filter — standard transforms
  • key_by — partition by key for stateful processing
  • split / union — branch and merge streams
  • join — inner, left, full outer joins plus interval and window joins
  • connected streams — co-process two streams with shared state

Pipeline Builder (DAG)

from streamlite.pipeline import Pipeline
from streamlite.source import CollectionSource, FileSource, CSVSource
from streamlite.sink import CollectionSink, FileSink, CSVSink

pipeline = Pipeline("etl")
pipeline.add_source("raw", CSVSource("data.csv"))
pipeline.add_operator("clean", MapOperator(str.strip), inputs=["raw"])
pipeline.add_operator("parse", MapOperator(parse_fn), inputs=["clean"])
pipeline.add_sink("output", CSVSink("output.csv"), inputs=["parse"])

results = pipeline.execute()

Checkpointing

from streamlite.checkpoint import CheckpointCoordinator, CheckpointStorage
from streamlite.state import StateBackend

backend = StateBackend()
storage = CheckpointStorage("/tmp/checkpoints", max_retained=3)
coordinator = CheckpointCoordinator(backend, storage, interval_ms=30000)

# Periodic snapshots
coordinator.trigger_checkpoint()

# Restore on failure
coordinator.restore_latest()

Metrics & Observability

from streamlite.metrics import MetricsRegistry

registry = MetricsRegistry()
registry.counter("records").increment()
registry.gauge("buffer_size", 1024)

report = registry.report()
# {"counters": {"records": 1}, "gauges": {"buffer_size": 1024}, ...}

Architecture

                    ┌─────────────────────────────────────┐
                    │            StreamLite               │
                    ├─────────────────────────────────────┤
                    │                                     │
  Sources           │  CollectionSource  FileSource       │
  (data in)         │  CSVSource  GeneratorSource         │
                    │  RateSource                         │
                    ├─────────────────────────────────────┤
                    │                                     │
  Operators         │  Map  Filter  FlatMap  KeyBy        │
  (transforms)      │  Peek  TimestampAssigner  Chain     │
                    ├─────────────────────────────────────┤
                    │                                     │
  Windowing         │  Tumbling  Sliding  Session         │
                    │  Count  Global                      │
                    ├─────────────────────────────────────┤
                    │                                     │
  State             │  ValueState  ListState  MapState    │
                    │  ReducingState  StateBackend        │
                    ├─────────────────────────────────────┤
                    │                                     │
  Time              │  Watermarks  Triggers  EventTime    │
                    │  ProcessingTime  Ingestion          │
                    ├─────────────────────────────────────┤
                    │                                     │
  Topology          │  Split  Union  Join  Connected      │
                    │  Pipeline (DAG)  Executor           │
                    ├─────────────────────────────────────┤
                    │                                     │
  Infrastructure    │  Checkpoint  Serialize  Partition   │
                    │  Metrics  Errors  Utils             │
                    ├─────────────────────────────────────┤
                    │                                     │
  Sinks             │  CollectionSink  FileSink  CSVSink  │
  (data out)        │  CallbackSink  PrintSink  NullSink │
                    │                                     │
                    └─────────────────────────────────────┘

Project Structure

streamlite/
├── __init__.py          # StreamLite entry point
├── types.py             # StreamRecord, TimeWindow, Watermark
├── errors.py            # Exception hierarchy
├── source.py            # Data sources (collection, file, CSV, generator, rate)
├── sink.py              # Data sinks (collection, file, CSV, callback, print)
├── operators.py         # Map, Filter, FlatMap, KeyBy, Peek, Chain
├── stream.py            # DataStream with fluent API
├── keyed.py             # KeyedStream + WindowedStream
├── window.py            # Window assigners (tumbling, sliding, session, count, global)
├── trigger.py           # Window triggers (event-time, count, delta, purging)
├── watermark.py         # Watermark strategies + multi-source tracker
├── state.py             # Keyed state (value, list, map, reducing) + backend
├── context.py           # Processing context for stateful operations
├── join.py              # Stream joins (inner, left, full, interval, window)
├── split.py             # Stream splitting + side outputs
├── merge.py             # Stream union + connected streams
├── serializer.py        # JSON, CSV, string serialization + schema validation
├── partition.py         # Hash, round-robin, broadcast, custom partitioning
├── checkpoint.py        # Checkpoint storage, metadata, coordinator
├── metrics.py           # Counters, throughput, latency, backpressure
├── pipeline.py          # DAG-based pipeline builder
├── executor.py          # Pipeline/stream executor with metrics
├── time_utils.py        # Timestamp parsing, window math
└── utils.py             # ID generation, hashing, formatting
tests/                   # 281 tests across 20 test files
examples/                # 5 runnable examples

Quick Start

git clone https://github.com/hajirufai/streamlite.git
cd streamlite

# Run tests
python -m pytest tests/ -v

# Try an example
python examples/word_count.py
python examples/clickstream.py
python examples/sensor_alerts.py

No dependencies to install. Python 3.11+ required.

24 Modules, Zero Dependencies

Every component — from the HTTP timestamp parser to the checkpoint serializer — is implemented from scratch using only the Python standard library. The goal is understanding, not convenience.

License

MIT

About

Lightweight stream processing engine built from scratch in Python. Zero dependencies. Windowing, watermarks, keyed state, checkpoints, joins — Flink internals demystified.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages