-
Notifications
You must be signed in to change notification settings - Fork 84
/
orderrouter.go
925 lines (826 loc) · 32 KB
/
orderrouter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.
package market
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"math"
"time"
"decred.org/dcrdex/dex"
"decred.org/dcrdex/dex/calc"
"decred.org/dcrdex/dex/encode"
"decred.org/dcrdex/dex/msgjson"
"decred.org/dcrdex/dex/order"
"decred.org/dcrdex/dex/wait"
"decred.org/dcrdex/server/account"
"decred.org/dcrdex/server/asset"
"decred.org/dcrdex/server/comms"
"decred.org/dcrdex/server/matcher"
)
// The AuthManager handles client-related actions, including authorization and
// communications.
type AuthManager interface {
Route(route string, handler func(account.AccountID, *msgjson.Message) *msgjson.Error)
Auth(user account.AccountID, msg, sig []byte) error
Suspended(user account.AccountID) (found, suspended bool)
Sign(...msgjson.Signable)
Send(account.AccountID, *msgjson.Message) error
Request(account.AccountID, *msgjson.Message, func(comms.Link, *msgjson.Message)) error
RequestWithTimeout(account.AccountID, *msgjson.Message, func(comms.Link, *msgjson.Message), time.Duration, func()) error
PreimageSuccess(user account.AccountID, refTime time.Time, oid order.OrderID)
MissedPreimage(user account.AccountID, refTime time.Time, oid order.OrderID)
RecordCancel(user account.AccountID, oid, target order.OrderID, t time.Time)
RecordCompletedOrder(user account.AccountID, oid order.OrderID, t time.Time)
UserSettlingLimit(user account.AccountID, mkt *dex.MarketInfo) int64
}
const (
maxClockOffset = 600_000 // milliseconds => 600 sec => 10 minutes
fundingTxWait = time.Minute
// ZeroConfFeeRateThreshold is multiplied by the last known fee rate for an
// asset to attain a minimum fee rate acceptable for zero-conf funding
// coins.
ZeroConfFeeRateThreshold = 0.9
)
// MarketTunnel is a connection to a market.
type MarketTunnel interface {
// SubmitOrder submits the order to the market for insertion into the epoch
// queue.
SubmitOrder(*orderRecord) error
// MidGap returns the mid-gap market rate, which is ths rate halfway between
// the best buy order and the best sell order in the order book.
MidGap() uint64
// MarketBuyBuffer is a coefficient that when multiplied by the market's lot
// size specifies the minimum required amount for a market buy order.
MarketBuyBuffer() float64
// LotSize is the market's lot size in units of the base asset.
LotSize() uint64
// RateStep is the market's rate step in units of the quote asset.
RateStep() uint64
// CoinLocked should return true if the CoinID is currently a funding Coin
// for an active DEX order. This is required for Coin validation to prevent
// a user from submitting multiple orders spending the same Coin. This
// method will likely need to check all orders currently in the epoch queue,
// the order book, and the swap monitor, since UTXOs will still be unspent
// according to the asset backends until the client broadcasts their
// initialization transaction.
//
// DRAFT NOTE: This function could also potentially be handled by persistent
// storage, since active orders and active matches are tracked there.
CoinLocked(assetID uint32, coinID order.CoinID) bool
// Cancelable determines whether an order is cancelable. A cancelable order
// is a limit order with time-in-force standing either in the epoch queue or
// in the order book.
Cancelable(order.OrderID) bool
// Suspend suspends the market as soon as a given time, returning the final
// epoch index and and time at which that epoch closes.
Suspend(asSoonAs time.Time, persistBook bool) (finalEpochIdx int64, finalEpochEnd time.Time)
// Running indicates is the market is accepting new orders. This will return
// false when suspended, but false does not necessarily mean Run has stopped
// since a start epoch may be set.
Running() bool
// CheckUnfilled checks a user's unfilled book orders that are funded by
// coins for a given asset to ensure that their funding coins are not spent.
// If any of an unfilled order's funding coins are spent, the order is
// unbooked (removed from the in-memory book, revoked in the DB, a
// cancellation marked against the user, coins unlocked, and orderbook
// subscribers notified). See Unbook for details.
CheckUnfilled(assetID uint32, user account.AccountID) (unbooked []*order.LimitOrder)
}
// orderRecord contains the information necessary to respond to an order
// request.
type orderRecord struct {
order order.Order
req msgjson.Stampable
msgID uint64
}
// assetSet is pointers to two different assets, but with 4 ways of addressing
// them.
type assetSet struct {
funding *asset.BackedAsset
receiving *asset.BackedAsset
base *asset.BackedAsset
quote *asset.BackedAsset
}
// newAssetSet is a constructor for an assetSet.
func newAssetSet(base, quote *asset.BackedAsset, sell bool) *assetSet {
coins := &assetSet{
quote: quote,
base: base,
funding: quote,
receiving: base,
}
if sell {
coins.funding, coins.receiving = base, quote
}
return coins
}
// FeeSource is a source of the last reported tx fee rate estimate for an asset.
type FeeSource interface {
LastRate(assetID uint32) (feeRate uint64)
}
// OrderRouter handles the 'limit', 'market', and 'cancel' DEX routes. These
// are authenticated routes used for placing and canceling orders.
type OrderRouter struct {
auth AuthManager
assets map[uint32]*asset.BackedAsset
tunnels map[string]MarketTunnel
latencyQ *wait.TickerQueue
feeSource FeeSource
}
// OrderRouterConfig is the configuration settings for an OrderRouter.
type OrderRouterConfig struct {
AuthManager AuthManager
Assets map[uint32]*asset.BackedAsset
Markets map[string]MarketTunnel
FeeSource FeeSource
}
// NewOrderRouter is a constructor for an OrderRouter.
func NewOrderRouter(cfg *OrderRouterConfig) *OrderRouter {
router := &OrderRouter{
auth: cfg.AuthManager,
assets: cfg.Assets,
tunnels: cfg.Markets,
latencyQ: wait.NewTickerQueue(2 * time.Second),
feeSource: cfg.FeeSource,
}
cfg.AuthManager.Route(msgjson.LimitRoute, router.handleLimit)
cfg.AuthManager.Route(msgjson.MarketRoute, router.handleMarket)
cfg.AuthManager.Route(msgjson.CancelRoute, router.handleCancel)
return router
}
func (r *OrderRouter) Run(ctx context.Context) {
r.latencyQ.Run(ctx)
}
func (r *OrderRouter) respondError(reqID uint64, user account.AccountID, msgErr *msgjson.Error) {
log.Debugf("Error going to user %v: %s", user, msgErr)
msg, err := msgjson.NewResponse(reqID, nil, msgErr)
if err != nil {
log.Errorf("Failed to create error response with message '%s': %v", msg, err)
return // this should not be possible, but don't pass nil msg to Send
}
if err := r.auth.Send(user, msg); err != nil {
log.Infof("Failed to send %s error response (msg = %s) to disconnected user %v: %q",
msg.Route, msgErr, user, err)
}
}
func fundingCoin(backend asset.Backend, coinID []byte, redeemScript []byte) (asset.FundingCoin, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
return backend.FundingCoin(ctx, coinID, redeemScript)
}
func coinConfirmations(coin asset.Coin) (int64, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
return coin.Confirmations(ctx)
}
// handleLimit is the handler for the 'limit' route. This route accepts a
// msgjson.Limit payload, validates the information, constructs an
// order.LimitOrder and submits it to the epoch queue.
func (r *OrderRouter) handleLimit(user account.AccountID, msg *msgjson.Message) *msgjson.Error {
limit := new(msgjson.LimitOrder)
err := msg.Unmarshal(&limit)
if err != nil || limit == nil {
return msgjson.NewError(msgjson.RPCParseError, "error decoding 'limit' payload")
}
rpcErr := r.verifyAccount(user, limit.AccountID, limit)
if rpcErr != nil {
return rpcErr
}
if _, suspended := r.auth.Suspended(user); suspended {
return msgjson.NewError(msgjson.MarketNotRunningError, "suspended account %v may not submit trade orders", user)
}
tunnel, coins, sell, rpcErr := r.extractMarketDetails(&limit.Prefix, &limit.Trade)
if rpcErr != nil {
return rpcErr
}
// Spare some resources if the market is closed now. Any orders that make it
// through to a closed market will receive a similar error from SubmitOrder.
if !tunnel.Running() {
return msgjson.NewError(msgjson.MarketNotRunningError, "market closed to new orders")
}
// Check that OrderType is set correctly
if limit.OrderType != msgjson.LimitOrderNum {
return msgjson.NewError(msgjson.OrderParameterError, "wrong order type set for limit order. wanted %d, got %d", msgjson.LimitOrderNum, limit.OrderType)
}
// Check that the rate is non-zero and obeys the rate step interval.
if limit.Rate == 0 {
return msgjson.NewError(msgjson.OrderParameterError, "rate = 0 not allowed")
}
if rateStep := tunnel.RateStep(); limit.Rate%rateStep != 0 {
return msgjson.NewError(msgjson.OrderParameterError, "rate (%d) not a multiple of ratestep (%d)",
limit.Rate, rateStep)
}
lotSize := tunnel.LotSize()
rpcErr = r.checkPrefixTrade(coins, lotSize, &limit.Prefix, &limit.Trade, true)
if rpcErr != nil {
return rpcErr
}
// Check time-in-force
var force order.TimeInForce
switch limit.TiF {
case msgjson.StandingOrderNum:
force = order.StandingTiF
case msgjson.ImmediateOrderNum:
force = order.ImmediateTiF
default:
return msgjson.NewError(msgjson.OrderParameterError, "unknown time-in-force")
}
// Commitment
if len(limit.Commit) != order.CommitmentSize {
return msgjson.NewError(msgjson.OrderParameterError, "invalid commitment")
}
var commit order.Commitment
copy(commit[:], limit.Commit)
fundingAsset := coins.funding
coinIDs := make([]order.CoinID, 0, len(limit.Trade.Coins))
coinStrs := make([]string, 0, len(limit.Trade.Coins))
for _, coin := range limit.Trade.Coins {
// Check that the outpoint isn't locked.
coinID := order.CoinID(coin.ID)
if tunnel.CoinLocked(coins.funding.ID, coinID) {
return msgjson.NewError(msgjson.FundingError, fmt.Sprintf("coin %s is locked", fmtCoinID(coins.funding.Symbol, coinID)))
}
coinStr, err := fundingAsset.Backend.ValidateCoinID(coinID)
if err != nil {
return msgjson.NewError(msgjson.FundingError, fmt.Sprintf("invalid coin ID %v: %v", coinID, err))
}
coinStrs = append(coinStrs, coinStr)
coinIDs = append(coinIDs, coinID)
}
// Create the limit order.
lo := &order.LimitOrder{
P: order.Prefix{
AccountID: user,
BaseAsset: limit.Base,
QuoteAsset: limit.Quote,
OrderType: order.LimitOrderType,
ClientTime: encode.UnixTimeMilli(int64(limit.ClientTime)),
//ServerTime set in epoch queue processing pipeline.
Commit: commit,
},
T: order.Trade{
Coins: coinIDs,
Sell: sell,
Quantity: limit.Quantity,
Address: limit.Address,
},
Rate: limit.Rate,
Force: force,
}
// NOTE: ServerTime is not yet set, so the order's ID, which is computed
// from the serialized order, is not yet valid. The Market will stamp the
// order on receipt, and the order ID will be valid.
oRecord := &orderRecord{
order: lo,
req: limit,
msgID: msg.ID,
}
swapVal := limit.Quantity
lots := swapVal / lotSize
if !sell {
swapVal = matcher.BaseToQuote(limit.Rate, limit.Quantity)
}
var valSum uint64
var spendSize uint32
neededCoins := make(map[int]*msgjson.Coin, len(limit.Trade.Coins))
for i, coin := range limit.Trade.Coins {
neededCoins[i] = coin
}
checkCoins := func() (tryAgain bool, msgErr *msgjson.Error) {
for key, coin := range neededCoins {
// Get the coin from the backend and validate it.
dexCoin, err := fundingCoin(fundingAsset.Backend, coin.ID, coin.Redeem)
if err != nil {
if errors.Is(err, asset.CoinNotFoundError) {
return true, nil
}
if errors.Is(err, asset.ErrRequestTimeout) {
log.Errorf("Deadline exceeded attempting to verify funding coin %v (%s). Will try again.",
coin.ID, fundingAsset.Symbol)
return true, nil
}
log.Errorf("Error retreiving limit order funding coin ID %s. user = %s: %v", coin.ID, user, err)
return false, msgjson.NewError(msgjson.FundingError, fmt.Sprintf("error retrieving coin ID %v", coin.ID))
}
// Verify that the user controls the funding coins.
err = dexCoin.Auth(msgBytesToBytes(coin.PubKeys), msgBytesToBytes(coin.Sigs), coin.ID)
if err != nil {
log.Debugf("Auth error for %s coin %s: %v", fundingAsset.Symbol, dexCoin, err)
return false, msgjson.NewError(msgjson.CoinAuthError, fmt.Sprintf("failed to authorize coin %v", dexCoin))
}
msgErr := r.checkZeroConfs(dexCoin, fundingAsset)
if msgErr != nil {
return false, msgErr
}
delete(neededCoins, key) // don't check this coin again
valSum += dexCoin.Value()
// NOTE: Summing like this is actually not quite sufficient to
// estimate the size associated with the input, because if it's a
// BTC segwit output, we would also have to account for the marker
// and flag weight, but only once per tx. The weight would add
// either 0 or 1 byte to the tx virtual size, so we have a chance of
// under-estimating by 1 byte to the advantage of the client. It
// won't ever cause issues though, because we also require funding
// for a change output in the final swap, which is actually not
// needed, so there's some buffer.
spendSize += dexCoin.SpendSize()
}
// Calculate the fees and check that the utxo sum is enough.
reqVal := calc.RequiredOrderFunds(swapVal, uint64(spendSize), lots, &fundingAsset.Asset)
if valSum < reqVal {
return false, msgjson.NewError(msgjson.FundingError,
fmt.Sprintf("not enough funds. need at least %d, got %d", reqVal, valSum))
}
return false, nil
}
log.Tracef("Searching for %s coins %v for new limit order", fundingAsset.Symbol, coinStrs)
r.latencyQ.Wait(&wait.Waiter{
Expiration: time.Now().Add(fundingTxWait),
TryFunc: func() bool {
tryAgain, msgErr := checkCoins()
if tryAgain {
return wait.TryAgain
}
if msgErr != nil {
r.respondError(msg.ID, user, msgErr)
return wait.DontTryAgain
}
// Send the order to the epoch queue where it will be time stamped.
log.Tracef("Found and validated %s coins %v for new limit order", fundingAsset.Symbol, coinStrs)
if err := tunnel.SubmitOrder(oRecord); err != nil {
code := msgjson.UnknownMarketError
switch {
case errors.Is(err, ErrInternalServer):
log.Errorf("Market failed to SubmitOrder: %v", err)
case errors.Is(err, ErrQuantityTooHigh):
code = msgjson.OrderQuantityTooHigh
fallthrough
default:
log.Debugf("Market failed to SubmitOrder: %v", err)
}
r.respondError(msg.ID, user, msgjson.NewError(code, err.Error()))
}
return wait.DontTryAgain
},
ExpireFunc: func() {
// Tell them to broadcast again or check their node before broadcast
// timeout is reached and the match is revoked.
r.respondError(msg.ID, user, msgjson.NewError(msgjson.TransactionUndiscovered,
fmt.Sprintf("failed to find funding coins %v", coinStrs)))
},
})
return nil
}
// handleMarket is the handler for the 'market' route. This route accepts a
// msgjson.MarketOrder payload, validates the information, constructs an
// order.MarketOrder and submits it to the epoch queue.
func (r *OrderRouter) handleMarket(user account.AccountID, msg *msgjson.Message) *msgjson.Error {
market := new(msgjson.MarketOrder)
err := msg.Unmarshal(&market)
if err != nil || market == nil {
return msgjson.NewError(msgjson.RPCParseError, "error decoding 'market' payload")
}
rpcErr := r.verifyAccount(user, market.AccountID, market)
if rpcErr != nil {
return rpcErr
}
if _, suspended := r.auth.Suspended(user); suspended {
return msgjson.NewError(msgjson.MarketNotRunningError, "suspended account %v may not submit trade orders", user)
}
tunnel, assets, sell, rpcErr := r.extractMarketDetails(&market.Prefix, &market.Trade)
if rpcErr != nil {
return rpcErr
}
if !tunnel.Running() {
mktName, _ := dex.MarketName(market.Base, market.Quote)
return msgjson.NewError(msgjson.MarketNotRunningError, "market %s closed to new orders", mktName)
}
// Check that OrderType is set correctly
if market.OrderType != msgjson.MarketOrderNum {
return msgjson.NewError(msgjson.OrderParameterError, "wrong order type set for market order")
}
// Passing sell as the checkLot parameter causes the lot size check to be
// ignored for market buy orders.
lotSize := tunnel.LotSize()
rpcErr = r.checkPrefixTrade(assets, lotSize, &market.Prefix, &market.Trade, sell)
if rpcErr != nil {
return rpcErr
}
// Commitment.
if len(market.Commit) != order.CommitmentSize {
return msgjson.NewError(msgjson.OrderParameterError, "invalid commitment")
}
var commit order.Commitment
copy(commit[:], market.Commit)
fundingAsset := assets.funding
coinIDs := make([]order.CoinID, 0, len(market.Trade.Coins))
coinStrs := make([]string, 0, len(market.Trade.Coins))
for _, coin := range market.Trade.Coins {
// Check that the outpoint isn't locked.
coinID := order.CoinID(coin.ID)
if tunnel.CoinLocked(fundingAsset.ID, coinID) {
return msgjson.NewError(msgjson.FundingError, fmt.Sprintf("coin %s is locked", fmtCoinID(fundingAsset.Symbol, coinID)))
}
coinStr, err := fundingAsset.Backend.ValidateCoinID(coinID)
if err != nil {
return msgjson.NewError(msgjson.FundingError, fmt.Sprintf("invalid coin ID %v: %v", coinID, err))
}
coinStrs = append(coinStrs, coinStr)
coinIDs = append(coinIDs, coinID)
}
// Create the market order
mo := &order.MarketOrder{
P: order.Prefix{
AccountID: user,
BaseAsset: market.Base,
QuoteAsset: market.Quote,
OrderType: order.MarketOrderType,
ClientTime: encode.UnixTimeMilli(int64(market.ClientTime)),
//ServerTime set in epoch queue processing pipeline.
Commit: commit,
},
T: order.Trade{
Coins: coinIDs,
Sell: sell,
Quantity: market.Quantity,
Address: market.Address,
},
}
// Send the order to the epoch queue.
oRecord := &orderRecord{
order: mo,
req: market,
msgID: msg.ID,
}
var valSum uint64
var spendSize uint32
neededCoins := make(map[int]*msgjson.Coin, len(market.Trade.Coins))
for i, coin := range market.Trade.Coins {
neededCoins[i] = coin
}
checkCoins := func() (tryAgain bool, msgErr *msgjson.Error) {
for key, coin := range neededCoins {
// Get the coin from the backend and validate it.
dexCoin, err := fundingCoin(fundingAsset.Backend, coin.ID, coin.Redeem)
if err != nil {
if errors.Is(err, asset.CoinNotFoundError) {
return true, nil
}
if errors.Is(err, asset.ErrRequestTimeout) {
log.Errorf("Deadline exceeded attempting to verify funding coin %v (%s). Will try again.",
coin.ID, fundingAsset.Symbol)
return true, nil
}
log.Errorf("Error retreiving market order funding coin ID %s. user = %s: %v", coin.ID, user, err)
return false, msgjson.NewError(msgjson.FundingError, fmt.Sprintf("error retrieving coin ID %v", coin.ID))
}
// Verify that the user controls the funding coins.
err = dexCoin.Auth(msgBytesToBytes(coin.PubKeys), msgBytesToBytes(coin.Sigs), coin.ID)
if err != nil {
log.Debugf("Auth error for %s coin %s: %v", fundingAsset.Symbol, dexCoin, err)
return false, msgjson.NewError(msgjson.CoinAuthError, fmt.Sprintf("failed to authorize coin %v", dexCoin))
}
msgErr := r.checkZeroConfs(dexCoin, fundingAsset)
if msgErr != nil {
return false, msgErr
}
delete(neededCoins, key) // don't check this coin again
valSum += dexCoin.Value()
// SEE NOTE above in handleLimit regarding underestimation for BTC.
spendSize += dexCoin.SpendSize()
}
if valSum == 0 {
return false, msgjson.NewError(msgjson.FundingError, "zero value funding coins not permitted")
}
// Calculate the fees and check that the utxo sum is enough.
var reqVal uint64
if sell {
lots := market.Quantity / lotSize
reqVal = calc.RequiredOrderFunds(market.Quantity, uint64(spendSize), lots, &assets.funding.Asset)
} else {
// This is a market buy order, so the quantity gets special handling.
// 1. The quantity is in units of the quote asset.
// 2. The quantity has to satisfy the market buy buffer.
midGap := tunnel.MidGap()
if midGap == 0 {
midGap = tunnel.RateStep()
}
buyBuffer := tunnel.MarketBuyBuffer()
lotWithBuffer := uint64(float64(lotSize) * buyBuffer)
minReq := matcher.BaseToQuote(midGap, lotWithBuffer)
reqVal = calc.RequiredOrderFunds(minReq, uint64(spendSize), 1, &assets.funding.Asset)
if market.Quantity < minReq {
errStr := fmt.Sprintf("order quantity does not satisfy market buy buffer. %d < %d. midGap = %d", market.Quantity, minReq, midGap)
return false, msgjson.NewError(msgjson.FundingError, errStr)
}
}
if valSum < reqVal {
return false, msgjson.NewError(msgjson.FundingError,
fmt.Sprintf("not enough funds. need at least %d, got %d", reqVal, valSum))
}
return false, nil
}
log.Tracef("Searching for %s coins %v for new market order", fundingAsset.Symbol, coinStrs)
r.latencyQ.Wait(&wait.Waiter{
Expiration: time.Now().Add(fundingTxWait),
TryFunc: func() bool {
tryAgain, msgErr := checkCoins()
if tryAgain {
return wait.TryAgain
}
if msgErr != nil {
r.respondError(msg.ID, user, msgErr)
return wait.DontTryAgain
}
// Send the order to the epoch queue where it will be time stamped.
log.Tracef("Found and validated %s coins %v for new market order", fundingAsset.Symbol, coinStrs)
if err := tunnel.SubmitOrder(oRecord); err != nil {
if errors.Is(err, ErrInternalServer) {
log.Errorf("Market failed to SubmitOrder: %v", err)
} else {
log.Debugf("Market failed to SubmitOrder: %v", err)
}
r.respondError(msg.ID, user, msgjson.NewError(msgjson.UnknownMarketError, err.Error()))
}
return wait.DontTryAgain
},
ExpireFunc: func() {
// Tell them to broadcast again or check their node before broadcast
// timeout is reached and the match is revoked.
r.respondError(msg.ID, user, msgjson.NewError(msgjson.TransactionUndiscovered,
fmt.Sprintf("failed to find funding coins %v", coinStrs)))
},
})
return nil
}
// Check the FundingCoin confirmations, and if zero, ensure the tx fee rate
// is sufficient, > 90% of our last recorded estimate for the asset.
func (r *OrderRouter) checkZeroConfs(dexCoin asset.FundingCoin, fundingAsset *asset.BackedAsset) *msgjson.Error {
// Verify that zero-conf coins are within 10% of the last known fee
// rate.
confs, err := coinConfirmations(dexCoin)
if err != nil {
log.Debugf("Confirmations error for %s coin %s: %v", fundingAsset.Symbol, dexCoin, err)
return msgjson.NewError(msgjson.FundingError, fmt.Sprintf("failed to verify coin %v", dexCoin))
}
if confs > 0 {
return nil
}
lastKnownFeeRate := r.feeSource.LastRate(fundingAsset.ID) // MaxFeeRate applied inside feeSource
feeMinimum := uint64(math.Round(float64(lastKnownFeeRate) * ZeroConfFeeRateThreshold))
feeRate := dexCoin.FeeRate()
if lastKnownFeeRate > 0 && feeRate < feeMinimum {
log.Debugf("Fee rate too low %s coin %s: %d < %d", fundingAsset.Symbol, dexCoin, feeRate, feeMinimum)
return msgjson.NewError(msgjson.FundingError,
fmt.Sprintf("fee rate for %s is too low. %d < %d", dexCoin, feeRate, feeMinimum))
}
return nil
}
// handleCancel is the handler for the 'cancel' route. This route accepts a
// msgjson.Cancel payload, validates the information, constructs an
// order.CancelOrder and submits it to the epoch queue.
func (r *OrderRouter) handleCancel(user account.AccountID, msg *msgjson.Message) *msgjson.Error {
cancel := new(msgjson.CancelOrder)
err := msg.Unmarshal(&cancel)
if err != nil || cancel == nil {
return msgjson.NewError(msgjson.RPCParseError, "error decoding 'cancel' payload")
}
rpcErr := r.verifyAccount(user, cancel.AccountID, cancel)
if rpcErr != nil {
return rpcErr
}
// NOTE: Allow suspended accounts to submit cancel orders.
tunnel, rpcErr := r.extractMarket(&cancel.Prefix)
if rpcErr != nil {
return rpcErr
}
if len(cancel.TargetID) != order.OrderIDSize {
return msgjson.NewError(msgjson.OrderParameterError, "invalid target ID format")
}
var targetID order.OrderID
copy(targetID[:], cancel.TargetID)
if !tunnel.Cancelable(targetID) {
return msgjson.NewError(msgjson.OrderParameterError, "target order not known: %v", targetID)
}
// Check that OrderType is set correctly
if cancel.OrderType != msgjson.CancelOrderNum {
return msgjson.NewError(msgjson.OrderParameterError, "wrong order type set for cancel order")
}
rpcErr = checkTimes(&cancel.Prefix)
if rpcErr != nil {
return rpcErr
}
// Commitment.
if len(cancel.Commit) != order.CommitmentSize {
return msgjson.NewError(msgjson.OrderParameterError, "invalid commitment")
}
var commit order.Commitment
copy(commit[:], cancel.Commit)
// Create the cancel order
co := &order.CancelOrder{
P: order.Prefix{
AccountID: user,
BaseAsset: cancel.Base,
QuoteAsset: cancel.Quote,
OrderType: order.CancelOrderType,
ClientTime: encode.UnixTimeMilli(int64(cancel.ClientTime)),
//ServerTime set in epoch queue processing pipeline.
Commit: commit,
},
TargetOrderID: targetID,
}
// Send the order to the epoch queue.
oRecord := &orderRecord{
order: co,
req: cancel,
msgID: msg.ID,
}
if err := tunnel.SubmitOrder(oRecord); err != nil {
if errors.Is(err, ErrInternalServer) {
log.Errorf("Market failed to SubmitOrder: %v", err)
}
return msgjson.NewError(msgjson.UnknownMarketError, err.Error())
}
return nil
}
// verifyAccount checks that the submitted order squares with the submitting user.
func (r *OrderRouter) verifyAccount(user account.AccountID, msgAcct msgjson.Bytes, signable msgjson.Signable) *msgjson.Error {
// Verify account ID matches.
if !bytes.Equal(user[:], msgAcct) {
return msgjson.NewError(msgjson.OrderParameterError, "account ID mismatch")
}
// Check the clients signature of the order.
// DRAFT NOTE: These Serialize methods actually never return errors. We should
// just drop the error return value.
sigMsg := signable.Serialize()
err := r.auth.Auth(user, sigMsg, signable.SigBytes())
if err != nil {
return msgjson.NewError(msgjson.SignatureError, "signature error: "+err.Error())
}
return nil
}
// extractMarket finds the MarketTunnel for the provided prefix.
func (r *OrderRouter) extractMarket(prefix *msgjson.Prefix) (MarketTunnel, *msgjson.Error) {
mktName, err := dex.MarketName(prefix.Base, prefix.Quote)
if err != nil {
return nil, msgjson.NewError(msgjson.UnknownMarketError, "asset lookup error: "+err.Error())
}
tunnel, found := r.tunnels[mktName]
if !found {
return nil, msgjson.NewError(msgjson.UnknownMarketError, "unknown market "+mktName)
}
return tunnel, nil
}
// SuspendEpoch holds the index and end time of final epoch marking the
// suspension of a market.
type SuspendEpoch struct {
Idx int64
End time.Time
}
// SuspendMarket schedules a suspension of a given market, with the option to
// persist the orders on the book (or purge the book automatically on market
// shutdown). The scheduled final epoch and suspend time are returned. Note that
// OrderRouter is a proxy for this request to the ultimate Market. This is done
// because OrderRouter is the entry point for new orders into the market. TODO:
// track running, suspended, and scheduled-suspended markets, appropriately
// blocking order submission according to the schedule rather than just checking
// Market.Running prior to submitting incoming orders to the Market.
func (r *OrderRouter) SuspendMarket(mktName string, asSoonAs time.Time, persistBooks bool) *SuspendEpoch {
mkt, found := r.tunnels[mktName]
if !found {
return nil
}
idx, t := mkt.Suspend(asSoonAs, persistBooks)
return &SuspendEpoch{
Idx: idx,
End: t,
}
}
// Suspend is like SuspendMarket, but for all known markets.
func (r *OrderRouter) Suspend(asSoonAs time.Time, persistBooks bool) map[string]*SuspendEpoch {
suspendTimes := make(map[string]*SuspendEpoch, len(r.tunnels))
for name, mkt := range r.tunnels {
idx, ts := mkt.Suspend(asSoonAs, persistBooks)
suspendTimes[name] = &SuspendEpoch{Idx: idx, End: ts}
}
// MarketTunnel.Running will return false when the market closes, and true
// when and if it opens again. Locking/blocking of the incoming order
// handlers is not necessary since any orders that sneak in to a Market will
// be rejected if there is no active epoch.
return suspendTimes
}
// extractMarketDetails finds the MarketTunnel, an assetSet, and market side for
// the provided prefix.
func (r *OrderRouter) extractMarketDetails(prefix *msgjson.Prefix, trade *msgjson.Trade) (MarketTunnel, *assetSet, bool, *msgjson.Error) {
// Check that assets are for a valid market.
tunnel, rpcErr := r.extractMarket(prefix)
if rpcErr != nil {
return nil, nil, false, rpcErr
}
// Side must be one of buy or sell
var sell bool
switch trade.Side {
case msgjson.BuyOrderNum:
case msgjson.SellOrderNum:
sell = true
default:
return nil, nil, false, msgjson.NewError(msgjson.OrderParameterError,
fmt.Sprintf("invalid side value %d", trade.Side))
}
quote, found := r.assets[prefix.Quote]
if !found {
panic("missing quote asset for known market should be impossible")
}
base, found := r.assets[prefix.Base]
if !found {
panic("missing base asset for known market should be impossible")
}
return tunnel, newAssetSet(base, quote, sell), sell, nil
}
// checkTimes validates the timestamps in an order prefix.
func checkTimes(prefix *msgjson.Prefix) *msgjson.Error {
offset := encode.UnixMilli(time.Now()) - int64(prefix.ClientTime)
if offset < 0 {
offset *= -1
}
if offset >= maxClockOffset {
return msgjson.NewError(msgjson.ClockRangeError, fmt.Sprintf(
"clock offset of %d ms is larger than maximum allowed, %d ms",
offset, maxClockOffset,
))
}
// Server time should be unset.
if prefix.ServerTime != 0 {
return msgjson.NewError(msgjson.OrderParameterError, "non-zero server time not allowed")
}
return nil
}
// checkPrefixTrade validates the information in the prefix and trade portions
// of an order.
func (r *OrderRouter) checkPrefixTrade(assets *assetSet, lotSize uint64, prefix *msgjson.Prefix,
trade *msgjson.Trade, checkLot bool) *msgjson.Error {
// Check that the client's timestamp is still valid.
rpcErr := checkTimes(prefix)
if rpcErr != nil {
return rpcErr
}
// Check that the address is valid.
if !assets.receiving.Backend.CheckAddress(trade.Address) {
return msgjson.NewError(msgjson.OrderParameterError, "address doesn't check")
}
// Quantity cannot be zero, and must be an integral multiple of the lot size.
if trade.Quantity == 0 {
return msgjson.NewError(msgjson.OrderParameterError, "zero quantity not allowed")
}
if checkLot && trade.Quantity%lotSize != 0 {
return msgjson.NewError(msgjson.OrderParameterError, "order quantity not a multiple of lot size")
}
// Validate UTXOs
// Check that all required arrays are of equal length.
if len(trade.Coins) == 0 {
return msgjson.NewError(msgjson.FundingError, "order must specify utxos")
}
for i, coin := range trade.Coins {
sigCount := len(coin.Sigs)
if sigCount == 0 {
return msgjson.NewError(msgjson.SignatureError, fmt.Sprintf("no signature for coin %d", i))
}
if len(coin.PubKeys) != sigCount {
return msgjson.NewError(msgjson.OrderParameterError, fmt.Sprintf(
"pubkey count %d not equal to signature count %d for coin %d",
len(coin.PubKeys), sigCount, i,
))
}
}
// Verify all of the user's unfilled book orders have unspent funding coins,
// unbooking them as necessary.
var user account.AccountID
copy(user[:], prefix.AccountID)
for mktName, tunnel := range r.tunnels {
unbookedUnfunded := tunnel.CheckUnfilled(assets.funding.ID, user)
for _, badLo := range unbookedUnfunded {
log.Infof("Unbooked unfunded order %v from market %s for user %v", badLo, mktName, user)
}
}
return nil
}
// msgBytesToBytes converts a []msgjson.Byte to a [][]byte.
func msgBytesToBytes(msgBs []msgjson.Bytes) [][]byte {
b := make([][]byte, 0, len(msgBs))
for _, msgB := range msgBs {
b = append(b, msgB)
}
return b
}
// fmtCoinID formats the coin ID by asset. If an error is encountered, the
// coinID string returned hex-encoded and prepended with "unparsed:".
func fmtCoinID(symbol string, coinID []byte) string {
strID, err := asset.DecodeCoinID(symbol, coinID)
if err != nil {
return "unparsed:" + hex.EncodeToString(coinID)
}
return strID
}