Skip to content

Production-ready distributed message broker with Raft consensus, partition sharding, and horizontal scalability. Built in Go with features comparable to Apache Kafka.

Notifications You must be signed in to change notification settings

Technocrat-dev/Distributed_Broker

Repository files navigation

Distributed Message Broker

A production-ready, fault-tolerant distributed message broker built with Go

Go Version gRPC Raft Docker License

Features β€’ Architecture β€’ Quick Start β€’ Documentation β€’ Examples


πŸ“– Overview

A high-performance distributed message broker inspired by Apache Kafka, implementing partition sharding, Raft consensus, leader-follower replication, and horizontal scalability. Built from the ground up in Go with a focus on simplicity, reliability, and operational excellence.

Why This Broker?

  • πŸš€ Production-Ready: Built with fault tolerance, durability, and observability in mind
  • 🎯 Simple Architecture: Clear separation between metadata and data planes
  • πŸ“¦ Batteries Included: Docker, Kubernetes, Prometheus metrics, and client libraries
  • πŸ”§ Operator Friendly: Comprehensive health checks, structured logging, and admin tools
  • πŸ§ͺ Well Tested: Extensive integration tests including multi-node cluster scenarios

✨ Features

Core Messaging

  • βœ… Topics & Partitions β€” Horizontally scalable message streams with configurable partition counts
  • βœ… Producer API β€” Single and batch produce operations with key-based routing
  • βœ… Consumer API β€” Pull-based consumption with batch and streaming support
  • βœ… Consumer Groups β€” Automatic partition assignment with rebalancing and offset management
  • βœ… Offset Tracking β€” Durable offset commits per consumer group and partition

Distributed Systems

  • βœ… Raft Consensus β€” HashiCorp Raft for metadata replication and leader election
  • βœ… Partition Sharding β€” Distribute partitions across multiple broker nodes
  • βœ… Replication β€” Configurable replication factor with leader-follower protocol
  • βœ… ISR Management β€” In-Sync Replica tracking for data durability guarantees
  • βœ… Fault Tolerance β€” Automatic controller and partition leader failover

Advanced Features

  • βœ… Batch Operations β€” High-throughput batch produce and consume
  • βœ… Compression β€” Gzip, Snappy, and LZ4 compression codecs
  • βœ… Idempotent Producer β€” Sequence-based duplicate detection for at-least-once delivery
  • βœ… Dead Letter Queue β€” Automatic retry and failed message handling
  • βœ… Log Compaction β€” Key-based compaction for state/changelog topics
  • βœ… Retention Policies β€” Time-based and size-based log retention
  • βœ… Write-Ahead Log β€” Segmented commit logs with fsync durability

Operations & Observability

  • βœ… Prometheus Metrics β€” Comprehensive metrics for monitoring and alerting
  • βœ… Health Checks β€” HTTP endpoints for liveness and readiness probes
  • βœ… Structured Logging β€” JSON and text logging with configurable levels
  • βœ… gRPC API β€” High-performance Protocol Buffers-based communication
  • βœ… Client Library β€” Idiomatic Go client with full feature support

Deployment

  • βœ… Docker Support β€” Multi-stage Dockerfile with security best practices
  • βœ… Docker Compose β€” 3-node cluster with Prometheus and Grafana
  • βœ… Kubernetes β€” StatefulSet deployments with persistent volumes
  • βœ… Configuration β€” YAML config files with environment variable overrides

Performance

  • βœ… High Throughput β€” 100K+ msg/sec with batching and async writes
  • βœ… Compression β€” 60%+ storage reduction on text/JSON data with Gzip/Snappy/LZ4
  • βœ… Horizontal Scaling β€” Tested with 1000+ partitions across multi-node clusters
  • βœ… Configurable Durability β€” Trade-off between sync writes (durable) and async writes (fast)

πŸ—οΈ Architecture

High-Level Overview

The broker follows a clean metadata-data plane separation architecture:

