-
Notifications
You must be signed in to change notification settings - Fork 18
Architecture
licas.li edited this page Jun 18, 2026
·
1 revision
先持久化,后发送。 消息必须先写入本地存储并随业务事务一起提交,才会被真正发送到消息中间件。进程崩溃、网络中断等异常情况下,由重试器负责兜底。
事务隔离级别最低要求 读已提交 (RC)。
系统分为三个正交维度,可独立组合选用:
| 维度 | 接口 | 实现 |
|---|---|---|
| 消息传输 |
IMessageSender + IEventSubscriber
|
RabbitMQ / Kafka / Pulsar / Redis Stream / MemoryQueue |
| 消息存储 | IMessageStorage |
RelationDbStorage (MySQL/PG/SqlServer/Sqlite/Oracle) / MongoDb / MemoryStorage |
| 事务集成 | ITransactionContext |
EF Core / SqlSugar / FreeSql / XA (TransactionScope) |
业务代码 → IEventPublisher.PublishAsync()
│
├─ 1. 生成全局唯一 MsgId (IMsgIdGenerator)
├─ 2. 解析事件名称 (IEventNameRuler),注入元数据 (msg-id/event-name/send-at/delay-at)
├─ 3. 持久化 MessageStorageModel 到存储 (状态: SCHEDULED)
│ └─ 若提供 ITransactionContext,消息插入与业务操作在同一事务内
├─ 4. 异步等待事务提交 (轮询 ITransactionContext.IsDone(),超时: TransactionCommitTimeout)
├─ 5. 确认消息已提交 (IsCommittedAsync),回滚则忽略
├─ 6. 调用 IPublishHandler → IMessageSender.SendAsync 发送到中间件
│ └─ 最多即时重试 5 次
└─ 7. 超过即时重试次数后放弃,交给 IPublishedMessageRetryProvider 兜底
关键设计:发送使用 IHostedStopToken 而非调用方 CancellationToken,避免 HTTP 请求结束导致已持久化消息未发送。
消息中间件 → IEventSubscriber → IMessageListener.OnReceiveAsync()
│
├─ 1. 解析 EventHandler,保存接收消息到存储 (状态: SCHEDULED)
├─ 2a. 非延迟消息:立即在 Task.Run 中执行处理器
│ └─ 最多即时重试 5 次,失败交给 IReceivedMessageRetryProvider
└─ 2b. 延迟消息:通过 TimerHelper.SetTimeout 在指定时间调度执行
IEventHandlerInvoker 负责调用事件处理器:
- 在新的
IServiceScope中创建处理器实例(支持 scoped 依赖注入) - 优先使用启动时编译好的委托(
EventHandlerDescriptor.ExecuteDelegate),回退到反射调用 - 自动解包
TargetInvocationException,暴露真实异常堆栈
SCHEDULED ──→ Succeeded (发送/消费成功,设置过期时间)
│
└──→ Failed (发送/消费失败)
│
└──→ SCHEDULED (重试器重新拾起,RetryCount + 1)
│
└──→ ... 直到 RetryCount >= RetryFailedMax,不再重试,等待人工介入
-
SCHEDULED:初始状态,等待发送或消费 -
Succeeded:成功,设置ExpireTime,到期后由过期清理器删除 -
Failed:失败,等待重试器介入
两个独立的重试提供者:
-
IPublishedMessageRetryProvider:重试发送失败的消息 -
IReceivedMessageRetryProvider:重试消费失败的消息
两者都:
- 启动时立即执行一次,之后按
RetryInterval间隔定时执行 - 查询存储中状态为
SCHEDULED/FAILED、RetryCount < RetryFailedMax且创建时间早于StartRetryAfter的消息 - 使用
SemaphoreSlim+Task.WhenAll并发执行(RetryMaxDegreeOfParallelism控制并行度) - 执行前通过
TryLockPublishedAsync/TryLockReceivedAsync获取乐观锁,防止多实例重复处理
IExpiredMessageProvider 每小时执行一次:
- 删除状态为
SUCCEEDED且已过期的消息 - 删除状态为
FAILED且重试次数达到RetryFailedMax的消息
EventBusStartup 作为 IHostedService 在应用启动时自动执行:
-
IMessageStorageInitializer.InitializeAsync— 初始化存储(建表等) -
IEventHandlerFindProvider.FindAll— 扫描程序集中所有IEventHandler<T>实现 -
IEventSubscriber.SubscribeAsync— 对每个处理器注册消息订阅 -
IPublishedMessageRetryProvider.StartupAsync— 启动发布重试器 -
IReceivedMessageRetryProvider.StartupAsync— 启动消费重试器 -
IExpiredMessageProvider.DoDeleteAsync— 启动过期清理
应用停止时,IHostedStopToken 发出取消信号,所有后台任务优雅退出。