Skip to content

Commit

Permalink
fix race/inconsistency in leadboard rank cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Kuznetsov committed Mar 14, 2024
1 parent 676ebb7 commit fcc177a
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 229 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr

## [Unreleased]
### Changes
- Reduce number of memory allocations in leaderboard cache
- Reduce number of memory allocations in leaderboard cache.
- Fix leaderboard rank cache insonsistency/race that could arise under heavy load.

### Fixed
- Prevent players from requesting duplicate joins to the same party.
Expand Down
109 changes: 109 additions & 0 deletions internal/skiplist/skiplist_chaos_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package skiplist

import (
"fmt"
"math/rand"
"sort"
"testing"

"github.com/stretchr/testify/require"
)

func randString(t *testing.T, rnd *rand.Rand) string {
b := make([]byte, 16)
_, err := rnd.Read(b)
require.NoError(t, err)

return fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
}

type testInterface struct {
id string
val int
}

func (ti testInterface) Less(other interface{}) bool {
otherTi := other.(testInterface)

if ti.val < otherTi.val {
return true
}

if ti.val > otherTi.val {
return false
}

return ti.id < otherTi.id
}

func TestSkiplistChaos(t *testing.T) {
rnd := rand.New(rand.NewSource(0))
iterations := 20

for i := 0; i < iterations; i++ {
numUsers := rnd.Intn(2000) + 100
users := make([]string, numUsers)

for j := 0; j < numUsers; j++ {
users[j] = randString(t, rnd)
}

numOps := rnd.Intn(100_000) + 100
ops := make([]testInterface, numOps)

for j := 0; j < numOps; j++ {
id := users[rnd.Intn(numUsers)]
ops[j] = testInterface{
id: id,
val: rnd.Intn(999) + 1,
}
}

numLists := rnd.Intn(47) + 3
lists := make([]*SkipList, numLists)
userOps := make([]map[string]testInterface, numLists)

fmt.Printf("** iteration=%v, users=%v, ops=%v, lists=%v\n", i, numUsers, numOps, numLists)

// Populate lists
for j := 0; j < numLists; j++ {
rnd.Shuffle(len(ops), func(i, j int) {
ops[i], ops[j] = ops[j], ops[i]
})

userOps[j] = make(map[string]testInterface, numUsers)

lists[j] = New()
for _, op := range ops {
oldOp, ok := userOps[j][op.id]
if ok {
lists[j].Delete(oldOp)
op.val += oldOp.val
}

lists[j].Insert(op)
userOps[j][op.id] = op
}
}

// Now verify
for j, sl := range lists {
listOps := make([]testInterface, 0, len(ops))

for _, op := range userOps[j] {
listOps = append(listOps, op)
}

sort.Slice(listOps, func(i, j int) bool {
return listOps[i].Less(listOps[j])
})

for idx, op := range listOps {
rank := idx + 1

listRank := sl.GetRank(op)
require.Equal(t, rank, listRank, "list %d, unexpected rank for op %+v", j, op)
}
}
}
}
4 changes: 2 additions & 2 deletions server/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ func NewConsoleLogger(output *os.File, verbose bool) *zap.Logger {
}