graph TB
    subgraph "Client Applications"
        Producer[Producers]
        Consumer[Consumers]
    end
    
    subgraph "Broker Cluster"
        subgraph "Control Plane - Raft Consensus"
            Controller1[Controller Node 1<br/>Leader]
            Controller2[Controller Node 2]
            Controller3[Controller Node 3]
            
            Controller1 -.->|Raft| Controller2
            Controller2 -.->|Raft| Controller3
            Controller3 -.->|Raft| Controller1
        end
        
        subgraph "Data Plane - Message Processing"
            Broker1[Broker 1<br/>Partitions: P0, P3]
            Broker2[Broker 2<br/>Partitions: P1, P4]
            Broker3[Broker 3<br/>Partitions: P2, P5]
            
            Broker1 -->|Replicate| Broker2
            Broker2 -->|Replicate| Broker3
            Broker3 -->|Replicate| Broker1
        end
    end
    
    subgraph "Monitoring Stack"
        Prometheus[Prometheus]
        Grafana[Grafana]
    end
    
    Producer -->|Produce gRPC| Broker1
    Producer -->|Produce gRPC| Broker2
    Consumer -->|Consume gRPC| Broker2
    Consumer -->|Consume gRPC| Broker3
    
    Broker1 -.->|Metrics| Prometheus
    Broker2 -.->|Metrics| Prometheus
    Broker3 -.->|Metrics| Prometheus
    Prometheus --> Grafana
    
    style Controller1 fill:#ff6b6b
    style Broker1 fill:#4ecdc4
    style Broker2 fill:#4ecdc4
    style Broker3 fill:#4ecdc4
Loading

Component Architecture

graph TB
    subgraph "Broker Process"
        subgraph "Server Layer"
            gRPCServer[gRPC Server<br/>Client API]
            MetricsServer[HTTP Server<br/>Metrics & Health]
        end
        
        subgraph "Broker Layer"
            DistBroker[Distributed Broker<br/>Request Router]
            PartMgr[Partition Manager<br/>Local Partitions]
            ConsumerMgr[Consumer Group Manager<br/>Assignments & Offsets]
            TopicMgr[Topic Manager<br/>Topic Metadata]
        end
        
        subgraph "Controller Layer"
            RaftController[Raft Controller<br/>Cluster Metadata]
            MetadataFSM[Metadata FSM<br/>State Machine]
            MetadataStore[Metadata Store<br/>Topics, Brokers, ISR]
        end
        
        subgraph "Storage Layer"
            CommitLog[Commit Log<br/>Segmented WAL]
            Segments[Segments<br/>Data + Index]
            Compactor[Log Compactor<br/>Key-based Cleanup]
        end
        
        subgraph "Middleware"
            Compression[Compression<br/>Gzip/Snappy/LZ4]
            Idempotency[Idempotency Manager<br/>Producer Sessions]
            DLQ[Dead Letter Queue<br/>Failed Messages]
        end
        
        subgraph "Replication"
            ReplService[Replication Service<br/>Leader-Follower Sync]
            ISRTracker[ISR Tracker<br/>Replica State]
        end
    end
    
    gRPCServer --> DistBroker
    MetricsServer --> DistBroker
    
    DistBroker --> PartMgr
    DistBroker --> ConsumerMgr
    DistBroker --> TopicMgr
    DistBroker --> RaftController
    
    PartMgr --> CommitLog
    PartMgr --> ReplService
    
    RaftController --> MetadataFSM
    MetadataFSM --> MetadataStore
    
    CommitLog --> Segments
    CommitLog --> Compactor
    
    DistBroker --> Compression
    DistBroker --> Idempotency
    DistBroker --> DLQ
    
    ReplService --> ISRTracker
    
    style gRPCServer fill:#667eea
    style RaftController fill:#ff6b6b
    style PartMgr fill:#4ecdc4
    style CommitLog fill:#ffd93d
