queue-wrap is an enterprise-grade, strictly typed, and highly reliable distributed job queue for Go. Built with Clean Architecture principles and leveraging Go 1.18+ Generics, it provides a robust foundation for distributed task processing, delayed execution, and background worker orchestration.
Out of the box, it's powered by Redis. Extensible transport layer allows seamless integration with NATS, RabbitMQ, Kafka, and other messaging systems.
- Pure Generics:
Job[T]type safety β zero type assertions, zero runtime panics - At-Least-Once Delivery: Reliable Queue pattern with automatic recovery from worker crashes
- Idempotency: Built-in deduplication with atomic Redis locking
- Scheduling: First-class support for delayed jobs, exponential backoff with jitter
- Observability: Middleware hooks for Prometheus, OpenTelemetry, structured logging
- Testing Ready: In-memory backend for fast, deterministic unit tests
- Transport Agnostic: Pluggable repository interface for Redis, NATS, RabbitMQ, Kafka
Ensure Go 1.18+:
go get github.com/CMPNION/queue-wrapPrerequisites:
- Redis 6.2+ (for default Redis backend with
BLMOVEsupport)
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/CMPNION/queue-wrap/pkg/queuewrap"
"github.com/redis/go-redis/v9"
)
type OrderPayload struct {
OrderID string `json:"order_id"`
UserEmail string `json:"user_email"`
}
func main() {
ctx := context.Background()
redisOpts := &redis.Options{
Addr: "localhost:6379",
}
// Create a queue
queue := queuewrap.NewQueue[OrderPayload]("order-processing", redisOpts)
// Enqueue a job
job, err := queue.Add(ctx, "process-payment", OrderPayload{
OrderID: "ORD-778899",
UserEmail: "customer@example.com",
}, queuewrap.WithExponentialBackoff(
1*time.Second, 30*time.Second, 2.0, true,
))
if err != nil {
log.Fatalf("Enqueue failed: %v", err)
}
fmt.Printf("Job enqueued: %s\n", job.ID)
// Define processor
processor := func(ctx context.Context, j *queuewrap.Job[OrderPayload]) error {
fmt.Printf("Processing order %s for %s\n", j.Data.OrderID, j.Data.UserEmail)
time.Sleep(500 * time.Millisecond)
return nil
}
// Start worker
worker := queuewrap.NewWorker[OrderPayload](
"order-processing",
redisOpts,
processor,
queuewrap.WithConcurrency[OrderPayload](10),
queuewrap.WithVisibilityTimeout[OrderPayload](5*time.Minute),
)
worker.Start(ctx)
fmt.Println("Worker started. Press Ctrl+C to exit.")
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
fmt.Println("Shutting down gracefully...")
worker.Stop()
}Every Queue and Worker is parameterized with your payload type [T any]. Serialization happens automaticallyβno manual type assertions needed.
queue := queuewrap.NewQueue[OrderPayload]("orders", redisOpts)
job, err := queue.Add(ctx, "process", OrderPayload{...})
// Type-safe: j.Data is OrderPayload, not map[string]interface{}Uses Redis BLMOVE (Reliable Queue pattern) + automatic Reaper to recover orphaned jobs:
- Jobs move from
waitβactive:<worker_id>atomically - Worker maintains heartbeat (TTL-based)
- If worker crashes, Reaper sweeps stale active lists and restores jobs
queuewrap.WithVisibilityTimeout[Payload](10 * time.Minute)Atomic Redis SETNX prevents duplicate processing for 24 hours:
job, err := queue.Add(ctx, "charge-card", payload,
queuewrap.WithIdempotencyKey("charge-req-uuid-1234"),
)
if err == queuewrap.ErrDuplicateJob {
// Handle duplicate safely
}Delayed jobs stored in Redis ZSET, promoted by Worker scheduler:
// Delay 15 minutes
queuewrap.WithDelay(15 * time.Minute)
// Fixed backoff on retry
queuewrap.WithBackoff(10 * time.Second)
// Exponential with jitter (prevents thundering herd)
queuewrap.WithExponentialBackoff(initial, max, factor, true)Middleware-like hooks for APM, metrics, error tracking:
hooks := queuewrap.WorkerHooks[MyPayload]{
BeforeProcess: func(ctx context.Context, job *queuewrap.Job[MyPayload]) {
metrics.IncActiveJobs(job.Name)
},
AfterProcess: func(ctx context.Context, job *queuewrap.Job[MyPayload], err error) {
if err != nil {
metrics.IncFailedJobs(job.Name)
} else {
metrics.IncCompletedJobs(job.Name)
}
},
OnRetry: func(ctx context.Context, job *queuewrap.Job[MyPayload], err error) {
logger.Warn("Retrying", "attempt", job.Attempts, "err", err)
},
}
worker := queuewrap.NewWorker[MyPayload](..., queuewrap.WithHooks[MyPayload](hooks))Fast, deterministic unit tests without Redis:
memRepo := queuewrap.NewMemoryQueueRepository[MyPayload]()
queue := queuewrap.NewQueueWithRepo("test-queue", memRepo)
worker := queuewrap.NewWorkerWithRepo("test-queue", memRepo, processor)
// Run tests...Clean Architecture ensures maintainability and extensibility:
internal/domain: Business rules, interfaces (QueueRepository[T]), entities (Job[T])internal/usecase: Application logic (Queue, Worker orchestration, Reaper, Heartbeat)internal/infrastructure: Data persistence (Redis & Memory implementations)pkg/queuewrap: Public Facade API (all consumers import this)
Extend with custom transports by implementing QueueRepository[T]:
type QueueRepository[T any] interface {
Enqueue(ctx context.Context, queueName string, job *Job[T]) error
Dequeue(ctx context.Context, queueName, workerID string) (*Job[T], error)
Update(ctx context.Context, queueName string, job *Job[T]) error
GetJob(ctx context.Context, queueName, jobID string) (*Job[T], error)
Acknowledge(ctx context.Context, queueName, workerID, jobID string) error
PromoteDelayed(ctx context.Context, queueName string) error
Heartbeat(ctx context.Context, queueName, workerID string, timeout time.Duration) error
ReclaimStalled(ctx context.Context, queueName string, timeout time.Duration) error
}Implement for NATS, RabbitMQ, Kafka and swap seamlessly:
// Example: NATS backend
repo, _ := nats.NewNATSRepository[T](conn, "queuewrap")
queue := queuewrap.NewQueueWithRepo("orders", repo)queuewrap.WithJobID("custom-id") // Custom job ID
queuewrap.WithMaxAttempts(5) // Retry limit
queuewrap.WithDelay(time.Minute) // Delayed execution
queuewrap.WithBackoff(time.Minute) // Fixed backoff
queuewrap.WithExponentialBackoff(i, m, f, true) // Exponential + jitter
queuewrap.WithIdempotencyKey("unique-key") // Prevent duplicatesqueuewrap.WithConcurrency[T](10) // Parallel jobs
queuewrap.WithVisibilityTimeout[T](5 * time.Minute) // Crash recovery window
queuewrap.WithHooks[T](hooks) // Observability| Aspect | Redis | NATS | RabbitMQ | Kafka |
|---|---|---|---|---|
| Latency | ~1ms | ~1ms | ~10ms | ~50ms |
| Throughput | 100K msg/s | 100K msg/s | 1M msg/s | 10M+ msg/s |
| Push Model | β (BLMOVE) | β (JetStream) | β (Native) | β (Pull) |
| Delayed Jobs | β (ZSET) | β | β | |
| Recovery | β (Reaper) | β | β (DLQ) | β |
| Setup Complexity | Low | Low | Medium | High |
| Best For | General purpose | Microservices | Enterprise | Real-time streams |
- Configure appropriate
VisibilityTimeoutbased on job duration - Set
MaxAttemptsto balance retry vs failure - Use exponential backoff with jitter for high-concurrency scenarios
- Attach observability hooks (metrics, logging, tracing)
- Monitor worker heartbeats and stalled job metrics
- Configure Redis persistence or replicate for durability
- Use idempotency keys for payment/billing operations
- Test graceful shutdown with
worker.Stop()
- Batch Processing: Use higher concurrency for I/O-bound jobs
- Job Serialization: Keep payloads small (< 1MB recommended)
- Backoff Strategy: Start low (1s) and scale exponentially to avoid thundering herd
- Memory Backend: Use for tests, not production
- Redis Persistence: Enable AOF or RDB snapshots for durability
Licensed under GPL-3.0. See LICENSE for details.
Contributions welcome! Please open issues and PRs.
Future: Native NATS, RabbitMQ, and Kafka transports coming soon.