-
Notifications
You must be signed in to change notification settings - Fork 89
/
orders.go
1802 lines (1580 loc) · 56.2 KB
/
orders.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.
package pg
import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"fmt"
"sort"
"time"
"decred.org/dcrdex/dex"
"decred.org/dcrdex/dex/order"
"decred.org/dcrdex/server/account"
"decred.org/dcrdex/server/db"
"decred.org/dcrdex/server/db/driver/pg/internal"
"github.com/lib/pq"
)
// Wrap the CoinID slice to implement custom Scanner and Valuer.
type dbCoins []order.CoinID
// Value implements the sql/driver.Valuer interface. The coin IDs are encoded as
// L0|ID0|L1|ID1|... where | is simple concatenation, Ln is the length of the
// nth coin ID, and IDn is the bytes of the nth coinID.
func (coins dbCoins) Value() (driver.Value, error) {
if len(coins) == 0 {
return []byte{}, nil
}
// As an initial guess that's likely accurate for most coins, allocate as if
// each coin ID is the same length.
lenGuess := len(coins[0])
b := make([]byte, 0, len(coins)*(lenGuess+1))
for _, coin := range coins {
b = append(b, byte(len(coin)))
b = append(b, coin...)
}
return b, nil
}
// Scan implements the sql.Scanner interface.
func (coins *dbCoins) Scan(src interface{}) error {
b := src.([]byte)
if len(b) == 0 {
*coins = dbCoins{}
return nil
}
lenGuess := int(b[0])
if lenGuess == 0 {
return fmt.Errorf("zero-length coin ID indicated")
}
c := make(dbCoins, 0, len(b)/(lenGuess+1))
for len(b) > 0 {
cLen := int(b[0])
if cLen == 0 {
return fmt.Errorf("zero-length coin ID indicated")
}
if len(b) < cLen+1 {
return fmt.Errorf("too many bytes indicated")
}
// Deep copy the coin ID (a slice) since the backing buffer may be
// reused.
bc := make([]byte, cLen)
copy(bc, b[1:cLen+1])
c = append(c, bc)
b = b[cLen+1:]
}
*coins = c
return nil
}
var _ db.OrderArchiver = (*Archiver)(nil)
// Order retrieves an order with the given OrderID, stored for the market
// specified by the given base and quote assets. A non-nil error will be
// returned if the market is not recognized. If the order is not found, the
// error value is ErrUnknownOrder, and the type is order.OrderStatusUnknown. The
// only recognized order types are market, limit, and cancel.
func (a *Archiver) Order(oid order.OrderID, base, quote uint32) (order.Order, order.OrderStatus, error) {
marketSchema, err := a.marketSchema(base, quote)
if err != nil {
return nil, order.OrderStatusUnknown, err
}
// Since order type is unknown:
// - try to load from orders table, which includes market and limit orders
// - if found, coerce into the correct order type and return
// - if not found, try loading a cancel order with this oid
var errA db.ArchiveError
ord, status, err := loadTrade(a.db, a.dbName, marketSchema, oid)
if errors.As(err, &errA) {
if errA.Code != db.ErrUnknownOrder {
return nil, order.OrderStatusUnknown, err
}
// Try the cancel orders.
var co *order.CancelOrder
co, status, err = loadCancelOrder(a.db, a.dbName, marketSchema, oid)
if err != nil {
return nil, order.OrderStatusUnknown, err // includes ErrUnknownOrder
}
co.BaseAsset, co.QuoteAsset = base, quote
return co, pgToMarketStatus(status), err
// no other order types to try presently
}
if err != nil {
return nil, order.OrderStatusUnknown, err
}
prefix := ord.Prefix()
prefix.BaseAsset, prefix.QuoteAsset = base, quote
return ord, pgToMarketStatus(status), nil
}
type pgOrderStatus int16
const (
orderStatusUnknown pgOrderStatus = iota
orderStatusEpoch
orderStatusBooked
orderStatusExecuted
orderStatusFailed // failed helps distinguish matched from unmatched executed cancel orders
orderStatusCanceled
orderStatusRevoked // indicates a trade order was revoked, or in the cancels table that the cancel is server-generated
)
func marketToPgStatus(status order.OrderStatus) pgOrderStatus {
switch status {
case order.OrderStatusEpoch:
return orderStatusEpoch
case order.OrderStatusBooked:
return orderStatusBooked
case order.OrderStatusExecuted:
return orderStatusExecuted
case order.OrderStatusCanceled:
return orderStatusCanceled
case order.OrderStatusRevoked:
return orderStatusRevoked
}
return orderStatusUnknown
}
func pgToMarketStatus(status pgOrderStatus) order.OrderStatus {
switch status {
case orderStatusEpoch:
return order.OrderStatusEpoch
case orderStatusBooked:
return order.OrderStatusBooked
case orderStatusExecuted, orderStatusFailed: // failed is executed as far as the market is concerned
return order.OrderStatusExecuted
case orderStatusCanceled:
return order.OrderStatusCanceled
case orderStatusRevoked, -orderStatusRevoked: // negative revoke status means forgiven preimage miss
return order.OrderStatusRevoked
}
return order.OrderStatusUnknown
}
func (status pgOrderStatus) String() string {
switch status {
case orderStatusFailed:
return "failed"
default:
return pgToMarketStatus(status).String()
}
}
func (status pgOrderStatus) active() bool {
switch status {
case orderStatusEpoch, orderStatusBooked:
return true
case orderStatusCanceled, orderStatusRevoked, -orderStatusRevoked,
orderStatusExecuted, orderStatusFailed, orderStatusUnknown:
return false
default:
panic("unknown order status!") // programmer error
}
}
// NewEpochOrder stores the given order with epoch status. This is equivalent to
// StoreOrder with OrderStatusEpoch.
func (a *Archiver) NewEpochOrder(ord order.Order, epochIdx, epochDur int64, epochGap int32) error {
return a.storeOrder(ord, epochIdx, epochDur, epochGap, orderStatusEpoch)
}
// NewArchivedCancel stores a cancel order directly in the executed state. This
// is used for orders that are canceled when the market is suspended, and therefore
// do not need to be matched.
func (a *Archiver) NewArchivedCancel(ord *order.CancelOrder, epochID, epochDur int64) error {
marketSchema, err := a.marketSchema(ord.Base(), ord.Quote())
if err != nil {
return err
}
status := orderStatusExecuted
tableName := fullCancelOrderTableName(a.dbName, marketSchema, status.active())
N, err := storeCancelOrder(a.db, tableName, ord, status, epochID, epochDur, db.EpochGapNA)
if err != nil {
a.fatalBackendErr(err)
return fmt.Errorf("storeCancelOrder failed: %w", err)
}
if N != 1 {
err = fmt.Errorf("failed to store order %v: %d rows affected, expected 1",
ord.UID(), N)
return err
}
return nil
}
func makePseudoCancel(target order.OrderID, user account.AccountID, base, quote uint32, timeStamp time.Time) *order.CancelOrder {
// Create a server-generated cancel order to record the server's revoke
// order action.
return &order.CancelOrder{
P: order.Prefix{
AccountID: user,
BaseAsset: base,
QuoteAsset: quote,
OrderType: order.CancelOrderType,
ClientTime: timeStamp,
ServerTime: timeStamp,
// The zero-value for Commitment is stored as NULL. See
// (Commitment).Value.
},
TargetOrderID: target,
}
}
// FlushBook revokes all booked orders for a market.
func (a *Archiver) FlushBook(base, quote uint32) (sellsRemoved, buysRemoved []order.OrderID, err error) {
var marketSchema string
marketSchema, err = a.marketSchema(base, quote)
if err != nil {
return
}
// Booked orders (active) are made revoked (archived).
srcTableName := fullOrderTableName(a.dbName, marketSchema, orderStatusBooked.active())
dstTableName := fullOrderTableName(a.dbName, marketSchema, orderStatusRevoked.active())
timeStamp := time.Now().Truncate(time.Millisecond).UTC()
var dbTx *sql.Tx
dbTx, err = a.db.Begin()
if err != nil {
err = fmt.Errorf("failed to begin database transaction: %w", err)
return
}
fail := func() {
sellsRemoved, buysRemoved = nil, nil
a.fatalBackendErr(err)
_ = dbTx.Rollback()
}
// Changed all booked orders to revoked.
stmt := fmt.Sprintf(internal.PurgeBook, srcTableName, orderStatusRevoked, dstTableName)
var rows *sql.Rows
rows, err = dbTx.Query(stmt, orderStatusBooked)
if err != nil {
fail()
return
}
defer rows.Close()
var cos []*order.CancelOrder
for rows.Next() {
var oid order.OrderID
var sell bool
var aid account.AccountID
if err = rows.Scan(&oid, &sell, &aid); err != nil {
fail()
return
}
cos = append(cos, makePseudoCancel(oid, aid, base, quote, timeStamp))
if sell {
sellsRemoved = append(sellsRemoved, oid)
} else {
buysRemoved = append(buysRemoved, oid)
}
}
if err = rows.Err(); err != nil {
fail()
return
}
// Insert the pseudo-cancel orders.
cancelTable := fullCancelOrderTableName(a.dbName, marketSchema, orderStatusRevoked.active())
stmt = fmt.Sprintf(internal.InsertCancelOrder, cancelTable)
for _, co := range cos {
// Special values for this server-generate cancel order:
// - Pass nil instead of the zero value Commitment to save a comparison
// in (Commitment).Value with the zero value.
// - Set epoch idx to exemptEpochIdx (-1) and dur to dummyEpochDur (1),
// consistent with revokeOrder(..., exempt=true).
_, err = dbTx.Exec(stmt, co.ID(), co.AccountID, co.ClientTime,
co.ServerTime, nil, co.TargetOrderID, orderStatusRevoked, exemptEpochIdx, dummyEpochDur, db.EpochGapNA)
if err != nil {
fail()
err = fmt.Errorf("failed to store pseudo-cancel order: %w", err)
return
}
}
if err = dbTx.Commit(); err != nil {
fail()
err = fmt.Errorf("failed to commit transaction: %w", err)
return
}
return
}
// BookOrders retrieves all booked orders (with order status booked) for the
// specified market. This will be used to repopulate a market's book on
// construction of the market.
func (a *Archiver) BookOrders(base, quote uint32) ([]*order.LimitOrder, error) {
marketSchema, err := a.marketSchema(base, quote)
if err != nil {
return nil, err
}
// All booked orders are active.
tableName := fullOrderTableName(a.dbName, marketSchema, true) // active (true)
// no query timeout here, only explicit cancellation
ords, err := ordersByStatusFromTable(a.ctx, a.db, tableName, base, quote, orderStatusBooked)
if err != nil {
return nil, err
}
// Verify loaded orders are limits, and cast to *LimitOrder.
limits := make([]*order.LimitOrder, 0, len(ords))
for _, ord := range ords {
lo, ok := ord.(*order.LimitOrder)
if !ok {
log.Errorf("loaded book order %v that was not a limit order", ord.ID())
continue
}
limits = append(limits, lo)
}
return limits, nil
}
// EpochOrders retrieves all epoch orders for the specified market returns them
// as a slice of order.Order.
func (a *Archiver) EpochOrders(base, quote uint32) ([]order.Order, error) {
los, mos, cos, err := a.epochOrders(base, quote)
if err != nil {
return nil, err
}
orders := make([]order.Order, 0, len(los)+len(mos)+len(cos))
for _, o := range los {
orders = append(orders, o)
}
for _, o := range mos {
orders = append(orders, o)
}
for _, o := range cos {
orders = append(orders, o)
}
return orders, nil
}
// epochOrders retrieves all epoch orders for the specified market.
func (a *Archiver) epochOrders(base, quote uint32) ([]*order.LimitOrder, []*order.MarketOrder, []*order.CancelOrder, error) {
marketSchema, err := a.marketSchema(base, quote)
if err != nil {
return nil, nil, nil, err
}
tableName := fullOrderTableName(a.dbName, marketSchema, true) // active (true)
// no query timeout here, only explicit cancellation
ords, err := ordersByStatusFromTable(a.ctx, a.db, tableName, base, quote, orderStatusEpoch)
if err != nil {
return nil, nil, nil, err
}
// Verify loaded order type and add to correct slice.
var limits []*order.LimitOrder
var markets []*order.MarketOrder
for _, ord := range ords {
switch o := ord.(type) {
case *order.LimitOrder:
limits = append(limits, o)
case *order.MarketOrder:
markets = append(markets, o)
default:
log.Errorf("loaded epoch order %v that was not a limit or market order: %T", ord.ID(), ord)
}
}
tableName = fullCancelOrderTableName(a.dbName, marketSchema, true) // active(true)
cancels, err := cancelOrdersByStatusFromTable(a.ctx, a.db, tableName, base, quote, orderStatusEpoch)
if err != nil {
return nil, nil, nil, err
}
return limits, markets, cancels, nil
}
// ActiveOrderCoins retrieves a CoinID slice for each active order.
func (a *Archiver) ActiveOrderCoins(base, quote uint32) (baseCoins, quoteCoins map[order.OrderID][]order.CoinID, err error) {
var marketSchema string
marketSchema, err = a.marketSchema(base, quote)
if err != nil {
return
}
tableName := fullOrderTableName(a.dbName, marketSchema, true) // active (true)
stmt := fmt.Sprintf(internal.SelectOrderCoinIDs, tableName)
var rows *sql.Rows
rows, err = a.db.Query(stmt)
switch {
case errors.Is(err, sql.ErrNoRows):
err = nil
fallthrough
case err == nil:
baseCoins = make(map[order.OrderID][]order.CoinID)
quoteCoins = make(map[order.OrderID][]order.CoinID)
default:
return
}
defer rows.Close()
for rows.Next() {
var oid order.OrderID
var coins dbCoins
var sell bool
err = rows.Scan(&oid, &sell, &coins)
if err != nil {
return nil, nil, err
}
// Sell orders lock base asset coins.
if sell {
baseCoins[oid] = coins
} else {
// Buy orders lock quote asset coins.
quoteCoins[oid] = coins
}
}
if err = rows.Err(); err != nil {
return nil, nil, err
}
return
}
// BookOrder updates the given LimitOrder with booked status.
func (a *Archiver) BookOrder(lo *order.LimitOrder) error {
return a.updateOrderStatus(lo, orderStatusBooked)
}
// ExecuteOrder updates the given Order with executed status.
func (a *Archiver) ExecuteOrder(ord order.Order) error {
return a.updateOrderStatus(ord, orderStatusExecuted)
}
// CancelOrder updates a LimitOrder with canceled status. If the order does not
// exist in the Archiver, CancelOrder returns ErrUnknownOrder. To store a new
// limit order with canceled status, use StoreOrder.
func (a *Archiver) CancelOrder(lo *order.LimitOrder) error {
return a.updateOrderStatus(lo, orderStatusCanceled)
}
// RevokeOrder updates an Order with revoked status, which is used for
// DEX-revoked orders rather than orders matched with a user's CancelOrder. If
// the order does not exist in the Archiver, RevokeOrder returns
// ErrUnknownOrder. This may change orders with status executed to revoked,
// which may be unexpected.
func (a *Archiver) RevokeOrder(ord order.Order) (cancelID order.OrderID, timeStamp time.Time, err error) {
return a.revokeOrder(ord, false)
}
// RevokeOrderUncounted is like RevokeOrder except that the generated cancel
// order will not be counted against the user. i.e. ExecutedCancelsForUser
// should not return the cancel orders created this way.
func (a *Archiver) RevokeOrderUncounted(ord order.Order) (cancelID order.OrderID, timeStamp time.Time, err error) {
return a.revokeOrder(ord, true)
}
const (
exemptEpochIdx int64 = -1
countedEpochIdx int64 = 0
dummyEpochDur int64 = 1 // for idx*duration math
)
func (a *Archiver) revokeOrder(ord order.Order, exempt bool) (cancelID order.OrderID, timeStamp time.Time, err error) {
// Revoke the targeted order.
err = a.updateOrderStatus(ord, orderStatusRevoked)
if err != nil {
return
}
// Store the pseudo-cancel order with 0 epoch idx and duration and status
// orderStatusRevoked as indicators that this is a revocation.
timeStamp = time.Now().Truncate(time.Millisecond).UTC()
co := makePseudoCancel(ord.ID(), ord.User(), ord.Base(), ord.Quote(), timeStamp)
cancelID = co.ID()
epochIdx := countedEpochIdx
if exempt {
epochIdx = exemptEpochIdx
}
err = a.storeOrder(co, epochIdx, dummyEpochDur, db.EpochGapNA, orderStatusRevoked)
return
}
// FailCancelOrder updates or inserts the given CancelOrder with failed status.
// To update a CancelOrder with executed status, use ExecuteOrder.
func (a *Archiver) FailCancelOrder(co *order.CancelOrder) error {
return a.updateOrderStatus(co, orderStatusFailed)
}
func validateOrder(ord order.Order, status pgOrderStatus, mkt *dex.MarketInfo) bool {
if status == orderStatusFailed && ord.Type() != order.CancelOrderType {
return false
}
return db.ValidateOrder(ord, pgToMarketStatus(status), mkt)
}
// StoreOrder stores an order for the specified epoch ID (idx:dur) with the
// provided status. The market is determined from the Order. A non-nil error
// will be returned if the market is not recognized. All orders are validated
// via server/db.ValidateOrder to ensure only sensible orders reach persistent
// storage. Updating orders should be done via one of the update functions such
// as UpdateOrderStatus.
func (a *Archiver) StoreOrder(ord order.Order, epochIdx, epochDur int64, status order.OrderStatus) error {
return a.storeOrder(ord, epochIdx, epochDur, db.EpochGapNA, marketToPgStatus(status))
}
func (a *Archiver) storeOrder(ord order.Order, epochIdx, epochDur int64, epochGap int32, status pgOrderStatus) error {
marketSchema, err := a.marketSchema(ord.Base(), ord.Quote())
if err != nil {
return err
}
if !validateOrder(ord, status, a.markets[marketSchema]) {
return db.ArchiveError{
Code: db.ErrInvalidOrder,
Detail: fmt.Sprintf("invalid order %v for status %v and market %v",
ord.UID(), status, a.markets[marketSchema]),
}
}
// Check for order commitment duplicates. This also covers order ID since
// commitment is part of order serialization. Note that it checks ALL
// markets, so this may be excessive. This check may be more appropriate in
// the caller, or may be removed in favor of a different check depending on
// where preimages are stored. If we allow reused commitments if the
// preimages are only revealed once, then the unique constraint on the
// commit column in the orders tables would need to be removed.
// IDEA: Do not apply this constraint to server-generated cancel orders,
// which we may wish to have a zero value commitment and status revoked.
// if _, isCancel := ord.(*order.CancelOrder); !isCancel || status != orderStatusRevoked {
commit := ord.Commitment()
found, prevOid, err := a.OrderWithCommit(a.ctx, commit) // no query timeouts in storeOrder, only explicit cancellation
if err != nil {
return err
}
if found {
return db.ArchiveError{
Code: db.ErrReusedCommit,
Detail: fmt.Sprintf("order %v reuses commit %v from previous order %v",
ord.UID(), commit, prevOid),
}
}
var N int64
switch ot := ord.(type) {
case *order.CancelOrder:
tableName := fullCancelOrderTableName(a.dbName, marketSchema, status.active())
N, err = storeCancelOrder(a.db, tableName, ot, status, epochIdx, epochDur, epochGap)
if err != nil {
a.fatalBackendErr(err)
return fmt.Errorf("storeCancelOrder failed: %w", err)
}
case *order.MarketOrder:
tableName := fullOrderTableName(a.dbName, marketSchema, status.active())
N, err = storeMarketOrder(a.db, tableName, ot, status, epochIdx, epochDur)
if err != nil {
a.fatalBackendErr(err)
return fmt.Errorf("storeMarketOrder failed: %w", err)
}
case *order.LimitOrder:
tableName := fullOrderTableName(a.dbName, marketSchema, status.active())
N, err = storeLimitOrder(a.db, tableName, ot, status, epochIdx, epochDur)
if err != nil {
a.fatalBackendErr(err)
return fmt.Errorf("storeLimitOrder failed: %w", err)
}
default:
panic("ValidateOrder should have caught this")
}
if N != 1 {
err = fmt.Errorf("failed to store order %v: %d rows affected, expected 1",
ord.UID(), N)
a.fatalBackendErr(err)
return err
}
return nil
}
func (a *Archiver) orderTableName(ord order.Order) (string, pgOrderStatus, error) {
status, orderType, _, err := a.orderStatus(ord)
if err != nil {
return "", status, err
}
marketSchema, err := a.marketSchema(ord.Base(), ord.Quote())
if err != nil {
return "", status, err
}
var tableName string
switch orderType {
case order.MarketOrderType, order.LimitOrderType:
tableName = fullOrderTableName(a.dbName, marketSchema, status.active())
case order.CancelOrderType:
tableName = fullCancelOrderTableName(a.dbName, marketSchema, status.active())
default:
return "", status, fmt.Errorf("unrecognized order type %v", orderType)
}
return tableName, status, nil
}
func (a *Archiver) OrderPreimage(ord order.Order) (order.Preimage, error) {
var pi order.Preimage
tableName, _, err := a.orderTableName(ord)
if err != nil {
return pi, err
}
stmt := fmt.Sprintf(internal.SelectOrderPreimage, tableName)
err = a.db.QueryRow(stmt, ord.ID()).Scan(&pi)
return pi, err
}
// StorePreimage stores the preimage associated with an existing order.
func (a *Archiver) StorePreimage(ord order.Order, pi order.Preimage) error {
tableName, status, err := a.orderTableName(ord)
if err != nil {
return err
}
// Preimages are stored during epoch processing, specifically after users
// have responded with their preimages but before swap negotiation begins.
// Thus, this order should be "active" i.e. not in an archived orders table.
if !status.active() {
log.Warnf("Attempting to set preimage for archived order %v", ord.UID())
}
stmt := fmt.Sprintf(internal.SetOrderPreimage, tableName)
N, err := sqlExec(a.db, stmt, pi, ord.ID())
if err != nil {
a.fatalBackendErr(err)
return err
}
if N != 1 {
return fmt.Errorf("failed to update 1 order's preimage, updated %d", N)
}
return nil
}
// SetOrderCompleteTime sets the successful swap completion time for an existing
// order. It is an error if the order is not in executed status.
func (a *Archiver) SetOrderCompleteTime(ord order.Order, compTimeMs int64) error {
status, orderType, _, err := a.orderStatus(ord)
if err != nil {
return err
}
if status != orderStatusExecuted { // complete_time is only set for executed orders, not canceled or revoked
log.Warnf("Attempting to set swap completion time for order %v in status %v, not executed",
ord.UID(), status)
return db.ArchiveError{
Code: db.ErrOrderNotExecuted,
Detail: fmt.Sprintf("unable to set completed time for order %v in status %v, not executed",
ord.UID(), status),
}
}
marketSchema, err := a.marketSchema(ord.Base(), ord.Quote())
if err != nil {
return db.ArchiveError{
Code: db.ErrInvalidOrder,
Detail: fmt.Sprintf("unknown market (%d, %d) for order %v",
ord.Base(), ord.Quote(), ord.UID()),
}
}
var tableName string
switch orderType {
case order.MarketOrderType, order.LimitOrderType:
tableName = fullOrderTableName(a.dbName, marketSchema, status.active())
case order.CancelOrderType:
tableName = fullCancelOrderTableName(a.dbName, marketSchema, status.active())
default:
return db.ArchiveError{
Code: db.ErrInvalidOrder,
Detail: fmt.Sprintf("unknown type for order %v: %v", ord.UID(), orderType),
}
}
stmt := fmt.Sprintf(internal.SetOrderCompleteTime, tableName)
N, err := sqlExec(a.db, stmt, compTimeMs, ord.ID())
if err != nil {
a.fatalBackendErr(err)
return db.ArchiveError{
Code: db.ErrGeneralFailure,
Detail: "SetOrderCompleteTime failed:" + err.Error(),
}
}
if N != 1 {
return db.ArchiveError{
Code: db.ErrUpdateCount,
Detail: fmt.Sprintf("failed to update 1 order's completion time, updated %d", N),
}
}
return nil
}
type orderCompStamped struct {
oid order.OrderID
t int64
}
// CompletedUserOrders retrieves the N most recently completed orders for a user
// across all markets.
func (a *Archiver) CompletedUserOrders(aid account.AccountID, N int) (oids []order.OrderID, compTimes []int64, err error) {
var ords []orderCompStamped
for schema := range a.markets {
tableName := fullOrderTableName(a.dbName, schema, false) // NOT active table
ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout)
mktOids, err := completedUserOrders(ctx, a.db, tableName, aid, N)
cancel()
if err != nil {
return nil, nil, err
}
ords = append(ords, mktOids...)
}
sort.Slice(ords, func(i, j int) bool {
return ords[i].t > ords[j].t // descending, latest completed order first
})
if N > len(ords) {
N = len(ords)
}
for i := range ords[:N] {
oids = append(oids, ords[i].oid)
compTimes = append(compTimes, ords[i].t)
}
return
}
func completedUserOrders(ctx context.Context, dbe *sql.DB, tableName string, aid account.AccountID, N int) (oids []orderCompStamped, err error) {
stmt := fmt.Sprintf(internal.RetrieveCompletedOrdersForAccount, tableName)
var rows *sql.Rows
rows, err = dbe.QueryContext(ctx, stmt, aid, N)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var oid order.OrderID
var acct account.AccountID
var completeTime sql.NullInt64
err = rows.Scan(&oid, &acct, &completeTime)
if err != nil {
return nil, err
}
oids = append(oids, orderCompStamped{oid, completeTime.Int64})
}
if err = rows.Err(); err != nil {
return nil, err
}
return
}
// PreimageStats retrieves results of the N most recent preimage requests for
// the user across all markets.
func (a *Archiver) PreimageStats(user account.AccountID, lastN int) ([]*db.PreimageResult, error) {
var outcomes []*db.PreimageResult
queryOutcomes := func(stmt string) error {
ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout)
defer cancel()
rows, err := a.db.QueryContext(ctx, stmt, user, lastN, orderStatusRevoked)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var miss bool
var time int64
var oid order.OrderID
err = rows.Scan(&oid, &miss, &time)
if err != nil {
return err
}
outcomes = append(outcomes, &db.PreimageResult{
Miss: miss,
Time: time,
ID: oid,
})
}
return rows.Err()
}
for schema := range a.markets {
// archived trade orders
stmt := fmt.Sprintf(internal.PreimageResultsLastN, fullOrderTableName(a.dbName, schema, false))
if err := queryOutcomes(stmt); err != nil {
return nil, err
}
// archived cancel orders
stmt = fmt.Sprintf(internal.CancelPreimageResultsLastN, fullCancelOrderTableName(a.dbName, schema, false))
if err := queryOutcomes(stmt); err != nil {
return nil, err
}
}
sort.Slice(outcomes, func(i, j int) bool {
return outcomes[j].Time < outcomes[i].Time // descending
})
if len(outcomes) > lastN {
outcomes = outcomes[:lastN]
}
return outcomes, nil
}
// OrderStatusByID gets the status, type, and filled amount of the order with
// the given OrderID in the market specified by a base and quote asset. See also
// OrderStatus. If the order is not found, the error value is ErrUnknownOrder,
// and the type is order.OrderStatusUnknown.
func (a *Archiver) OrderStatusByID(oid order.OrderID, base, quote uint32) (order.OrderStatus, order.OrderType, int64, error) {
pgStatus, orderType, filled, err := a.orderStatusByID(oid, base, quote)
return pgToMarketStatus(pgStatus), orderType, filled, err
}
func (a *Archiver) orderStatusByID(oid order.OrderID, base, quote uint32) (pgOrderStatus, order.OrderType, int64, error) {
marketSchema, err := a.marketSchema(base, quote)
if err != nil {
return orderStatusUnknown, order.UnknownOrderType, -1, err
}
status, orderType, filled, err := orderStatus(a.db, oid, a.dbName, marketSchema)
if db.IsErrOrderUnknown(err) {
status, err = cancelOrderStatus(a.db, oid, a.dbName, marketSchema)
if err != nil {
// The severity of an unknown order is up to the caller.
if !db.IsErrOrderUnknown(err) {
a.fatalBackendErr(err)
}
return orderStatusUnknown, order.UnknownOrderType, -1, err // includes ErrUnknownOrder
}
filled = -1
orderType = order.CancelOrderType
}
return status, orderType, filled, err
}
// OrderStatus gets the status, ID, and filled amount of the given order. See
// also OrderStatusByID.
func (a *Archiver) OrderStatus(ord order.Order) (order.OrderStatus, order.OrderType, int64, error) {
return a.OrderStatusByID(ord.ID(), ord.Base(), ord.Quote())
}
func (a *Archiver) orderStatus(ord order.Order) (pgOrderStatus, order.OrderType, int64, error) {
return a.orderStatusByID(ord.ID(), ord.Base(), ord.Quote())
}
// UpdateOrderStatusByID updates the status and filled amount of the order with
// the given OrderID in the market specified by a base and quote asset. If
// filled is -1, the filled amount is unchanged. For cancel orders, the filled
// amount is ignored. OrderStatusByID is used to locate the existing order. If
// the order is not found, the error value is ErrUnknownOrder, and the type is
// market/order.OrderStatusUnknown. See also UpdateOrderStatus.
func (a *Archiver) UpdateOrderStatusByID(oid order.OrderID, base, quote uint32, status order.OrderStatus, filled int64) error {
return a.updateOrderStatusByID(oid, base, quote, marketToPgStatus(status), filled)
}
func (a *Archiver) updateOrderStatusByID(oid order.OrderID, base, quote uint32, status pgOrderStatus, filled int64) error {
marketSchema, err := a.marketSchema(base, quote)
if err != nil {
return err
}
initStatus, orderType, initFilled, err := a.orderStatusByID(oid, base, quote)
if err != nil {
return err
}
if initStatus == status && filled == initFilled {
log.Tracef("Not updating order with no status or filled amount change: %v.", oid)
return nil
}
if filled == -1 {
filled = initFilled
}
tableChange := status.active() != initStatus.active()
if !initStatus.active() {
if tableChange {
return fmt.Errorf("Moving an order from an archived to active status: "+
"Order %s (%s -> %s)", oid, initStatus, status)
}
log.Infof("Archived order is changing status: Order %s (%s -> %s)",
oid, initStatus, status)
}
switch orderType {
case order.LimitOrderType, order.MarketOrderType:
srcTableName := fullOrderTableName(a.dbName, marketSchema, initStatus.active())
if tableChange {
dstTableName := fullOrderTableName(a.dbName, marketSchema, status.active())
return a.moveOrder(oid, srcTableName, dstTableName, status, filled)
}
// No table move, just update the order.
return updateOrderStatusAndFilledAmt(a.db, srcTableName, oid, status, uint64(filled))
case order.CancelOrderType:
srcTableName := fullCancelOrderTableName(a.dbName, marketSchema, initStatus.active())
if tableChange {
dstTableName := fullCancelOrderTableName(a.dbName, marketSchema, status.active())
return a.moveCancelOrder(oid, srcTableName, dstTableName, status)
}
// No table move, just update the order.
return updateCancelOrderStatus(a.db, srcTableName, oid, status)
default:
return fmt.Errorf("unsupported order type: %v", orderType)
}
}
// UpdateOrderStatus updates the status and filled amount of the given order.
// Both the market and new filled amount are determined from the Order.
// OrderStatusByID is used to locate the existing order. See also
// UpdateOrderStatusByID.
func (a *Archiver) UpdateOrderStatus(ord order.Order, status order.OrderStatus) error {
return a.updateOrderStatus(ord, marketToPgStatus(status))
}
func (a *Archiver) updateOrderStatus(ord order.Order, status pgOrderStatus) error {
var filled int64
if ord.Type() != order.CancelOrderType {
filled = int64(ord.Trade().Filled())
}
return a.updateOrderStatusByID(ord.ID(), ord.Base(), ord.Quote(), status, filled)
}
func (a *Archiver) moveOrder(oid order.OrderID, srcTableName, dstTableName string, status pgOrderStatus, filled int64) error {
// Move the order, updating status and filled amount.
moved, err := moveOrder(a.db, srcTableName, dstTableName, oid,
status, uint64(filled))
if err != nil {
a.fatalBackendErr(err)
return err
}
if !moved {
return fmt.Errorf("order %s not moved from %s to %s", oid, srcTableName, dstTableName)
}
return nil
}
func (a *Archiver) moveCancelOrder(oid order.OrderID, srcTableName, dstTableName string, status pgOrderStatus) error {
// Move the order, updating status and filled amount.
moved, err := moveCancelOrder(a.db, srcTableName, dstTableName, oid,
status)
if err != nil {
a.fatalBackendErr(err)
return err
}