Skip to content

Commit

Permalink
fix: remove big transaction in handleEvents
Browse files Browse the repository at this point in the history
  • Loading branch information
liqiankun1111 committed Apr 11, 2024
1 parent 7626586 commit 6ef9e2d
Showing 1 changed file with 54 additions and 35 deletions.
89 changes: 54 additions & 35 deletions eventbus/mysql/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"gorm.io/gorm/clause"

"github.com/bytedance/dddfirework"
lockdb "github.com/bytedance/dddfirework/lock/db"
"github.com/bytedance/dddfirework/logger"
"github.com/bytedance/dddfirework/logger/stdr"
)
Expand Down Expand Up @@ -155,6 +156,7 @@ type EventBus struct {

cb dddfirework.DomainEventHandler
cleanCron *cron.Cron
lock *lockdb.DBLock
once sync.Once
}

Expand All @@ -168,7 +170,8 @@ func NewEventBus(serviceName string, db *gorm.DB, options ...Option) *EventBus {
}

opt := Options{
QueueLimit: defaultQueueLimit,
QueueLimit: defaultQueueLimit,
// 注意,任何时候,handleEvents 执行耗时不应超过 RunInterval
RunInterval: runInterval,
CleanCron: cleanCron,
RetentionTime: retentionTime,
Expand Down Expand Up @@ -196,7 +199,6 @@ func NewEventBus(serviceName string, db *gorm.DB, options ...Option) *EventBus {
} else {
strategy = &IntervalRetry{Interval: retryInterval, Limit: retryLimit}
}

eb := &EventBus{
serviceName: serviceName,
db: db,
Expand All @@ -205,6 +207,11 @@ func NewEventBus(serviceName string, db *gorm.DB, options ...Option) *EventBus {
opt: opt,
txKey: contextKey(fmt.Sprintf("eventbus_tx_%d", time.Now().Unix())),
cleanCron: cron.New(),
// 多实例场景下 event handler/handleEvents 必须单实例执行(因为每个event的执行结果保存在 service里而不是event表里,且重试等最好由单实例触发),所以要借助分布式锁
// 有效期为锁间隔,避免时间太短锁过期,在实例1 handleEvents 未执行完成时,实例2 开始执行handleEvents
lock: lockdb.NewDBLock(db, opt.RunInterval, func(opt *lockdb.Options) {
opt.Retry = false
}),
}
_ = eb.initService()
return eb
Expand Down Expand Up @@ -329,7 +336,8 @@ func (e *EventBus) initService() error {

func (e *EventBus) lockService(tx *gorm.DB) (*ServicePO, error) {
service := &ServicePO{}
if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).
//if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Where("name = ?", e.serviceName).
First(service).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
Expand Down Expand Up @@ -463,41 +471,52 @@ func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (fai
func (e *EventBus) handleEvents() error {
e.logger.V(logger.LevelDebug).Info("handle events")

return e.db.Transaction(func(tx *gorm.DB) error {
ctx := context.Background()
service, err := e.lockService(tx)
if err != nil {
return err
}
scanEvents, err := e.getScanEvents(tx, service)
if err != nil {
return err
}
retryEvents, remainIDs, err := e.getRetryEvents(tx, service)
if err != nil {
return err
}
events := make([]*EventPO, 0)
events = append(events, scanEvents...)
events = append(events, retryEvents...)
if len(events) == 0 {
return nil
ctx := context.Background()
// 确保在多实例场景下,只有一个实例可以执行handleEvents
keyLock, err := e.lock.Lock(ctx, fmt.Sprintf("eventbus-service-%s", e.serviceName))
if err != nil {
return err
}
defer func() {
if err = e.lock.UnLock(ctx, keyLock); err != nil {
e.logger.Error(err, "failed to unlock")
}
}()
// 以下代码不要用大事务来保证一致性,因为一次可能处理比如100个event,耗时较久,event handler 里可能处理各种数据,且都得等事务提交后才释放锁
// 这会导致这个大事务持续时间长,且锁住了过多数据进而影响业务代码
service, err := e.lockService(e.db)
if err != nil {
return err
}
scanEvents, err := e.getScanEvents(e.db, service)
if err != nil {
return err
}
retryEvents, remainIDs, err := e.getRetryEvents(e.db, service)
if err != nil {
return err
}
events := make([]*EventPO, 0)
events = append(events, scanEvents...)
events = append(events, retryEvents...)
if len(events) == 0 {
return nil
}

failedIDs, panicIDs := e.dispatchEvents(ctx, events)
retry, failed := e.doRetryStrategy(service, remainIDs, failedIDs)
failed = append(service.Failed, failed...)
for _, id := range panicIDs {
failed = append(failed, &RetryInfo{ID: id})
}
service.Retry, service.Failed = e.evictEvents(retry, failed)
failedIDs, panicIDs := e.dispatchEvents(ctx, events)
retry, failed := e.doRetryStrategy(service, remainIDs, failedIDs)
failed = append(service.Failed, failed...)
for _, id := range panicIDs {
failed = append(failed, &RetryInfo{ID: id})
}
service.Retry, service.Failed = e.evictEvents(retry, failed)

if len(scanEvents) > 0 {
last := scanEvents[len(scanEvents)-1]
service.Offset = last.ID
}
return tx.Save(service).Error
})
if len(scanEvents) > 0 {
last := scanEvents[len(scanEvents)-1]
service.Offset = last.ID
}
// 如果save(service) 执行失败,则可能引起 event 被重复消费,简单的eventbus 很难保证最终一致性
return e.db.Save(service).Error
}

func (e *EventBus) evictEvents(retry, failed []*RetryInfo) (evictedRetry, evictedFailed []*RetryInfo) {
Expand Down

0 comments on commit 6ef9e2d

Please sign in to comment.