Loading

Data Flow: Produce Request

sequenceDiagram
    participant P as Producer
    participant B as Broker (Leader)
    participant PM as Partition Manager
    participant L as Commit Log
    participant R as Replication Service
    participant F as Follower Brokers
    participant C as Controller
    
    P->>B: Produce(topic, key, value)
    B->>PM: Route to partition (hash key)
    PM->>PM: Check if leader for partition
    PM->>L: Append to local log
    L->>L: Write to segment + fsync
    L-->>PM: Return offset
    
    PM->>R: Trigger replication
    R->>F: Push records to followers
    F->>F: Append to follower logs
    F-->>R: ACK replication
    
    R->>PM: Update high watermark
    PM->>C: Report ISR status (if changed)
    
    PM-->>B: Return offset
    B-->>P: ProduceResponse(offset, partition)
Loading

Data Flow: Consume Request

sequenceDiagram
    participant C as Consumer
    participant B as Broker
    participant CG as Consumer Group Mgr
    participant PM as Partition Manager
    participant L as Commit Log
    participant Ctrl as Controller
    
    C->>B: JoinConsumerGroup(group, consumer, topic)
    B->>Ctrl: Request partition assignment
    Ctrl->>Ctrl: Rebalance partitions
    Ctrl-->>B: Assigned partitions
    B->>CG: Store assignment
    CG-->>C: Assignment(partitions, generation)
    
    loop For each partition
        C->>B: Consume(topic, partition, offset)
        B->>PM: Get partition
        PM->>L: Read from offset
        L->>L: Locate segment + read
        L-->>PM: Records
        PM-->>B: Records
        B-->>C: ConsumeResponse(records)
        
        C->>B: CommitOffset(group, partition, offset)
        B->>CG: Store committed offset
        CG-->>C: ACK
    end
Loading

Partition Replication

graph LR
    subgraph "Topic: orders (3 partitions, RF=3)"
        subgraph "Partition 0"
            P0L[Leader: Broker 1<br/>Offset: 1000]
            P0F1[Follower: Broker 2<br/>Offset: 999]
            P0F2[Follower: Broker 3<br/>Offset: 998]
            
            P0L -->|Replicate| P0F1
            P0L -->|Replicate| P0F2
        end
        
        subgraph "Partition 1"
            P1L[Leader: Broker 2<br/>Offset: 2000]
            P1F1[Follower: Broker 1<br/>Offset: 2000]
            P1F2[Follower: Broker 3<br/>Offset: 1999]
            
            P1L -->|Replicate| P1F1
            P1L -->|Replicate| P1F2
        end
        
        subgraph "Partition 2"
            P2L[Leader: Broker 3<br/>Offset: 500]
            P2F1[Follower: Broker 1<br/>Offset: 500]
            P2F2[Follower: Broker 2<br/>Offset: 500]
            
            P2L -->|Replicate| P2F1
            P2L -->|Replicate| P2F2
        end
    end
    
    style P0L fill:#ff6b6b
    style P1L fill:#ff6b6b
    style P2L fill:#ff6b6b
    style P0F1 fill:#4ecdc4
    style P0F2 fill:#ffd93d
    style P1F1 fill:#4ecdc4
    style P1F2 fill:#ffd93d
    style P2F1 fill:#4ecdc4
    style P2F2 fill:#4ecdc4
Loading

πŸš€ Quick Start

Prerequisites

  • Go 1.21 or higher
  • Docker (optional, for containerized deployment)
  • Kubernetes (optional, for K8s deployment)

Single Node Setup

  1. Clone the repository
git clone https://github.com/yourusername/distributed-message-broker.git
cd distributed-message-broker
  1. Build the broker
go build -o broker .
  1. Start the broker
./broker -id node-1
  1. Verify it's running
# Health check
curl http://localhost:9090/health
# Expected: OK

