From 2a6cdf9c22c85918062064aebe800829685fb0b6 Mon Sep 17 00:00:00 2001 From: Fernando Takagi Date: Tue, 30 Nov 2021 16:09:05 -0300 Subject: [PATCH] Add tournament termination function and guarding against race conditions on End callback --- server/core_tournament.go | 23 ++++++++++++++++ server/leaderboard_cache.go | 36 +++++++++++++------------ server/leaderboard_scheduler.go | 47 ++++++++++++++++++++++++++++----- 3 files changed, 82 insertions(+), 24 deletions(-) diff --git a/server/core_tournament.go b/server/core_tournament.go index 52a9577bc..45e7d43ed 100644 --- a/server/core_tournament.go +++ b/server/core_tournament.go @@ -84,6 +84,29 @@ func TournamentDelete(ctx context.Context, cache LeaderboardCache, rankCache Lea return nil } +func TournamentTerminate(ctx context.Context, cache LeaderboardCache, rankCache LeaderboardRankCache, scheduler LeaderboardScheduler, leaderboardId string) error { + ts := time.Now().Unix() + tMinusOne := time.Unix(ts-1, 0).UTC() + + if err := TournamentDelete(ctx, cache, rankCache, scheduler, leaderboardId); err != nil { + return err + } + leaderboard := cache.Get(leaderboardId) + if leaderboard == nil || !leaderboard.IsTournament() { + // If it does not exist treat it as success. + return nil + } + + if leaderboard.EndTime > 0 && leaderboard.EndTime < ts { + // Tournament has ended, callback has already run. + return nil + } + + scheduler.QueueCallback(&LeaderboardSchedulerCallback{id: leaderboardId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: End}) + + return nil +} + func TournamentAddAttempt(ctx context.Context, logger *zap.Logger, db *sql.DB, cache LeaderboardCache, leaderboardId string, owner string, count int) error { if count == 0 { // No-op. diff --git a/server/leaderboard_cache.go b/server/leaderboard_cache.go index 98729b8ac..8ba839d4c 100644 --- a/server/leaderboard_cache.go +++ b/server/leaderboard_cache.go @@ -43,23 +43,25 @@ const ( ) type Leaderboard struct { - Id string - Authoritative bool - SortOrder int - Operator int - ResetScheduleStr string - ResetSchedule *cronexpr.Expression - Metadata string - CreateTime int64 - Category int - Description string - Duration int - EndTime int64 - JoinRequired bool - MaxSize int - MaxNumScore int - Title string - StartTime int64 + sync.Mutex // Guarding EndCallbackInvoked + Id string + Authoritative bool + SortOrder int + Operator int + ResetScheduleStr string + ResetSchedule *cronexpr.Expression + Metadata string + CreateTime int64 + Category int + Description string + Duration int + EndTime int64 + JoinRequired bool + MaxSize int + MaxNumScore int + Title string + StartTime int64 + EndCallbackInvoked bool } func (l *Leaderboard) IsTournament() bool { diff --git a/server/leaderboard_scheduler.go b/server/leaderboard_scheduler.go index adec8cbb3..5de415050 100644 --- a/server/leaderboard_scheduler.go +++ b/server/leaderboard_scheduler.go @@ -26,11 +26,19 @@ import ( "go.uber.org/zap" ) +type CallbackType int + +const ( + Expiry CallbackType = iota // Reset + End +) + type LeaderboardSchedulerCallback struct { - id string - leaderboard *Leaderboard - ts int64 - t time.Time + id string + leaderboard *Leaderboard + ts int64 + t time.Time + callbackType CallbackType } type LeaderboardScheduler interface { @@ -39,6 +47,7 @@ type LeaderboardScheduler interface { Resume() Stop() Update() + QueueCallback(callback *LeaderboardSchedulerCallback) } type LocalLeaderboardScheduler struct { @@ -279,6 +288,10 @@ func (ls *LocalLeaderboardScheduler) Update() { ls.logger.Info("Leaderboard scheduler update", zap.Duration("end_active", endActiveDuration), zap.Int("end_active_count", len(endActiveLeaderboardIds)), zap.Duration("expiry", expiryDuration), zap.Int("expiry_count", len(expiryLeaderboardIds))) } +func (ls *LocalLeaderboardScheduler) QueueCallback(c *LeaderboardSchedulerCallback) { + ls.queue <- c +} + func (ls *LocalLeaderboardScheduler) queueEndActiveElapse(t time.Time, ids []string) { if ls.active.Load() != 1 { // Not active. @@ -311,8 +324,15 @@ func (ls *LocalLeaderboardScheduler) queueEndActiveElapse(t time.Time, ids []str // Process the current set of tournament ends. for _, id := range ids { currentId := id + + leaderboard := ls.cache.Get(id) + if leaderboard == nil { + // Cached entry was deleted before it reached the scheduler here. + continue + } + // Will block if the queue is full. - ls.queue <- &LeaderboardSchedulerCallback{id: currentId, ts: ts, t: tMinusOne} + ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: End} } }() } @@ -357,7 +377,7 @@ func (ls *LocalLeaderboardScheduler) queueExpiryElapse(t time.Time, ids []string continue } // Will block if queue is full. - ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne} + ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne, callbackType: Expiry} } }() } @@ -368,7 +388,7 @@ func (ls *LocalLeaderboardScheduler) invokeCallback() { case <-ls.ctx.Done(): return case callback := <-ls.queue: - if callback.leaderboard != nil { + if callback.callbackType == Expiry { if callback.leaderboard.IsTournament() { // Tournament, fetch most up to date info for size etc. // Some processing is needed even if there is no runtime callback registered for tournament reset. @@ -412,6 +432,11 @@ WHERE id = $1` } } } else { + // Skip processing if there is no tournament end callback registered. + if ls.fnTournamentEnd == nil { + continue + } + query := `SELECT id, sort_order, operator, reset_schedule, metadata, create_time, category, description, duration, end_time, max_size, max_num_score, title, size, start_time @@ -427,10 +452,18 @@ WHERE id = $1` continue } + callback.leaderboard.Lock() + if callback.leaderboard.EndCallbackInvoked { + callback.leaderboard.Unlock() + // already activated once + continue + } // fnTournamentEnd cannot be nil here, if it was the callback would not be queued at all. if err := ls.fnTournamentEnd(ls.ctx, tournament, int64(tournament.EndActive), int64(tournament.NextReset)); err != nil { ls.logger.Warn("Failed to invoke tournament end callback", zap.Error(err)) } + callback.leaderboard.EndCallbackInvoked = true + callback.leaderboard.Unlock() } } }