Skip to content

Commit

Permalink
fix: remove big transaction in handleEvents
Browse files Browse the repository at this point in the history
  • Loading branch information
liqiankun1111 committed Apr 25, 2024
1 parent eb0f0ef commit e1ab495
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 14 deletions.
35 changes: 24 additions & 11 deletions eventbus/mysql/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"gorm.io/gorm/clause"

"github.com/bytedance/dddfirework"
lockdb "github.com/bytedance/dddfirework/lock/db"
"github.com/bytedance/dddfirework/logger"
"github.com/bytedance/dddfirework/logger/stdr"
)
Expand Down Expand Up @@ -155,6 +156,7 @@ type EventBus struct {

cb dddfirework.DomainEventHandler
cleanCron *cron.Cron
lock *lockdb.DBLock
once sync.Once
}

Expand All @@ -168,7 +170,8 @@ func NewEventBus(serviceName string, db *gorm.DB, options ...Option) *EventBus {
}

opt := Options{
QueueLimit: defaultQueueLimit,
QueueLimit: defaultQueueLimit,
// 注意,任何时候,handleEvents 执行耗时不应超过 RunInterval
RunInterval: runInterval,
CleanCron: cleanCron,
RetentionTime: retentionTime,
Expand Down Expand Up @@ -196,7 +199,6 @@ func NewEventBus(serviceName string, db *gorm.DB, options ...Option) *EventBus {
} else {
strategy = &IntervalRetry{Interval: retryInterval, Limit: retryLimit}
}

eb := &EventBus{
serviceName: serviceName,
db: db,
Expand All @@ -205,6 +207,11 @@ func NewEventBus(serviceName string, db *gorm.DB, options ...Option) *EventBus {
opt: opt,
txKey: contextKey(fmt.Sprintf("eventbus_tx_%d", time.Now().Unix())),
cleanCron: cron.New(),
// 多实例场景下 event handler/handleEvents 必须单实例执行(因为每个event的执行结果保存在 service里而不是event表里,且重试等最好由单实例触发),所以要借助分布式锁
// 有效期为锁间隔,避免时间太短锁过期,在实例1 handleEvents 未执行完成时,实例2 开始执行handleEvents
lock: lockdb.NewDBLock(db, opt.RunInterval, func(opt *lockdb.Options) {
opt.Retry = false
}),
}
_ = eb.initService()
return eb
Expand Down Expand Up @@ -329,7 +336,8 @@ func (e *EventBus) initService() error {

func (e *EventBus) lockService(tx *gorm.DB) (*ServicePO, error) {
service := &ServicePO{}
if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).
//if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Where("name = ?", e.serviceName).
First(service).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
Expand Down Expand Up @@ -462,18 +470,19 @@ func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (fai

func (e *EventBus) handleEvents() error {
e.logger.V(logger.LevelDebug).Info("handle events")

return e.db.Transaction(func(tx *gorm.DB) error {
ctx := context.Background()
service, err := e.lockService(tx)
ctx := context.Background()
fn := func(ctx context.Context) error {
// 以下代码不要用大事务来保证一致性,因为一次可能处理比如100个event,耗时较久,event handler 里可能处理各种数据,且都得等事务提交后才释放锁
// 这会导致这个大事务持续时间长,且锁住了过多数据进而影响业务代码
service, err := e.lockService(e.db)
if err != nil {
return err
}
scanEvents, err := e.getScanEvents(tx, service)
scanEvents, err := e.getScanEvents(e.db, service)
if err != nil {
return err
}
retryEvents, remainIDs, err := e.getRetryEvents(tx, service)
retryEvents, remainIDs, err := e.getRetryEvents(e.db, service)
if err != nil {
return err
}
Expand All @@ -496,8 +505,12 @@ func (e *EventBus) handleEvents() error {
last := scanEvents[len(scanEvents)-1]
service.Offset = last.ID
}
return tx.Save(service).Error
})
// 如果save(service) 执行失败,则可能引起 event 被重复消费,简单的eventbus 很难保证最终一致性
return e.db.Save(service).Error
}
// 确保在多实例场景下,只有一个实例可以执行handleEvents, lock.Run 还支持自动续期
err := e.lock.Run(ctx, fmt.Sprintf("eventbus-service-%s", e.serviceName), fn)
return err
}

func (e *EventBus) evictEvents(retry, failed []*RetryInfo) (evictedRetry, evictedFailed []*RetryInfo) {
Expand Down
7 changes: 7 additions & 0 deletions eventbus/mysql/po.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ func (o *EventPO) TableName() string {
return "ddd_domain_event"
}

/*
ddd_event_transaction 作用
1. 一个command 操作中可能发event,事务event保证了 command 操作+发event 要么都成功要么都失败(也就是command 失败了不发event)
2. 如果eventbus 不是 ITransactionEventBus 实现或者 发的event.type 不是 SendTypeTransaction ,则不保证上述能力。
从实现上看,对于事务event,没有记录在event 表中,而是记录在了 ddd_event_transaction.events 里,command发送event 时先记录在这里,command 执行成功,再发送ddd_event_transaction.events
*/

// Transaction
/*
CREATE TABLE `ddd_event_transaction` (
Expand Down
6 changes: 3 additions & 3 deletions lock/db/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (r *DBLock) update(ctx context.Context, keyLock interface{}) error {
return nil
}

func (r *DBLock) Run(ctx context.Context, key string, fn func(ctx context.Context)) error {
func (r *DBLock) Run(ctx context.Context, key string, fn func(ctx context.Context) error) error {
locker, err := r.Lock(ctx, key)
if err != nil {
return err
Expand All @@ -184,6 +184,6 @@ func (r *DBLock) Run(ctx context.Context, key string, fn func(ctx context.Contex
}
}()

fn(subCtx)
return nil
err = fn(subCtx)
return err
}

0 comments on commit e1ab495

Please sign in to comment.