# Check leader status
curl http://localhost:9090/ready
# Expected: LEADER
  1. Run the demo client
go run ./examples/demo

Expected Output:

πŸš€ Broker Demo
==================================================

1️⃣  Creating topic 'demo-topic'...

2️⃣  Producing single messages...
   βœ… Produced message 0 at offset 0
   βœ… Produced message 1 at offset 1
   βœ… Produced message 2 at offset 2

3️⃣  Consuming messages...
   πŸ“– Offset 0: Hello, World!
   πŸ“– Offset 1: {"user": "alice", "action": "login"}
   πŸ“– Offset 2: {"user": "bob", "action": "purchase", "amount": 99.99}

βœ… Demo complete!

3-Node Cluster with Docker Compose

  1. Start the cluster
docker-compose up -d
  1. Verify cluster formation
# Check broker logs
docker-compose logs broker-1 | grep "Distributed broker ready"

# Check cluster status
curl http://localhost:9090/ready
  1. Start with monitoring (Prometheus + Grafana)
docker-compose --profile monitoring up -d

# Access Grafana
open http://localhost:3000
# Username: admin, Password: admin
  1. Test the cluster
# Run demo against the cluster
go run ./examples/demo

# Kill the leader to test failover
docker-compose stop broker-1

# Cluster automatically elects new leader
# Demo still works!
go run ./examples/demo
  1. Clean up
docker-compose down -v

Kubernetes Deployment

  1. Deploy the StatefulSet
kubectl apply -f deploy/kubernetes/broker.yaml
  1. Verify deployment
kubectl get pods -l app=broker
kubectl logs broker-0 -f
  1. Connect to the cluster
# Port forward to access locally
kubectl port-forward broker-0 8080:8080

# Run demo
go run ./examples/demo

🎯 Examples

Basic Producer

package main

import (
    "context"
    "log"
    "my-broker/pkg/client"
)

func main() {
    // Connect to broker
    c, err := client.New("localhost:8080")
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    ctx := context.Background()

    // Create topic
    if err := c.CreateTopic(ctx, "events", 3); err != nil {
        log.Fatal(err)
    }

    // Produce message
    offset, err := c.Produce(ctx, "events", []byte("Hello, Broker!"))
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Produced at offset: %d", offset)
}

Basic Consumer

package main

import (
    "context"
    "log"
    "my-broker/pkg/client"
)

func main() {
    c, err := client.New("localhost:8080")
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    ctx := context.Background()

    // Consume from offset 0
    record, err := c.Consume(ctx, "events", 0)
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Consumed: %s", string(record))
}

Consumer Group

package main

import (
    "context"
    "log"
    "time"
    "my-broker/pkg/client"
)

func main() {
    c, err := client.New("localhost:8080")
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    ctx := context.Background()

    // Join consumer group
    partitions, genID, err := c.JoinConsumerGroup(
        ctx, 
        "my-group",     // Group ID
        "consumer-1",   // Consumer ID
        "events",       // Topic
    )
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Assigned partitions: %v (generation: %d)", partitions, genID)

    // Consume from assigned partitions
    for _, partition := range partitions {
        // Fetch committed offset
        offset, err := c.FetchOffset(ctx, "my-group", "consumer-1", "events", int(partition))
        if err != nil {
            offset = 0 // Start from beginning
        }

        // Consume records
        records, nextOffset, err := c.ConsumeBatch(ctx, "events", offset, 100)
        if err != nil {
            log.Fatal(err)
        }

        log.Printf("Consumed %d records from partition %d", len(records), partition)

        // Commit offset
        err = c.CommitOffset(ctx, "my-group", "consumer-1", "events", int(partition), nextOffset)
        if err != nil {
            log.Fatal(err)
        }
    }

    // Send heartbeat to maintain group membership
    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        rebalance, err := c.Heartbeat(ctx, "my-group", "consumer-1", genID)
        if err != nil {
            log.Fatal(err)
        }
        if rebalance {
            log.Println("Rebalance needed, rejoining group...")
            break
        }
    }
}

