Skip to content

Commit

Permalink
Add tournament termination function and guarding against race conditi…
Browse files Browse the repository at this point in the history
…ons on End callback
  • Loading branch information
ftkg committed Nov 30, 2021
1 parent f4490d9 commit 2a6cdf9
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 24 deletions.
23 changes: 23 additions & 0 deletions server/core_tournament.go
Expand Up @@ -84,6 +84,29 @@ func TournamentDelete(ctx context.Context, cache LeaderboardCache, rankCache Lea
return nil
}

func TournamentTerminate(ctx context.Context, cache LeaderboardCache, rankCache LeaderboardRankCache, scheduler LeaderboardScheduler, leaderboardId string) error {
ts := time.Now().Unix()
tMinusOne := time.Unix(ts-1, 0).UTC()

if err := TournamentDelete(ctx, cache, rankCache, scheduler, leaderboardId); err != nil {
return err
}
leaderboard := cache.Get(leaderboardId)
if leaderboard == nil || !leaderboard.IsTournament() {
// If it does not exist treat it as success.
return nil
}

if leaderboard.EndTime > 0 && leaderboard.EndTime < ts {
// Tournament has ended, callback has already run.
return nil
}

scheduler.QueueCallback(&LeaderboardSchedulerCallback{id: leaderboardId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: End})

return nil
}

func TournamentAddAttempt(ctx context.Context, logger *zap.Logger, db *sql.DB, cache LeaderboardCache, leaderboardId string, owner string, count int) error {
if count == 0 {
// No-op.
Expand Down
36 changes: 19 additions & 17 deletions server/leaderboard_cache.go
Expand Up @@ -43,23 +43,25 @@ const (
)

type Leaderboard struct {
Id string
Authoritative bool
SortOrder int
Operator int
ResetScheduleStr string
ResetSchedule *cronexpr.Expression
Metadata string
CreateTime int64
Category int
Description string
Duration int
EndTime int64
JoinRequired bool
MaxSize int
MaxNumScore int
Title string
StartTime int64
sync.Mutex // Guarding EndCallbackInvoked
Id string
Authoritative bool
SortOrder int
Operator int
ResetScheduleStr string
ResetSchedule *cronexpr.Expression
Metadata string
CreateTime int64
Category int
Description string
Duration int
EndTime int64
JoinRequired bool
MaxSize int
MaxNumScore int
Title string
StartTime int64
EndCallbackInvoked bool
}

func (l *Leaderboard) IsTournament() bool {
Expand Down
47 changes: 40 additions & 7 deletions server/leaderboard_scheduler.go
Expand Up @@ -26,11 +26,19 @@ import (
"go.uber.org/zap"
)

type CallbackType int

const (
Expiry CallbackType = iota // Reset
End
)

type LeaderboardSchedulerCallback struct {
id string
leaderboard *Leaderboard
ts int64
t time.Time
id string
leaderboard *Leaderboard
ts int64
t time.Time
callbackType CallbackType
}

type LeaderboardScheduler interface {
Expand All @@ -39,6 +47,7 @@ type LeaderboardScheduler interface {
Resume()
Stop()
Update()
QueueCallback(callback *LeaderboardSchedulerCallback)
}

type LocalLeaderboardScheduler struct {
Expand Down Expand Up @@ -279,6 +288,10 @@ func (ls *LocalLeaderboardScheduler) Update() {
ls.logger.Info("Leaderboard scheduler update", zap.Duration("end_active", endActiveDuration), zap.Int("end_active_count", len(endActiveLeaderboardIds)), zap.Duration("expiry", expiryDuration), zap.Int("expiry_count", len(expiryLeaderboardIds)))
}

func (ls *LocalLeaderboardScheduler) QueueCallback(c *LeaderboardSchedulerCallback) {
ls.queue <- c
}

func (ls *LocalLeaderboardScheduler) queueEndActiveElapse(t time.Time, ids []string) {
if ls.active.Load() != 1 {
// Not active.
Expand Down Expand Up @@ -311,8 +324,15 @@ func (ls *LocalLeaderboardScheduler) queueEndActiveElapse(t time.Time, ids []str
// Process the current set of tournament ends.
for _, id := range ids {
currentId := id

leaderboard := ls.cache.Get(id)
if leaderboard == nil {
// Cached entry was deleted before it reached the scheduler here.
continue
}

// Will block if the queue is full.
ls.queue <- &LeaderboardSchedulerCallback{id: currentId, ts: ts, t: tMinusOne}
ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: End}
}
}()
}
Expand Down Expand Up @@ -357,7 +377,7 @@ func (ls *LocalLeaderboardScheduler) queueExpiryElapse(t time.Time, ids []string
continue
}
// Will block if queue is full.
ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne}
ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: Expiry}
}
}()
}
Expand All @@ -368,7 +388,7 @@ func (ls *LocalLeaderboardScheduler) invokeCallback() {
case <-ls.ctx.Done():
return
case callback := <-ls.queue:
if callback.leaderboard != nil {
if callback.callbackType == Expiry {
if callback.leaderboard.IsTournament() {
// Tournament, fetch most up to date info for size etc.
// Some processing is needed even if there is no runtime callback registered for tournament reset.
Expand Down Expand Up @@ -412,6 +432,11 @@ WHERE id = $1`
}
}
} else {
// Skip processing if there is no tournament end callback registered.
if ls.fnTournamentEnd == nil {
continue
}

query := `SELECT
id, sort_order, operator, reset_schedule, metadata, create_time,
category, description, duration, end_time, max_size, max_num_score, title, size, start_time
Expand All @@ -427,10 +452,18 @@ WHERE id = $1`
continue
}

callback.leaderboard.Lock()
if callback.leaderboard.EndCallbackInvoked {
callback.leaderboard.Unlock()
// already activated once
continue
}
// fnTournamentEnd cannot be nil here, if it was the callback would not be queued at all.
if err := ls.fnTournamentEnd(ls.ctx, tournament, int64(tournament.EndActive), int64(tournament.NextReset)); err != nil {
ls.logger.Warn("Failed to invoke tournament end callback", zap.Error(err))
}
callback.leaderboard.EndCallbackInvoked = true
callback.leaderboard.Unlock()
}
}
}
Expand Down

0 comments on commit 2a6cdf9

Please sign in to comment.