A concurrent flow graph library for Go. Build data processing pipelines where data flows through decision nodes based on success/fail conditions.
go get github.com/emreisler/flowgraphpackage main
import (
"context"
"fmt"
"github.com/emreisler/flowgraph"
)
func main() {
fg := flowgraph.NewFlowGraph[int]()
// Create root node
isPositive, _ := fg.Root("is_positive", func(n int) (bool, error) {
return n > 0, nil
}, 2) // 2 workers
// Add child nodes
isEven, _ := fg.AddNode("is_even", isPositive.SuccessOut(), func(n int) (bool, error) {
return n%2 == 0, nil
}, 2)
fg.AddNode("handle_negative", isPositive.FailOut(), func(n int) (bool, error) {
fmt.Println("Negative:", n)
return true, nil
}, 1)
// Create feed channel
feed := make(chan int)
go func() {
for _, n := range []int{4, 3, -1, 8, -5, 7} {
feed <- n
}
close(feed)
}()
fg.Start(feed)
// Listen to terminal states
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for ts := range fg.TerminalStates(ctx, isEven.SuccessOut(), isEven.FailOut()) {
fmt.Printf("Result: %v -> %s\n", ts.Data, ts.State)
}
fg.Stop()
}func NewFlowGraph[T any]() *FlowGraph[T]Creates a new flow graph with generic type T.
func (g *FlowGraph[T]) Root(name string, fn func(T) (bool, error), workers int) (*decisionNode[T], error)Creates the entry point node. Returns a node with SuccessOut() and FailOut() topics.
func (g *FlowGraph[T]) AddNode(name string, listen topic, fn func(T) (bool, error), workers int) (*decisionNode[T], error)Adds a child node that listens to a parent's output (use node.Success() or node.Fail()).
func (g *FlowGraph[T]) Start(feed <-chan T) errorStarts the graph processing. Non-blocking.
func (g *FlowGraph[T]) TerminalStates(ctx context.Context, topics ...topic) <-chan TerminalState[T]Subscribe to node outputs (use node.Success() or node.Fail()). Returns channel that closes when context is cancelled.
func (g *FlowGraph[T]) Stop()Stops the graph and closes all channels.
type User struct {
Age int
Country string
Premium bool
}
fg := flowgraph.NewFlowGraph[User]()
// Build decision tree
ageCheck, _ := fg.Root("age_check", func(u User) (bool, error) {
return u.Age >= 18, nil
}, 4)
countryCheck, _ := fg.AddNode("country_check", ageCheck.SuccessOut(), func(u User) (bool, error) {
return u.Country == "US" || u.Country == "UK", nil
}, 4)
premiumCheck, _ := fg.AddNode("premium_check", countryCheck.SuccessOut(), func(u User) (bool, error) {
return u.Premium, nil
}, 2)
// Start processing
feed := make(chan User, 100)
fg.Start(feed)
// Feed data
go func() {
for _, user := range users {
feed <- user
}
close(feed)
}()
// Collect results
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for ts := range fg.TerminalStates(ctx, premiumCheck.SuccessOut(), premiumCheck.FailOut()) {
fmt.Printf("User classified: %s\n", ts.State)
}
fg.Stop()- Generic - works with any data type
- Concurrent - configurable workers per node
- Non-blocking - async message passing via pubsub
- Context support - graceful shutdown with context cancellation
MIT