Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server/auth: scoring for offline users must load history from DB #1083

Merged
merged 3 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 124 additions & 55 deletions server/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,44 +657,73 @@ func (auth *AuthManager) UserSettlingLimit(user account.AccountID, mkt *dex.Mark
return limit
}

// userScore computes the user score from the user's recent match outcomes and
// preimage history. This must be called with the violationMtx locked.
func (auth *AuthManager) userScore(user account.AccountID) (score int32) {
if outcomes, found := auth.matchOutcomes[user]; found {
for v, count := range outcomes.binViolations() {
func integrateOutcomes(matchOutcomes *latestMatchOutcomes, preimgOutcomes *latestPreimageOutcomes) (score, successCount, piMissCount int32) {
if matchOutcomes != nil {
matchCounts := matchOutcomes.binViolations()
for v, count := range matchCounts {
score += v.Score() * int32(count)
}
successCount = int32(matchCounts[ViolationSwapSuccess])
}
if outcomes, found := auth.preimgOutcomes[user]; found {
score += ViolationPreimageMiss.Score() * outcomes.misses()
if preimgOutcomes != nil {
piMissCount = preimgOutcomes.misses()
score += ViolationPreimageMiss.Score() * piMissCount
}
return
}

func (auth *AuthManager) registerMatchOutcome(user account.AccountID, misstep NoActionStep, mmid db.MarketMatchID, value uint64, refTime time.Time) int32 {
// userScore computes an authenticated user's score from their recent match
// outcomes and preimage history. They must have entries in the outcome maps.
// Use loadUserScore to compute score from history in DB. This must be called
// with the violationMtx locked.
func (auth *AuthManager) userScore(user account.AccountID) (score int32) {
score, _, _ = integrateOutcomes(auth.matchOutcomes[user], auth.preimgOutcomes[user])
return score
}

func (auth *AuthManager) registerMatchOutcome(user account.AccountID, misstep NoActionStep, mmid db.MarketMatchID, value uint64, refTime time.Time) (score int32) {
violation := misstep.Violation()

auth.violationMtx.Lock()
defer auth.violationMtx.Unlock()
outcomes, found := auth.matchOutcomes[user]
if !found {
outcomes = newLatestMatchOutcomes(scoringMatchLimit)
auth.matchOutcomes[user] = outcomes
}
Comment on lines -680 to -683
Copy link
Member Author

@chappjc chappjc May 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the gist of the bug. I suspect it was written (by me) with the thinking "no known outcomes, so this must be a new user with no previous outcomes" but the absence of an entry really means offline user.

outcomes.add(&matchOutcome{
time: encode.UnixMilli(refTime),
mid: mmid.MatchID,
outcome: violation,
value: value,
base: mmid.Base,
quote: mmid.Quote,
})
matchOutcomes, found := auth.matchOutcomes[user]
if found {
matchOutcomes.add(&matchOutcome{
time: encode.UnixMilli(refTime),
mid: mmid.MatchID,
outcome: violation,
value: value,
base: mmid.Base,
quote: mmid.Quote,
})
score = auth.userScore(user)
log.Debugf("Registering outcome %q (badness %d) for user %v, new score = %d",
violation.String(), violation.Score(), user, score)
return
}

// The user is currently not connected and authenticated. When the user logs
// back in, their history will be reloaded (loadUserScore) and their account
// will be suspended/restored as required, but compute their score now from
// DB so their orders may be unbooked if need.
matchOutcomes, piOutcomes, err := auth.loadUserOutcomes(user)
if err != nil {
log.Errorf("Failed to load swap and preimage outcomes for user %v: %v", user, err)
return 0
}

// Make outcome entries for the user to optimize subsequent outcomes calls
// while they are disconnected? This could lead to adding duplicate outcomes
// with a concurrent connect/login or subsequent outcomes while offline.
//
// auth.matchOutcomes[user] = matchOutcomes
// auth.preimgOutcomes[user] = piOutcomes
Comment on lines +715 to +720
Copy link
Member Author

@chappjc chappjc May 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be safe with the new duplicate checks in the outcome structs' add methods, but it's a little messy and these map entries are assumed not to be present for a disconnected user. (The entries would remain in the map for eternity if they never logged back in.)

Even for the largest matches table so far, the loadUserOutcomes query is ~30ms on my (admittedly fast) machine and ~200ms on a closer-to-production-equivalent machine. Even the occasional string of a dozen back-to-back outcomes for offline users (match failures or preimage misses) should be tolerable. We can experiment with this optimization or even separate outcome maps if it becomes slow to hit the DB for each outcome for an offline user.


score := auth.userScore(user)
log.Debugf("Registering outcome %q (badness %d) for user %v, new score = %d",
score, _, _ = integrateOutcomes(matchOutcomes, piOutcomes)
log.Debugf("Registering outcome %q (badness %d) for user %v (offline), current score = %d",
violation.String(), violation.Score(), user, score)

return score
return
}

// SwapSuccess registers the successful completion of a swap by the given user.
Expand Down Expand Up @@ -728,26 +757,48 @@ func (auth *AuthManager) Inaction(user account.AccountID, misstep NoActionStep,
}
}

func (auth *AuthManager) registerPreimageOutcome(user account.AccountID, miss bool, oid order.OrderID, refTime time.Time) int32 {
func (auth *AuthManager) registerPreimageOutcome(user account.AccountID, miss bool, oid order.OrderID, refTime time.Time) (score int32) {
auth.violationMtx.Lock()
defer auth.violationMtx.Unlock()
outcomes, found := auth.preimgOutcomes[user]
if !found {
outcomes = newLatestPreimageOutcomes(scoringOrderLimit)
auth.preimgOutcomes[user] = outcomes
piOutcomes, found := auth.preimgOutcomes[user]
if found {
piOutcomes.add(&preimageOutcome{
time: encode.UnixMilli(refTime),
oid: oid,
miss: miss,
})
score = auth.userScore(user)
if miss {
log.Debugf("Registering outcome %q (badness %d) for user %v, new score = %d",
ViolationPreimageMiss.String(), ViolationPreimageMiss.Score(), user, score)
}
return
}
outcomes.add(&preimageOutcome{
time: encode.UnixMilli(refTime),
oid: oid,
miss: miss,
})

score := auth.userScore(user)
// The user is currently not connected and authenticated. When the user logs
// back in, their history will be reloaded (loadUserScore) and their account
// will be suspended/restored as required, but compute their score now from
// DB so their orders may be unbooked if need.
matchOutcomes, piOutcomes, err := auth.loadUserOutcomes(user)
if err != nil {
log.Errorf("Failed to load swap and preimage outcomes for user %v: %v", user, err)
return 0
}

// Make outcome entries for the user to optimize subsequent outcomes calls
// while they are disconnected? This could lead to adding duplicate outcomes
// with a concurrent connect/login or subsequent outcomes while offline.
//
// auth.matchOutcomes[user] = matchOutcomes
// auth.preimgOutcomes[user] = piOutcomes

score, _, _ = integrateOutcomes(matchOutcomes, piOutcomes)
if miss {
log.Debugf("Registering outcome %q (badness %d) for user %v, new score = %d",
log.Debugf("Registering outcome %q (badness %d) for user %v (offline), current score = %d",
ViolationPreimageMiss.String(), ViolationPreimageMiss.Score(), user, score)
}
return score

return
}

// PreimageSuccess registers an accepted preimage for the user.
Expand Down Expand Up @@ -944,15 +995,14 @@ func (auth *AuthManager) removeClient(client *clientInfo) {
auth.violationMtx.Unlock()
}

// loadUserScore computes the user's current score from order and swap data
// retrieved from the DB.
func (auth *AuthManager) loadUserScore(user account.AccountID) (int32, error) {
var successCount, piMissCount int32
// loadUserOutcomes returns user's latest match and preimage outcomes from order
// and swap data retrieved from the DB.
func (auth *AuthManager) loadUserOutcomes(user account.AccountID) (*latestMatchOutcomes, *latestPreimageOutcomes, error) {
// Load the N most recent matches resulting in success or an at-fault match
// revocation for the user.
matchOutcomes, err := auth.storage.CompletedAndAtFaultMatchStats(user, scoringMatchLimit)
if err != nil {
return 0, fmt.Errorf("CompletedAndAtFaultMatchStats: %w", err)
return nil, nil, fmt.Errorf("CompletedAndAtFaultMatchStats: %w", err)
}

matchStatusToViol := func(status order.MatchStatus) Violation {
Expand All @@ -975,22 +1025,16 @@ func (auth *AuthManager) loadUserScore(user account.AccountID) (int32, error) {
// Load the count of preimage misses in the N most recently placed orders.
piOutcomes, err := auth.storage.PreimageStats(user, scoringOrderLimit)
if err != nil {
return 0, fmt.Errorf("PreimageStats: %w", err)
return nil, nil, fmt.Errorf("PreimageStats: %w", err)
}

auth.violationMtx.Lock()
defer auth.violationMtx.Unlock()

latestMatches := newLatestMatchOutcomes(scoringMatchLimit)
auth.matchOutcomes[user] = latestMatches
for _, mo := range matchOutcomes {
// The Fail flag qualifies MakerRedeemed, which is always success for
// maker, but fail for taker if revoked.
v := ViolationSwapSuccess
if mo.Fail {
v = matchStatusToViol(mo.Status)
} else {
successCount++
}
latestMatches.add(&matchOutcome{
time: mo.Time,
Expand All @@ -1003,20 +1047,35 @@ func (auth *AuthManager) loadUserScore(user account.AccountID) (int32, error) {
}

latestPreimageResults := newLatestPreimageOutcomes(scoringOrderLimit)
auth.preimgOutcomes[user] = latestPreimageResults
for _, po := range piOutcomes {
if po.Miss {
piMissCount++
}
latestPreimageResults.add(&preimageOutcome{
time: po.Time,
oid: po.ID,
miss: po.Miss,
})
}

// Integrate the match and preimage outcomes.
score := auth.userScore(user)
return latestMatches, latestPreimageResults, nil
}

// loadUserScore computes the user's current score from order and swap data
// retrieved from the DB. The creates entries in the matchOutcomes and
// preimgOutcomes maps for the user.
func (auth *AuthManager) loadUserScore(user account.AccountID) (int32, error) {
// Load the N most recent matches resulting in success or an at-fault match
// revocation for the user.
latestMatches, latestPreimageResults, err := auth.loadUserOutcomes(user)
if err != nil {
return 0, err
}

score, successCount, piMissCount := integrateOutcomes(latestMatches, latestPreimageResults)

// Make outcome entries for the user.
auth.violationMtx.Lock()
auth.matchOutcomes[user] = latestMatches
auth.preimgOutcomes[user] = latestPreimageResults
auth.violationMtx.Unlock()

successScore := successCount * successScore // negative
piMissScore := piMissCount * preimageMissScore
Expand Down Expand Up @@ -1131,6 +1190,16 @@ func (auth *AuthManager) handleConnect(conn comms.Link, msg *msgjson.Message) *m
client.suspended = true
auth.storage.CloseAccount(user, account.FailureToAct)
log.Debugf("Suspended account %v (score = %d) connected.", acctInfo.ID, score)
} else if score < int32(auth.banScore) && !open {
// banScore is a configurable threshold that may have changed. This also
// assists account recovery in the event of an online accounting bug.
if err = auth.Unban(user); err == nil {
log.Warnf("Restoring suspended account %v (score = %d).", acctInfo.ID, score)
client.suspended = false
} else {
log.Errorf("Failed to restore suspended account %v (score = %d): %v.",
acctInfo.ID, score, err)
}
}

// Get the list of active orders for this user.
Expand Down
27 changes: 27 additions & 0 deletions server/auth/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ func newPreimageResult(miss bool, t int64) *db.PreimageResult {
return &db.PreimageResult{
Miss: miss,
Time: t,
ID: randomOrderID(),
}
}

Expand Down Expand Up @@ -912,6 +913,11 @@ func TestAccountErrors(t *testing.T) {
}

// closed accounts allowed to connect

// Make a violation score above ban score reflected by the DB.
score := setViolations()
defer clearViolations()

rig.mgr.removeClient(rig.mgr.user(user.acctID)) // disconnect first, NOTE that link.Disconnect is async
user.conn = tNewRPCClient() // disconnect necessitates new conn ID
rig.storage.closed = true
Expand All @@ -928,6 +934,27 @@ func TestAccountErrors(t *testing.T) {
t.Errorf("client should have been in suspended state")
}

// Raise the ban score threshold to ensure automatic reinstatement.
initBanScore := rig.mgr.banScore
defer func() { rig.mgr.banScore = initBanScore }()
rig.mgr.banScore = uint32(score + 1)

rig.mgr.removeClient(rig.mgr.user(user.acctID)) // disconnect first, NOTE that link.Disconnect is async
user.conn = tNewRPCClient() // disconnect necessitates new conn ID
rig.storage.closed = true
rpcErr = rig.mgr.handleConnect(user.conn, connect)
rig.storage.closed = false
if rpcErr != nil {
t.Fatalf("should be no error for closed account")
}
client = rig.mgr.user(user.acctID)
if client == nil {
t.Fatalf("client not found")
}
if client.isSuspended() {
t.Errorf("client should have unbaned automatically")
}

}

func TestRoute(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions server/auth/latest.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ func (la *latestMatchOutcomes) add(mo *matchOutcome) {
la.mtx.Lock()
defer la.mtx.Unlock()

// Do a dumb search for the match ID, without regard to time, so we can't
// insert a match twice.
for _, oc := range la.outcomes {
if oc.mid == mo.mid {
log.Warnf("(*latestMatchOutcomes).add: Rejecting duplicate match ID: %v", mo.mid)
return
}
}

// Use sort.Search and insert it at the right spot.
n := len(la.outcomes)
i := sort.Search(n, func(i int) bool {
Expand All @@ -68,6 +77,9 @@ func (la *latestMatchOutcomes) add(mo *matchOutcome) {
}

func (la *latestMatchOutcomes) binViolations() map[Violation]int64 {
la.mtx.Lock()
defer la.mtx.Unlock()

bins := make(map[Violation]int64)
for _, mo := range la.outcomes {
bins[mo.outcome]++
Expand Down Expand Up @@ -148,6 +160,15 @@ func (la *latestPreimageOutcomes) add(po *preimageOutcome) {
la.mtx.Lock()
defer la.mtx.Unlock()

// Do a dumb search for the order ID, without regard to time, so we can't
// insert an order twice.
for _, oc := range la.outcomes {
if oc.oid == po.oid {
log.Warnf("(*latestPreimageOutcomes).add: Rejecting duplicate order ID: %v", po.oid)
return
}
}

// Use sort.Search and insert it at the right spot.
n := len(la.outcomes)
i := sort.Search(n, func(i int) bool {
Expand All @@ -170,6 +191,9 @@ func (la *latestPreimageOutcomes) add(po *preimageOutcome) {
}

func (la *latestPreimageOutcomes) misses() (misses int32) {
la.mtx.Lock()
defer la.mtx.Unlock()

for _, th := range la.outcomes {
if th.miss {
misses++
Expand Down
4 changes: 2 additions & 2 deletions server/auth/latest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func Test_latestMatchOutcomes(t *testing.T) {
times[i] = t
ordList.add(&matchOutcome{
time: t,
// zero OrderID
mid: randomMatchID(),
})
}

Expand Down Expand Up @@ -87,7 +87,7 @@ func Test_latestPreimageOutcomes(t *testing.T) {
times[i] = t
ordList.add(&preimageOutcome{
time: t,
// zero OrderID
oid: randomOrderID(),
})
}

Expand Down
4 changes: 2 additions & 2 deletions server/market/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -1707,8 +1707,6 @@ func (m *Market) prepEpoch(orders []order.Order, epochEnd time.Time) (cSum []byt
for _, ord := range misses {
log.Infof("No preimage received for order %v from user %v. Recording violation and revoking order.",
ord.ID(), ord.User())
// Register the preimage miss violation, adjusting the user's score.
m.auth.MissedPreimage(ord.User(), epochEnd, ord.ID())
// Unlock the order's coins locked in processOrder.
m.unlockOrderCoins(ord) // could also be done in processReadyEpoch
// Change the order status from orderStatusEpoch to orderStatusRevoked.
Expand All @@ -1719,6 +1717,8 @@ func (m *Market) prepEpoch(orders []order.Order, epochEnd time.Time) (cSum []byt
log.Errorf("Failed to revoke order %v with a new cancel order: %v",
ord.UID(), err)
}
// Register the preimage miss violation, adjusting the user's score.
m.auth.MissedPreimage(ord.User(), epochEnd, ord.ID())
Comment on lines +1720 to +1721
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved below the DB storage of the preimage miss.

}

// Register the preimage collection successes, potentially evicting preimage
Expand Down
Loading