High-performance stateful stream processing framework for Go. A Kafka Streams alternative with exactly-once semantics, windowing, and distributed checkpointing.
Sources (Kafka/Channel/Generator)
|
v
Processor Chain (Map -> Filter -> FlatMap -> Aggregate -> Join)
|
v
State Store (In-Memory / BoltDB per partition)
|
v
Sinks (Kafka / Stdout / File)
Checkpoint Coordinator (Chandy-Lamport barriers)
-> File / S3 checkpoint storage
Backpressure Controller (credit-based flow control)
- Fluent Topology Builder - Compose processing pipelines with method chaining
- Stateful Processing - In-memory and BoltDB-backed state stores with snapshot/restore
- Windowing - Tumbling, sliding, and session windows with custom triggers
- Aggregation - Count, sum, reduce, and custom aggregate functions
- Exactly-Once Semantics - Chandy-Lamport distributed checkpointing
- Backpressure - Credit-based flow control with adaptive rate limiting
- Multiple Sources/Sinks - Kafka, in-memory channels, generators, stdout, files
- Observability - OpenTelemetry metrics and tracing, structured logging
- Python SDK - Build topologies in Python, deploy to Go runtime
go get github.com/ilya-shevelev/streamforgepackage main
import (
"context"
"fmt"
"strings"
"time"
"github.com/ilya-shevelev/streamforge/internal/runtime"
"github.com/ilya-shevelev/streamforge/pkg/processor"
"github.com/ilya-shevelev/streamforge/pkg/record"
"github.com/ilya-shevelev/streamforge/pkg/sink"
"github.com/ilya-shevelev/streamforge/pkg/source"
"github.com/ilya-shevelev/streamforge/pkg/topology"
)
func main() {
ch := make(chan *record.Record, 10)
go func() {
for _, s := range []string{"hello world", "hello streamforge"} {
ch <- record.New([]byte("key"), []byte(s))
}
close(ch)
}()
topo, _ := topology.NewTopology("wordcount").
Source("input", source.NewChannelSource(ch)).
FlatMap("split", func(_ context.Context, r *record.Record) ([]*record.Record, error) {
var out []*record.Record
for _, w := range strings.Fields(string(r.Value)) {
out = append(out, record.New([]byte(w), []byte(w)))
}
return out, nil
}).
GroupByKey().
Count("counter").
Sink("output", sink.NewStdoutSink()).
Build()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
engine := runtime.NewEngine(topo)
engine.Run(ctx)
}// Build complex topologies with method chaining.
topo, err := topology.NewTopology("pipeline").
Source("events", kafkaSource).
Map("parse", parseEvent).
Filter("valid", isValid).
GroupByKey().
Window(window.NewTumblingWindow(time.Minute)).
Count("event-count").
Sink("output", kafkaSink).
Build()// Tumbling window: fixed, non-overlapping time intervals.
window.NewTumblingWindow(1 * time.Minute)
// Sliding window: overlapping windows with configurable slide.
window.NewSlidingWindow(10*time.Minute, 5*time.Minute)
// Session window: groups events with an inactivity gap.
window.NewSessionWindow(30 * time.Second)// In-memory store (for development/testing).
store := state.NewMemoryStore()
// BoltDB-backed persistent store (for production).
store, err := state.NewBoltStore("/data/state")
// Snapshot and restore for checkpointing.
snapshot, _ := store.Snapshot()
store.Restore(snapshot)from streamforge import Topology, Source, Sink, Window
topo = (
Topology("clickstream")
.source("events", Source.kafka(topic="clicks", brokers=["kafka:9092"]))
.map("parse", config={"format": "json"})
.group_by_key()
.window(Window.tumbling(minutes=5))
.count("click-count")
.sink("output", Sink.kafka(topic="counts", brokers=["kafka:9092"]))
)
print(topo.to_yaml())streamforge/
├── cmd/
│ ├── streamforge/ # Main processing node
│ └── sfctl/ # CLI management tool
├── api/proto/v1/ # Protocol buffer definitions
├── pkg/
│ ├── topology/ # Fluent topology builder
│ ├── processor/ # Map, Filter, FlatMap, Aggregate, Join
│ ├── window/ # Tumbling, Sliding, Session windows
│ ├── state/ # StateStore interface + implementations
│ ├── source/ # Source interface + Kafka, Channel, Generator
│ ├── sink/ # Sink interface + Kafka, Stdout, File
│ ├── checkpoint/ # Chandy-Lamport checkpointing
│ ├── backpressure/ # Credit-based flow control
│ ├── serde/ # JSON, String, Bytes serialization
│ ├── record/ # Core Record type
│ └── observability/ # Metrics, tracing, logging
├── internal/
│ ├── runtime/ # Processing engine
│ ├── network/ # Inter-node shuffle
│ └── config/ # Configuration
├── sdk/python/ # Python topology builder
├── deploy/
│ ├── docker/ # Dockerfile (distroless)
│ └── helm/ # Helm chart
└── examples/
├── wordcount/ # Classic word count
├── clickstream/ # Page view aggregation
└── fraud-detection/ # Transaction fraud detection
# Build all binaries
make build
# Run tests
make test
# Run with coverage
make test-cover
# Lint
make lint
# Docker image
make dockerdocker build -t streamforge -f deploy/docker/Dockerfile .
docker run -e STREAMFORGE_NODE_ID=node-1 streamforgehelm install streamforge deploy/helm/streamforge \
--set config.kafka.brokers[0]=kafka:9092 \
--set replicaCount=3StreamForge can be configured via JSON file or environment variables:
| Environment Variable | Description | Default |
|---|---|---|
STREAMFORGE_NODE_ID |
Unique node identifier | node-1 |
STREAMFORGE_LISTEN_ADDR |
HTTP listen address | :8080 |
STREAMFORGE_KAFKA_BROKERS |
Kafka broker addresses | localhost:9092 |
STREAMFORGE_STATE_BACKEND |
State store backend (memory or bolt) |
memory |
STREAMFORGE_STATE_DIR |
State store directory | /tmp/streamforge/state |
STREAMFORGE_CHECKPOINT_DIR |
Checkpoint storage directory | /tmp/streamforge/checkpoints |
MIT License - see LICENSE for details.