-
Notifications
You must be signed in to change notification settings - Fork 18
Transfer.StackExchangeRedis
licas.li edited this page Jun 23, 2026
·
2 revisions
NuGet:Shashlik.EventBus.StackExchangeRedis
Shashlik.EventBus.StackExchangeRedis 使用 StackExchange.Redis 作为底层 Redis 客户端,提供:
-
AddStackExchangeRedisMQ:基于 Redis Streams 的消息传输 -
AddStackExchangeRedisWorkerId:基于 Redis 分布式协调的雪花 ID WorkerId 自动分配
两个扩展方法共用同一个 EventBusStackExchangeRedisOptions 实例和同一个 IConnectionMultiplexer 连接。
与 FreeRedis 版本 功能完全等价,仅底层客户端库不同。推荐场景:项目已注册
IConnectionMultiplexer、使用 StackExchange.Redis 集群/Sentinel/SSL 高级特性,或希望选用社区更活跃的标准库。
// 注册 IConnectionMultiplexer 单例
services.AddSingleton<IConnectionMultiplexer>(_ =>
ConnectionMultiplexer.Connect("localhost:6379,abortConnect=false"));
services.AddEventBus()
.AddStackExchangeRedisMQ(r =>
{
r.MaxLength = 10000;
r.ConsumerPoolSize = 4;
})
.AddStackExchangeRedisWorkerId(r =>
{
r.AppName = "OrderService";
});services.AddEventBus()
.AddStackExchangeRedisMQ();AddStackExchangeRedisMQ 默认从 DI 容器中获取 IConnectionMultiplexer 实例。
services.AddEventBus()
.AddStackExchangeRedisWorkerId(r =>
{
r.AppName = "OrderService";
});services.AddEventBus()
.AddStackExchangeRedisMQ(r =>
{
r.ConnectionMultiplexerFactory = sp =>
{
var options = ConfigurationOptions.Parse("localhost:6379,abortConnect=false");
options.Password = "your-password";
options.ConnectTimeout = 5000;
return ConnectionMultiplexer.Connect(options);
};
});| 配置项 | 默认值 | 适用 | 说明 |
|---|---|---|---|
ConnectionMultiplexerFactory |
sp => sp.GetService<IConnectionMultiplexer>() |
MQ + WorkerId |
IConnectionMultiplexer 实例获取工厂,MQ 与 WorkerId 共享 |
AppName |
"DEFAULT" |
WorkerId | WorkerId 分配的应用标识,不同 AppName 互不干扰 |
MaxLength |
10000 |
MQ | 每个 Stream 消息堆积最大数量,防止内存溢出 |
ConsumerPoolSize |
4 |
MQ | 每个 EventHandler 的并发消费者数量 |
MaxLengthFactory |
null |
MQ | 消息堆积最大数量动态配置器,优先级高于 MaxLength
|
- 基于 Redis Streams 的 Consumer Group 机制
- 同一 EventHandler 对应一个 Consumer Group,组内多消费者自动分担消息
- 启动时自动创建 Consumer Group(已存在则忽略)
- 消息处理成功后自动确认(
XAck) -
MaxLength通过StreamAddAsync的maxLength + useApproximateMaxLength限制 Stream 长度
通过 Redis 协调多实例自动获得唯一 WorkerId,免去手动管理 WORKER_ID 环境变量。详见 配置参考 - ID 生成器。
核心行为:
- 启动时自动从 Redis 抢占空闲 WorkerId(0~1023)
- 后台定时续约避免租约过期
- 进程退出时主动释放占用的 WorkerId
- 不同
AppName之间互不干扰 - 所有 WorkerId 都被占用时启动失败
services.AddSingleton<IConnectionMultiplexer>(sp =>
{
var config = ConfigurationOptions.Parse("localhost:6379,abortConnect=false");
config.Password = Configuration["Redis:Password"];
return ConnectionMultiplexer.Connect(config);
});
services.AddEventBus(options =>
{
options.Environment = "Production";
})
.AddStackExchangeRedisMQ(r =>
{
r.AppName = "OrderService"; // 与 WorkerId 共享
r.MaxLength = 50000;
r.ConsumerPoolSize = 8;
r.MaxLengthFactory = msg =>
msg.EventName.Contains("HighVolume") ? 100000 : 10000;
})
.AddStackExchangeRedisWorkerId(r =>
{
r.AppName = "OrderService"; // 与 MQ 共享
})
.AddRelationDb(opt => opt.UseConnection(DataType.MySql, "..."));| 场景 | 推荐 |
|---|---|
| 已使用 FreeRedis | Shashlik.EventBus.Redis |
已使用 StackExchange.Redis(已注册 IConnectionMultiplexer) |
Shashlik.EventBus.StackExchangeRedis |
| 需要 Redis 集群、Sentinel、SSL、Streams 高级特性 | Shashlik.EventBus.StackExchangeRedis |
| 新项目,无历史包袱 |
Shashlik.EventBus.StackExchangeRedis(社区更活跃、API 更标准) |