Batch Operations

// Batch produce
records := [][]byte{
    []byte("message 1"),
    []byte("message 2"),
    []byte("message 3"),
}

baseOffset, count, err := c.ProduceBatch(ctx, "events", records)
if err != nil {
    log.Fatal(err)
}

log.Printf("Produced %d records starting at offset %d", count, baseOffset)

// Batch consume
records, nextOffset, err := c.ConsumeBatch(ctx, "events", baseOffset, 10)
if err != nil {
    log.Fatal(err)
}

log.Printf("Consumed %d records, next offset: %d", len(records), nextOffset)

πŸ“¦ Project Structure

.
β”œβ”€β”€ api/                    # Protocol definitions
β”‚   β”œβ”€β”€ v1/                 # Client API (gRPC/Protobuf)
β”‚   β”‚   β”œβ”€β”€ log.proto       # Producer, consumer, topic APIs
β”‚   β”‚   β”œβ”€β”€ log.pb.go       # Generated code
β”‚   β”‚   └── log_grpc.pb.go  # Generated gRPC stubs
β”‚   └── replication/        # Replication protocol
β”‚       β”œβ”€β”€ replication.proto
β”‚       └── *.pb.go
β”‚
β”œβ”€β”€ broker/                 # Core broker implementation
β”‚   β”œβ”€β”€ distributed_broker.go    # Main distributed broker
β”‚   β”œβ”€β”€ partition_manager.go     # Partition management
β”‚   β”œβ”€β”€ consumer.go              # Consumer group logic
β”‚   β”œβ”€β”€ replication.go           # Replication service
β”‚   β”œβ”€β”€ topic.go                 # Topic management
β”‚   β”œβ”€β”€ idempotency.go           # Exactly-once semantics
β”‚   β”œβ”€β”€ dlq.go                   # Dead letter queue
β”‚   └── retention.go             # Log retention policies
β”‚
β”œβ”€β”€ controller/             # Raft-based metadata management
β”‚   β”œβ”€β”€ controller.go       # Controller implementation
β”‚   β”œβ”€β”€ fsm.go              # Raft finite state machine
β”‚   β”œβ”€β”€ types.go            # Metadata types
β”‚   └── commands.go         # Raft command processing
β”‚
β”œβ”€β”€ storage/                # Persistent storage layer
β”‚   β”œβ”€β”€ log.go              # Commit log implementation
β”‚   β”œβ”€β”€ segment.go          # Log segment management
β”‚   β”œβ”€β”€ partition.go        # Partition storage
β”‚   β”œβ”€β”€ compaction.go       # Log compaction
β”‚   └── record.go           # Record serialization
β”‚
β”œβ”€β”€ server/                 # gRPC server layer
β”‚   β”œβ”€β”€ distributed_server.go    # Server implementation
β”‚   └── config.go                # Server configuration
β”‚
β”œβ”€β”€ middleware/             # Request processing middleware
β”‚   β”œβ”€β”€ compression.go      # Gzip, Snappy, LZ4
β”‚   β”œβ”€β”€ ratelimit.go        # Rate limiting
β”‚   └── interceptor.go      # gRPC interceptors
β”‚
β”œβ”€β”€ metrics/                # Observability
β”‚   └── metrics.go          # Prometheus metrics
β”‚
β”œβ”€β”€ logger/                 # Structured logging
β”‚   └── logger.go
β”‚
β”œβ”€β”€ config/                 # Configuration management
β”‚   └── config.go
β”‚
β”œβ”€β”€ pkg/client/             # Client library
β”‚   └── client.go           # Go client implementation
β”‚
β”œβ”€β”€ cmd/                    # Command-line tools
β”‚   β”œβ”€β”€ broker-admin/       # Admin CLI
β”‚   └── distributed/        # Distributed broker binary
β”‚
β”œβ”€β”€ tests/                  # Test suites
β”‚   β”œβ”€β”€ integration/        # Multi-node integration tests
β”‚   └── stress/             # Load and stress tests
β”‚
β”œβ”€β”€ examples/               # Example applications
β”‚   └── demo/               # Demo producer/consumer
β”‚
β”œβ”€β”€ deploy/                 # Deployment configurations
β”‚   β”œβ”€β”€ kubernetes/         # K8s manifests
β”‚   β”œβ”€β”€ docker-compose.yml  # Docker Compose
β”‚   β”œβ”€β”€ prometheus/         # Prometheus config
β”‚   └── grafana/            # Grafana dashboards
β”‚
β”œβ”€β”€ docs/                   # Documentation
β”‚   └── OPERATIONS.md
β”‚
β”œβ”€β”€ Dockerfile              # Docker image
β”œβ”€β”€ docker-compose.yml      # Multi-node cluster
β”œβ”€β”€ config.yaml.example     # Configuration template
β”œβ”€β”€ main.go                 # Main entry point
β”œβ”€β”€ README.md               # This file
β”œβ”€β”€ QUICKSTART.md           # Quick start guide
└── TESTING.md              # Testing guide

