Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove big transaction in handleEvents #26

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions eventbus/mysql/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"gorm.io/gorm/clause"

"github.com/bytedance/dddfirework"
lockdb "github.com/bytedance/dddfirework/lock/db"
"github.com/bytedance/dddfirework/logger"
"github.com/bytedance/dddfirework/logger/stdr"
)
Expand Down Expand Up @@ -155,6 +156,7 @@ type EventBus struct {

cb dddfirework.DomainEventHandler
cleanCron *cron.Cron
lock *lockdb.DBLock
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个感觉强制绑定dblock了,一定需要引入个dblock么

once sync.Once
}

Expand All @@ -168,7 +170,8 @@ func NewEventBus(serviceName string, db *gorm.DB, options ...Option) *EventBus {
}

opt := Options{
QueueLimit: defaultQueueLimit,
QueueLimit: defaultQueueLimit,
// 注意,任何时候,handleEvents 执行耗时不应超过 RunInterval
RunInterval: runInterval,
CleanCron: cleanCron,
RetentionTime: retentionTime,
Expand Down Expand Up @@ -196,7 +199,6 @@ func NewEventBus(serviceName string, db *gorm.DB, options ...Option) *EventBus {
} else {
strategy = &IntervalRetry{Interval: retryInterval, Limit: retryLimit}
}

eb := &EventBus{
serviceName: serviceName,
db: db,
Expand All @@ -205,6 +207,11 @@ func NewEventBus(serviceName string, db *gorm.DB, options ...Option) *EventBus {
opt: opt,
txKey: contextKey(fmt.Sprintf("eventbus_tx_%d", time.Now().Unix())),
cleanCron: cron.New(),
// 多实例场景下 event handler/handleEvents 必须单实例执行(因为每个event的执行结果保存在 service里而不是event表里,且重试等最好由单实例触发),所以要借助分布式锁
// 有效期为锁间隔,避免时间太短锁过期,在实例1 handleEvents 未执行完成时,实例2 开始执行handleEvents
lock: lockdb.NewDBLock(db, opt.RunInterval, func(opt *lockdb.Options) {
opt.Retry = false
}),
}
_ = eb.initService()
return eb
Expand Down Expand Up @@ -329,7 +336,8 @@ func (e *EventBus) initService() error {

func (e *EventBus) lockService(tx *gorm.DB) (*ServicePO, error) {
service := &ServicePO{}
if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).
//if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Where("name = ?", e.serviceName).
First(service).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
Expand Down Expand Up @@ -462,18 +470,19 @@ func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (fai

func (e *EventBus) handleEvents() error {
e.logger.V(logger.LevelDebug).Info("handle events")

return e.db.Transaction(func(tx *gorm.DB) error {
ctx := context.Background()
service, err := e.lockService(tx)
ctx := context.Background()
fn := func(ctx context.Context) error {
// 以下代码不要用大事务来保证一致性,因为一次可能处理比如100个event,耗时较久,event handler 里可能处理各种数据,且都得等事务提交后才释放锁
// 这会导致这个大事务持续时间长,且锁住了过多数据进而影响业务代码
service, err := e.lockService(e.db)
if err != nil {
return err
}
scanEvents, err := e.getScanEvents(tx, service)
scanEvents, err := e.getScanEvents(e.db, service)
if err != nil {
return err
}
retryEvents, remainIDs, err := e.getRetryEvents(tx, service)
retryEvents, remainIDs, err := e.getRetryEvents(e.db, service)
if err != nil {
return err
}
Expand All @@ -496,8 +505,12 @@ func (e *EventBus) handleEvents() error {
last := scanEvents[len(scanEvents)-1]
service.Offset = last.ID
}
return tx.Save(service).Error
})
// 如果save(service) 执行失败,则可能引起 event 被重复消费,简单的eventbus 很难保证最终一致性
return e.db.Save(service).Error
}
// 确保在多实例场景下,只有一个实例可以执行handleEvents, lock.Run 还支持自动续期
err := e.lock.Run(ctx, fmt.Sprintf("eventbus-service-%s", e.serviceName), fn)
return err
}

func (e *EventBus) evictEvents(retry, failed []*RetryInfo) (evictedRetry, evictedFailed []*RetryInfo) {
Expand Down
7 changes: 7 additions & 0 deletions eventbus/mysql/po.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ func (o *EventPO) TableName() string {
return "ddd_domain_event"
}

/*
ddd_event_transaction 作用
1. 一个command 操作中可能发event,事务event保证了 command 操作+发event 要么都成功要么都失败(也就是command 失败了不发event)
2. 如果eventbus 不是 ITransactionEventBus 实现或者 发的event.type 不是 SendTypeTransaction ,则不保证上述能力。
从实现上看,对于事务event,没有记录在event 表中,而是记录在了 ddd_event_transaction.events 里,command发送event 时先记录在这里,command 执行成功,再发送ddd_event_transaction.events
*/

// Transaction
/*
CREATE TABLE `ddd_event_transaction` (
Expand Down
6 changes: 3 additions & 3 deletions lock/db/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (r *DBLock) update(ctx context.Context, keyLock interface{}) error {
return nil
}

func (r *DBLock) Run(ctx context.Context, key string, fn func(ctx context.Context)) error {
func (r *DBLock) Run(ctx context.Context, key string, fn func(ctx context.Context) error) error {
locker, err := r.Lock(ctx, key)
if err != nil {
return err
Expand All @@ -184,6 +184,6 @@ func (r *DBLock) Run(ctx context.Context, key string, fn func(ctx context.Contex
}
}()

fn(subCtx)
return nil
err = fn(subCtx)
return err
}