-
Notifications
You must be signed in to change notification settings - Fork 84
/
auth.go
1970 lines (1749 loc) · 68 KB
/
auth.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.
package auth
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"math"
"sync"
"time"
"decred.org/dcrdex/dex"
"decred.org/dcrdex/dex/msgjson"
"decred.org/dcrdex/dex/order"
"decred.org/dcrdex/dex/wait"
"decred.org/dcrdex/server/account"
"decred.org/dcrdex/server/asset"
"decred.org/dcrdex/server/comms"
"decred.org/dcrdex/server/db"
"github.com/decred/dcrd/dcrec/secp256k1/v4"
"github.com/decred/dcrd/dcrec/secp256k1/v4/ecdsa"
)
const (
cancelThreshWindow = 100 // spec
scoringMatchLimit = 60 // last N matches (success or at-fault fail) to be considered in swap inaction scoring
scoringOrderLimit = 40 // last N orders to be considered in preimage miss scoring
maxIDsPerOrderStatusRequest = 10_000
)
var (
ErrUserNotConnected = dex.ErrorKind("user not connected")
)
func unixMsNow() time.Time {
return time.Now().Truncate(time.Millisecond).UTC()
}
// Storage updates and fetches account-related data from what is presumably a
// database.
type Storage interface {
// Account retrieves account info for the specified account ID and lock time
// threshold, which determines when a bond is considered expired.
Account(account.AccountID, time.Time) (acct *account.Account, bonds []*db.Bond, legacy, legacyPaid bool)
CreateAccountWithBond(acct *account.Account, bond *db.Bond) error
AddBond(acct account.AccountID, bond *db.Bond) error
DeleteBond(assetID uint32, coinID []byte) error
CreateAccount(acct *account.Account, feeAsset uint32, feeAddr string) error // DEPRECATED
AccountRegAddr(account.AccountID) (addr string, asset uint32, err error) // DEPRECATED
PayAccount(account.AccountID, []byte) error // DEPRECATED
AccountInfo(aid account.AccountID) (*db.Account, error)
UserOrderStatuses(aid account.AccountID, base, quote uint32, oids []order.OrderID) ([]*db.OrderStatus, error)
ActiveUserOrderStatuses(aid account.AccountID) ([]*db.OrderStatus, error)
CompletedUserOrders(aid account.AccountID, N int) (oids []order.OrderID, compTimes []int64, err error)
ExecutedCancelsForUser(aid account.AccountID, N int) ([]*db.CancelRecord, error)
CompletedAndAtFaultMatchStats(aid account.AccountID, lastN int) ([]*db.MatchOutcome, error)
ForgiveMatchFail(mid order.MatchID) (bool, error)
PreimageStats(user account.AccountID, lastN int) ([]*db.PreimageResult, error)
AllActiveUserMatches(aid account.AccountID) ([]*db.MatchData, error)
MatchStatuses(aid account.AccountID, base, quote uint32, matchIDs []order.MatchID) ([]*db.MatchStatus, error)
}
// Signer signs messages. The message must be a 32-byte hash.
type Signer interface {
Sign(hash []byte) *ecdsa.Signature
PubKey() *secp256k1.PublicKey
}
// FeeChecker is a function for retrieving the details for a fee payment txn.
type FeeChecker func(assetID uint32, coinID []byte) (addr string, val uint64, confs int64, err error)
// BondCoinChecker is a function for locating an unspent bond, and extracting
// the amount, lockTime, and account ID. The confirmations of the bond
// transaction are also provided.
type BondCoinChecker func(ctx context.Context, assetID uint32, ver uint16,
coinID []byte) (amt, lockTime, confs int64, acct account.AccountID, err error)
// BondTxParser parses a dex fidelity bond transaction and the redeem script of
// the first output of the transaction, which must be the actual bond output.
// The returned account ID is from the second output. This will become a
// multi-asset checker.
//
// NOTE: For DCR, and possibly all assets, the bond script is reconstructed from
// the null data output, and it is verified that the bond output pays to this
// script. As such, there is no provided bondData (redeem script for UTXO
// assets), but this may need for other assets.
type BondTxParser func(assetID uint32, ver uint16, rawTx []byte) (bondCoinID []byte,
amt int64, lockTime int64, acct account.AccountID, err error)
// TxDataSource retrieves the raw transaction for a coin ID.
type TxDataSource func(coinID []byte) (rawTx []byte, err error)
// A respHandler is the handler for the response to a DEX-originating request. A
// respHandler has a time associated with it so that old unused handlers can be
// detected and deleted.
type respHandler struct {
f func(comms.Link, *msgjson.Message)
expire *time.Timer
}
// clientInfo represents a DEX client, including account information and last
// known comms.Link.
type clientInfo struct {
acct *account.Account
conn comms.Link
mtx sync.Mutex
respHandlers map[uint64]*respHandler
tier int64
bonds []*db.Bond // only confirmed and active, not pending
legacyFeePaid bool // deprecated with bonds
}
// not thread-safe
func (client *clientInfo) bondTier() (bondTier int64) {
for _, bi := range client.bonds {
bondTier += int64(bi.Strength)
}
return
}
// not thread-safe
func (client *clientInfo) addBond(bond *db.Bond) (bondTier int64) {
var dup bool
for _, bi := range client.bonds {
bondTier += int64(bi.Strength)
dup = dup || (bi.AssetID == bond.AssetID && bytes.Equal(bi.CoinID, bond.CoinID))
}
if !dup { // idempotent
client.bonds = append(client.bonds, bond)
bondTier += int64(bond.Strength)
}
return
}
// not thread-safe
func (client *clientInfo) pruneBonds(lockTimeThresh int64) (pruned []*db.Bond, bondTier int64) {
if len(client.bonds) == 0 {
return
}
var n int
for _, bond := range client.bonds {
if bond.LockTime >= lockTimeThresh { // not expired
if len(pruned) > 0 /* n < i */ { // a prior bond was removed, must move this element up in the slice
client.bonds[n] = bond
}
n++
bondTier += int64(bond.Strength)
continue
}
log.Infof("Expiring user %v bond %v (%s)", client.acct.ID,
coinIDString(bond.AssetID, bond.CoinID), dex.BipIDSymbol(bond.AssetID))
pruned = append(pruned, bond)
// n not incremented, next live bond shifts up
}
client.bonds = client.bonds[:n] // no-op if none expired
return
}
func (client *clientInfo) rmHandler(id uint64) bool {
client.mtx.Lock()
defer client.mtx.Unlock()
_, found := client.respHandlers[id]
if found {
delete(client.respHandlers, id)
}
return found
}
// logReq associates the specified response handler with the message ID.
func (client *clientInfo) logReq(id uint64, f func(comms.Link, *msgjson.Message), expireTime time.Duration, expire func()) {
client.mtx.Lock()
defer client.mtx.Unlock()
doExpire := func() {
// Delete the response handler, and call the provided expire function if
// (*clientInfo).respHandler has not already retrieved the handler
// function for execution.
if client.rmHandler(id) {
expire()
}
}
client.respHandlers[id] = &respHandler{
f: f,
expire: time.AfterFunc(expireTime, doExpire),
}
}
// respHandler extracts the response handler from the respHandlers map. If the
// handler is found, it is also deleted from the map before being returned, and
// the expiration Timer is stopped.
func (client *clientInfo) respHandler(id uint64) *respHandler {
client.mtx.Lock()
defer client.mtx.Unlock()
handler := client.respHandlers[id]
if handler == nil {
return nil
}
// Stop the expiration Timer. If the Timer fired after respHandler was
// called, but we found the response handler in the map, clientInfo.expire
// is waiting for the lock and will return false, thus preventing the
// registered expire func from executing.
handler.expire.Stop()
delete(client.respHandlers, id)
return handler
}
// AuthManager handles authentication-related tasks, including validating client
// signatures, maintaining association between accounts and `comms.Link`s, and
// signing messages with the DEX's private key. AuthManager manages requests to
// the 'connect' route.
type AuthManager struct {
wg sync.WaitGroup
storage Storage
signer Signer
checkFee FeeChecker // legacy fee confs, amt, and address
parseBondTx BondTxParser
checkBond BondCoinChecker // fidelity bond amount, lockTime, acct, and confs
miaUserTimeout time.Duration
unbookFun func(account.AccountID)
bondExpiry time.Duration // a bond is expired when time.Until(lockTime) < bondExpiry
bondAssets map[uint32]*msgjson.BondAsset
feeAddress func(assetID uint32) string // DEPRECATED (V0PURGE)
feeAssets map[uint32]*msgjson.FeeAsset // DEPRECATED (V0PURGE)
freeCancels bool
banScore uint32
cancelThresh float64
initTakerLotLimit int64
absTakerLotLimit int64
// latencyQ is a queue for fee coin waiters to deal with latency.
latencyQ *wait.TickerQueue
feeWaiterMtx sync.Mutex // DEPRECATED (V0PURGE)
feeWaiterIdx map[account.AccountID]struct{} // DEPRECATED
bondWaiterMtx sync.Mutex
bondWaiterIdx map[string]struct{}
connMtx sync.RWMutex
users map[account.AccountID]*clientInfo
conns map[uint64]*clientInfo
unbookers map[account.AccountID]*time.Timer
violationMtx sync.Mutex
matchOutcomes map[account.AccountID]*latestMatchOutcomes
preimgOutcomes map[account.AccountID]*latestPreimageOutcomes
orderOutcomes map[account.AccountID]*latestOrders // cancel/complete, was in clientInfo.recentOrders
txDataSources map[uint32]TxDataSource
}
// violation badness
const (
// preimage miss
preimageMissScore = 2 // book spoof, no match, no stuck funds
// failure to act violations
noSwapAsMakerScore = 4 // book spoof, match with taker order affected, no stuck funds
noSwapAsTakerScore = 11 // maker has contract stuck for 20 hrs
noRedeemAsMakerScore = 7 // taker has contract stuck for 8 hrs
noRedeemAsTakerScore = 1 // just dumb, counterparty not inconvenienced
// cancel rate exceeds threshold
excessiveCancels = 5
successScore = -1 // offsets the violations
defaultBanScore = 20
)
// Violation represents a specific infraction. For example, not broadcasting a
// swap contract transaction by the deadline as the maker.
type Violation int32
const (
ViolationInvalid Violation = iota - 2
ViolationForgiven
ViolationSwapSuccess
ViolationPreimageMiss
ViolationNoSwapAsMaker
ViolationNoSwapAsTaker
ViolationNoRedeemAsMaker
ViolationNoRedeemAsTaker
ViolationCancelRate
)
var violations = map[Violation]struct {
score int32
desc string
}{
ViolationSwapSuccess: {successScore, "swap success"},
ViolationForgiven: {-1, "forgiveness"},
ViolationPreimageMiss: {preimageMissScore, "preimage miss"},
ViolationNoSwapAsMaker: {noSwapAsMakerScore, "no swap as maker"},
ViolationNoSwapAsTaker: {noSwapAsTakerScore, "no swap as taker"},
ViolationNoRedeemAsMaker: {noRedeemAsMakerScore, "no redeem as maker"},
ViolationNoRedeemAsTaker: {noRedeemAsTakerScore, "no redeem as taker"},
ViolationCancelRate: {excessiveCancels, "excessive cancels"},
ViolationInvalid: {0, "invalid violation"},
}
// Score returns the Violation's score, which is a representation of the
// relative severity of the infraction.
func (v Violation) Score() int32 {
return violations[v].score
}
// String returns a description of the Violation.
func (v Violation) String() string {
return violations[v].desc
}
// NoActionStep is the action that the user failed to take. This is used to
// define valid inputs to the Inaction method.
type NoActionStep uint8
const (
SwapSuccess NoActionStep = iota // success included for accounting purposes
NoSwapAsMaker
NoSwapAsTaker
NoRedeemAsMaker
NoRedeemAsTaker
)
// Violation returns the corresponding Violation for the misstep represented by
// the NoActionStep.
func (step NoActionStep) Violation() Violation {
switch step {
case SwapSuccess:
return ViolationSwapSuccess
case NoSwapAsMaker:
return ViolationNoSwapAsMaker
case NoSwapAsTaker:
return ViolationNoSwapAsTaker
case NoRedeemAsMaker:
return ViolationNoRedeemAsMaker
case NoRedeemAsTaker:
return ViolationNoRedeemAsTaker
default:
return ViolationInvalid
}
}
// String returns the description of the NoActionStep's corresponding Violation.
func (step NoActionStep) String() string {
return step.Violation().String()
}
// Config is the configuration settings for the AuthManager, and the only
// argument to its constructor.
type Config struct {
// Storage is an interface for storing and retrieving account-related info.
Storage Storage
// Signer is an interface that signs messages. In practice, Signer is
// satisfied by a secp256k1.PrivateKey.
Signer Signer
// BondExpiry is the time in seconds left until a bond's LockTime is reached
// that defines when a bond is considered expired.
BondExpiry uint64
// BondAssets indicates the supported bond assets and parameters.
BondAssets map[string]*msgjson.BondAsset
// BondTxParser performs rudimentary validation of a raw time-locked
// fidelity bond transaction. e.g. dcr.ParseBondTx
BondTxParser BondTxParser
// BondChecker locates an unspent bond, and extracts the amount, lockTime,
// and account ID, plus txn confirmations.
BondChecker BondCoinChecker
// FeeAddress retrieves a fresh registration fee address for an asset. It
// should return an empty string for an unsupported asset.
FeeAddress func(assetID uint32) string
// FeeAssets specifies the registration fee parameters for assets supported
// for registration.
FeeAssets map[string]*msgjson.FeeAsset
// FeeChecker is a method for getting the registration fee output info.
FeeChecker FeeChecker
// TxDataSources are sources of tx data for a coin ID.
TxDataSources map[uint32]TxDataSource
// UserUnbooker is a function for unbooking all of a user's orders.
UserUnbooker func(account.AccountID)
// MiaUserTimeout is how long after a user disconnects until UserUnbooker is
// called for that user.
MiaUserTimeout time.Duration
CancelThreshold float64
FreeCancels bool
// BanScore defines the penalty score when an account gets closed.
BanScore uint32
// InitTakerLotLimit is the number of lots per-market a new user is
// permitted to have in active orders and swaps.
InitTakerLotLimit uint32
// AbsTakerLotLimit is a cap on the per-market taker lot limit regardless of
// how good the user's swap history is.
AbsTakerLotLimit uint32
}
const (
defaultInitTakerLotLimit = 6
defaultAbsTakerLotLimit = 375
)
// NewAuthManager is the constructor for an AuthManager.
func NewAuthManager(cfg *Config) *AuthManager {
// A ban score of 0 is not sensible, so have a default.
banScore := cfg.BanScore
if banScore == 0 {
banScore = defaultBanScore
}
initTakerLotLimit := int64(cfg.InitTakerLotLimit)
if initTakerLotLimit == 0 {
initTakerLotLimit = defaultInitTakerLotLimit
}
absTakerLotLimit := int64(cfg.AbsTakerLotLimit)
if absTakerLotLimit == 0 {
absTakerLotLimit = defaultAbsTakerLotLimit
}
// Re-key the maps for efficiency in AuthManager methods.
feeAssets := make(map[uint32]*msgjson.FeeAsset, len(cfg.FeeAssets))
for _, asset := range cfg.FeeAssets {
feeAssets[asset.ID] = asset
}
bondAssets := make(map[uint32]*msgjson.BondAsset, len(cfg.BondAssets))
for _, asset := range cfg.BondAssets {
bondAssets[asset.ID] = asset
}
auth := &AuthManager{
storage: cfg.Storage,
signer: cfg.Signer,
bondAssets: bondAssets,
bondExpiry: time.Duration(cfg.BondExpiry) * time.Second,
checkFee: cfg.FeeChecker, // e.g. dcr's FeeCoin
parseBondTx: cfg.BondTxParser, // e.g. dcr's ParseBondTx
checkBond: cfg.BondChecker, // e.g. dcr's BondCoin
miaUserTimeout: cfg.MiaUserTimeout,
unbookFun: cfg.UserUnbooker,
feeAddress: cfg.FeeAddress,
feeAssets: feeAssets,
freeCancels: cfg.FreeCancels,
banScore: banScore,
cancelThresh: cfg.CancelThreshold,
initTakerLotLimit: initTakerLotLimit,
absTakerLotLimit: absTakerLotLimit,
latencyQ: wait.NewTickerQueue(recheckInterval),
users: make(map[account.AccountID]*clientInfo),
conns: make(map[uint64]*clientInfo),
unbookers: make(map[account.AccountID]*time.Timer),
feeWaiterIdx: make(map[account.AccountID]struct{}),
bondWaiterIdx: make(map[string]struct{}),
matchOutcomes: make(map[account.AccountID]*latestMatchOutcomes),
preimgOutcomes: make(map[account.AccountID]*latestPreimageOutcomes),
orderOutcomes: make(map[account.AccountID]*latestOrders),
txDataSources: cfg.TxDataSources,
}
comms.Route(msgjson.ConnectRoute, auth.handleConnect)
comms.Route(msgjson.RegisterRoute, auth.handleRegister) // DEPRECATED (V0PURGE)
comms.Route(msgjson.NotifyFeeRoute, auth.handleNotifyFee) // DEPRECATED (V0PURGE)
comms.Route(msgjson.PostBondRoute, auth.handlePostBond)
comms.Route(msgjson.PreValidateBondRoute, auth.handlePreValidateBond)
comms.Route(msgjson.MatchStatusRoute, auth.handleMatchStatus)
comms.Route(msgjson.OrderStatusRoute, auth.handleOrderStatus)
return auth
}
func (auth *AuthManager) unbookUserOrders(user account.AccountID) {
log.Tracef("Unbooking all orders for user %v", user)
auth.unbookFun(user)
auth.connMtx.Lock()
delete(auth.unbookers, user)
auth.connMtx.Unlock()
}
// ExpectUsers specifies which users are expected to connect within a certain
// time or have their orders unbooked (revoked). This should be run prior to
// starting the AuthManager. This is not part of the constructor since it is
// convenient to obtain this information from the Market's Books, and Market
// requires the AuthManager. The same information could be pulled from storage,
// but the Market is the authoritative book. The AuthManager should be started
// via Run immediately after calling ExpectUsers so the users can connect.
func (auth *AuthManager) ExpectUsers(users map[account.AccountID]struct{}, within time.Duration) {
log.Debugf("Expecting %d users with booked orders to connect within %v", len(users), within)
for user := range users {
user := user // bad go
auth.unbookers[user] = time.AfterFunc(within, func() { auth.unbookUserOrders(user) })
}
}
// GraceLimit returns the number of initial orders allowed for a new user before
// the cancellation rate threshold is enforced.
func (auth *AuthManager) GraceLimit() int {
// Grace period if: total/(1+total) <= thresh OR total <= thresh/(1-thresh).
return int(math.Round(1e8*auth.cancelThresh/(1-auth.cancelThresh))) / 1e8
}
// RecordCancel records a user's executed cancel order, including the canceled
// order ID, and the time when the cancel was executed.
func (auth *AuthManager) RecordCancel(user account.AccountID, oid, target order.OrderID, epochGap int32, t time.Time) {
score := auth.recordOrderDone(user, oid, &target, epochGap, t.UnixMilli())
tier, bondTier, changed := auth.computeUserTier(user, score)
log.Debugf("RecordCancel: user %v strikes %d, bond tier %v => trading tier %v",
user, score, bondTier, tier)
// If their tier sinks below 1, unbook their orders and send a note.
if tier < 1 {
details := fmt.Sprintf("excessive cancellation rate, new tier = %d", tier)
auth.Penalize(user, account.CancellationRate, details)
}
if changed {
go auth.sendTierChanged(user, tier, "excessive, cancellation rate")
}
}
// RecordCompletedOrder records a user's completed order, where completed means
// a swap involving the order was successfully completed and the order is no
// longer on the books if it ever was.
func (auth *AuthManager) RecordCompletedOrder(user account.AccountID, oid order.OrderID, t time.Time) {
score := auth.recordOrderDone(user, oid, nil, db.EpochGapNA, t.UnixMilli())
tier, bondTier, changed := auth.computeUserTier(user, score) // may raise tier
if changed {
log.Tracef("RecordCompletedOrder: tier changed for user %v strikes %d, bond tier %v => trading tier %v",
user, score, bondTier, tier)
go auth.sendTierChanged(user, tier, "successful order completion")
}
}
// recordOrderDone records that an order has finished processing. This can be a
// cancel order, which matched and unbooked another order, or a trade order that
// completed the swap negotiation. Note that in the case of a cancel, oid refers
// to the ID of the cancel order itself, while target is non-nil for cancel
// orders. The user's new score is returned, which can be used to compute the
// user's tier with computeUserTier.
func (auth *AuthManager) recordOrderDone(user account.AccountID, oid order.OrderID, target *order.OrderID, epochGap int32, tMS int64) (score int32) {
auth.violationMtx.Lock()
if orderOutcomes, found := auth.orderOutcomes[user]; found {
orderOutcomes.add(&oidStamped{
OrderID: oid,
time: tMS,
target: target,
epochGap: epochGap,
})
score = auth.userScore(user)
auth.violationMtx.Unlock()
log.Debugf("Recorded order %v that has finished processing: user=%v, time=%v, target=%v",
oid, user, tMS, target)
return
}
auth.violationMtx.Unlock()
// The user is currently not connected and authenticated. When the user logs
// back in, their history will be reloaded (loadUserScore) and their tier
// recomputed, but compute their score now from DB for the caller.
var err error
score, err = auth.loadUserScore(user)
if err != nil {
log.Errorf("Failed to load order and match outcomes for user %v: %v", user, err)
return 0
}
return
}
// Run runs the AuthManager until the context is canceled. Satisfies the
// dex.Runner interface.
func (auth *AuthManager) Run(ctx context.Context) {
log.Infof("Allowing %d settling + taker order lots per market for new users.", auth.initTakerLotLimit)
log.Infof("Allowing up to %d settling + taker order lots per market for established users.", auth.absTakerLotLimit)
auth.wg.Add(1)
go func() {
defer auth.wg.Done()
t := time.NewTicker(20 * time.Second)
defer t.Stop()
for {
select {
case <-t.C:
auth.checkBonds()
case <-ctx.Done():
return
}
}
}()
auth.wg.Add(1)
go func() {
defer auth.wg.Done()
auth.latencyQ.Run(ctx)
}()
<-ctx.Done()
auth.connMtx.Lock()
defer auth.connMtx.Unlock()
for user, ub := range auth.unbookers {
ub.Stop()
delete(auth.unbookers, user)
}
// Wait for latencyQ and checkBonds.
auth.wg.Wait()
// TODO: wait for running comms route handlers and other DB writers.
}
// Route wraps the comms.Route function, storing the response handler with the
// associated clientInfo, and sending the message on the current comms.Link for
// the client.
func (auth *AuthManager) Route(route string, handler func(account.AccountID, *msgjson.Message) *msgjson.Error) {
comms.Route(route, func(conn comms.Link, msg *msgjson.Message) *msgjson.Error {
client := auth.conn(conn)
if client == nil {
return &msgjson.Error{
Code: msgjson.UnauthorizedConnection,
Message: "cannot use route '" + route + "' on an unauthorized connection",
}
}
msgErr := handler(client.acct.ID, msg)
if msgErr != nil {
log.Debugf("Handling of '%s' request for user %v failed: %v", route, client.acct.ID, msgErr)
}
return msgErr
})
}
// Message signing and signature verification.
// checkSigS256 checks that the message's signature was created with the
// private key for the provided secp256k1 public key.
func checkSigS256(msg, sig []byte, pubKey *secp256k1.PublicKey) error {
signature, err := ecdsa.ParseDERSignature(sig)
if err != nil {
return fmt.Errorf("error decoding secp256k1 Signature from bytes: %w", err)
}
hash := sha256.Sum256(msg)
if !signature.Verify(hash[:], pubKey) {
return fmt.Errorf("secp256k1 signature verification failed")
}
return nil
}
// Auth validates the signature/message pair with the users public key.
func (auth *AuthManager) Auth(user account.AccountID, msg, sig []byte) error {
client := auth.user(user)
if client == nil {
return dex.NewError(ErrUserNotConnected, user.String())
}
return checkSigS256(msg, sig, client.acct.PubKey)
}
// SignMsg signs the message with the DEX private key, returning the DER encoded
// signature. SHA256 is used to hash the message before signing it.
func (auth *AuthManager) SignMsg(msg []byte) []byte {
hash := sha256.Sum256(msg)
return auth.signer.Sign(hash[:]).Serialize()
}
// Sign signs the msgjson.Signables with the DEX private key.
func (auth *AuthManager) Sign(signables ...msgjson.Signable) {
for _, signable := range signables {
sig := auth.SignMsg(signable.Serialize())
signable.SetSig(sig)
}
}
// Response and notification (non-request) messages
// Send sends the non-Request-type msgjson.Message to the client identified by
// the specified account ID. The message is sent asynchronously, so an error is
// only generated if the specified user is not connected and authorized, if the
// message fails marshalling, or if the link is in a failing state. See
// dex/ws.(*WSLink).Send for more information.
func (auth *AuthManager) Send(user account.AccountID, msg *msgjson.Message) error {
client := auth.user(user)
if client == nil {
log.Debugf("Send requested for disconnected user %v", user)
return dex.NewError(ErrUserNotConnected, user.String())
}
err := client.conn.Send(msg)
if err != nil {
log.Debugf("error sending on link: %v", err)
// Remove client assuming connection is broken, requiring reconnect.
auth.removeClient(client)
// client.conn.Disconnect() // async removal
}
return err
}
// Notify sends a message to a client. The message should be a notification.
// See msgjson.NewNotification.
func (auth *AuthManager) Notify(acctID account.AccountID, msg *msgjson.Message) {
if err := auth.Send(acctID, msg); err != nil {
log.Infof("Failed to send notification to user %s: %v", acctID, err)
}
}
// Requests
// DefaultRequestTimeout is the default timeout for requests to wait for
// responses from connected users after the request is successfully sent.
const DefaultRequestTimeout = 30 * time.Second
func (auth *AuthManager) request(user account.AccountID, msg *msgjson.Message, f func(comms.Link, *msgjson.Message),
expireTimeout time.Duration, expire func()) error {
client := auth.user(user)
if client == nil {
log.Debugf("Send requested for disconnected user %v", user)
return dex.NewError(ErrUserNotConnected, user.String())
}
// log.Tracef("Registering '%s' request ID %d for user %v (auth clientInfo)", msg.Route, msg.ID, user)
client.logReq(msg.ID, f, expireTimeout, expire)
// auth.handleResponse checks clientInfo map and the found client's request
// handler map, where the expire function should be found for msg.ID.
err := client.conn.Request(msg, auth.handleResponse, expireTimeout, func() {})
if err != nil {
log.Debugf("error sending request ID %d: %v", msg.ID, err)
// Remove the responseHandler registered by logReq and stop the expire
// timer so that it does not eventually fire and run the expire func.
// The caller receives a non-nil error to deal with it.
client.respHandler(msg.ID) // drop the removed handler
// Remove client assuming connection is broken, requiring reconnect.
auth.removeClient(client)
// client.conn.Disconnect() // async removal
}
return err
}
// Request sends the Request-type msgjson.Message to the client identified by
// the specified account ID. The user must respond within DefaultRequestTimeout
// of the request. Late responses are not handled.
func (auth *AuthManager) Request(user account.AccountID, msg *msgjson.Message, f func(comms.Link, *msgjson.Message)) error {
return auth.request(user, msg, f, DefaultRequestTimeout, func() {})
}
// RequestWithTimeout sends the Request-type msgjson.Message to the client
// identified by the specified account ID. If the user responds within
// expireTime of the request, the response handler is called, otherwise the
// expire function is called. If the response handler is called, it is
// guaranteed that the request Message.ID is equal to the response Message.ID
// (see handleResponse).
func (auth *AuthManager) RequestWithTimeout(user account.AccountID, msg *msgjson.Message, f func(comms.Link, *msgjson.Message),
expireTimeout time.Duration, expire func()) error {
return auth.request(user, msg, f, expireTimeout, expire)
}
// userSwapAmountHistory retrieves the summary of recent swap amounts for the
// given user and market. The user should be connected.
func (auth *AuthManager) userSwapAmountHistory(user account.AccountID, base, quote uint32) *SwapAmounts {
auth.violationMtx.Lock()
defer auth.violationMtx.Unlock()
if outcomes, found := auth.matchOutcomes[user]; found {
return outcomes.mktSwapAmounts(base, quote)
}
return new(SwapAmounts)
}
const (
// These coefficients are used to compute a user's swap limit adjustment via
// UserOrderLimitAdjustment based on the cumulative amounts in the different
// match outcomes.
successWeight int64 = 3
stuckLongWeight int64 = -5
stuckShortWeight int64 = -3
spoofedWeight int64 = -1
)
// UserSettlingLimit returns a user's settling amount limit for the given market
// in units of the base asset. The limit may be negative for accounts with poor
// swap history.
func (auth *AuthManager) UserSettlingLimit(user account.AccountID, mkt *dex.MarketInfo) int64 {
currentLotSize := int64(mkt.LotSize)
base := currentLotSize * auth.initTakerLotLimit
sa := auth.userSwapAmountHistory(user, mkt.Base, mkt.Quote)
limit := base + sa.Swapped*successWeight + sa.StuckLong*stuckLongWeight + sa.StuckShort*stuckShortWeight + sa.Spoofed*spoofedWeight
if limit/currentLotSize >= auth.absTakerLotLimit {
limit = auth.absTakerLotLimit * currentLotSize
}
return limit
}
func (auth *AuthManager) integrateOutcomes(matchOutcomes *latestMatchOutcomes, preimgOutcomes *latestPreimageOutcomes, orderOutcomes *latestOrders) (score, successCount, piMissCount int32) {
if matchOutcomes != nil {
matchCounts := matchOutcomes.binViolations()
for v, count := range matchCounts {
score += v.Score() * int32(count)
}
successCount = int32(matchCounts[ViolationSwapSuccess])
}
if preimgOutcomes != nil {
piMissCount = preimgOutcomes.misses()
score += ViolationPreimageMiss.Score() * piMissCount
}
if !auth.freeCancels {
totalOrds, cancels := orderOutcomes.counts() // completions := totalOrds - cancels
if totalOrds > auth.GraceLimit() {
cancelRate := float64(cancels) / float64(totalOrds)
if cancelRate > auth.cancelThresh {
score += ViolationCancelRate.Score()
}
}
}
return
}
// userScore computes an authenticated user's score from their recent order and
// match outcomes. They must have entries in the outcome maps. Use loadUserScore
// to compute score from history in DB. This must be called with the
// violationMtx locked.
func (auth *AuthManager) userScore(user account.AccountID) (score int32) {
score, _, _ = auth.integrateOutcomes(auth.matchOutcomes[user], auth.preimgOutcomes[user], auth.orderOutcomes[user])
return score
}
func (auth *AuthManager) UserScore(user account.AccountID) (score int32) {
auth.violationMtx.Lock()
if _, found := auth.matchOutcomes[user]; found {
score = auth.userScore(user)
auth.violationMtx.Unlock()
return
}
auth.violationMtx.Unlock()
// The user is currently not connected and authenticated. When the user logs
// back in, their history will be reloaded (loadUserScore) and their tier
// recomputed, but compute their score now from DB for the caller.
var err error
score, err = auth.loadUserScore(user)
if err != nil {
log.Errorf("Failed to load order and match outcomes for user %v: %v", user, err)
return 0
}
return
}
// tier computes a user's tier from their conduct score and bond tier.
func (auth *AuthManager) tier(bondTier int64, score int32, legacyFeePaid bool) int64 {
tierAdj := int64(score) / int64(auth.banScore)
if tierAdj < 0 && bondTier == 0 {
tierAdj = 0 // no bonus tiers unless bonded
}
if legacyFeePaid {
bondTier++
}
return bondTier - tierAdj
}
// computeUserTier computes the user's tier given the provided score weighed
// against known active bonds. Note that bondTier is not a specific asset, and
// is just for logging, and it may be removed or changed to a map by asset ID.
// For online users, this will also indicate if the tier changed; this will
// always return false for offline users.
func (auth *AuthManager) computeUserTier(user account.AccountID, score int32) (tier, bondTier int64, changed bool) {
client := auth.user(user)
if client == nil {
// Offline. Load active bonds and legacyFeePaid flag from DB.
lockTimeThresh := time.Now().Add(auth.bondExpiry)
_, bonds, _, legacyFeePaid := auth.storage.Account(user, lockTimeThresh)
for _, bond := range bonds {
bondTier += int64(bond.Strength)
}
tier = auth.tier(bondTier, score, legacyFeePaid)
return
}
client.mtx.Lock()
defer client.mtx.Unlock()
wasTier := client.tier
bondTier = client.bondTier()
client.tier = auth.tier(bondTier, score, client.legacyFeePaid)
tier = client.tier
changed = wasTier != tier
return
}
// ComputeUserTier computes the user's tier from their active bonds and conduct
// score. The bondTier is also returned. The DB is always consulted for
// computing the conduct score. Summing bond amounts may access the DB if the
// user is not presently connected. The tier for an unknown user is -1.
func (auth *AuthManager) ComputeUserTier(user account.AccountID) (tier, bondTier int64) {
score, err := auth.loadUserScore(user)
if err != nil {
log.Errorf("failed to load user score: %v", err)
return -1, -1
}
tier, bondTier, _ = auth.computeUserTier(user, score)
return
}
func (auth *AuthManager) registerMatchOutcome(user account.AccountID, misstep NoActionStep, mmid db.MarketMatchID, value uint64, refTime time.Time) (score int32) {
violation := misstep.Violation()
auth.violationMtx.Lock()
if matchOutcomes, found := auth.matchOutcomes[user]; found {
matchOutcomes.add(&matchOutcome{
time: refTime.UnixMilli(),
mid: mmid.MatchID,
outcome: violation,
value: value,
base: mmid.Base,
quote: mmid.Quote,
})
score = auth.userScore(user)
auth.violationMtx.Unlock()
return
}
auth.violationMtx.Unlock()
// The user is currently not connected and authenticated. When the user logs
// back in, their history will be reloaded (loadUserScore) and their tier
// recomputed, but compute their score now from DB for the caller.
score, err := auth.loadUserScore(user)
if err != nil {
log.Errorf("Failed to load order and match outcomes for user %v: %v", user, err)
return 0
}
return
}
// SwapSuccess registers the successful completion of a swap by the given user.
// TODO: provide lots instead of value, or convert to lots somehow. But, Swapper
// has no clue about lot size, and neither does DB!
func (auth *AuthManager) SwapSuccess(user account.AccountID, mmid db.MarketMatchID, value uint64, redeemTime time.Time) {
score := auth.registerMatchOutcome(user, SwapSuccess, mmid, value, redeemTime)
tier, bondTier, changed := auth.computeUserTier(user, score) // may raise tier
log.Debugf("Match success for user %v: strikes %d, bond tier %v => tier %v",
user, score, bondTier, tier)
if changed {
log.Infof("SwapSuccess: tier change for user %v, strikes %d, bond tier %v => trading tier %v",
user, score, bondTier, tier)
go auth.sendTierChanged(user, tier, "successful swap completion")
}
}
// Inaction registers an inaction violation by the user at the given step. The
// refTime is time to which the at-fault user's inaction deadline for the match
// is referenced. e.g. For a swap that failed in TakerSwapCast, refTime would be
// the maker's redeem time, which is recorded in the DB when the server
// validates the maker's redemption and informs the taker, and is roughly when
// the actor was first able to take the missed action.
// TODO: provide lots instead of value, or convert to lots somehow. But, Swapper
// has no clue about lot size, and neither does DB!
func (auth *AuthManager) Inaction(user account.AccountID, misstep NoActionStep, mmid db.MarketMatchID, matchValue uint64, refTime time.Time, oid order.OrderID) {
violation := SwapSuccess.Violation()
if violation == ViolationInvalid {
log.Errorf("Invalid inaction step %d", misstep)
return
}
score := auth.registerMatchOutcome(user, misstep, mmid, matchValue, refTime)
// Recompute tier.
tier, bondTier, changed := auth.computeUserTier(user, score)
log.Infof("Match failure for user %v: %q (badness %v), strikes %d, bond tier %v => trading tier %v",
user, violation, violation.Score(), score, bondTier, tier)
// If their tier sinks below 1, unbook their orders and send a note.
if tier < 1 {
details := fmt.Sprintf("swap %v failure (%v) for order %v, new tier = %d",
mmid.MatchID, misstep, oid, tier)
auth.Penalize(user, account.FailureToAct, details)
}
if changed {
reason := fmt.Sprintf("swap failure for match %v order %v: %v", mmid.MatchID, oid, misstep)
go auth.sendTierChanged(user, tier, reason)
}
}
func (auth *AuthManager) registerPreimageOutcome(user account.AccountID, miss bool, oid order.OrderID, refTime time.Time) (score int32) {
auth.violationMtx.Lock()
piOutcomes, found := auth.preimgOutcomes[user]
if found {
piOutcomes.add(&preimageOutcome{
time: refTime.UnixMilli(),
oid: oid,
miss: miss,
})