βš™οΈ Configuration

Configuration File

Copy config.yaml.example to config.yaml:

# Node Configuration
node:
  id: "broker-1"                # Unique node identifier
  data_dir: "./data"            # Data storage directory

# Network Configuration
network:
  grpc_port: 8080               # Client API port
  raft_port: 9080               # Raft consensus port
  data_port: 9180               # Inter-broker replication port
  metrics_port: 9090            # Prometheus metrics port
  bind_addr: "0.0.0.0"          # Bind address (0.0.0.0 = all interfaces)
  advertise_addr: ""            # Advertise address (empty = localhost)

# Cluster Configuration
cluster:
  bootstrap: false              # Is this the first node?
  join_addr: ""                 # Join existing cluster (e.g., "broker-1:8080")
  controller: true              # Run as controller node

# Storage Configuration
storage:
  max_segment_bytes: 16777216   # 16 MB per segment
  sync_writes: true             # Fsync after writes

# Retention Configuration
retention:
  default_max_age: "168h"       # 7 days
  default_max_bytes: 1073741824 # 1 GB
  cleanup_interval: "5m"

# Replication Configuration
replication:
  default_replication_factor: 3
  fetch_batch_size: 100
  fetch_interval: "100ms"

# Consumer Group Configuration
consumer_groups:
  session_timeout: "30s"
  heartbeat_interval: "3s"
  rebalance_timeout: "60s"

# Logging Configuration
logging:
  level: "info"                 # debug, info, warn, error
  format: "json"                # json, text

# Compression Configuration
compression:
  default: "none"               # none, gzip, snappy, lz4
  producer_compression: true

Environment Variables

All configuration options can be overridden via environment variables with the BROKER_ prefix:

export BROKER_NODE_ID=broker-1
export BROKER_GRPC_PORT=8080
export BROKER_LOG_LEVEL=debug
export BROKER_BOOTSTRAP=true

CLI Flags

./broker \
  -id node-1 \
  -grpc-port 8080 \
  -raft-port 9080 \
  -data-port 9180 \
  -metrics-port 9090 \
  -data-dir ./data \
  -join broker-1:8080 \
  -controller true \
  -log-level info

Flag Reference:

Flag Type Default Description
-id string required Unique node identifier
-grpc-port int 8080 Client gRPC API port
-raft-port int 9080 Raft controller port
-data-port int 9180 Inter-broker replication port
-metrics-port int 9090 Prometheus metrics port
-data-dir string ./data/<node-id> Data directory path
-join string - Address of node to join (e.g., localhost:8080)
-bind-addr string 0.0.0.0 Bind address for all services
-advertise-addr string localhost Address to advertise to cluster
-controller bool true Run as controller node
-log-level string info Log level (debug, info, warn, error)

