Production-ready Pub/Sub library and standalone service for Go
Works both as a library for embedding in your application AND as a standalone microservice with REST API.
- π¨ Reliable Message Delivery - Guaranteed delivery with exponential backoff retry
- π Exponential Backoff - 30s β 1m β 2m β 4m β 8m β 16m β 30m (max)
- π Dead Letter Queue (DLQ) - Automatic handling of failed messages after 5 attempts
- π DLQ Statistics - Track failure reasons and resolution metrics
- π― Domain-Driven Design - Rich domain models with business logic
- ποΈ Repository Pattern - Clean data access abstraction
- π Pluggable - Bring your own Logger, Notification system
- βοΈ Options Pattern - Modern Go API (2025 best practices)
- ποΈ Clean Architecture - Services, Repositories, Models separation
- β Battle-Tested - Production-proven in FreiCON Railway Management System
- π¬ MySQL - Full support with Relica adapters
- π PostgreSQL - Full support with Relica adapters
- πͺΆ SQLite - Full support with Relica adapters
- β‘ Zero Dependencies - Relica query builder (no ORM bloat)
- π As Library - Embed in your Go application
- π³ As Service - Standalone PubSub server with REST API
- βΈοΈ Docker Ready - Production Dockerfile + docker-compose
- π Cloud Native - 12-factor app, ENV config, health checks
go get github.com/coregx/pubsub@latest# Using Docker (recommended)
cd cmd/pubsub-server
docker-compose up -d
# Or build from source
go build ./cmd/pubsub-server# Windows
cd cmd/pubsub-server
start.bat
# Linux/Mac
cd cmd/pubsub-server
docker-compose up -dAccess API at http://localhost:8080
See Server Documentation for API endpoints.
package main
import (
"context"
"database/sql"
"time"
"github.com/coregx/pubsub"
"github.com/coregx/pubsub/adapters/relica"
_ "github.com/go-sql-driver/mysql"
)
func main() {
// Connect to database
db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/pubsub?parseTime=true")
// Create repositories (production-ready Relica adapters!)
repos := relica.NewRepositories(db, "mysql")
// Create services
publisher, _ := pubsub.NewPublisher(
pubsub.WithPublisherRepositories(
repos.Message, repos.Queue, repos.Subscription, repos.Topic,
),
pubsub.WithPublisherLogger(logger),
)
// Publish message
result, _ := publisher.Publish(context.Background(), pubsub.PublishRequest{
TopicCode: "user.signup",
Identifier: "user-123",
Data: `{"userId": 123, "email": "user@example.com"}`,
})
// Create worker for background processing
worker, _ := pubsub.NewQueueWorker(
pubsub.WithRepositories(repos.Queue, repos.Message, repos.Subscription, repos.DLQ),
pubsub.WithDelivery(transmitterProvider, gateway),
pubsub.WithLogger(logger),
)
// Run worker (processes queue every 30 seconds)
worker.Run(context.Background(), 30*time.Second)
}import "github.com/coregx/pubsub/migrations"
// Apply all migrations
if err := migrations.ApplyAll(db); err != nil {
log.Fatal(err)
}# MySQL
mysql -u user -p database < migrations/mysql/001_core_tables.sql
mysql -u user -p database < migrations/mysql/002_retry_fields.sql
mysql -u user -p database < migrations/mysql/003_dead_letter_queue.sql
# PostgreSQL
psql -U user -d database -f migrations/postgres/001_core_tables.sql
...
# SQLite
sqlite3 pubsub.db < migrations/sqlite/001_core_tables.sql
...βββββββββββββββββββββββββββββββββββββββ
β Your Application β
β (or REST API for standalone) β
βββββββββββββββ¬ββββββββββββββββββββββββ
β
βββββββββββββββΌββββββββββββββββββββββββ
β Services Layer β
β - Publisher β
β - SubscriptionManager β
β - QueueWorker β
βββββββββββββββ¬ββββββββββββββββββββββββ
β
βββββββββββββββΌββββββββββββββββββββββββ
β Relica Adapters β
β (Production-Ready Implementations) β
β - Zero dependencies β
β - MySQL / PostgreSQL / SQLite β
βββββββββββββββ¬ββββββββββββββββββββββββ
β
βββββββββββββββΌββββββββββββββββββββββββ
β Database β
βββββββββββββββββββββββββββββββββββββββ
When running as standalone service, PubSub-Go exposes these endpoints:
POST /api/v1/publish
Content-Type: application/json
{
"topicCode": "user.signup",
"identifier": "optional-dedup-key",
"data": {
"userId": 123,
"email": "user@example.com"
}
}POST /api/v1/subscribe
{
"subscriberId": 1,
"topicCode": "user.signup",
"identifier": "webhook-receiver-1"
}GET /api/v1/subscriptions?subscriberId=1DELETE /api/v1/subscriptions/123GET /api/v1/healthSee API Documentation for full details.
// Options Pattern (2025 best practice)
worker, err := pubsub.NewQueueWorker(
pubsub.WithRepositories(queueRepo, msgRepo, subRepo, dlqRepo),
pubsub.WithDelivery(transmitterProvider, gateway),
pubsub.WithLogger(logger),
pubsub.WithBatchSize(100), // optional
pubsub.WithRetryStrategy(customStrategy), // optional
pubsub.WithNotifications(notifService), // optional
)# Server
SERVER_HOST=0.0.0.0
SERVER_PORT=8080
# Database
DB_DRIVER=mysql
DB_HOST=localhost
DB_PORT=3306
DB_USER=pubsub
DB_PASSWORD=your_password
DB_NAME=pubsub
DB_PREFIX=pubsub_
# Worker
PUBSUB_BATCH_SIZE=100
PUBSUB_WORKER_INTERVAL=30
PUBSUB_ENABLE_NOTIFICATIONS=trueSee .env.example for all options.
1. PUBLISH
Publisher β Topic β Create Message
β Find Active Subscriptions
β Create Queue Items (one per subscription)
2. WORKER (Background)
QueueWorker β Find Pending/Retryable Items (batch)
β Deliver to Subscribers (via webhooks/gateway)
β On Success: Mark as SENT
β On Failure: Retry with exponential backoff
β After 5 failures: Move to DLQ
3. DLQ (Dead Letter Queue)
Failed items β Manual review
β Resolve or Delete
Attempt 1: Immediate
Attempt 2: +30 seconds
Attempt 3: +1 minute
Attempt 4: +2 minutes
Attempt 5: +4 minutes
Attempt 6: +8 minutes (moves to DLQ after this)
# Run all tests
go test ./...
# With coverage
go test ./... -cover
# Model tests (95.9% coverage)
go test ./model/... -cover
# Integration tests (requires database)
go test ./adapters/relica/... -covercd cmd/pubsub-server
docker-compose up -d# Build image
docker build -t pubsub-server:0.1.0 -f cmd/pubsub-server/Dockerfile .
# Run with environment
docker run -d \
-p 8080:8080 \
-e DB_DRIVER=mysql \
-e DB_HOST=mysql \
-e DB_PASSWORD=secret \
pubsub-server:0.1.0- Basic Example - Simple QueueWorker setup with Relica
- Server Example - Full standalone service
- Core PubSub functionality
- Relica adapters (MySQL/PostgreSQL/SQLite)
- Publisher + SubscriptionManager services
- Standalone REST API server
- Docker support
- Health checks
- Delivery providers (HTTP webhooks, gRPC)
- Message encryption
- Rate limiting
- Metrics (Prometheus)
- Admin UI
- OpenAPI/Swagger docs
- Authentication/Authorization
- Multi-tenancy
- Message replay
- Full test coverage (>90%)
This is an alpha release. Contributions welcome!
- Fork the repository
- Create feature branch (
git checkout -b feature/amazing) - Commit changes (
git commit -m 'feat: add amazing feature') - Push to branch (
git push origin feature/amazing) - Open Pull Request
MIT License - see LICENSE file for details.
- Relica - Type-safe query builder (github.com/coregx/relica)
- FreiCON - Original production testing ground
- CoreGX Ecosystem - Part of CoreGX microservices suite
- π Issues: GitHub Issues
- π Documentation: Wiki
- π¬ Discussions: GitHub Discussions
This is a pre-release version (v0.1.0 development). The library is production-ready and battle-tested in FreiCON Railway Management System with 95.9% test coverage and zero linter issues. APIs may evolve before v1.0.0 LTS release.
π¦ Dependencies
This library uses Relica for type-safe database operations. All dependencies are properly published and available through Go modules.
Made with β€οΈ by CoreGX Team