From de509f472c3f843c07811c61820bf3427e4a372d Mon Sep 17 00:00:00 2001 From: qiankunli Date: Mon, 20 May 2024 10:18:06 +0800 Subject: [PATCH] feat: enhance eventbus with service_event (#36) Co-authored-by: liqiankun.1111 --- event.go | 3 +- eventbus/mysql/eventbus.go | 433 +++++++++++++++++--------------- eventbus/mysql/eventbus_test.go | 221 ++++++++++------ eventbus/mysql/po.go | 62 ++++- logger/stdr/stdr.go | 2 +- 5 files changed, 436 insertions(+), 285 deletions(-) diff --git a/event.go b/event.go index a46ab54..6624c12 100644 --- a/event.go +++ b/event.go @@ -35,7 +35,8 @@ type SendType string const ( SendTypeNormal SendType = "normal" // 普通事件 - SendTypeFIFO SendType = "FIFO" // 保序事件,即事件以 Sender 的发送时间顺序被消费执行 + SendTypeFIFO SendType = "FIFO" // 保序事件,即事件以 Sender 的发送时间顺序被消费执行,且前序事件执行失败时会阻塞后续事件执行 + SendTypeLaxFIFO SendType = "LaxFIFO" // 保序事件,即事件以 Sender 的发送时间顺序被消费执行,但前序事件的执行成败不影响后续事件 SendTypeTransaction SendType = "transaction" // 事务事件 SendTypeDelay SendType = "delay" // 延时发送 ) diff --git a/eventbus/mysql/eventbus.go b/eventbus/mysql/eventbus.go index 5d6cd9d..6aa27f8 100644 --- a/eventbus/mysql/eventbus.go +++ b/eventbus/mysql/eventbus.go @@ -37,7 +37,6 @@ import ( const retryInterval = time.Second * 3 const retryLimit = 5 const runInterval = time.Millisecond * 100 -const defaultQueueLimit = 1000 // 每天凌晨两点执行clean const cleanCron = "0 2 * * *" @@ -52,6 +51,7 @@ const txCheckTimeout = time.Minute var ErrInvalidDB = fmt.Errorf("invalid db") var ErrNoTransaction = fmt.Errorf("no transaction") var ErrServiceNotCreate = fmt.Errorf("service not create") +var ErrPrecedingEventNotReady = fmt.Errorf("preceding event not ready") var defaultLogger = stdr.NewStdr("mysql_eventbus") var eventBusMu sync.Mutex @@ -126,7 +126,6 @@ type Options struct { RetryLimit int // 重试次数 RetryInterval time.Duration // 重试间隔 CustomRetry []time.Duration // 自定义重试间隔 - QueueLimit int // 重试/失败队列长度 DefaultOffset *int64 // 默认起始 offset RunInterval time.Duration // 默认轮询间隔 @@ -168,7 +167,6 @@ func NewEventBus(serviceName string, db *gorm.DB, options ...Option) *EventBus { } opt := Options{ - QueueLimit: defaultQueueLimit, RunInterval: runInterval, CleanCron: cleanCron, RetentionTime: retentionTime, @@ -327,107 +325,83 @@ func (e *EventBus) initService() error { return e.db.Where(ServicePO{Name: e.serviceName}).FirstOrCreate(service).Error } -func (e *EventBus) lockService(tx *gorm.DB) (*ServicePO, error) { - service := &ServicePO{} - if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}). - Where("name = ?", e.serviceName). - First(service).Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, ErrServiceNotCreate +// getScanEvents 扫描待处理的event:无service_event的event; status=init and 到达下次可执行时间的event +func (e *EventBus) getScanEvents() ([]*EventPO, error) { + // 从service_event 视角查询最早的 需要的处理的event。其实仅凭下面的联表查询即可一步做到 返回待处理的event,但联表查询可能是性能瓶颈,因此通过计算 eventOffset来减少联表查询的数据量 + eventOffset := int64(0) + retryableServiceEvent := &ServiceEventPO{} + if err := e.db.Where("service = ?", e.serviceName). + Where("status = ?", ServiceEventStatusInit). + Order("event_id").First(retryableServiceEvent).Error; err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + return nil, err } - return nil, err } - return service, nil -} - -func (e *EventBus) getScanEvents(db *gorm.DB, service *ServicePO) ([]*EventPO, error) { - if service.Offset == 0 { - if e.opt.DefaultOffset != nil { - service.Offset = *e.opt.DefaultOffset - } else { - lastEvent := &EventPO{} - if err := db.Order("id").Last(lastEvent).Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, nil - } + if retryableServiceEvent.EventID > 0 { + eventOffset = retryableServiceEvent.EventID + } else { + finishedServiceEvent := &ServiceEventPO{} + if err := e.db.Where("service = ?", e.serviceName). + Order("event_id").Last(finishedServiceEvent).Error; err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { return nil, err } - // 不指定 offset 的情况下,默认从新的一条事件开始,且包含该条 - service.Offset = lastEvent.ID - 1 + } + // 找不到init的service_event 说明上一批的event 都被处理了,找到最大的service_event 即为本次开始处理的event offset + if finishedServiceEvent.EventID > 0 { + eventOffset = finishedServiceEvent.EventID + 1 } } eventPOs := make([]*EventPO, 0) - query := db.Where("id > ?", service.Offset) - if err := query.Order("id").Limit(e.opt.LimitPerRun).Find(&eventPOs).Error; err != nil { + if err := e.db. + // 只抓取当前service的 service_event参与 left join + Joins("left join ddd_domain_service_event ON ddd_domain_event.id = ddd_domain_service_event.event_id and ddd_domain_service_event.service = ?", e.serviceName). + Where("ddd_domain_event.event_created_at >= ?", time.Now().Add(-scanStartTime)). + Where("ddd_domain_event.id >= ?", eventOffset). + Where( + // event_id 为null 表示event 还未被当前service service_event引用 + e.db.Where("ddd_domain_service_event.event_id is null"). + // 被当前service service_event 引用但未处理成功且到了新的重试时间的event + Or("ddd_domain_service_event.status = ? and ddd_domain_service_event.next_time <= ?", ServiceEventStatusInit, time.Now())). + Order("ddd_domain_event.event_created_at, ddd_domain_event.id"). + Limit(e.opt.LimitPerRun).Find(&eventPOs).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // 没找到就是没有event 要处理 + return nil, nil + } return nil, err } - return eventPOs, nil } -func (e *EventBus) getRetryEvents(db *gorm.DB, service *ServicePO) ([]*EventPO, []int64, error) { - now := time.Now() - retryIDs := make([]int64, 0) - remainIDs := make([]int64, 0) - for _, info := range service.Retry { - if info.RetryTime.Before(now) { - retryIDs = append(retryIDs, info.ID) - } else { - // 保留未到重试时间的事件 - remainIDs = append(remainIDs, info.ID) - } - } - if len(retryIDs) == 0 { - return nil, remainIDs, nil - } - - eventPOs := make([]*EventPO, 0) - if err := db.Where("id in ?", retryIDs).Find(&eventPOs).Error; err != nil { - return nil, nil, err - } - return eventPOs, remainIDs, nil -} - -func (e *EventBus) doRetryStrategy(service *ServicePO, remainIDs, failedIDs []int64) (retry, failed []*RetryInfo) { - retryInfos := make(map[int64]*RetryInfo) - for _, info := range service.Retry { - retryInfos[info.ID] = info - } - - for _, id := range remainIDs { - retry = append(retry, retryInfos[id]) - } - +func (e *EventBus) doRetryStrategy(spo *ServiceEventPO) { // 没有定义重试策略,默认不重试直接失败 if e.retryStrategy == nil { - for _, id := range failedIDs { - failed = append(failed, &RetryInfo{ID: id}) - } + spo.Status = ServiceEventStatusFailed return } - - for _, id := range failedIDs { - info := retryInfos[id] - if info == nil { - info = &RetryInfo{ID: id} - } - - newInfo := e.retryStrategy.Next(info) - if newInfo != nil { - retry = append(retry, newInfo) - } else { - failed = append(failed, info) - } + info := &RetryInfo{ + ID: spo.EventID, + RetryCount: spo.RetryCount, + RetryTime: time.Now(), } + newInfo := e.retryStrategy.Next(info) + if newInfo != nil { + spo.RetryCount = newInfo.RetryCount + spo.NextTime = newInfo.RetryTime + } else { + spo.Status = ServiceEventStatusFailed + } + return } -func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (failed, panics []int64) { - events := make(chan *EventPO, len(eventPOs)) +func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) { + eventChan := make(chan *EventPO, len(eventPOs)) for _, e := range eventPOs { - events <- e + eventChan <- e } - close(events) + close(eventChan) wg := sync.WaitGroup{} for i := 0; i < e.opt.ConsumeConcurrent; i++ { @@ -436,22 +410,11 @@ func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (fai defer wg.Done() cb := func(ctx context.Context, po *EventPO) { - defer func() { - if r := recover(); r != nil { - e.logger.Error(fmt.Errorf("err: %v stack:%s", r, string(debug.Stack())), fmt.Sprintf("panic while handling event(%s)", po.EventID)) - eventBusMu.Lock() - defer eventBusMu.Unlock() - panics = append(panics, po.ID) - } - }() - if err := e.cb(ctx, po.Event); err != nil { - // slice 线程不安全,需要加锁 - eventBusMu.Lock() - defer eventBusMu.Unlock() - failed = append(failed, po.ID) - } + // 某个event 的处理成败,不影响后续事件的处理 + // 对于保序事件,因为前序事件没有执行,后面同sender event也不用继续了。但毕竟后续还有非保序事件要处理,所以也不会因为ErrPrecedingEventNotReady 就终止事件消费流程 + _ = e.handleEvent(ctx, po) } - for po := range events { + for po := range eventChan { cb(ctx, po) } }() @@ -460,69 +423,150 @@ func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (fai return } -func (e *EventBus) handleEvents() error { - e.logger.V(logger.LevelDebug).Info("handle events") - - return e.db.Transaction(func(tx *gorm.DB) error { - ctx := context.Background() - service, err := e.lockService(tx) - if err != nil { +func (e *EventBus) checkPrecedingEvent(tx *gorm.DB, spo *ServiceEventPO, eventPO *EventPO) error { + // 保序event 要求sender 不能为空 + if len(eventPO.Sender) == 0 { + err := fmt.Errorf("event sender can not be empty when event type is fifo") + e.logger.Error(err, "event_id", spo.EventID) + return err + } + // 找到前序event + precedingEvent := &EventPO{} + if err := tx.Where("sender = ?", eventPO.Sender). + // event_created_at 是最权威的前序,但是时间精度问题导致可能前序event.event_created_at可能跟当前event一样,再用event_id 明确下 + Where("event_created_at <= ?", eventPO.EventCreatedAt). + Where("id < ?", eventPO.ID). + Order("event_created_at desc, event_id desc").First(precedingEvent).Error; err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + e.logger.V(logger.LevelInfo).Info("find preceding event error", "current event_id", spo.EventID, "err", err) return err } - scanEvents, err := e.getScanEvents(tx, service) - if err != nil { + } + // 找不到前序event,则当前 event 可能是同sender的第一个event + if precedingEvent.ID == 0 { + e.logger.V(logger.LevelInfo).Info("can not find preceding event, see it as first event", "current event_id", spo.EventID) + return nil + } + // 找到前序service_event + precedingServiceEvent := &ServiceEventPO{} + if err := tx.Where("service = ?", e.serviceName). + // event_created_at 是最权威的前序,但是时间精度问题导致可能前序event可能跟当前event一样,再用event_id 明确下 + Where("event_id = ?", precedingEvent.ID).First(precedingServiceEvent).Error; err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + e.logger.V(logger.LevelInfo).Info("find preceding service_event error", "current event_id", spo.EventID, "err", err) return err } - retryEvents, remainIDs, err := e.getRetryEvents(tx, service) - if err != nil { - return err + } + // 找不到precedingServiceEvent,则说明前序event 还未被处理 + if precedingServiceEvent.ID == 0 { + return ErrPrecedingEventNotReady + } + if eventPO.Event.SendType == dddfirework.SendTypeFIFO { + if precedingServiceEvent.Status != ServiceEventStatusSuccess && precedingServiceEvent.Status != ServiceEventStatusExpired { + // 前序event 未执行、失败、未过期,则不执行当前event + e.logger.V(logger.LevelInfo).Info("preceding service_event is not success or expired, ignore", "current event_id", spo.EventID) + return ErrPrecedingEventNotReady } - events := make([]*EventPO, 0) - events = append(events, scanEvents...) - events = append(events, retryEvents...) - if len(events) == 0 { - return nil + } else if eventPO.Event.SendType == dddfirework.SendTypeLaxFIFO { + // 当期的策略是,哪怕前序event 在重试中,也还是init,除非超过重试次数status=failed。若想忽略重试这一点,可以check下 precedingServiceEvent.retryCount + if precedingServiceEvent.Status == ServiceEventStatusInit { + e.logger.V(logger.LevelInfo).Info("find preceding service_event has not been run", "current event_id", spo.EventID) + // 前序event 未执行,则不执行当前event + return ErrPrecedingEventNotReady } + } + e.logger.V(logger.LevelInfo).Info("preceding event is ready", "current event_id", spo.EventID, + "preceding event_id", precedingServiceEvent.EventID, + "precedingServiceEvent status", precedingServiceEvent.Status) + return nil +} - failedIDs, panicIDs := e.dispatchEvents(ctx, events) - retry, failed := e.doRetryStrategy(service, remainIDs, failedIDs) - failed = append(service.Failed, failed...) - for _, id := range panicIDs { - failed = append(failed, &RetryInfo{ID: id}) +// handleEvent, 多实例并发场景下先锁住event,check service_event状态,如果是保序event,则check前序event状态,check通过后,执行event handler +// 执行成功,则保存执行结果,执行失败则根据RetryStrategy更新retryCount/nextTime/failedMessage +func (e *EventBus) handleEvent(ctx context.Context, po *EventPO) error { + // 下面的db 操作一定要全部使用 tx + fn := func(tx *gorm.DB) (err error) { + // 找到并锁住event + currentEvent := &EventPO{} + if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).First(currentEvent, po.ID).Error; err != nil { + e.logger.Info("find and lock current event error", "current event_id", po.ID, "err", err) + return err + } + // 找到service_event + spo := &ServiceEventPO{ + Service: e.serviceName, + EventID: po.ID, + Status: ServiceEventStatusInit, + EventCreatedAt: po.EventCreatedAt, + // 初始化时给一个尽量早的可执行时间,表示创建后就可以执行了 + NextTime: po.EventCreatedAt, } - service.Retry, service.Failed = e.evictEvents(retry, failed) + if err := tx.Where("service = ?", e.serviceName).Where("event_id = ?", po.ID). + First(spo).Error; err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + e.logger.Info("find current service_event error", "current event_id", spo.EventID, "err", err) + return err + } + } + // 如果service_event 已存在,确认service_event 还未被执行 + if spo.ID > 0 { + if spo.Status != ServiceEventStatusInit { + e.logger.V(logger.LevelInfo).Info("current service_event'status it not init, ignore", "event_id", spo.EventID) + return nil + } + } + defer func() { + if r := recover(); r != nil { + err := fmt.Errorf("err: %v stack:%s", r, string(debug.Stack())) + e.logger.Error(err, fmt.Sprintf("panic while handling event(%d)", spo.EventID)) + e.doRetryStrategy(spo) + spo.FailedMessage = fmt.Sprintf("%v", err) + // 65535 是FailedMessage 字段的db类型长度 + if len(spo.FailedMessage) > 65535 { + spo.FailedMessage = spo.FailedMessage[:65535] + } + } + // 插入或更新 service_event + if err = tx.Save(spo).Error; err != nil { + e.logger.Error(err, "create or update service_event error", "current event_id", spo.EventID) + return + } + }() - if len(scanEvents) > 0 { - last := scanEvents[len(scanEvents)-1] - service.Offset = last.ID + // 如果是保序event,则校验前序event的执行结果 + if po.Event.SendType == dddfirework.SendTypeFIFO || po.Event.SendType == dddfirework.SendTypeLaxFIFO { + if err = e.checkPrecedingEvent(tx, spo, po); err != nil { + return err + } } - return tx.Save(service).Error - }) + e.logger.V(logger.LevelInfo).Info("eventbus handle event", "event db id", spo.EventID, "event id", po.EventID) + err = e.cb(ctx, po.Event) + spo.RunAt = time.Now() + if err != nil { + // 更新 retry_limit 以及 next_time,超出重试限制,则status置为失败。为了支持自定义RetryStrategy,所以通过NextTime 而不是RunAt来控制重试间隔 + e.doRetryStrategy(spo) + spo.FailedMessage = fmt.Sprintf("%v", err) + } else { + spo.Status = ServiceEventStatusSuccess + } + return nil + } + return e.db.Transaction(fn) } -func (e *EventBus) evictEvents(retry, failed []*RetryInfo) (evictedRetry, evictedFailed []*RetryInfo) { - // 重试队列超出限制时,移动到失败队列并记录日志, FIFO - if len(retry) > e.opt.QueueLimit { - evictCount := len(retry) - e.opt.QueueLimit - evictEvents := retry[:evictCount] - failed = append(failed, evictEvents...) - evictedRetry = retry[evictCount:] - e.logger.V(logger.LevelWarn).Info("evict retry events", "count", evictCount, "events", evictEvents) - } else { - evictedRetry = retry +func (e *EventBus) handleEvents() error { + e.logger.V(logger.LevelDebug).Info("handle events") + ctx := context.Background() + scanEvents, err := e.getScanEvents() + if err != nil { + return err } - - // 失败队列超出限制时,丢弃并记录日志, FIFO - if len(failed) > e.opt.QueueLimit { - evictCount := len(failed) - e.opt.QueueLimit - evictEvents := failed[:evictCount] - evictedFailed = failed[evictCount:] - e.logger.V(logger.LevelWarn).Info("evict failed events", "count", evictCount, "events", evictEvents) - } else { - evictedFailed = failed + if len(scanEvents) == 0 { + e.logger.V(logger.LevelDebug).Info("find empty service_event, ignore") + return nil } - - return + e.dispatchEvents(ctx, scanEvents) + return nil } func (e *EventBus) checkTX(ctx context.Context, tx *Transaction) { @@ -558,58 +602,54 @@ func (e *EventBus) handleTransactions() error { }) } +// cleanEvents 先清理当前service的service_event,如果一个event 被所有service_event 都处理成功,则删除对应的event func (e *EventBus) cleanEvents() error { e.logger.Info("clean events") - // 查询failed 或重试中的event,不能清理 - // retry 重试的时候,重试成功,从retry 移除。重试次数超过设置,从retry 移除,加入failed - // failed event不能删,这个相当于死信队列,留给用户做人工判断 - var services []*ServicePO - err := e.db.Find(&services).Error - if err != nil { - return err - } - failedOrRetryingEventIDSet := map[int64]bool{} - for _, service := range services { - for _, si := range service.Retry { - failedOrRetryingEventIDSet[si.ID] = true - } - for _, si := range service.Failed { - failedOrRetryingEventIDSet[si.ID] = true - } - } - - // 查询消费最慢的service,其已经消费的event即为被所有service消费完的event。First 会自动加上主键排序所以使用Take - var slowestServicePO ServicePO - if err = e.db.Order("offset").Take(&slowestServicePO).Error; err != nil { - return err - } - if slowestServicePO.Offset == 0 { - // 最近消费时间没有赋值,可能还未触发消费,暂时不考虑清理 - return nil - } - lastEvent := &EventPO{} - if err := e.db.Where("id = ?", slowestServicePO.Offset).First(lastEvent).Error; err != nil { - return err - } - - // 遍历可以清理的event,加上for update 防止多实例场景下并发操作问题 - query := e.db.Model(&EventPO{}).Clauses(clause.Locking{Strength: "UPDATE"}) + // 遍历可以清理的service_event,加上for update 防止多实例场景下并发操作问题 + query := e.db.Model(&ServiceEventPO{}).Clauses(clause.Locking{Strength: "UPDATE"}) needCleanAt := time.Now().Add(-e.opt.RetentionTime) - // 可能因为服务未启动等原因,event 很长时间都未被清理 - if lastEvent.CreatedAt.Before(needCleanAt) { - needCleanAt = lastEvent.CreatedAt - } - // servicePO记录 EventConsumedID 的原因是:EventConsumedAt 只精确到毫秒,可能有多个事件event_created_at 一样。此时这些遗留的event等到下一个清理周期再被清理。 - query = query.Where("created_at < ?", needCleanAt) + // 清理成功service_event + query = query.Where("service = ?", e.serviceName).Where("status = ?", ServiceEventStatusSuccess).Where("event_created_at < ?", needCleanAt) rows, err := query.Rows() if err != nil { return err } - defer rows.Close() + defer func() { + _ = rows.Close() + }() + + type EventIDCount struct { + EventID int64 `json:"event_id"` + Count int `json:"count"` + } - // 批量删除event - deleteEvents := func(eventIDs []int64) error { - if err := e.db.Where("id in ?", eventIDs).Delete(EventPO{}).Error; err != nil { + // 批量删除service_event + deleteServiceEvents := func(eventIDs []int64) error { + if err := e.db.Where("service = ?", e.serviceName).Where("event_id in ?", eventIDs).Delete(ServiceEventPO{}).Error; err != nil { + return err + } + // 假设存在多个service,经过多个service的陆续操作,则已经成功的service_event 陆续被清理,如果service_event 已无该event_id 的记录,则清理该event记录 + var eventIDCounts []EventIDCount + if err := e.db.Model(&ServiceEventPO{}).Select("event_id, count(id) as count"). + Where("event_id in ?", eventIDs). + Group("event_id").Find(&eventIDCounts).Error; err != nil { + return err + } + // 能查到说明 event 还在被其它service_event引用 + reservedEventIDs := map[int64]bool{} + for _, eventIDCount := range eventIDCounts { + reservedEventIDs[eventIDCount.EventID] = true + } + deleteEventIDs := make([]int64, 0) + for _, eventID := range eventIDs { + if !reservedEventIDs[eventID] { + deleteEventIDs = append(deleteEventIDs, eventID) + } + } + if len(deleteEventIDs) == 0 { + return nil + } + if err := e.db.Where("id in ?", deleteEventIDs).Delete(EventPO{}).Error; err != nil { return err } return nil @@ -617,17 +657,14 @@ func (e *EventBus) cleanEvents() error { batch := 10 eventIDs := make([]int64, 0) for rows.Next() { - event := &EventPO{} - err = e.db.ScanRows(rows, event) + serviceEvent := &ServiceEventPO{} + err = e.db.ScanRows(rows, serviceEvent) if err != nil { return err } - if failedOrRetryingEventIDSet[event.ID] { - continue - } - eventIDs = append(eventIDs, event.ID) + eventIDs = append(eventIDs, serviceEvent.EventID) if len(eventIDs) >= batch { - if err = deleteEvents(eventIDs); err != nil { + if err = deleteServiceEvents(eventIDs); err != nil { return err } // 清空eventIDs @@ -635,7 +672,7 @@ func (e *EventBus) cleanEvents() error { } } if len(eventIDs) > 0 { - if err = deleteEvents(eventIDs); err != nil { + if err = deleteServiceEvents(eventIDs); err != nil { return err } } diff --git a/eventbus/mysql/eventbus_test.go b/eventbus/mysql/eventbus_test.go index 78cdbdc..e9f1162 100644 --- a/eventbus/mysql/eventbus_test.go +++ b/eventbus/mysql/eventbus_test.go @@ -65,7 +65,7 @@ func (t *testEvent) GetSender() string { } func initModel(db *gorm.DB) { - if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}, &Transaction{}); err != nil { + if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}, &Transaction{}, &ServiceEventPO{}); err != nil { panic(err) } @@ -158,7 +158,7 @@ func TestEventBusConcurrent(t *testing.T) { wg.Wait() var eventCount int64 - db.Model(&EventPO{}).Count(&eventCount) + db.Model(&ServiceEventPO{}).Count(&eventCount) // 保证所有事件都能消费到 assert.Equal(t, eventCount, int64(len(ids))) curr := time.Time{} @@ -195,12 +195,13 @@ func TestEventBusConcurrentFailed(t *testing.T) { err := eventBus.handleEvents() assert.NoError(t, err) - service := &ServicePO{} + var count int64 err = db.Transaction(func(tx *gorm.DB) error { - return tx.Where("name = ?", "test_concurrent_failed").First(service).Error + return tx.Model(&ServiceEventPO{}).Where("service = ?", "test_concurrent_failed"). + Where("status = ?", ServiceEventStatusFailed).Count(&count).Error }) assert.NoError(t, err) - assert.Len(t, service.Failed, 100) + assert.Equal(t, int(count), 100) } @@ -236,7 +237,7 @@ func TestEventBusRetry(t *testing.T) { } var eventCount int64 - db.Model(&EventPO{}).Count(&eventCount) + db.Model(&ServiceEventPO{}).Count(&eventCount) assert.Equal(t, eventCount, int64(len(counts))) for id, count := range counts { if id == "0" { @@ -299,10 +300,11 @@ func TestEventBusRetryStrategy(t *testing.T) { } } - service := &ServicePO{} - err := tx.Where("name = ?", "test_retry_strategy").First(service).Error + var count int64 + err := tx.Model(&ServiceEventPO{}).Where("service = ?", "test_retry_strategy"). + Where("status = ?", ServiceEventStatusInit).Where("retry_count > 0").Count(&count).Error assert.NoError(t, err) - assert.Len(t, service.Retry, 1) + assert.Equal(t, int(count), 1) return err }) @@ -317,6 +319,7 @@ func TestEventBusFailed(t *testing.T) { opt.RetryStrategy = &LimitRetry{ Limit: 2, } + opt.ConsumeConcurrent = 1 }) eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error { if evt.Type == "test_failed" { @@ -335,48 +338,11 @@ func TestEventBusFailed(t *testing.T) { assert.NoError(t, err) } - service := &ServicePO{} - err := db.Where("name = ?", "test_fail").First(service).Error - assert.NoError(t, err) - assert.Len(t, service.Failed, 10) -} - -func TestEventBusQueueLimit(t *testing.T) { - ctx := context.Background() - db := testsuit.InitMysql() - - queueLimit := 100 - eventCount := queueLimit + 100 - eventBus := NewEventBus("test_queue_limit", db, func(opt *Options) { - opt.RetryStrategy = &LimitRetry{ - Limit: -1, - } - opt.LimitPerRun = eventCount * 2 - opt.ConsumeConcurrent = 10 - opt.QueueLimit = queueLimit - }) - eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error { - if evt.Type == "test_queue_limit" { - return fmt.Errorf("failed") - } - return nil - }) - - for i := 0; i < eventCount; i++ { - err := eventBus.Dispatch(ctx, dddfirework.NewDomainEvent(&testEvent{EType: "test_queue_limit", Data: "failed"})) - assert.NoError(t, err) - } - - err := eventBus.handleEvents() + var count int64 + err := db.Model(&ServiceEventPO{}).Where("service = ?", "test_fail"). + Where("status = ?", ServiceEventStatusFailed).Count(&count).Error assert.NoError(t, err) - - service := &ServicePO{} - err = db.Transaction(func(tx *gorm.DB) error { - return tx.Where("name = ?", "test_queue_limit").First(service).Error - }) - assert.NoError(t, err) - assert.Len(t, service.Failed, queueLimit) - + assert.Equal(t, int(count), 10) } func TestEngine(t *testing.T) { @@ -553,14 +519,15 @@ func TestRollback(t *testing.T) { func TestClean(t *testing.T) { ctx := context.Background() // 使用sqlite防止数据被其它单测影响 - db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) assert.NoError(t, err) - err = db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}) + err = db.AutoMigrate(&ServiceEventPO{}, &EventPO{}, &ServicePO{}, &testPO{}) assert.NoError(t, err) eventBus := NewEventBus("test_clean", db, func(opt *Options) { // 消费完成的事件保留时间为0 opt.RetentionTime = 0 * time.Hour + opt.ConsumeConcurrent = 1 }) // 插入一些事件 num := 10 @@ -584,13 +551,16 @@ func TestClean(t *testing.T) { err = eventBus.cleanEvents() assert.NoError(t, err) - // 校验 lowestServicePO.offset 之前的event 其它的都删掉了 - var slowestServicePO ServicePO - err = db.Order("offset asc").Take(&slowestServicePO).Error + events := []*EventPO{} + err = db.Model(&EventPO{}).Find(&events).Error assert.NoError(t, err) - + eventIDs := make([]int64, 0) + for _, event := range events { + eventIDs = append(eventIDs, event.ID) + } + // 确认现存的event的已经没有成功的了 count := int64(0) - err = db.Model(&EventPO{}).Where("id < ?", slowestServicePO.Offset).Count(&count).Error + err = db.Model(&ServiceEventPO{}).Where("event_id in ?", eventIDs).Where("status = ?", ServiceEventStatusSuccess).Count(&count).Error assert.NoError(t, err) assert.Equal(t, 0, int(count)) } @@ -599,9 +569,9 @@ func TestClean(t *testing.T) { func TestCleanFailed(t *testing.T) { ctx := context.Background() // 使用sqlite防止数据被其它单测影响 - db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) assert.NoError(t, err) - err = db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}) + err = db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}, &ServiceEventPO{}) assert.NoError(t, err) eventBus := NewEventBus("test_clean", db, func(opt *Options) { @@ -634,22 +604,18 @@ func TestCleanFailed(t *testing.T) { assert.NoError(t, err) // 校验除了 failed 和 retry中的event 其它的都删掉了 - var servicePO ServicePO - err = db.Where("name = ?", "test_clean").Take(&servicePO).Error + events := []*EventPO{} + err = db.Model(&EventPO{}).Find(&events).Error assert.NoError(t, err) - eventIDs := make([]int64, 0) - for _, si := range servicePO.Retry { - eventIDs = append(eventIDs, si.ID) + for _, event := range events { + eventIDs = append(eventIDs, event.ID) } - for _, si := range servicePO.Failed { - eventIDs = append(eventIDs, si.ID) - } - + // 确认现存的event的已经没有成功的了 count := int64(0) - err = db.Model(&EventPO{}).Where("id in ?", eventIDs).Count(&count).Error + err = db.Model(&ServiceEventPO{}).Where("event_id in ?", eventIDs).Where("status = ?", ServiceEventStatusSuccess).Count(&count).Error assert.NoError(t, err) - assert.Equal(t, len(eventIDs), int(count)) + assert.Equal(t, 0, int(count)) } func TestCleanConcurrent(t *testing.T) { @@ -658,7 +624,7 @@ func TestCleanConcurrent(t *testing.T) { // 使用独立database防止数据被其它单测影响 db := testsuit.InitMysql() db = testsuit.InitMysqlWithDatabase(db, "test_clean") - if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}); err != nil { + if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}, &ServiceEventPO{}); err != nil { panic(err) } eventBus := NewEventBus("test_clean", db, func(opt *Options) { @@ -695,13 +661,116 @@ func TestCleanConcurrent(t *testing.T) { } wg.Wait() - // 校验 lowestServicePO.event_created_at 之前的event 其它的都删掉了 - var slowestServicePO ServicePO - err = db.Order("offset asc").Take(&slowestServicePO).Error + // 校验除了 failed 和 retry中的event 其它的都删掉了 + events := []*EventPO{} + err = db.Model(&EventPO{}).Find(&events).Error assert.NoError(t, err) - + eventIDs := make([]int64, 0) + for _, event := range events { + eventIDs = append(eventIDs, event.ID) + } + // 确认现存的event的已经没有成功的了 count := int64(0) - err = db.Model(&EventPO{}).Where("id < ?", slowestServicePO.Offset).Count(&count).Error + err = db.Model(&ServiceEventPO{}).Where("event_id in ?", eventIDs).Where("status = ?", ServiceEventStatusSuccess).Count(&count).Error assert.NoError(t, err) assert.Equal(t, 0, int(count)) } + +func TestFIFO(t *testing.T) { + ctx := context.Background() + db := testsuit.InitMysql() + if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}, &ServiceEventPO{}); err != nil { + panic(err) + } + eventBus := NewEventBus("test_fifo", db, func(opt *Options) { + opt.ConsumeConcurrent = 5 + }) + // 插入一些事件 + num := 5 + for num > 0 { + evt := dddfirework.NewDomainEvent(&testEvent{EType: "test_fifo", Data: "ttt"}, dddfirework.WithSendType(dddfirework.SendTypeFIFO)) + err := eventBus.Dispatch(ctx, evt) + time.Sleep(1 * time.Millisecond) + assert.NoError(t, err) + num-- + } + + events := make([]*dddfirework.DomainEvent, 0) + eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error { + t.Logf("handle event ID %s,CreatedAt %s", evt.ID, evt.CreatedAt) + // 必须停1ms,因为mysql datetime 只精确到1ms,这样每个event 的run_at 有明显的大小差别 + time.Sleep(1 * time.Millisecond) + if evt.Type == "test_fifo" { + events = append(events, evt) + } + return nil + }) + // 处理事件 + for i := 0; i < 10; i++ { + // 得等一下, 不然 scanEvents 时根据next_time < time.Now 可能小于service_event插入时的next_time + time.Sleep(100 * time.Millisecond) + err := eventBus.handleEvents() + assert.NoError(t, err) + } + + // 保证消费顺序一定是递增的 + curr := time.Time{} + for _, e := range events { + assert.GreaterOrEqual(t, e.CreatedAt, curr) + curr = e.CreatedAt + } + +} + +func TestLaxFIFO(t *testing.T) { + // 初始化随机数种子 + rand.Seed(time.Now().UnixNano()) + ctx := context.Background() + db := testsuit.InitMysql() + if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}, &ServiceEventPO{}); err != nil { + panic(err) + } + eventBus := NewEventBus("test_lax_fifo", db, func(opt *Options) { + opt.ConsumeConcurrent = 2 + opt.RetryLimit = -1 + opt.RetryInterval = 1 * time.Millisecond + }) + // 插入一些事件 + num := 5 + for num > 0 { + evt := dddfirework.NewDomainEvent(&testEvent{EType: "test_lax_fifo", Data: "ttt"}, dddfirework.WithSendType(dddfirework.SendTypeLaxFIFO)) + err := eventBus.Dispatch(ctx, evt) + time.Sleep(1 * time.Millisecond) + assert.NoError(t, err) + num-- + } + events := make([]*dddfirework.DomainEvent, 0) + eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error { + t.Logf("handle event ID %s,CreatedAt %s", evt.ID, evt.CreatedAt) + // 必须停1ms,因为mysql datetime 只精确到1ms,这样每个event 的run_at 有明显的大小差别 + time.Sleep(1 * time.Millisecond) + if evt.Type == "test_lax_fifo" { + events = append(events, evt) + } + randomNum := rand.Intn(10-1+1) + 1 + if randomNum%2 == 0 { + return fmt.Errorf("random error %d", randomNum) + } + return nil + }) + // 处理事件 + for i := 0; i < 20; i++ { + // 得等一下, 不然 scanEvents 时根据next_time < time.Now 可能小于service_event插入时的next_time + time.Sleep(100 * time.Millisecond) + err := eventBus.handleEvents() + assert.NoError(t, err) + } + + // 保证消费顺序一定是递增的 + curr := time.Time{} + for _, e := range events { + assert.GreaterOrEqual(t, e.CreatedAt, curr) + curr = e.CreatedAt + } + +} diff --git a/eventbus/mysql/po.go b/eventbus/mysql/po.go index 877d67d..cc09489 100644 --- a/eventbus/mysql/po.go +++ b/eventbus/mysql/po.go @@ -29,6 +29,18 @@ const ( EventStatusFailed EventStatus = 3 ) +type ServiceEventStatus int8 + +const ( + ServiceEventStatusInit ServiceEventStatus = 0 + ServiceEventStatusSuccess ServiceEventStatus = 10 + ServiceEventStatusFailed ServiceEventStatus = 21 + // ServiceEventStatusPrecedingFailed 前置event失败 + ServiceEventStatusPrecedingFailed ServiceEventStatus = 22 + // ServiceEventStatusExpired 过期,在保序事件中,一个event处理失败,会阻塞后序同名sender_id的处理,但不能永远阻塞,可以通过人工改db或其他策略将已经失败的几个event置为过期,不影响新的同名sender_id event的执行 + ServiceEventStatusExpired ServiceEventStatus = 31 +) + // EventPO 事件存储模型 /* CREATE TABLE `ddd_domain_event` ( @@ -40,6 +52,7 @@ CREATE TABLE `ddd_domain_event` ( `created_at` datetime(3) DEFAULT NULL, PRIMARY KEY (`id`), KEY `idx_ddd_domain_event_event_id` (`event_id`), + KEY `idx_ddd_domain_event_sender` (`sender`), KEY `idx_ddd_domain_event_trans_id` (`trans_id`), KEY `idx_ddd_domain_event_created_at` (`created_at`), KEY `idx_ddd_domain_event_event_created_at` (`event_created_at`) @@ -48,6 +61,7 @@ CREATE TABLE `ddd_domain_event` ( type EventPO struct { ID int64 `gorm:"primaryKey;autoIncrement"` EventID string `gorm:"column:event_id"` + Sender string `gorm:"type:varchar(255);column:sender;index"` // 事件发出实体 ID Event *dddfirework.DomainEvent `gorm:"serializer:json"` TransID int64 `gorm:"column:trans_id"` // 事务id EventCreatedAt time.Time `gorm:"index"` // 事件的创建时间 @@ -97,9 +111,6 @@ type FailedInfo struct { /* CREATE TABLE `ddd_eventbus_service` ( `name` varchar(30) NOT NULL, - `failed` text, - `retry` text, - `offset` bigint(20) DEFAULT NULL, `created_at` datetime(3) DEFAULT NULL, `updated_at` datetime(3) DEFAULT NULL, PRIMARY KEY (`name`), @@ -108,12 +119,9 @@ CREATE TABLE `ddd_eventbus_service` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; */ type ServicePO struct { - Name string `gorm:"primaryKey"` - Retry []*RetryInfo `gorm:"serializer:json;type:text"` // 重试信息 - Failed []*RetryInfo `gorm:"serializer:json;type:text"` // 失败信息 - Offset int64 `gorm:"column:offset"` // 消费位置,等于最后一次消费的事件id - CreatedAt time.Time `gorm:"index"` // 记录创建时间 - UpdatedAt time.Time `gorm:"index"` // 记录的更新时间 + Name string `gorm:"primaryKey"` + CreatedAt time.Time `gorm:"index"` // 记录创建时间 + UpdatedAt time.Time `gorm:"index"` // 记录的更新时间 } func (o *ServicePO) GetID() string { @@ -127,7 +135,43 @@ func (o *ServicePO) TableName() string { func eventPersist(event *dddfirework.DomainEvent) (*EventPO, error) { return &EventPO{ EventID: event.ID, + Sender: event.Sender, Event: event, EventCreatedAt: event.CreatedAt, }, nil } + +/* +CREATE TABLE `ddd_domain_service_event` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `service` varchar(30) DEFAULT NULL, + `event_id` bigint NOT NULL, + `retry_count` int(11) DEFAULT NULL, + `status` tinyint DEFAULT NULL, + `failed_message` text, + `event_created_at` datetime(3) NOT NULL, + `next_time` datetime(3) NOT NULL, + `run_at` datetime(3) DEFAULT '1970-01-01 00:00:01', + PRIMARY KEY (`id`), + UNIQUE KEY `idx_service_event_id` (`service`,`event_id`), + KEY `idx_service_status_next_time` (`service`,`status`,`next_time`), + KEY `idx_ddd_domain_service_event_event_created_at` (`event_created_at`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +*/ + +// ServiceEventPO 记录service对event的处理情况 +type ServiceEventPO struct { + ID int64 `gorm:"primaryKey;autoIncrement"` + Service string `gorm:"type:varchar(30);column:service;uniqueIndex:idx_service_event_id;index:idx_service_status_next_time;"` + EventID int64 `gorm:"column:event_id;uniqueIndex:idx_service_event_id;not null"` + RetryCount int `gorm:"type:int(11);column:retry_count"` // 重试次数 + Status ServiceEventStatus `gorm:"type:tinyint;column:status;index:idx_service_status_next_time;"` // service event状态 + FailedMessage string `gorm:"type:text;column:failed_message"` // 失败详情 + EventCreatedAt time.Time `gorm:"type:datetime(3);index;not null"` // 事件的创建时间 + NextTime time.Time `gorm:"type:datetime(3);index:idx_service_status_next_time;not null"` // 事件可以运行的事件 + RunAt time.Time `gorm:"type:datetime(3);zeroValue:1970-01-01 00:00:01;default:'1970-01-01 00:00:01+00:00'"` // 事件真实运行时间,单纯记录下 +} + +func (o *ServiceEventPO) TableName() string { + return "ddd_domain_service_event" +} diff --git a/logger/stdr/stdr.go b/logger/stdr/stdr.go index 9a0e948..e293d2e 100644 --- a/logger/stdr/stdr.go +++ b/logger/stdr/stdr.go @@ -24,5 +24,5 @@ import ( ) func NewStdr(name string) logr.Logger { - return stdr.New(stdlog.New(os.Stderr, "", stdlog.LstdFlags|stdlog.Lshortfile)).WithName(name) + return stdr.New(stdlog.New(os.Stdout, "", stdlog.LstdFlags|stdlog.Lshortfile)).WithName(name) }