πŸ§ͺ Testing

Unit Tests

# Run all unit tests
go test ./... -v

# Run specific package tests
go test ./storage/... -v
go test ./middleware/... -v
go test ./broker/... -v

# Run with coverage
go test ./... -cover -coverprofile=coverage.out
go tool cover -html=coverage.out

Integration Tests

# Run all integration tests
go test ./tests/integration/... -v -timeout 180s

# Run specific test
go test ./tests/integration/... -v -run TestDistributedBrokerClusterFormation

# Run multi-node tests
go test ./tests/integration/... -v -run TestMultiNode

Manual Testing

Test 3-Node Cluster Locally

# Terminal 1: Start bootstrap node
go run . -id node-1 -grpc-port 8080 -raft-port 9080 -data-port 9180

# Terminal 2: Join cluster
go run . -id node-2 -grpc-port 8081 -raft-port 9081 -data-port 9181 -join localhost:8080

# Terminal 3: Join cluster
go run . -id node-3 -grpc-port 8082 -raft-port 9082 -data-port 9182 -join localhost:8080

# Terminal 4: Run demo
go run ./examples/demo

Test Controller Failover

# 1. Start 3-node cluster (see above)
# 2. Run demo to create topic
go run ./examples/demo

# 3. Kill leader (Ctrl+C in leader's terminal)
# 4. Wait ~5 seconds for new leader election
# 5. Run demo again - should still work!
go run ./examples/demo

Load Testing

# Stress test
go test ./tests/stress/... -v -timeout 10m

# Benchmark storage layer
go test -bench=. ./storage/...

# Benchmark compression
go test -bench=. ./middleware/...

πŸ“Š Monitoring

Prometheus Metrics

Available at http://localhost:9090/metrics:

Broker Metrics:

  • broker_messages_produced_total - Total messages produced
  • broker_messages_consumed_total - Total messages consumed
  • broker_bytes_in_total - Total bytes written
  • broker_bytes_out_total - Total bytes read
  • broker_active_partitions - Number of active partitions
  • broker_under_replicated_partitions - Partitions below replication factor

Storage Metrics:

  • storage_segment_count - Number of log segments
  • storage_total_bytes - Total storage used
  • storage_compaction_runs_total - Log compaction runs

Controller Metrics:

  • controller_is_leader - Whether this node is controller leader (0 or 1)
  • controller_metadata_updates_total - Metadata update operations
  • controller_cluster_size - Number of brokers in cluster

Consumer Group Metrics:

  • consumer_group_members - Active consumers per group
  • consumer_group_lag - Consumer lag by partition

Health Checks

Health Endpoint

curl http://localhost:9090/health
# Returns: OK (200) if broker is running

Readiness Endpoint

curl http://localhost:9090/ready
# Returns: 
#   LEADER (200) - This node is controller leader
#   FOLLOWER (200) - This node is controller follower
#   NOT_READY (503) - Node is not ready to serve traffic

Grafana Dashboards

Pre-built dashboards available in deploy/grafana/dashboards/:

  • Broker Overview - Throughput, latency, partition health
  • Storage Metrics - Disk usage, segment statistics, compaction
  • Consumer Groups - Group membership, lag, rebalancing
  • Cluster Health - Node status, replication status, ISR

πŸ”§ Operations

Cluster Management

Adding a Node

# On new node
./broker -id node-4 -grpc-port 8083 -join existing-node:8080

# Partitions will be automatically rebalanced

Removing a Node

# Gracefully shutdown
# Send SIGTERM (Ctrl+C)
# Node will deregister and partitions will be reassigned

Checking Cluster Status

