From 287ef59856441e05912dc5502973754673e2873b Mon Sep 17 00:00:00 2001 From: "liqiankun.1111" Date: Wed, 1 May 2024 20:54:46 +0800 Subject: [PATCH] feat: enhance eventbus concurrency --- event.go | 3 +- eventbus/mysql/eventbus.go | 450 +++++++++++++++++--------------- eventbus/mysql/eventbus_test.go | 194 ++++++++------ eventbus/mysql/po.go | 61 ++++- 4 files changed, 418 insertions(+), 290 deletions(-) diff --git a/event.go b/event.go index a46ab54..6624c12 100644 --- a/event.go +++ b/event.go @@ -35,7 +35,8 @@ type SendType string const ( SendTypeNormal SendType = "normal" // 普通事件 - SendTypeFIFO SendType = "FIFO" // 保序事件,即事件以 Sender 的发送时间顺序被消费执行 + SendTypeFIFO SendType = "FIFO" // 保序事件,即事件以 Sender 的发送时间顺序被消费执行,且前序事件执行失败时会阻塞后续事件执行 + SendTypeLaxFIFO SendType = "LaxFIFO" // 保序事件,即事件以 Sender 的发送时间顺序被消费执行,但前序事件的执行成败不影响后续事件 SendTypeTransaction SendType = "transaction" // 事务事件 SendTypeDelay SendType = "delay" // 延时发送 ) diff --git a/eventbus/mysql/eventbus.go b/eventbus/mysql/eventbus.go index 5d6cd9d..402c747 100644 --- a/eventbus/mysql/eventbus.go +++ b/eventbus/mysql/eventbus.go @@ -25,6 +25,7 @@ import ( "unicode/utf8" "github.com/go-logr/logr" + "github.com/go-sql-driver/mysql" "github.com/robfig/cron" "gorm.io/gorm" "gorm.io/gorm/clause" @@ -37,7 +38,6 @@ import ( const retryInterval = time.Second * 3 const retryLimit = 5 const runInterval = time.Millisecond * 100 -const defaultQueueLimit = 1000 // 每天凌晨两点执行clean const cleanCron = "0 2 * * *" @@ -52,6 +52,7 @@ const txCheckTimeout = time.Minute var ErrInvalidDB = fmt.Errorf("invalid db") var ErrNoTransaction = fmt.Errorf("no transaction") var ErrServiceNotCreate = fmt.Errorf("service not create") +var ErrPrecedingEventNotReady = fmt.Errorf("preceding event not ready") var defaultLogger = stdr.NewStdr("mysql_eventbus") var eventBusMu sync.Mutex @@ -126,7 +127,6 @@ type Options struct { RetryLimit int // 重试次数 RetryInterval time.Duration // 重试间隔 CustomRetry []time.Duration // 自定义重试间隔 - QueueLimit int // 重试/失败队列长度 DefaultOffset *int64 // 默认起始 offset RunInterval time.Duration // 默认轮询间隔 @@ -168,7 +168,6 @@ func NewEventBus(serviceName string, db *gorm.DB, options ...Option) *EventBus { } opt := Options{ - QueueLimit: defaultQueueLimit, RunInterval: runInterval, CleanCron: cleanCron, RetentionTime: retentionTime, @@ -327,131 +326,131 @@ func (e *EventBus) initService() error { return e.db.Where(ServicePO{Name: e.serviceName}).FirstOrCreate(service).Error } -func (e *EventBus) lockService(tx *gorm.DB) (*ServicePO, error) { - service := &ServicePO{} - if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}). - Where("name = ?", e.serviceName). - First(service).Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, ErrServiceNotCreate - } - return nil, err - } - return service, nil -} - -func (e *EventBus) getScanEvents(db *gorm.DB, service *ServicePO) ([]*EventPO, error) { - if service.Offset == 0 { - if e.opt.DefaultOffset != nil { - service.Offset = *e.opt.DefaultOffset - } else { - lastEvent := &EventPO{} - if err := db.Order("id").Last(lastEvent).Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, nil +// syncServiceEvents 将待消费的event 信息copy 一份到service_event +func (e *EventBus) syncServiceEvents() error { + fn := func(tx *gorm.DB) (err error) { + // 并发insert ignore into unique key 会有dead lock,因此copy event 到service_event 动作用竞争service锁的动作保护下 + service := &ServicePO{} + if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}). + Where("name = ?", e.serviceName). + First(service).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return ErrServiceNotCreate + } + var mysqlErr *mysql.MySQLError + if errors.As(err, &mysqlErr) { + // 竞争service 失败 只是放弃了syncServiceEvents,不影响去handleEvents + // Error 3572 (HY000): Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set. + if mysqlErr.Number == 3572 { + return nil } - return nil, err } - // 不指定 offset 的情况下,默认从新的一条事件开始,且包含该条 - service.Offset = lastEvent.ID - 1 + return err } - } - eventPOs := make([]*EventPO, 0) - query := db.Where("id > ?", service.Offset) - if err := query.Order("id").Limit(e.opt.LimitPerRun).Find(&eventPOs).Error; err != nil { - return nil, err - } - - return eventPOs, nil -} - -func (e *EventBus) getRetryEvents(db *gorm.DB, service *ServicePO) ([]*EventPO, []int64, error) { - now := time.Now() - retryIDs := make([]int64, 0) - remainIDs := make([]int64, 0) - for _, info := range service.Retry { - if info.RetryTime.Before(now) { - retryIDs = append(retryIDs, info.ID) - } else { - // 保留未到重试时间的事件 - remainIDs = append(remainIDs, info.ID) + // 计算从哪个event开始copy。即当前service_event 的最大event_id 之后开始copy + eventOffset := int64(0) + lastServiceEvent := &ServiceEventPO{} + if err := tx.Where("service = ?", e.serviceName).Order("id").Last(lastServiceEvent).Error; err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + return err + } } + if lastServiceEvent.EventID > 0 { + eventOffset = lastServiceEvent.ID + } + eventPOs := make([]*EventPO, 0) + // 除了service_event 里已有的event不同步,太早的event 不同步了,防止项目初始化场景下,处理过于早的event + query := tx.Where("id > ?", eventOffset).Where("event_created_at >= ?", time.Now().Add(-scanStartTime)) + if err := query.Order("id").Limit(e.opt.LimitPerRun).Find(&eventPOs).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // 没找到就是没有event 要处理 + return nil + } + return err + } + if len(eventPOs) == 0 { + // 没有新的event产生 + return nil + } + serviceEvents := make([]*ServiceEventPO, 0) + for _, event := range eventPOs { + serviceEvent := &ServiceEventPO{ + Service: e.serviceName, + Sender: event.Event.Sender, + EventID: event.ID, + Status: int8(ServiceEventStatusInit), + EventCreatedAt: event.EventCreatedAt, + // 初始化时给一个尽量早的可执行时间,表示创建后就可以执行了 + NextTime: event.EventCreatedAt, + } + serviceEvents = append(serviceEvents, serviceEvent) + } + // 插入动作是并发的,因此使用insert ignore into(service+event_id有unique key)。 sqlite 不支持insert ignore into,插入时注意 + if err := tx.Clauses(clause.Insert{Modifier: "IGNORE"}).Create(serviceEvents).Error; err != nil { + return err + } + return nil } - if len(retryIDs) == 0 { - return nil, remainIDs, nil - } - - eventPOs := make([]*EventPO, 0) - if err := db.Where("id in ?", retryIDs).Find(&eventPOs).Error; err != nil { - return nil, nil, err - } - return eventPOs, remainIDs, nil + return e.db.Transaction(fn) } -func (e *EventBus) doRetryStrategy(service *ServicePO, remainIDs, failedIDs []int64) (retry, failed []*RetryInfo) { - retryInfos := make(map[int64]*RetryInfo) - for _, info := range service.Retry { - retryInfos[info.ID] = info +// getScanEvents 获取当期待处理的service_event, 每次干活儿前顺带从event表copy 一些数量的event 到service_event +func (e *EventBus) getScanEvents() ([]*ServiceEventPO, error) { + if err := e.syncServiceEvents(); err != nil { + return nil, err } - - for _, id := range remainIDs { - retry = append(retry, retryInfos[id]) + scanServiceEvents := make([]*ServiceEventPO, 0) + scanQuery := e.db.Where("service = ? ", e.serviceName).Where("status = ?", ServiceEventStatusInit).Where("next_time <= ?", time.Now()) + if err := scanQuery.Order("event_created_at, event_id").Limit(e.opt.LimitPerRun).Find(&scanServiceEvents).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // 没找到就是没有event 要处理 + e.logger.V(logger.LevelInfo).Info("find empty service_event, ignore handling event") + return nil, nil + } + return nil, err } + return scanServiceEvents, nil +} +func (e *EventBus) doRetryStrategy(spo *ServiceEventPO) { // 没有定义重试策略,默认不重试直接失败 if e.retryStrategy == nil { - for _, id := range failedIDs { - failed = append(failed, &RetryInfo{ID: id}) - } + spo.Status = int8(ServiceEventStatusFailed) return } - - for _, id := range failedIDs { - info := retryInfos[id] - if info == nil { - info = &RetryInfo{ID: id} - } - - newInfo := e.retryStrategy.Next(info) - if newInfo != nil { - retry = append(retry, newInfo) - } else { - failed = append(failed, info) - } + info := &RetryInfo{ + ID: spo.EventID, + RetryCount: spo.RetryCount, + RetryTime: time.Now(), + } + newInfo := e.retryStrategy.Next(info) + if newInfo != nil { + spo.RetryCount = newInfo.RetryCount + spo.NextTime = newInfo.RetryTime + } else { + spo.Status = int8(ServiceEventStatusFailed) } + return } -func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (failed, panics []int64) { - events := make(chan *EventPO, len(eventPOs)) +func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*ServiceEventPO) { + serviceEventChan := make(chan *ServiceEventPO, len(eventPOs)) for _, e := range eventPOs { - events <- e + serviceEventChan <- e } - close(events) - + close(serviceEventChan) wg := sync.WaitGroup{} for i := 0; i < e.opt.ConsumeConcurrent; i++ { wg.Add(1) go func() { defer wg.Done() - - cb := func(ctx context.Context, po *EventPO) { - defer func() { - if r := recover(); r != nil { - e.logger.Error(fmt.Errorf("err: %v stack:%s", r, string(debug.Stack())), fmt.Sprintf("panic while handling event(%s)", po.EventID)) - eventBusMu.Lock() - defer eventBusMu.Unlock() - panics = append(panics, po.ID) - } - }() - if err := e.cb(ctx, po.Event); err != nil { - // slice 线程不安全,需要加锁 - eventBusMu.Lock() - defer eventBusMu.Unlock() - failed = append(failed, po.ID) - } + cb := func(ctx context.Context, spo *ServiceEventPO) { + // 某个event 的处理成败,不影响后续事件的处理 + // 对于保序事件,因为前序事件没有执行,后面同sender event也不用继续了。但毕竟后续还有非保序事件要处理,所以也不会因为ErrPrecedingEventNotReady 就终止事件消费流程 + _ = e.handleEvent(ctx, spo) } - for po := range events { + for po := range serviceEventChan { cb(ctx, po) } }() @@ -460,69 +459,119 @@ func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (fai return } -func (e *EventBus) handleEvents() error { - e.logger.V(logger.LevelDebug).Info("handle events") - - return e.db.Transaction(func(tx *gorm.DB) error { - ctx := context.Background() - service, err := e.lockService(tx) - if err != nil { +func (e *EventBus) checkPrecedingEvent(tx *gorm.DB, spo *ServiceEventPO, eventPO *EventPO) error { + // 如果是保序event,则校验前序event的执行结果 + if len(spo.Sender) == 0 { + err := fmt.Errorf("event sender can not be empty when event type is fifo") + e.logger.Error(err, "event_id", spo.EventID) + return err + } + // 找到前序service_event + precedingServiceEvent := &ServiceEventPO{} + if err := tx.Where("service = ?", e.serviceName). + Where("sender = ?", spo.Sender). + // event_created_at 是最权威的前序,但是时间精度问题导致可能前序event可能跟当前event一样,再用event_id 明确下 + Where("event_created_at <= ?", spo.EventCreatedAt). + Where("event_id < ?", spo.EventID). + Order("event_created_at desc, event_id desc").First(precedingServiceEvent).Error; err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + e.logger.V(logger.LevelInfo).Info("find preceding service_event error", "current event_id", spo.EventID, "err", err) return err } - scanEvents, err := e.getScanEvents(tx, service) - if err != nil { - return err + } + // 找不到则当前 event 可能是同sender的第一个event + if precedingServiceEvent.ID == 0 { + return nil + } + if eventPO.Event.SendType == dddfirework.SendTypeFIFO { + if precedingServiceEvent.Status != int8(ServiceEventStatusSuccess) && precedingServiceEvent.Status != int8(ServiceEventStatusExpired) { + // 前序event 未执行、失败、未过期,则不执行当前event + e.logger.V(logger.LevelInfo).Info("preceding service_event is not success or expired, ignore", "current event_id", spo.EventID) + return ErrPrecedingEventNotReady } - retryEvents, remainIDs, err := e.getRetryEvents(tx, service) - if err != nil { - return err + } else if eventPO.Event.SendType == dddfirework.SendTypeLaxFIFO { + // 当期的策略是,哪怕前序event 在重试中,也还是init,除非超过重试次数status=failed。若想忽略重试这一点,可以check下 precedingServiceEvent.retryCount + if precedingServiceEvent.Status == int8(ServiceEventStatusInit) { + e.logger.V(logger.LevelInfo).Info("find preceding service_event has not been run", "current event_id", spo.EventID) + // 前序event 未执行,则不执行当前event + return ErrPrecedingEventNotReady + } + } + return nil +} + +// handleEvent, 多实例并发场景下先锁住service_event,check service_event状态,找到对应的event,如果是保序event,则check前序event状态,check通过后,执行event handler +// 执行成功,则保存执行结果,执行失败则根据RetryStrategy更新retryCount/nextTime/failedMessage +func (e *EventBus) handleEvent(ctx context.Context, spo *ServiceEventPO) error { + fn := func(tx *gorm.DB) (err error) { + // 下面的db 操作一定要全部使用 tx + defer func() { + if r := recover(); r != nil { + err := fmt.Errorf("err: %v stack:%s", r, string(debug.Stack())) + e.logger.Error(err, fmt.Sprintf("panic while handling event(%d)", spo.EventID)) + e.doRetryStrategy(spo) + spo.FailedMessage = fmt.Sprintf("%v", err) + } + if err = tx.Model(spo).Updates(&spo).Error; err != nil { + e.logger.Error(err, "update service_event error", "current event_id", spo.EventID) + return + } + }() + // 找到并锁住service_event,确认service_event 还未被执行 + currentServiceEvent := &ServiceEventPO{} + if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}). + Where("service = ?", e.serviceName).Where("event_id = ?", spo.EventID). + First(currentServiceEvent).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + e.logger.Info("find current service_event error", "current event_id", spo.EventID, "err", err) + return err + } } - events := make([]*EventPO, 0) - events = append(events, scanEvents...) - events = append(events, retryEvents...) - if len(events) == 0 { + if currentServiceEvent.Status != int8(ServiceEventStatusInit) { + e.logger.V(logger.LevelInfo).Info("current service_event'status it not init, ignore", "event_id", spo.EventID) return nil } - - failedIDs, panicIDs := e.dispatchEvents(ctx, events) - retry, failed := e.doRetryStrategy(service, remainIDs, failedIDs) - failed = append(service.Failed, failed...) - for _, id := range panicIDs { - failed = append(failed, &RetryInfo{ID: id}) + // 找到event + eventPO := &EventPO{} + if err := tx.First(eventPO, spo.EventID).Error; err != nil { + e.logger.V(logger.LevelInfo).Info("find and lock event error", "event_id", spo.EventID, "err", err) + return err } - service.Retry, service.Failed = e.evictEvents(retry, failed) - if len(scanEvents) > 0 { - last := scanEvents[len(scanEvents)-1] - service.Offset = last.ID + // 如果是保序event,则校验前序event的执行结果 + if eventPO.Event.SendType == dddfirework.SendTypeFIFO || eventPO.Event.SendType == dddfirework.SendTypeLaxFIFO { + if err = e.checkPrecedingEvent(tx, spo, eventPO); err != nil { + return err + } } - return tx.Save(service).Error - }) + e.logger.V(logger.LevelInfo).Info("eventbus handle event", "event db id", spo.EventID, "event id", eventPO.EventID) + err = e.cb(ctx, eventPO.Event) + spo.RunAt = time.Now() + if err != nil { + // 更新 retry_limit 以及 next_time,超出重试限制,则status置为失败。为了支持自定义RetryStrategy,所以通过NextTime 而不是RunAt来控制重试间隔 + e.doRetryStrategy(spo) + spo.FailedMessage = fmt.Sprintf("%v", err) + } else { + spo.Status = int8(ServiceEventStatusSuccess) + } + return nil + } + return e.db.Transaction(fn) } -func (e *EventBus) evictEvents(retry, failed []*RetryInfo) (evictedRetry, evictedFailed []*RetryInfo) { - // 重试队列超出限制时,移动到失败队列并记录日志, FIFO - if len(retry) > e.opt.QueueLimit { - evictCount := len(retry) - e.opt.QueueLimit - evictEvents := retry[:evictCount] - failed = append(failed, evictEvents...) - evictedRetry = retry[evictCount:] - e.logger.V(logger.LevelWarn).Info("evict retry events", "count", evictCount, "events", evictEvents) - } else { - evictedRetry = retry +func (e *EventBus) handleEvents() error { + e.logger.V(logger.LevelDebug).Info("handle events") + ctx := context.Background() + scanEvents, err := e.getScanEvents() + if err != nil { + return err } - - // 失败队列超出限制时,丢弃并记录日志, FIFO - if len(failed) > e.opt.QueueLimit { - evictCount := len(failed) - e.opt.QueueLimit - evictEvents := failed[:evictCount] - evictedFailed = failed[evictCount:] - e.logger.V(logger.LevelWarn).Info("evict failed events", "count", evictCount, "events", evictEvents) - } else { - evictedFailed = failed + if len(scanEvents) == 0 { + e.logger.V(logger.LevelInfo).Info("find empty service_event, ignore") + return nil } - - return + e.dispatchEvents(ctx, scanEvents) + return nil } func (e *EventBus) checkTX(ctx context.Context, tx *Transaction) { @@ -558,58 +607,54 @@ func (e *EventBus) handleTransactions() error { }) } +// cleanEvents 先清理当前service的service_event,如果一个event 被所有service 都处理成功,则删除对应的event func (e *EventBus) cleanEvents() error { e.logger.Info("clean events") - // 查询failed 或重试中的event,不能清理 - // retry 重试的时候,重试成功,从retry 移除。重试次数超过设置,从retry 移除,加入failed - // failed event不能删,这个相当于死信队列,留给用户做人工判断 - var services []*ServicePO - err := e.db.Find(&services).Error - if err != nil { - return err - } - failedOrRetryingEventIDSet := map[int64]bool{} - for _, service := range services { - for _, si := range service.Retry { - failedOrRetryingEventIDSet[si.ID] = true - } - for _, si := range service.Failed { - failedOrRetryingEventIDSet[si.ID] = true - } - } - - // 查询消费最慢的service,其已经消费的event即为被所有service消费完的event。First 会自动加上主键排序所以使用Take - var slowestServicePO ServicePO - if err = e.db.Order("offset").Take(&slowestServicePO).Error; err != nil { - return err - } - if slowestServicePO.Offset == 0 { - // 最近消费时间没有赋值,可能还未触发消费,暂时不考虑清理 - return nil - } - lastEvent := &EventPO{} - if err := e.db.Where("id = ?", slowestServicePO.Offset).First(lastEvent).Error; err != nil { - return err - } - - // 遍历可以清理的event,加上for update 防止多实例场景下并发操作问题 - query := e.db.Model(&EventPO{}).Clauses(clause.Locking{Strength: "UPDATE"}) + // 遍历可以清理的service_event,加上for update 防止多实例场景下并发操作问题 + query := e.db.Model(&ServiceEventPO{}).Clauses(clause.Locking{Strength: "UPDATE"}) needCleanAt := time.Now().Add(-e.opt.RetentionTime) - // 可能因为服务未启动等原因,event 很长时间都未被清理 - if lastEvent.CreatedAt.Before(needCleanAt) { - needCleanAt = lastEvent.CreatedAt - } - // servicePO记录 EventConsumedID 的原因是:EventConsumedAt 只精确到毫秒,可能有多个事件event_created_at 一样。此时这些遗留的event等到下一个清理周期再被清理。 - query = query.Where("created_at < ?", needCleanAt) + // 清理成功service_event + query = query.Where("service = ?", e.serviceName).Where("status = ?", ServiceEventStatusSuccess).Where("event_created_at < ?", needCleanAt) rows, err := query.Rows() if err != nil { return err } - defer rows.Close() + defer func() { + _ = rows.Close() + }() + + type EventIDCount struct { + EventID int64 `json:"event_id"` + Count int `json:"count"` + } - // 批量删除event - deleteEvents := func(eventIDs []int64) error { - if err := e.db.Where("id in ?", eventIDs).Delete(EventPO{}).Error; err != nil { + // 批量删除service_event + deleteServiceEvents := func(eventIDs []int64) error { + if err := e.db.Where("service = ?", e.serviceName).Where("event_id in ?", eventIDs).Delete(ServiceEventPO{}).Error; err != nil { + return err + } + // 假设存在多个service,经过多个service的陆续操作,则已经成功的service_event 陆续被清理,如果service_event 已无该event_id 的记录,则清理该event记录 + var eventIDCounts []EventIDCount + if err := e.db.Model(&ServiceEventPO{}).Select("event_id, count(id) as count"). + Where("event_id in ?", eventIDs). + Group("event_id").Find(&eventIDCounts).Error; err != nil { + return err + } + // 能查到说明 event 还在被其它service_event引用 + reservedEventIDs := map[int64]bool{} + for _, eventIDCount := range eventIDCounts { + reservedEventIDs[eventIDCount.EventID] = true + } + deleteEventIDs := make([]int64, 0) + for _, eventID := range eventIDs { + if !reservedEventIDs[eventID] { + deleteEventIDs = append(deleteEventIDs, eventID) + } + } + if len(deleteEventIDs) == 0 { + return nil + } + if err := e.db.Where("id in ?", deleteEventIDs).Delete(EventPO{}).Error; err != nil { return err } return nil @@ -617,17 +662,14 @@ func (e *EventBus) cleanEvents() error { batch := 10 eventIDs := make([]int64, 0) for rows.Next() { - event := &EventPO{} - err = e.db.ScanRows(rows, event) + serviceEvent := &ServiceEventPO{} + err = e.db.ScanRows(rows, serviceEvent) if err != nil { return err } - if failedOrRetryingEventIDSet[event.ID] { - continue - } - eventIDs = append(eventIDs, event.ID) + eventIDs = append(eventIDs, serviceEvent.EventID) if len(eventIDs) >= batch { - if err = deleteEvents(eventIDs); err != nil { + if err = deleteServiceEvents(eventIDs); err != nil { return err } // 清空eventIDs @@ -635,7 +677,7 @@ func (e *EventBus) cleanEvents() error { } } if len(eventIDs) > 0 { - if err = deleteEvents(eventIDs); err != nil { + if err = deleteServiceEvents(eventIDs); err != nil { return err } } diff --git a/eventbus/mysql/eventbus_test.go b/eventbus/mysql/eventbus_test.go index 78cdbdc..10ec73e 100644 --- a/eventbus/mysql/eventbus_test.go +++ b/eventbus/mysql/eventbus_test.go @@ -65,7 +65,7 @@ func (t *testEvent) GetSender() string { } func initModel(db *gorm.DB) { - if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}, &Transaction{}); err != nil { + if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}, &Transaction{}, &ServiceEventPO{}); err != nil { panic(err) } @@ -158,7 +158,7 @@ func TestEventBusConcurrent(t *testing.T) { wg.Wait() var eventCount int64 - db.Model(&EventPO{}).Count(&eventCount) + db.Model(&ServiceEventPO{}).Count(&eventCount) // 保证所有事件都能消费到 assert.Equal(t, eventCount, int64(len(ids))) curr := time.Time{} @@ -195,12 +195,13 @@ func TestEventBusConcurrentFailed(t *testing.T) { err := eventBus.handleEvents() assert.NoError(t, err) - service := &ServicePO{} + var count int64 err = db.Transaction(func(tx *gorm.DB) error { - return tx.Where("name = ?", "test_concurrent_failed").First(service).Error + return tx.Model(&ServiceEventPO{}).Where("service = ?", "test_concurrent_failed"). + Where("status = ?", ServiceEventStatusFailed).Count(&count).Error }) assert.NoError(t, err) - assert.Len(t, service.Failed, 100) + assert.Equal(t, int(count), 100) } @@ -236,7 +237,7 @@ func TestEventBusRetry(t *testing.T) { } var eventCount int64 - db.Model(&EventPO{}).Count(&eventCount) + db.Model(&ServiceEventPO{}).Count(&eventCount) assert.Equal(t, eventCount, int64(len(counts))) for id, count := range counts { if id == "0" { @@ -299,10 +300,11 @@ func TestEventBusRetryStrategy(t *testing.T) { } } - service := &ServicePO{} - err := tx.Where("name = ?", "test_retry_strategy").First(service).Error + var count int64 + err := tx.Model(&ServiceEventPO{}).Where("service = ?", "test_retry_strategy"). + Where("status = ?", ServiceEventStatusInit).Where("retry_count > 0").Count(&count).Error assert.NoError(t, err) - assert.Len(t, service.Retry, 1) + assert.Equal(t, int(count), 1) return err }) @@ -317,6 +319,7 @@ func TestEventBusFailed(t *testing.T) { opt.RetryStrategy = &LimitRetry{ Limit: 2, } + opt.ConsumeConcurrent = 1 }) eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error { if evt.Type == "test_failed" { @@ -335,48 +338,11 @@ func TestEventBusFailed(t *testing.T) { assert.NoError(t, err) } - service := &ServicePO{} - err := db.Where("name = ?", "test_fail").First(service).Error + var count int64 + err := db.Model(&ServiceEventPO{}).Where("service = ?", "test_fail"). + Where("status = ?", ServiceEventStatusFailed).Count(&count).Error assert.NoError(t, err) - assert.Len(t, service.Failed, 10) -} - -func TestEventBusQueueLimit(t *testing.T) { - ctx := context.Background() - db := testsuit.InitMysql() - - queueLimit := 100 - eventCount := queueLimit + 100 - eventBus := NewEventBus("test_queue_limit", db, func(opt *Options) { - opt.RetryStrategy = &LimitRetry{ - Limit: -1, - } - opt.LimitPerRun = eventCount * 2 - opt.ConsumeConcurrent = 10 - opt.QueueLimit = queueLimit - }) - eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error { - if evt.Type == "test_queue_limit" { - return fmt.Errorf("failed") - } - return nil - }) - - for i := 0; i < eventCount; i++ { - err := eventBus.Dispatch(ctx, dddfirework.NewDomainEvent(&testEvent{EType: "test_queue_limit", Data: "failed"})) - assert.NoError(t, err) - } - - err := eventBus.handleEvents() - assert.NoError(t, err) - - service := &ServicePO{} - err = db.Transaction(func(tx *gorm.DB) error { - return tx.Where("name = ?", "test_queue_limit").First(service).Error - }) - assert.NoError(t, err) - assert.Len(t, service.Failed, queueLimit) - + assert.Equal(t, int(count), 10) } func TestEngine(t *testing.T) { @@ -553,14 +519,15 @@ func TestRollback(t *testing.T) { func TestClean(t *testing.T) { ctx := context.Background() // 使用sqlite防止数据被其它单测影响 - db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) assert.NoError(t, err) - err = db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}) + err = db.AutoMigrate(&ServiceEventPO{}, &EventPO{}, &ServicePO{}, &testPO{}) assert.NoError(t, err) eventBus := NewEventBus("test_clean", db, func(opt *Options) { // 消费完成的事件保留时间为0 opt.RetentionTime = 0 * time.Hour + opt.ConsumeConcurrent = 1 }) // 插入一些事件 num := 10 @@ -584,13 +551,16 @@ func TestClean(t *testing.T) { err = eventBus.cleanEvents() assert.NoError(t, err) - // 校验 lowestServicePO.offset 之前的event 其它的都删掉了 - var slowestServicePO ServicePO - err = db.Order("offset asc").Take(&slowestServicePO).Error + events := []*EventPO{} + err = db.Model(&EventPO{}).Find(&events).Error assert.NoError(t, err) - + eventIDs := make([]int64, 0) + for _, event := range events { + eventIDs = append(eventIDs, event.ID) + } + // 确认现存的event的已经没有成功的了 count := int64(0) - err = db.Model(&EventPO{}).Where("id < ?", slowestServicePO.Offset).Count(&count).Error + err = db.Model(&ServiceEventPO{}).Where("event_id in ?", eventIDs).Where("status = ?", ServiceEventStatusSuccess).Count(&count).Error assert.NoError(t, err) assert.Equal(t, 0, int(count)) } @@ -599,9 +569,9 @@ func TestClean(t *testing.T) { func TestCleanFailed(t *testing.T) { ctx := context.Background() // 使用sqlite防止数据被其它单测影响 - db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) assert.NoError(t, err) - err = db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}) + err = db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}, &ServiceEventPO{}) assert.NoError(t, err) eventBus := NewEventBus("test_clean", db, func(opt *Options) { @@ -634,22 +604,18 @@ func TestCleanFailed(t *testing.T) { assert.NoError(t, err) // 校验除了 failed 和 retry中的event 其它的都删掉了 - var servicePO ServicePO - err = db.Where("name = ?", "test_clean").Take(&servicePO).Error + events := []*EventPO{} + err = db.Model(&EventPO{}).Find(&events).Error assert.NoError(t, err) - eventIDs := make([]int64, 0) - for _, si := range servicePO.Retry { - eventIDs = append(eventIDs, si.ID) + for _, event := range events { + eventIDs = append(eventIDs, event.ID) } - for _, si := range servicePO.Failed { - eventIDs = append(eventIDs, si.ID) - } - + // 确认现存的event的已经没有成功的了 count := int64(0) - err = db.Model(&EventPO{}).Where("id in ?", eventIDs).Count(&count).Error + err = db.Model(&ServiceEventPO{}).Where("event_id in ?", eventIDs).Where("status = ?", ServiceEventStatusSuccess).Count(&count).Error assert.NoError(t, err) - assert.Equal(t, len(eventIDs), int(count)) + assert.Equal(t, 0, int(count)) } func TestCleanConcurrent(t *testing.T) { @@ -658,7 +624,7 @@ func TestCleanConcurrent(t *testing.T) { // 使用独立database防止数据被其它单测影响 db := testsuit.InitMysql() db = testsuit.InitMysqlWithDatabase(db, "test_clean") - if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}); err != nil { + if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}, &ServiceEventPO{}); err != nil { panic(err) } eventBus := NewEventBus("test_clean", db, func(opt *Options) { @@ -695,13 +661,89 @@ func TestCleanConcurrent(t *testing.T) { } wg.Wait() - // 校验 lowestServicePO.event_created_at 之前的event 其它的都删掉了 - var slowestServicePO ServicePO - err = db.Order("offset asc").Take(&slowestServicePO).Error + // 校验除了 failed 和 retry中的event 其它的都删掉了 + events := []*EventPO{} + err = db.Model(&EventPO{}).Find(&events).Error assert.NoError(t, err) - + eventIDs := make([]int64, 0) + for _, event := range events { + eventIDs = append(eventIDs, event.ID) + } + // 确认现存的event的已经没有成功的了 count := int64(0) - err = db.Model(&EventPO{}).Where("id < ?", slowestServicePO.Offset).Count(&count).Error + err = db.Model(&ServiceEventPO{}).Where("event_id in ?", eventIDs).Where("status = ?", ServiceEventStatusSuccess).Count(&count).Error assert.NoError(t, err) assert.Equal(t, 0, int(count)) } + +func TestFIFO(t *testing.T) { + ctx := context.Background() + db := testsuit.InitMysql() + if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}, &ServiceEventPO{}); err != nil { + panic(err) + } + eventBus := NewEventBus("test_fifo", db, func(opt *Options) { + opt.ConsumeConcurrent = 2 + }) + // 插入一些事件 + num := 30 + for num > 0 { + evt := dddfirework.NewDomainEvent(&testEvent{EType: "test_fifo", Data: "ttt"}, dddfirework.WithSendType(dddfirework.SendTypeFIFO)) + err := eventBus.Dispatch(ctx, evt) + time.Sleep(1 * time.Millisecond) + assert.NoError(t, err) + num-- + } + + eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error { + t.Logf("handle event ID %s,CreatedAt %s", evt.ID, evt.CreatedAt) + // 必须停1ms,因为mysql datetime 只精确到1ms,这样每个event 的run_at 有明显的大小差别 + time.Sleep(1 * time.Millisecond) + return nil + }) + // 得等一下, 不然 scanEvents 时根据next_time < time.Now 可能小于service_event插入时的next_time + time.Sleep(100 * time.Millisecond) + // 处理事件 + err := eventBus.handleEvents() + assert.NoError(t, err) + +} + +func TestLaxFIFO(t *testing.T) { + // 初始化随机数种子 + rand.Seed(time.Now().UnixNano()) + ctx := context.Background() + db := testsuit.InitMysql() + if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}, &ServiceEventPO{}); err != nil { + panic(err) + } + eventBus := NewEventBus("test_fifo", db, func(opt *Options) { + opt.ConsumeConcurrent = 2 + }) + // 插入一些事件 + num := 30 + for num > 0 { + evt := dddfirework.NewDomainEvent(&testEvent{EType: "test_fifo", Data: "ttt"}, dddfirework.WithSendType(dddfirework.SendTypeLaxFIFO)) + err := eventBus.Dispatch(ctx, evt) + time.Sleep(1 * time.Millisecond) + assert.NoError(t, err) + num-- + } + + eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error { + t.Logf("handle event ID %s,CreatedAt %s", evt.ID, evt.CreatedAt) + // 必须停1ms,因为mysql datetime 只精确到1ms,这样每个event 的run_at 有明显的大小差别 + time.Sleep(1 * time.Millisecond) + randomNum := rand.Intn(10-1+1) + 1 + if randomNum%2 == 0 { + return fmt.Errorf("random error %d", randomNum) + } + return nil + }) + // 得等一下, 不然 scanEvents 时根据next_time < time.Now 可能小于service_event插入时的next_time + time.Sleep(100 * time.Millisecond) + // 处理事件 + err := eventBus.handleEvents() + assert.NoError(t, err) + +} diff --git a/eventbus/mysql/po.go b/eventbus/mysql/po.go index 877d67d..439b3c2 100644 --- a/eventbus/mysql/po.go +++ b/eventbus/mysql/po.go @@ -29,6 +29,18 @@ const ( EventStatusFailed EventStatus = 3 ) +type ServiceEventStatus int8 + +const ( + ServiceEventStatusInit ServiceEventStatus = 0 + ServiceEventStatusSuccess ServiceEventStatus = 10 + ServiceEventStatusFailed ServiceEventStatus = 21 + // ServiceEventStatusPrecedingFailed 前置event失败 + ServiceEventStatusPrecedingFailed ServiceEventStatus = 22 + // ServiceEventStatusExpired 过期,在保序事件中,一个event处理失败,会阻塞后序同名sender_id的处理,但不能永远阻塞,可以通过人工改db或其他策略将已经失败的几个event置为过期,不影响新的同名sender_id event的执行 + ServiceEventStatusExpired ServiceEventStatus = 31 +) + // EventPO 事件存储模型 /* CREATE TABLE `ddd_domain_event` ( @@ -97,9 +109,6 @@ type FailedInfo struct { /* CREATE TABLE `ddd_eventbus_service` ( `name` varchar(30) NOT NULL, - `failed` text, - `retry` text, - `offset` bigint(20) DEFAULT NULL, `created_at` datetime(3) DEFAULT NULL, `updated_at` datetime(3) DEFAULT NULL, PRIMARY KEY (`name`), @@ -108,12 +117,9 @@ CREATE TABLE `ddd_eventbus_service` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; */ type ServicePO struct { - Name string `gorm:"primaryKey"` - Retry []*RetryInfo `gorm:"serializer:json;type:text"` // 重试信息 - Failed []*RetryInfo `gorm:"serializer:json;type:text"` // 失败信息 - Offset int64 `gorm:"column:offset"` // 消费位置,等于最后一次消费的事件id - CreatedAt time.Time `gorm:"index"` // 记录创建时间 - UpdatedAt time.Time `gorm:"index"` // 记录的更新时间 + Name string `gorm:"primaryKey"` + CreatedAt time.Time `gorm:"index"` // 记录创建时间 + UpdatedAt time.Time `gorm:"index"` // 记录的更新时间 } func (o *ServicePO) GetID() string { @@ -131,3 +137,40 @@ func eventPersist(event *dddfirework.DomainEvent) (*EventPO, error) { EventCreatedAt: event.CreatedAt, }, nil } + +/* +CREATE TABLE `ddd_domain_service_event` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `service` varchar(30) DEFAULT NULL, + `sender` varchar(255) DEFAULT NULL, + `event_id` bigint NOT NULL, + `retry_count` bigint DEFAULT NULL, + `status` tinyint DEFAULT NULL, + `failed_message` text, + `event_created_at` datetime NOT NULL, + `next_time` datetime NOT NULL, + `run_at` datetime DEFAULT '1970-01-01 00:00:01', + PRIMARY KEY (`id`), + UNIQUE KEY `idx_service_event_id` (`service`,`event_id`), + KEY `idx_service_status_next_time` (`service`,`status`,`next_time`), + KEY `idx_ddd_domain_service_event_event_created_at` (`event_created_at`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +*/ + +// ServiceEventPO 记录service对event的处理情况 +type ServiceEventPO struct { + ID int64 `gorm:"primaryKey;autoIncrement"` + Service string `gorm:"type:varchar(30);column:service;uniqueIndex:idx_service_event_id;index:idx_service_status_next_time;"` + Sender string `gorm:"type:varchar(255);column:sender"` // 事件发出实体 ID + EventID int64 `gorm:"column:event_id;uniqueIndex:idx_service_event_id;not null"` + RetryCount int `gorm:"column:retry_count"` // 重试次数 + Status int8 `gorm:"column:status;index:idx_service_status_next_time;"` // service event状态 + FailedMessage string `gorm:"type:text;column:failed_message"` // 失败详情 + EventCreatedAt time.Time `gorm:"type:datetime;index;not null"` // 事件的创建时间 + NextTime time.Time `gorm:"type:datetime;index:idx_service_status_next_time;not null"` // 事件可以运行的事件 + RunAt time.Time `gorm:"type:datetime;zeroValue:1970-01-01 00:00:01;default:'1970-01-01 00:00:01+00:00'"` // 事件真实运行时间,单纯记录下 +} + +func (o *ServiceEventPO) TableName() string { + return "ddd_domain_service_event" +}