func NewDB(t *testing.T) *sql.DB {
db, err := sql.Open("pgx", "postgresql://root@127.0.0.1:26257/nakama?sslmode=disable")
//db, err := sql.Open("pgx", "postgresql://postgres@127.0.0.1:5432/nakama?sslmode=disable")
//db, err := sql.Open("pgx", "postgresql://root@127.0.0.1:26257/nakama?sslmode=disable")
db, err := sql.Open("pgx", "postgresql://postgres@127.0.0.1:5432/nakama?sslmode=disable")
if err != nil {
t.Fatal("Error connecting to database", err)
}
Expand Down
54 changes: 16 additions & 38 deletions server/core_leaderboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ WHERE leaderboard_id = $1 AND expiry_time = $2 AND owner_id = ANY($3)`
sort.Slice(ownerRecords, sortFn)

// Bulk fill in the ranks of any owner records requested.
rankCount := rankCache.Fill(leaderboardId, leaderboard.SortOrder, expiryTime, ownerRecords)
rankCount := rankCache.Fill(leaderboardId, expiryTime, ownerRecords)

return &api.LeaderboardRecordList{
Records: records,
Expand Down Expand Up @@ -491,7 +491,7 @@ func LeaderboardRecordWrite(ctx context.Context, logger *zap.Logger, db *sql.DB,
VALUES ($1, $2, $3, $4, $5, COALESCE($6, '{}'::JSONB), $7)
ON CONFLICT (owner_id, leaderboard_id, expiry_time)
DO UPDATE SET ` + opSQL + `, num_score = leaderboard_record.num_score + 1, metadata = COALESCE($6, leaderboard_record.metadata), username = COALESCE($3, leaderboard_record.username), update_time = now()` + filterSQL + `
RETURNING username, score, subscore, num_score, max_num_score, metadata, create_time, update_time, (SELECT (score,subscore) FROM leaderboard_record WHERE leaderboard_id=$1 AND owner_id=$2 AND expiry_time=$7)`
RETURNING username, score, subscore, num_score, max_num_score, metadata, create_time, update_time`

params := make([]interface{}, 0, 9)
params = append(params, leaderboardId, ownerID)
Expand All @@ -517,14 +517,15 @@ func LeaderboardRecordWrite(ctx context.Context, logger *zap.Logger, db *sql.DB,
var dbUsername sql.NullString
var dbScore int64
var dbSubscore int64
var dbOldScores int64Tuple
var dbNumScore int32
var dbMaxNumScore int32
var dbMetadata string
var dbCreateTime pgtype.Timestamptz
var dbUpdateTime pgtype.Timestamptz

if err := db.QueryRowContext(ctx, query, params...).Scan(&dbUsername, &dbScore, &dbSubscore, &dbNumScore, &dbMaxNumScore, &dbMetadata, &dbCreateTime, &dbUpdateTime, &dbOldScores); err != nil {
err := db.QueryRowContext(ctx, query, params...).Scan(&dbUsername, &dbScore, &dbSubscore, &dbNumScore, &dbMaxNumScore, &dbMetadata, &dbCreateTime, &dbUpdateTime)

if err != nil {
var pgErr *pgconn.PgError
if err != sql.ErrNoRows && !(errors.As(err, &pgErr) && pgErr.Code == dbErrorUniqueViolation && strings.Contains(pgErr.Message, "leaderboard_record_pkey")) {
logger.Error("Error writing leaderboard record", zap.Error(err))
Expand All @@ -547,18 +548,10 @@ func LeaderboardRecordWrite(ctx context.Context, logger *zap.Logger, db *sql.DB,

var rank int64
if unchanged {
rank = rankCache.Get(leaderboardId, leaderboard.SortOrder, dbScore, dbSubscore, expiryTime, uuid.Must(uuid.FromString(ownerID)))
rank = rankCache.Get(leaderboardId, expiryTime, uuid.Must(uuid.FromString(ownerID)))
} else {
var oldScore *int64
var oldSubscore *int64

if dbOldScores.Valid && len(dbOldScores.Tuple) == 2 {
oldScore = &dbOldScores.Tuple[0]
oldSubscore = &dbOldScores.Tuple[1]
}

// Ensure we have the latest dbscore, dbsubscore if there was an update.
rank = rankCache.Insert(leaderboardId, leaderboard.SortOrder, dbScore, dbSubscore, oldScore, oldSubscore, expiryTime, uuid.Must(uuid.FromString(ownerID)))
rank = rankCache.Insert(leaderboardId, leaderboard.SortOrder, dbScore, dbSubscore, dbNumScore, expiryTime, uuid.Must(uuid.FromString(ownerID)))
}

record := &api.LeaderboardRecord{
Expand Down Expand Up @@ -598,22 +591,15 @@ func LeaderboardRecordDelete(ctx context.Context, logger *zap.Logger, db *sql.DB
expiryTime = leaderboard.ResetSchedule.Next(time.Now().UTC()).UTC().Unix()
}

var score sql.NullInt64
var subscore sql.NullInt64

query := "DELETE FROM leaderboard_record WHERE leaderboard_id = $1 AND owner_id = $2 AND expiry_time = $3 RETURNING score, subscore"
err := db.QueryRowContext(
ctx, query, leaderboardId, ownerID, time.Unix(expiryTime, 0).UTC()).
Scan(&score, &subscore)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
query := "DELETE FROM leaderboard_record WHERE leaderboard_id = $1 AND owner_id = $2 AND expiry_time = $3"
_, err := db.ExecContext(
ctx, query, leaderboardId, ownerID, time.Unix(expiryTime, 0).UTC())
if err != nil {
logger.Error("Error deleting leaderboard record", zap.Error(err))
return err
}

if score.Valid && subscore.Valid {
rankCache.Delete(leaderboardId, leaderboard.SortOrder,
score.Int64, subscore.Int64, expiryTime, uuid.Must(uuid.FromString(ownerID)))
}
rankCache.Delete(leaderboardId, expiryTime, uuid.Must(uuid.FromString(ownerID)))

return nil
}
Expand All @@ -631,7 +617,7 @@ func LeaderboardRecordReadAll(ctx context.Context, logger *zap.Logger, db *sql.D
}

func LeaderboardRecordsDeleteAll(ctx context.Context, logger *zap.Logger, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, tx *sql.Tx, userID uuid.UUID, currentTime int64) error {
query := "DELETE FROM leaderboard_record WHERE owner_id = $1 RETURNING leaderboard_id, expiry_time, score, subscore"
query := "DELETE FROM leaderboard_record WHERE owner_id = $1 RETURNING leaderboard_id, expiry_time"
rows, err := tx.QueryContext(ctx, query, userID.String())
if err != nil {
logger.Error("Error deleting all leaderboard records for user", zap.String("user_id", userID.String()), zap.Error(err))
Expand All @@ -640,10 +626,8 @@ func LeaderboardRecordsDeleteAll(ctx context.Context, logger *zap.Logger, leader

var leaderboardId string
var expiryTime pgtype.Timestamptz
var score int64
var subscore int64
for rows.Next() {
if err := rows.Scan(&leaderboardId, &expiryTime, &score, &subscore); err != nil {
if err := rows.Scan(&leaderboardId, &expiryTime); err != nil {
_ = rows.Close()
logger.Error("Error deleting all leaderboard records for user, failed to scan", zap.String("user_id", userID.String()), zap.Error(err))
return err
Expand All @@ -655,13 +639,7 @@ func LeaderboardRecordsDeleteAll(ctx context.Context, logger *zap.Logger, leader
continue
}

leaderboard := leaderboardCache.Get(leaderboardId)
if leaderboard == nil {
continue
}

leaderboardRankCache.Delete(
leaderboardId, leaderboard.SortOrder, score, subscore, expiryUnix, userID)
leaderboardRankCache.Delete(leaderboardId, expiryUnix, userID)
}
_ = rows.Close()

Expand Down Expand Up @@ -877,7 +855,7 @@ func getLeaderboardRecordsHaystack(ctx context.Context, logger *zap.Logger, db *
}

records = records[start:end]
rankCount := rankCache.Fill(leaderboardId, sortOrder, expiryTime.Unix(), records)
rankCount := rankCache.Fill(leaderboardId, expiryTime.Unix(), records)

var prevCursorStr string
if setPrevCursor {
Expand Down
41 changes: 12 additions & 29 deletions server/core_tournament.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ ON CONFLICT(owner_id, leaderboard_id, expiry_time) DO NOTHING`

// Ensure new tournament joiner is included in the rank cache.
if isNewJoin {
_ = rankCache.Insert(leaderboard.Id, leaderboard.SortOrder, 0, 0, nil, nil, expiryTime, ownerID)
_ = rankCache.Insert(leaderboard.Id, leaderboard.SortOrder, 0, 0, 0, expiryTime, ownerID)
}

logger.Info("Joined tournament.", zap.String("tournament_id", tournamentId), zap.String("owner", ownerID.String()), zap.String("username", username))
Expand Down Expand Up @@ -508,14 +508,11 @@ func TournamentRecordWrite(ctx context.Context, logger *zap.Logger, db *sql.DB,
params = append(params, metadata)
}

var oldScore, oldSubscore *int64

if leaderboard.JoinRequired {
var dbOldScore, dbOldSubscore sql.NullInt64

// If join is required then the user must already have a record to update.
// There's also no need to increment the number of records tracked for this tournament.
err := db.QueryRowContext(ctx, "SELECT score, subscore FROM leaderboard_record WHERE leaderboard_id = $1 AND owner_id = $2 AND expiry_time = $3", leaderboard.Id, ownerId, expiryTime).Scan(&dbOldScore, &dbOldSubscore)
var exists int
err := db.QueryRowContext(ctx, "SELECT 1 FROM leaderboard_record WHERE leaderboard_id = $1 AND owner_id = $2 AND expiry_time = $3", leaderboard.Id, ownerId, expiryTime).Scan(&exists)
if err != nil {
if err == sql.ErrNoRows {
// Tournament required join but no row was found to update.
Expand All @@ -525,11 +522,6 @@ func TournamentRecordWrite(ctx context.Context, logger *zap.Logger, db *sql.DB,
return nil, err
}

if dbOldScore.Valid && dbOldSubscore.Valid {
oldScore = &dbOldScore.Int64
oldSubscore = &dbOldSubscore.Int64
}

query := `UPDATE leaderboard_record
SET ` + opSQL + `, num_score = leaderboard_record.num_score + 1, metadata = COALESCE($7, leaderboard_record.metadata), username = COALESCE($3, leaderboard_record.username), update_time = now()
WHERE leaderboard_id = $1 AND owner_id = $2 AND expiry_time = $4 AND (max_num_score = 0 OR num_score < max_num_score)`
Expand All @@ -545,7 +537,7 @@ func TournamentRecordWrite(ctx context.Context, logger *zap.Logger, db *sql.DB,
query := `INSERT INTO leaderboard_record (leaderboard_id, owner_id, username, score, subscore, metadata, expiry_time, max_num_score)
VALUES ($1, $2, $3, $9, $10, COALESCE($7, '{}'::JSONB), $4, $8)
ON CONFLICT (owner_id, leaderboard_id, expiry_time)
DO UPDATE SET ` + opSQL + `, num_score = leaderboard_record.num_score + 1, metadata = COALESCE($7, leaderboard_record.metadata), username = COALESCE($3, leaderboard_record.username), update_time = now() RETURNING (SELECT (score,subscore,num_score,max_num_score) FROM leaderboard_record WHERE leaderboard_id=$1 AND owner_id=$2 AND expiry_time=$4)`
DO UPDATE SET ` + opSQL + `, num_score = leaderboard_record.num_score + 1, metadata = COALESCE($7, leaderboard_record.metadata), username = COALESCE($3, leaderboard_record.username), update_time = now() RETURNING (SELECT (num_score,max_num_score) FROM leaderboard_record WHERE leaderboard_id=$1 AND owner_id=$2 AND expiry_time=$4)`
params = append(params, leaderboard.MaxNumScore, scoreAbs, subscoreAbs)

if err := ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
Expand All @@ -561,11 +553,9 @@ func TournamentRecordWrite(ctx context.Context, logger *zap.Logger, db *sql.DB,

var dbNumScore int64
var dbMaxNumScore int64
if dbOldScores.Valid && len(dbOldScores.Tuple) == 4 {
oldScore = &dbOldScores.Tuple[0]
oldSubscore = &dbOldScores.Tuple[1]
dbNumScore = dbOldScores.Tuple[2] + 1
dbMaxNumScore = dbOldScores.Tuple[3]
if dbOldScores.Valid && len(dbOldScores.Tuple) == 2 {
dbNumScore = dbOldScores.Tuple[0] + 1
dbMaxNumScore = dbOldScores.Tuple[1]
} else {
// There was no previous score.
dbNumScore = 1
Expand Down Expand Up @@ -637,7 +627,7 @@ func TournamentRecordWrite(ctx context.Context, logger *zap.Logger, db *sql.DB,
}

// Enrich the return record with rank data.
record.Rank = rankCache.Insert(leaderboard.Id, leaderboard.SortOrder, record.Score, record.Subscore, oldScore, oldSubscore, expiryUnix, ownerId)
record.Rank = rankCache.Insert(leaderboard.Id, leaderboard.SortOrder, record.Score, record.Subscore, dbNumScore, expiryUnix, ownerId)

return record, nil
}
Expand All @@ -656,23 +646,16 @@ func TournamentRecordDelete(ctx context.Context, logger *zap.Logger, db *sql.DB,
now := time.Now().UTC()
_, _, expiryUnix := calculateTournamentDeadlines(tournament.StartTime, tournament.EndTime, int64(tournament.Duration), tournament.ResetSchedule, now)

query := "DELETE FROM leaderboard_record WHERE leaderboard_id = $1 AND owner_id = $2 AND expiry_time = $3 RETURNING score, subscore"

var score sql.NullInt64
var subscore sql.NullInt64
query := "DELETE FROM leaderboard_record WHERE leaderboard_id = $1 AND owner_id = $2 AND expiry_time = $3"

err := db.QueryRowContext(
ctx, query, tournamentID, ownerID, time.Unix(expiryUnix, 0).UTC()).
Scan(&score, &subscore)
_, err := db.ExecContext(
ctx, query, tournamentID, ownerID, time.Unix(expiryUnix, 0).UTC())
if err != nil {
logger.Error("Error deleting tournament record", zap.Error(err))
return err
}

if score.Valid && subscore.Valid {
rankCache.Delete(tournamentID, tournament.SortOrder, score.Int64,
subscore.Int64, expiryUnix, uuid.Must(uuid.FromString(ownerID)))
}
rankCache.Delete(tournamentID, expiryUnix, uuid.Must(uuid.FromString(ownerID)))

return nil
}
Expand Down
Loading

0 comments on commit fcc177a

Please sign in to comment.