# Using client library
go run - <<'EOF'
package main
import (
    "context"
    "fmt"
    "my-broker/pkg/client"
)
func main() {
    c, _ := client.New("localhost:8080")
    defer c.Close()
    servers, _ := c.GetServers(context.Background())
    for _, s := range servers {
        leader := ""
        if s.IsLeader {
            leader = " [LEADER]"
        }
        fmt.Printf("%s: %s%s\n", s.ID, s.Address, leader)
    }
}
EOF

Topic Management

# Using admin tool (if implemented)
./broker-admin create-topic --name orders --partitions 3 --replication-factor 3
./broker-admin list-topics
./broker-admin delete-topic --name orders

Backup and Recovery

Backup Data

# Stop broker
./broker stop

# Backup data directory
tar -czf backup-$(date +%Y%m%d).tar.gz data/

# Backup Raft state
tar -czf raft-backup-$(date +%Y%m%d).tar.gz data/node-*/raft/

Restore Data

# Extract backup
tar -xzf backup-20260131.tar.gz

# Start broker
./broker -id node-1

Log Management

Enable Debug Logging

# Via flag
./broker -id node-1 -log-level debug

# Via environment
export BROKER_LOG_LEVEL=debug
./broker -id node-1

Structured Log Output

{
  "level": "info",
  "time": "2026-01-31T11:30:00Z",
  "node": "node-1",
  "msg": "Produced message",
  "topic": "orders",
  "partition": 2,
  "offset": 12345
}

πŸ”’ Security Considerations

⚠️ Note: Security features (TLS, authentication, authorization) are planned but not yet implemented.

Planned Security Features

# Future configuration
tls:
  enabled: true
  cert_file: "/path/to/cert.pem"
  key_file: "/path/to/key.pem"
  ca_file: "/path/to/ca.pem"
  client_auth: true  # mTLS

auth:
  enabled: true
  type: "api_key"  # or "sasl"
  api_keys_file: "/path/to/keys.yaml"

acls:
  enabled: true
  default_deny: true

Current Security Recommendations

For production deployment:

  1. Network Security

    • Deploy behind VPN or private network
    • Use firewall rules to restrict access
    • Enable network encryption at infrastructure level
  2. Access Control

    • Restrict metrics port (9090) to monitoring systems only
    • Use Kubernetes NetworkPolicies or security groups
  3. Data Security

    • Encrypt data at rest using filesystem encryption
    • Secure backup storage with encryption

πŸ“ˆ Performance Tuning

Producer Optimization

// Use batch operations for high throughput
records := make([][]byte, 1000)
for i := range records {
    records[i] = generateMessage()
}
c.ProduceBatch(ctx, "events", records)

// Enable compression
// (Configure server-side or via client)

Consumer Optimization

// Batch consume for efficiency
records, nextOffset, _ := c.ConsumeBatch(ctx, "events", offset, 500)

// Process in parallel
for _, record := range records {
    go processRecord(record)
}

Storage Tuning

storage:
  max_segment_bytes: 268435456   # 256 MB for fewer segments
  sync_writes: false              # Disable for higher throughput (less durable)

Replication Tuning

replication:
  fetch_batch_size: 1000          # Larger batches
  fetch_interval: "50ms"          # More frequent fetches

🀝 Contributing

Contributions are welcome! Please feel free to submit issues and pull requests.

Development Setup

# Clone repository
git clone https://github.com/yourusername/distributed-message-broker.git
cd distributed-message-broker

# Install dependencies
go mod download

# Run tests
go test ./... -v

# Build
go build -o broker .

Code Style

  • Follow standard Go conventions
  • Use gofmt for formatting
  • Add tests for new features
  • Update documentation

πŸ“š Documentation


πŸ“œ License

This project is licensed under the MIT License.

MIT License

Copyright (c) 2024

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

πŸ™ Acknowledgments


πŸ“ž Support


Built with Go

⭐ Star this repository if you find it useful!

Report Bug Β· Request Feature Β· Documentation

About

Production-ready distributed message broker with Raft consensus, partition sharding, and horizontal scalability. Built in Go with features comparable to Apache Kafka.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published