MQ 是一个使用 Go 实现的消息队列,提供基于 gRPC 的服务接口,并包含一个 Go 客户端 SDK。
项目支持两种运行方式:
- 单机模式:所有
Topic、Subscription和消息状态维护在单个进程内,可选持久化到本地磁盘。 - 集群模式:按
topic做一致性哈希分片,每个shard使用hashicorp/raft复制状态。
当前实现适合本地开发、功能验证和分布式机制实验。集群能力已具备基本骨架,但仍处于偏实验性质的阶段。
Topic / Subscription模型- 推模式消费和拉模式消费
ACK、可见性超时、超时重投、续期- 延时消息
- 按消息 ID 查询消息副本
- 消息持久化、重启恢复、TTL 清理
- 基于消费速率和内存预算的简单流控
- 一致性哈希分片
- 基于
hashicorp/raft和 BoltDB 的 shard 内复制
- 生产者向
Topic发布消息。 - 每个
Subscription拥有独立的消费进度和确认状态。 - 同一
Topic下的多个Subscription为扇出关系。 - 同一
Subscription下的多个消费者为竞争消费关系。
单机模式适合本地开发、功能验证和阅读代码:
- 所有元数据和消费状态都保存在一个进程内
- 可选择仅内存运行,或将消息副本持久化到磁盘
- 启动和调试成本最低
集群模式的基本流程如下:
topic先通过一致性哈希映射到目标shard- 每个
shard对应一个独立的 Raft 组 - 写请求在目标
shard的 leader 上提交并复制 - 非 leader 节点会将请求转发到 leader
当前集群实现主要覆盖分片、复制和 leader 转发等核心路径。成员变更、监控、鉴权和运维能力仍需进一步完善。
- Go
1.26+ protocprotoc-gen-goprotoc-gen-go-grpc
其中 protobuf 相关工具仅在修改 proto 文件时需要。
go run ./cmd/server/默认监听地址为 0.0.0.0:9090。
也可以通过命令行参数或环境变量覆盖:
go run ./cmd/server/ --host 127.0.0.1 --port 8080
MQ_HOST=127.0.0.1 MQ_PORT=8080 go run ./cmd/server/启动服务后,在另一个终端执行:
go run ./cmd/example/示例程序覆盖以下能力:
- 推模式消费
- 拉模式消费
- 延时消息
- 可见性超时续期
- 按消息 ID 查询
| 参数 | 环境变量 | 默认值 | 说明 |
|---|---|---|---|
--host |
MQ_HOST |
0.0.0.0 |
gRPC 监听地址 |
--port |
MQ_PORT |
9090 |
gRPC 监听端口 |
--data-dir |
MQ_DATA_DIR |
data |
消息持久化目录;为空时仅使用内存 |
--retention-days |
MQ_RETENTION_DAYS |
3 |
消息保留天数 |
--max-store-bytes |
MQ_MAX_STORE_BYTES |
0 |
消息存储上限,0 表示不限 |
--max-memory-pct |
MQ_MAX_MEMORY_PCT |
50 |
内存队列可使用的系统内存百分比 |
--max-memory-bytes |
MQ_MAX_MEMORY_BYTES |
0 |
显式指定内存上限,优先级高于百分比配置 |
当 --node-id 非空时,服务以集群模式启动。
| 参数 | 环境变量 | 默认值 | 说明 |
|---|---|---|---|
--node-id |
MQ_NODE_ID |
空 | 节点唯一标识;非空即启用集群模式 |
--raft-addr |
MQ_RAFT_ADDR |
:7000 |
Raft 传输地址 |
--raft-dir |
MQ_RAFT_DIR |
raft-data |
Raft 数据目录 |
--shard-count |
MQ_SHARD_COUNT |
8 |
逻辑分片数量 |
--bootstrap |
MQ_BOOTSTRAP |
false |
是否引导新集群 |
--join |
无 | 空 | 其他节点信息,格式为 nodeID@raftAddr@grpcAddr,多个节点以逗号分隔 |
节点 1:
go run ./cmd/server/ \
--host 127.0.0.1 \
--port 9090 \
--node-id node-1 \
--raft-addr 127.0.0.1:7000 \
--raft-dir ./data/node-1 \
--shard-count 4 \
--bootstrap \
--join "node-2@127.0.0.1:7100@127.0.0.1:9091,node-3@127.0.0.1:7200@127.0.0.1:9092"节点 2:
go run ./cmd/server/ \
--host 127.0.0.1 \
--port 9091 \
--node-id node-2 \
--raft-addr 127.0.0.1:7100 \
--raft-dir ./data/node-2 \
--shard-count 4节点 3:
go run ./cmd/server/ \
--host 127.0.0.1 \
--port 9092 \
--node-id node-3 \
--raft-addr 127.0.0.1:7200 \
--raft-dir ./data/node-3 \
--shard-count 4客户端 SDK 位于 pkg/client,常用入口为 Client、Producer 和 Consumer。
import mqclient "github.com/Rehtt/mq2/pkg/client"
c, err := mqclient.New("localhost:9090")
if err != nil {
return err
}
defer c.Close()
if _, err := c.CreateTopic(ctx, "orders"); err != nil {
return err
}
if _, err := c.CreateSubscription(ctx, "order-processor", "orders", 30*time.Second); err != nil {
return err
}
topics, err := c.ListTopics(ctx)producer := c.NewProducer("orders")
id, err := producer.Send(
ctx,
[]byte(`{"id":1}`),
map[string]string{"type": "new"},
)
ids, err := producer.SendBatch(ctx,
&mqclient.OutgoingMessage{Payload: []byte("msg-1")},
&mqclient.OutgoingMessage{
Payload: []byte("msg-2"),
Attributes: map[string]string{"k": "v"},
},
)发送延时消息:
id, err := producer.SendDelay(ctx, []byte("delayed"), nil, 5*time.Second)consumer := c.NewConsumer("order-processor",
mqclient.WithAutoAck(true),
mqclient.WithConcurrency(4),
mqclient.WithReconnect(true),
)
err := consumer.Start(ctx, func(ctx context.Context, msg *mqclient.Message) error {
fmt.Printf("id=%s payload=%s\n", msg.ID, string(msg.Payload))
return nil
})consumer := c.NewConsumer("order-processor")
msgs, err := consumer.Pull(ctx, 10, 30*time.Second)
if err != nil {
return err
}
for _, msg := range msgs {
if err := process(msg); err != nil {
continue
}
if err := consumer.Ack(ctx, msg.ID); err != nil {
return err
}
}err := consumer.ExtendAck(ctx, 10*time.Second, messageID)msg, err := c.GetMessage(ctx, messageID)GetMessage 查询的是消息存储副本,只要消息尚未被清理,无论是否已消费完成,都可以返回结果。
服务端启用了 gRPC reflection,可以直接使用 grpcurl 调试接口:
grpcurl -plaintext localhost:9090 list
grpcurl -plaintext -d '{"name":"test"}' \
localhost:9090 mq.MQService/CreateTopic
grpcurl -plaintext localhost:9090 mq.MQService/ListTopics
grpcurl -plaintext -d '{"topic":"test","messages":[{"payload":"aGVsbG8="}]}' \
localhost:9090 mq.MQService/Publish消费链路示例:
grpcurl -plaintext -d '{"name":"sub-1","topic":"test","ack_deadline_seconds":30}' \
localhost:9090 mq.MQService/CreateSubscription
grpcurl -plaintext -d '{"subscription":"sub-1","max_messages":10}' \
localhost:9090 mq.MQService/Pull- 消费语义当前为
at-least-once ACK超时后消息会重新投递,消息 ID 不变,DeliverCount会增加- 延时消息会在到期后进入可消费状态
- 不同
Subscription之间相互独立 - 同一
Subscription下的多个消费者共享同一消费队列
mq/
├── api/mq/ # gRPC 生成代码
├── cmd/
│ ├── example/ # 示例程序
│ └── server/ # 服务入口
├── config/ # 启动配置
├── internal/
│ ├── broker/ # 单机 broker、订阅与消息存储
│ ├── cluster/ # 分片、Raft 与节点转发
│ └── service/ # gRPC 服务实现
├── pkg/client/ # Go 客户端 SDK
└── proto/ # protobuf 定义
初次阅读代码时,建议优先查看以下文件:
internal/broker/subscription.gointernal/broker/store.gointernal/cluster/node.gointernal/cluster/shard.gointernal/service/service.go
常用命令:
make proto
make fmt
make vet
make test
make server
make example修改 proto 后可直接执行:
make proto如果需要手动生成:
protoc --go_out=. --go_opt=module=github.com/Rehtt/mq2 \
--go-grpc_out=. --go-grpc_opt=module=github.com/Rehtt/mq2 \
proto/mq/mq.proto
protoc --go_out=. --go_opt=module=github.com/Rehtt/mq2 \
proto/internal/persist.proto
protoc --go_out=. --go_opt=module=github.com/Rehtt/mq2 \
proto/internal/command.proto