Skip to content

deodatomatheus/synapse-middleware

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

synapse-middleware

A message-oriented middleware built from scratch in Go — pub/sub over a custom binary TCP protocol, with automatic reconnection, keep-alive heartbeats, and a pending-message retry queue.

Built as a course project for Distributed Platforms at CIn-UFPE (Master's level), this project implements a full middleware stack — from raw TCP framing up to a high-level publish/subscribe API — without relying on any existing message broker.


Why build a broker from scratch?

Most engineers reach for RabbitMQ, MQTT, or Kafka and never think about what's underneath. This project peels back those layers and implements everything by hand:

  • A custom binary protocol with length-prefixed framing and a JSON header
  • A broker that manages topic subscriptions and retries failed deliveries
  • CRH / SRH (Client / Server Request Handlers) that manage TCP connections, PING/PONG keep-alives, and transparent reconnection
  • A Client Proxy that exposes a clean Publish / Subscribe / Unsubscribe API to application code

Performance is benchmarked head-to-head against Mosquitto (a production MQTT broker) using 100,000-message test runs.


Flows

Message Flow — Publish to Deliver

sequenceDiagram
    participant P  as Producer
    participant CP as Client Proxy
    participant C1 as CRH (producer)
    participant S  as SRH (server)
    participant B  as Broker
    participant C2 as CRH (consumer)
    participant CS as Consumer

    P->>CP: Publish("topic", payload)
    CP->>C1: Invoke(PUBLISH, topic, payload)
    C1->>S: [4-byte len][JSON header][JSON packet]
    S->>B: Handle(clientID, PUBLISH)
    B->>S: SendTo(subscriberID, packet)
    S->>C2: [4-byte len][JSON header][JSON packet]
    C2->>CS: onMessage(topic, payload)
Loading

Connection Lifecycle

sequenceDiagram
    participant C as CRH (client)
    participant S as SRH (server)

    C->>S: dial TCP
    C->>S: CONNECT {clientID}
    loop every 5s
        C->>S: PING
        S->>C: PONG
    end
    C->>S: DATA {topic, payload}
    S->>C: DATA {topic, payload}
    note over C: connection lost
    C-->>S: reconnect (2s delay)
    C->>S: CONNECT {clientID}
Loading

Protocol

Two protocol layers sit between client and server, each with a clear responsibility.

Layer 1 — TCP Framing (CRH ↔ SRH)

Raw bytes on the wire follow a simple length-prefix framing:

┌──────────────────┬───────────────────────────┬────┬─────────────────┐
│  4 bytes (BE)    │  JSON protocol header     │ \n │  payload bytes  │
│  message length  │  {"type":…,"command":…}   │    │                 │
└──────────────────┴───────────────────────────┴────┴─────────────────┘

Wire example:

101{"type":"external","command":"DATA"}
{"hdr":{"message_type":0},"bd":{"topic":"my_computer/cpu","payload":72.3}}

Headers come in two types:

Type Commands Behaviour
internal CONNECT, PING, PONG Handled by CRH/SRH — never forwarded to application
external DATA Forwarded up to Requestor / Invoker

Layer 2 — Application Packets (Requestor ↔ Invoker)

Above the transport sits a JSON packet format:

{
  "hdr": { "message_type": 0 },
  "bd":  { "topic": "sensors/cpu", "payload": 72.3 }
}
message_type Operation
0 Publish — broadcast to all subscribers
1 Subscribe — register interest in a topic
2 Unsubscribe — remove subscription

Components

Component Responsibility
CRH Manages the outbound TCP connection. Spawns a read loop and heartbeat goroutine. On failure, reconnects automatically after a configurable delay.
SRH TCP server that accepts connections, registers client IDs via CONNECT handshake, and dispatches DATA messages to the Invoker.
Invoker Server-side bridge between SRH and Broker. Unmarshals raw bytes into typed Packet structs and routes them accordingly.
Broker Core pub/sub engine. Maintains topic → {clientID} maps with RWMutex safety. Failed sends go into a retry queue and are retried every second.
Requestor Client-side bridge between Client Proxy and CRH. Marshals Packet structs and dispatches received packets back to the proxy.
Client Proxy The public API. Stores per-topic callbacks and invokes them when a message arrives. Thread-safe via RWMutex.

Demo Application

The repo ships with a live example: a system monitor that streams CPU and memory metrics across the middleware.

┌──────────────────────┐                       ┌──────────────────────────┐
│    app_client_1      │   synapse-middleware  │    app_client_2          │
│    (producer)        │ ───────────────────►  │    (consumer)            │
│                      │                       │                          │
│  Publishes every 1s: │                       │  Displays in real-time:  │
│  · my_computer/cpu   │                       │  CPU: 14.2%              │
│  · my_computer/memory│                       │  Mem: 8.1 GB / 16 GB 50% │
└──────────────────────┘                       └──────────────────────────┘

Latency is measured from publish timestamp to receive timestamp (nanosecond precision).


Getting Started

Requirements: Go 1.21+

# 1. Start the middleware server
go run ./cmd/server/

# 2. Start the consumer (another terminal)
go run ./cmd/app_client_2/

# 3. Start the producer (another terminal)
go run ./cmd/app_client_1/

Using the API

import clientproxy "github.com/deodatomatheus/synapse-middleware/internal/client-proxy"

proxy := clientproxy.NewSimplePubSubProxy()

// Subscribe — register a callback for a topic
proxy.Subscribe("sensors/temperature", func(msg interface{}, topic string) {
    fmt.Printf("[%s] %.1f°C\n", topic, msg)
})

// Publish — broadcast to all subscribers
proxy.Publish("sensors/temperature", 36.6)

// Unsubscribe
proxy.Unsubscribe("sensors/temperature")

Benchmarking

The project includes a benchmark suite that fires 100,000 messages per run and records end-to-end latency for both this middleware and a stock Mosquitto MQTT broker.

# Run the full benchmark suite
./run_bench.sh

# Analyse results — outputs P50/P95/P99 and generates histograms
python3 analize.py

The analysis script reads all latency_*.csv output files and computes:

Metric Description
Average Mean end-to-end latency (ns)
P50 Median latency
P95 95th percentile — typical worst case
P99 99th percentile — tail latency
Min / Max Best and worst observed

Configuration

Setting Default Location
Broker host localhost internal/common/common.go
Broker port 5667 internal/common/common.go
CRH reconnect delay 2s internal/crh/client.go
CRH heartbeat period 5s internal/crh/client.go
Broker retry interval 1s internal/broker/broker.go
Max message size 50 MB internal/protocol/message.go

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors