From 53fea8d69f1e4bd4cb674dfa70b1833d53124288 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Wed, 20 May 2020 01:54:19 +0000 Subject: [PATCH 01/10] multi: handle trade suspension messages. This adds the trade suspension handler to the client core. --- client/core/core.go | 42 +++++++++++++++++++++++++++++++++++++++- client/core/core_test.go | 32 ++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/client/core/core.go b/client/core/core.go index cf97f1b5f1..525d6fdb64 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -2422,6 +2422,46 @@ func handleRevokeMatchMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) erro return tracker.db.UpdateOrder(metaOrder) } +// handleTradeSuspensionMsg is called when a trade suspension notification is received. +func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) error { + var suspension msgjson.TradeSuspension + err := msg.Unmarshal(&suspension) + if err != nil { + return fmt.Errorf("trade suspension unmarshal error: %v", err) + } + + go func(ctx context.Context, suspension *msgjson.TradeSuspension) { + nowMilli := encode.UnixMilliU(time.Now()) + + var duration time.Duration + if nowMilli < suspension.SuspendTime { + milliDiff := suspension.SuspendTime - nowMilli + duration = time.Duration(milliDiff * 1e6) + } + + timer := time.NewTimer(duration) + for { + select { + case <-ctx.Done(): + return + + case <-timer.C: + switch suspension.Persist { + case true: + // TODO: persist outstanding client orders. + return + + case false: + // TODO: purge outstanding client orders. + return + } + } + } + }(c.ctx, &suspension) + + return nil +} + // routeHandler is a handler for a message from the DEX. type routeHandler func(*Core, *dexConnection, *msgjson.Message) error @@ -2431,7 +2471,6 @@ var reqHandlers = map[string]routeHandler{ msgjson.AuditRoute: handleAuditRoute, msgjson.RedemptionRoute: handleRedemptionRoute, msgjson.RevokeMatchRoute: handleRevokeMatchMsg, - msgjson.SuspensionRoute: nil, } var noteHandlers = map[string]routeHandler{ @@ -2440,6 +2479,7 @@ var noteHandlers = map[string]routeHandler{ msgjson.EpochOrderRoute: handleEpochOrderMsg, msgjson.UnbookOrderRoute: handleUnbookOrderMsg, msgjson.UpdateRemainingRoute: handleUpdateRemainingMsg, + msgjson.SuspensionRoute: handleTradeSuspensionMsg, } // listen monitors the DEX websocket connection for server requests and diff --git a/client/core/core_test.go b/client/core/core_test.go index 10d79a9913..9afa5d6c17 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -2942,3 +2942,35 @@ func TestAssetCounter(t *testing.T) { t.Fatalf("absorbed counts not combined correctly") } } + +func TestHandleTradeSuspensionMsg(t *testing.T) { + rig := newTestRig() + payload := &msgjson.TradeSuspension{ + MarketID: "dcr_btc", + FinalEpoch: 100, + SuspendTime: encode.UnixMilliU(time.Now().Add(time.Hour * 24)), + Persist: false, + } + + req, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload) + err := handleTradeSuspensionMsg(rig.core, rig.dc, req) + if err != nil { + t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err) + } + + form := &TradeForm{ + DEX: tDexUrl, + IsLimit: true, + Sell: true, + Base: tDCR.ID, + Quote: tBTC.ID, + Qty: tDCR.LotSize * 10, + Rate: tBTC.RateStep * 1000, + TifNow: false, + } + + _, err = rig.core.Trade(tPW, form) + if err == nil { + t.Fatalf("expected a trade suspension set error") + } +} From 9c51dd83794fcf4f3bbfae61a84af268619ce7e0 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Fri, 22 May 2020 18:43:12 +0000 Subject: [PATCH 02/10] multi: handle persist behaviour. --- client/core/core.go | 37 ++++++++++++++++++++++++++++++++----- client/order/orderbook.go | 23 +++++++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/client/core/core.go b/client/core/core.go index 525d6fdb64..d6a5a2a3af 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -2442,18 +2442,45 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) timer := time.NewTimer(duration) for { select { - case <-ctx.Done(): + case <-c.ctx.Done(): return case <-timer.C: + dc.booksMtx.Lock() + defer dc.booksMtx.Unlock() + + bookie, ok := dc.books[suspension.MarketID] + if !ok { + log.Errorf("no bookie found with market id %s", + suspension.MarketID) + } + switch suspension.Persist { case true: - // TODO: persist outstanding client orders. - return + bookie.Reset() case false: - // TODO: purge outstanding client orders. - return + dc.tradeMtx.Lock() + toDelete := make([]*trackedTrade, 0) + for _, trade := range dc.trades { + if trade.mktID == suspension.MarketID { + // Remove unmatched trades + status := trade.metaData.Status + if status == order.OrderStatusBooked || + status == order.OrderStatusEpoch { + toDelete = append(toDelete, trade) + } + } + } + + for _, trade := range toDelete { + // TODO: delete the trade from the db, yet to figure + // out how to do that. + delete(dc.trades, trade.ID()) + } + dc.tradeMtx.Unlock() + + bookie.Reset() } } } diff --git a/client/order/orderbook.go b/client/order/orderbook.go index 8016ae0759..984782e4aa 100644 --- a/client/order/orderbook.go +++ b/client/order/orderbook.go @@ -437,6 +437,29 @@ func (ob *OrderBook) ResetEpoch() { ob.epochQueue.Reset() } +// Reset clears the orderbook and its associated epoch queue. This should be +// called when a trade suspension does not persist the orderbook. +func (ob *OrderBook) Reset() { + ob.seqMtx.Lock() + ob.seq = 0 + ob.seqMtx.Unlock() + + ob.marketID = "" + + ob.noteQueueMtx.Lock() + ob.noteQueue = make([]*cachedOrderNote, 0) + ob.noteQueueMtx.Unlock() + + ob.ordersMtx.Lock() + ob.orders = make(map[order.OrderID]*Order) + ob.ordersMtx.Unlock() + + ob.buys = NewBookSide(descending) + ob.sells = NewBookSide(ascending) + + ob.ResetEpoch() +} + // Enqueue appends the provided order note to the orderbook's epoch queue. func (ob *OrderBook) Enqueue(note *msgjson.EpochOrderNote) error { ob.setSeq(note.Seq) From 2934cea699beecb5a658bbf3df6793beb059652c Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Wed, 27 May 2020 21:12:52 +0000 Subject: [PATCH 03/10] multi: resolve review issues (1 of x). --- client/core/core.go | 157 ++++++++++++++++++++++++-------------- client/core/core_test.go | 18 +++-- client/core/types.go | 1 + client/db/bolt/db.go | 13 ++++ client/db/bolt/db_test.go | 33 ++++++++ client/db/interface.go | 3 + 6 files changed, 161 insertions(+), 64 deletions(-) diff --git a/client/core/core.go b/client/core/core.go index d6a5a2a3af..4548c325bf 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -76,6 +76,34 @@ type dexConnection struct { regConfMtx sync.RWMutex regConfirms uint32 + + pendingSuspendsMtx sync.Mutex + pendingSuspends map[string]*time.Timer +} + +// suspended returns the suspended status of the provided market. +func (dc *dexConnection) suspended(mkt string) bool { + dc.marketMtx.Lock() + defer dc.marketMtx.Unlock() + suspended, ok := dc.marketMap[mkt] + if !ok { + return false + } + return suspended.Suspended +} + +// suspend halts trading for the provided market. +func (dc *dexConnection) suspend(mkt string) { + dc.marketMtx.Lock() + dc.marketMap[mkt].Suspended = true + dc.marketMtx.Unlock() +} + +// resume commences trading for the provided market. +func (dc *dexConnection) resume(mkt string) { + dc.marketMtx.Lock() + dc.marketMap[mkt].Suspended = false + dc.marketMtx.Unlock() } // refreshMarkets rebuilds, saves, and returns the market map. The map itself @@ -98,6 +126,7 @@ func (dc *dexConnection) refreshMarkets() map[string]*Market { EpochLen: mkt.EpochLen, StartEpoch: mkt.StartEpoch, MarketBuyBuffer: mkt.MarketBuyBuffer, + Suspended: dc.suspended(mkt.Name), } mid := market.marketName() dc.tradeMtx.RLock() @@ -1467,6 +1496,12 @@ func (c *Core) Trade(pw []byte, form *TradeForm) (*Order, error) { return nil, fmt.Errorf("order placed for unknown market") } + // Proceed with the order if there is no trade suspension + // scheduled for the market. + if dc.suspended(mktID) { + return nil, fmt.Errorf("suspended market") + } + rate, qty := form.Rate, form.Qty if form.IsLimit && rate == 0 { return nil, fmt.Errorf("zero-rate order not allowed") @@ -2310,17 +2345,18 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) { // Create the dexConnection and listen for incoming messages. dc := &dexConnection{ - WsConn: conn, - connMaster: connMaster, - assets: assets, - cfg: dexCfg, - books: make(map[string]*bookie), - acct: newDEXAccount(acctInfo), - marketMap: marketMap, - trades: make(map[order.OrderID]*trackedTrade), - notify: c.notify, - epoch: epochMap, - connected: true, + WsConn: conn, + connMaster: connMaster, + assets: assets, + cfg: dexCfg, + books: make(map[string]*bookie), + acct: newDEXAccount(acctInfo), + marketMap: marketMap, + trades: make(map[order.OrderID]*trackedTrade), + notify: c.notify, + epoch: epochMap, + connected: true, + pendingSuspends: make(map[string]*time.Timer), } dc.refreshMarkets() @@ -2424,67 +2460,72 @@ func handleRevokeMatchMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) erro // handleTradeSuspensionMsg is called when a trade suspension notification is received. func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) error { - var suspension msgjson.TradeSuspension - err := msg.Unmarshal(&suspension) + var sp msgjson.TradeSuspension + err := msg.Unmarshal(&sp) if err != nil { return fmt.Errorf("trade suspension unmarshal error: %v", err) } - go func(ctx context.Context, suspension *msgjson.TradeSuspension) { - nowMilli := encode.UnixMilliU(time.Now()) + dc.pendingSuspendsMtx.Lock() + defer dc.pendingSuspendsMtx.Unlock() - var duration time.Duration - if nowMilli < suspension.SuspendTime { - milliDiff := suspension.SuspendTime - nowMilli - duration = time.Duration(milliDiff * 1e6) + // Attempt to cancel and remove pending suspends. + if sched := dc.pendingSuspends[sp.MarketID]; sched != nil { + if !sched.Stop() { + // Drain the channel. + <-sched.C + return fmt.Errorf("market %s for dex %s is already suspended", + sp.MarketID, dc.acct.url) } + delete(dc.pendingSuspends, sp.MarketID) + } - timer := time.NewTimer(duration) - for { - select { - case <-c.ctx.Done(): - return + // Set the new scheduled suspend. + duration := time.Until(encode.UnixTimeMilli(int64(sp.SuspendTime))) + dc.pendingSuspends[sp.MarketID] = time.AfterFunc(duration, func() { + dc.pendingSuspendsMtx.Lock() + dc.suspend(sp.MarketID) + if bookie := dc.books[sp.MarketID]; bookie != nil { + bookie.Reset() + } + if !sp.Persist { + assets := strings.Split(sp.MarketID, "_") + base, ok := dex.BipSymbolID(assets[0]) + if !ok { + log.Errorf("no ID for base BIP symbol: %v", assets[0]) + } - case <-timer.C: - dc.booksMtx.Lock() - defer dc.booksMtx.Unlock() + quote, ok := dex.BipSymbolID(assets[1]) + if !ok { + log.Errorf("no ID for quote BIP symbol: %v", assets[1]) + } - bookie, ok := dc.books[suspension.MarketID] - if !ok { - log.Errorf("no bookie found with market id %s", - suspension.MarketID) + orders, err := c.db.ActiveDexMarketOrders([]byte(dc.acct.url), + encode.Uint32Bytes(base), encode.Uint32Bytes(quote)) + if err != nil { + log.Errorf("unable to fetch active dex marker orders: %v", err) + return + } + + // Revoke all active orders of the suspended market for the dex. + for _, entry := range orders { + md := entry.MetaData + md.Status = order.OrderStatusRevoked + metaOrder := &db.MetaOrder{ + MetaData: md, + Order: entry.Order, } - switch suspension.Persist { - case true: - bookie.Reset() - - case false: - dc.tradeMtx.Lock() - toDelete := make([]*trackedTrade, 0) - for _, trade := range dc.trades { - if trade.mktID == suspension.MarketID { - // Remove unmatched trades - status := trade.metaData.Status - if status == order.OrderStatusBooked || - status == order.OrderStatusEpoch { - toDelete = append(toDelete, trade) - } - } - } - - for _, trade := range toDelete { - // TODO: delete the trade from the db, yet to figure - // out how to do that. - delete(dc.trades, trade.ID()) - } - dc.tradeMtx.Unlock() - - bookie.Reset() + err := c.db.UpdateOrder(metaOrder) + if err != nil { + log.Errorf("unable to revoke order: %v", err) + return } } } - }(c.ctx, &suspension) + delete(dc.pendingSuspends, sp.MarketID) + dc.pendingSuspendsMtx.Unlock() + }) return nil } diff --git a/client/core/core_test.go b/client/core/core_test.go index 9afa5d6c17..1b3e438306 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -150,6 +150,7 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) { QuoteSymbol: tBTC.Symbol, EpochLen: 60000, MarketBuyBuffer: 1.1, + Suspended: false, } return &dexConnection{ WsConn: conn, @@ -178,10 +179,11 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) { }, Fee: tFee, }, - notify: func(Notification) {}, - marketMap: map[string]*Market{tDcrBtcMktName: mkt}, - trades: make(map[order.OrderID]*trackedTrade), - epoch: map[string]uint64{tDcrBtcMktName: 0}, + notify: func(Notification) {}, + marketMap: map[string]*Market{tDcrBtcMktName: mkt}, + trades: make(map[order.OrderID]*trackedTrade), + epoch: map[string]uint64{tDcrBtcMktName: 0}, + pendingSuspends: make(map[string]*time.Timer), }, conn, acct } @@ -268,6 +270,10 @@ func (tdb *TDB) ActiveOrders() ([]*db.MetaOrder, error) { return nil, nil } +func (tdb *TDB) ActiveDexMarketOrders(dex []byte, base []byte, quote []byte) ([]*db.MetaOrder, error) { + return []*db.MetaOrder{}, nil +} + func (tdb *TDB) AccountOrders(dex string, n int, since uint64) ([]*db.MetaOrder, error) { return nil, nil } @@ -2948,8 +2954,8 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { payload := &msgjson.TradeSuspension{ MarketID: "dcr_btc", FinalEpoch: 100, - SuspendTime: encode.UnixMilliU(time.Now().Add(time.Hour * 24)), - Persist: false, + SuspendTime: encode.UnixMilliU(time.Now().Add(time.Second * 2)), + Persist: true, } req, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload) diff --git a/client/core/types.go b/client/core/types.go index 54c4bd530d..fc32305837 100644 --- a/client/core/types.go +++ b/client/core/types.go @@ -158,6 +158,7 @@ type Market struct { StartEpoch uint64 `json:"startepoch"` MarketBuyBuffer float64 `json:"buybuffer"` Orders []*Order `json:"orders"` + Suspended bool `json:"suspended"` } // Display returns an ID string suitable for displaying in a UI. diff --git a/client/db/bolt/db.go b/client/db/bolt/db.go index 19a9231f7e..63aaa3f392 100644 --- a/client/db/bolt/db.go +++ b/client/db/bolt/db.go @@ -305,6 +305,19 @@ func (db *boltDB) ActiveOrders() ([]*dexdb.MetaOrder, error) { }) } +// ActiveDexMarketOrders retrieves active orders associated with the +// provided dex, base and quote assets. +func (db *boltDB) ActiveDexMarketOrders(dex []byte, base []byte, quote []byte) ([]*dexdb.MetaOrder, error) { + return db.filteredOrders(func(oBkt *bbolt.Bucket) bool { + statusB := oBkt.Get(statusKey) + dexB := oBkt.Get(dexKey) + baseB := oBkt.Get(baseKey) + quoteB := oBkt.Get(quoteKey) + return bEqual(base, baseB) && bEqual(quote, quoteB) && + bEqual(dex, dexB) && (bEqual(statusB, byteEpoch) || bEqual(statusB, byteBooked)) + }) +} + // AccountOrders retrieves all orders associated with the specified DEX. n = 0 // applies no limit on number of orders returned. since = 0 is equivalent to // disabling the time filter, since no orders were created before before 1970. diff --git a/client/db/bolt/db_test.go b/client/db/bolt/db_test.go index 807a0e9c67..f6fd9d73ff 100644 --- a/client/db/bolt/db_test.go +++ b/client/db/bolt/db_test.go @@ -12,6 +12,7 @@ import ( "decred.org/dcrdex/client/db" dbtest "decred.org/dcrdex/client/db/test" + "decred.org/dcrdex/dex/encode" "decred.org/dcrdex/dex/order" ordertest "decred.org/dcrdex/dex/order/test" "github.com/decred/slog" @@ -533,6 +534,38 @@ func TestOrders(t *testing.T) { if err == nil { t.Fatalf("no error encountered for updating unknown order change coin") } + + numToDo = 5 + numActiveDex := numToDo + acct := acct1 + base, quote := uint32(42), uint32(0) // base - dcr, quote - btc. + nTimes(numToDo, func(i int) { + status := order.OrderStatusBooked + ord := randOrderForMarket(base, quote) + order := &db.MetaOrder{ + MetaData: &db.OrderMetaData{ + Status: status, + DEX: acct.URL, + Proof: db.OrderProof{DEXSig: randBytes(73)}, + }, + Order: ord, + } + + err := boltdb.UpdateOrder(order) + if err != nil { + t.Fatalf("error inserting order: %v", err) + } + }) + + activeDexOrders, err := boltdb.ActiveDexMarketOrders([]byte(acct.URL), + encode.Uint32Bytes(base), encode.Uint32Bytes(quote)) + if err != nil { + t.Fatalf("error retrieving active dex orders: %v", err) + } + if len(activeDexOrders) != numActiveDex { + t.Fatalf("expected %d active dex orders, got %d", + numActiveDex, len(activeDexOrders)) + } } func TestMatches(t *testing.T) { diff --git a/client/db/interface.go b/client/db/interface.go index 3327c58d76..0a6202605e 100644 --- a/client/db/interface.go +++ b/client/db/interface.go @@ -37,6 +37,9 @@ type DB interface { // ActiveOrders retrieves all orders which appear to be in an active state, // which is either in the epoch queue or in the order book. ActiveOrders() ([]*MetaOrder, error) + // ActiveDexMarketOrders retrieves active orders associated with the + // provided dex, base and quote assets. + ActiveDexMarketOrders(dex []byte, base []byte, quote []byte) ([]*MetaOrder, error) // AccountOrders retrieves all orders associated with the specified DEX. The // order count can be limited by supplying a non-zero n value. In that case // the newest n orders will be returned. The orders can be additionally From b1878d2229bf5597ebf424cc06143709e6ca8481 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Wed, 27 May 2020 23:32:07 +0000 Subject: [PATCH 04/10] core: avoid draining timer channel. --- client/core/core.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/client/core/core.go b/client/core/core.go index 4548c325bf..ac12309e69 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -2472,8 +2472,6 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) // Attempt to cancel and remove pending suspends. if sched := dc.pendingSuspends[sp.MarketID]; sched != nil { if !sched.Stop() { - // Drain the channel. - <-sched.C return fmt.Errorf("market %s for dex %s is already suspended", sp.MarketID, dc.acct.url) } From 328681d99b733f4973558987a47e556ab9e8951b Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Thu, 28 May 2020 22:43:57 +0000 Subject: [PATCH 05/10] multi: resolve review issues (2 of x). --- client/core/core.go | 56 ++++++++++++++++++--------------------- client/core/core_test.go | 6 +---- client/db/bolt/db.go | 13 --------- client/db/bolt/db_test.go | 33 ----------------------- client/db/interface.go | 3 --- 5 files changed, 27 insertions(+), 84 deletions(-) diff --git a/client/core/core.go b/client/core/core.go index ac12309e69..f9984617ae 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -85,11 +85,11 @@ type dexConnection struct { func (dc *dexConnection) suspended(mkt string) bool { dc.marketMtx.Lock() defer dc.marketMtx.Unlock() - suspended, ok := dc.marketMap[mkt] + market, ok := dc.marketMap[mkt] if !ok { return false } - return suspended.Suspended + return market.Suspended } // suspend halts trading for the provided market. @@ -2473,7 +2473,7 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) if sched := dc.pendingSuspends[sp.MarketID]; sched != nil { if !sched.Stop() { return fmt.Errorf("market %s for dex %s is already suspended", - sp.MarketID, dc.acct.url) + sp.MarketID, dc.acct.host) } delete(dc.pendingSuspends, sp.MarketID) } @@ -2482,47 +2482,43 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) duration := time.Until(encode.UnixTimeMilli(int64(sp.SuspendTime))) dc.pendingSuspends[sp.MarketID] = time.AfterFunc(duration, func() { dc.pendingSuspendsMtx.Lock() + defer dc.pendingSuspendsMtx.Unlock() dc.suspend(sp.MarketID) if bookie := dc.books[sp.MarketID]; bookie != nil { bookie.Reset() } if !sp.Persist { - assets := strings.Split(sp.MarketID, "_") - base, ok := dex.BipSymbolID(assets[0]) + mkt, ok := dc.marketMap[sp.MarketID] if !ok { - log.Errorf("no ID for base BIP symbol: %v", assets[0]) + log.Errorf("no market found with ID %s", sp.MarketID) } - quote, ok := dex.BipSymbolID(assets[1]) - if !ok { - log.Errorf("no ID for quote BIP symbol: %v", assets[1]) - } - - orders, err := c.db.ActiveDexMarketOrders([]byte(dc.acct.url), - encode.Uint32Bytes(base), encode.Uint32Bytes(quote)) - if err != nil { - log.Errorf("unable to fetch active dex marker orders: %v", err) - return - } + dc.tradeMtx.Lock() + defer dc.tradeMtx.Unlock() // Revoke all active orders of the suspended market for the dex. - for _, entry := range orders { - md := entry.MetaData - md.Status = order.OrderStatusRevoked - metaOrder := &db.MetaOrder{ - MetaData: md, - Order: entry.Order, - } - - err := c.db.UpdateOrder(metaOrder) - if err != nil { - log.Errorf("unable to revoke order: %v", err) - return + for _, tracker := range dc.trades { + if tracker.Order.Base() == mkt.BaseID && + tracker.Order.Quote() == mkt.QuoteID && + tracker.metaData.Host == dc.acct.host && + (tracker.metaData.Status == order.OrderStatusEpoch || + tracker.metaData.Status == order.OrderStatusBooked) { + + md := tracker.metaData + md.Status = order.OrderStatusRevoked + metaOrder := &db.MetaOrder{ + MetaData: md, + Order: tracker.Order, + } + + err = tracker.db.UpdateOrder(metaOrder) + if err != nil { + log.Errorf("unable to update order: %v", err) + } } } } delete(dc.pendingSuspends, sp.MarketID) - dc.pendingSuspendsMtx.Unlock() }) return nil diff --git a/client/core/core_test.go b/client/core/core_test.go index 1b3e438306..d70e9ec7e0 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -270,10 +270,6 @@ func (tdb *TDB) ActiveOrders() ([]*db.MetaOrder, error) { return nil, nil } -func (tdb *TDB) ActiveDexMarketOrders(dex []byte, base []byte, quote []byte) ([]*db.MetaOrder, error) { - return []*db.MetaOrder{}, nil -} - func (tdb *TDB) AccountOrders(dex string, n int, since uint64) ([]*db.MetaOrder, error) { return nil, nil } @@ -2965,7 +2961,7 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { } form := &TradeForm{ - DEX: tDexUrl, + Host: tDexHost, IsLimit: true, Sell: true, Base: tDCR.ID, diff --git a/client/db/bolt/db.go b/client/db/bolt/db.go index 63aaa3f392..19a9231f7e 100644 --- a/client/db/bolt/db.go +++ b/client/db/bolt/db.go @@ -305,19 +305,6 @@ func (db *boltDB) ActiveOrders() ([]*dexdb.MetaOrder, error) { }) } -// ActiveDexMarketOrders retrieves active orders associated with the -// provided dex, base and quote assets. -func (db *boltDB) ActiveDexMarketOrders(dex []byte, base []byte, quote []byte) ([]*dexdb.MetaOrder, error) { - return db.filteredOrders(func(oBkt *bbolt.Bucket) bool { - statusB := oBkt.Get(statusKey) - dexB := oBkt.Get(dexKey) - baseB := oBkt.Get(baseKey) - quoteB := oBkt.Get(quoteKey) - return bEqual(base, baseB) && bEqual(quote, quoteB) && - bEqual(dex, dexB) && (bEqual(statusB, byteEpoch) || bEqual(statusB, byteBooked)) - }) -} - // AccountOrders retrieves all orders associated with the specified DEX. n = 0 // applies no limit on number of orders returned. since = 0 is equivalent to // disabling the time filter, since no orders were created before before 1970. diff --git a/client/db/bolt/db_test.go b/client/db/bolt/db_test.go index f6fd9d73ff..807a0e9c67 100644 --- a/client/db/bolt/db_test.go +++ b/client/db/bolt/db_test.go @@ -12,7 +12,6 @@ import ( "decred.org/dcrdex/client/db" dbtest "decred.org/dcrdex/client/db/test" - "decred.org/dcrdex/dex/encode" "decred.org/dcrdex/dex/order" ordertest "decred.org/dcrdex/dex/order/test" "github.com/decred/slog" @@ -534,38 +533,6 @@ func TestOrders(t *testing.T) { if err == nil { t.Fatalf("no error encountered for updating unknown order change coin") } - - numToDo = 5 - numActiveDex := numToDo - acct := acct1 - base, quote := uint32(42), uint32(0) // base - dcr, quote - btc. - nTimes(numToDo, func(i int) { - status := order.OrderStatusBooked - ord := randOrderForMarket(base, quote) - order := &db.MetaOrder{ - MetaData: &db.OrderMetaData{ - Status: status, - DEX: acct.URL, - Proof: db.OrderProof{DEXSig: randBytes(73)}, - }, - Order: ord, - } - - err := boltdb.UpdateOrder(order) - if err != nil { - t.Fatalf("error inserting order: %v", err) - } - }) - - activeDexOrders, err := boltdb.ActiveDexMarketOrders([]byte(acct.URL), - encode.Uint32Bytes(base), encode.Uint32Bytes(quote)) - if err != nil { - t.Fatalf("error retrieving active dex orders: %v", err) - } - if len(activeDexOrders) != numActiveDex { - t.Fatalf("expected %d active dex orders, got %d", - numActiveDex, len(activeDexOrders)) - } } func TestMatches(t *testing.T) { diff --git a/client/db/interface.go b/client/db/interface.go index 0a6202605e..3327c58d76 100644 --- a/client/db/interface.go +++ b/client/db/interface.go @@ -37,9 +37,6 @@ type DB interface { // ActiveOrders retrieves all orders which appear to be in an active state, // which is either in the epoch queue or in the order book. ActiveOrders() ([]*MetaOrder, error) - // ActiveDexMarketOrders retrieves active orders associated with the - // provided dex, base and quote assets. - ActiveDexMarketOrders(dex []byte, base []byte, quote []byte) ([]*MetaOrder, error) // AccountOrders retrieves all orders associated with the specified DEX. The // order count can be limited by supplying a non-zero n value. In that case // the newest n orders will be returned. The orders can be additionally From 0424c6ce7159643f13599f121a0a3ed82fbcd202 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Fri, 29 May 2020 23:35:43 +0000 Subject: [PATCH 06/10] multi: resolve review issues (3 of x). --- client/core/core.go | 20 ++++++++++++++------ client/order/bookside.go | 9 +++++++++ client/order/orderbook.go | 28 +++++++++++++++++++++++----- 3 files changed, 46 insertions(+), 11 deletions(-) diff --git a/client/core/core.go b/client/core/core.go index f9984617ae..99f4632c0e 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -2481,29 +2481,32 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) // Set the new scheduled suspend. duration := time.Until(encode.UnixTimeMilli(int64(sp.SuspendTime))) dc.pendingSuspends[sp.MarketID] = time.AfterFunc(duration, func() { - dc.pendingSuspendsMtx.Lock() - defer dc.pendingSuspendsMtx.Unlock() dc.suspend(sp.MarketID) + + dc.booksMtx.RLock() if bookie := dc.books[sp.MarketID]; bookie != nil { bookie.Reset() } + dc.booksMtx.RUnlock() + if !sp.Persist { + dc.marketMtx.RLock() mkt, ok := dc.marketMap[sp.MarketID] if !ok { log.Errorf("no market found with ID %s", sp.MarketID) + dc.marketMtx.Unlock() + return } - - dc.tradeMtx.Lock() - defer dc.tradeMtx.Unlock() + dc.marketMtx.RUnlock() // Revoke all active orders of the suspended market for the dex. + dc.tradeMtx.RLock() for _, tracker := range dc.trades { if tracker.Order.Base() == mkt.BaseID && tracker.Order.Quote() == mkt.QuoteID && tracker.metaData.Host == dc.acct.host && (tracker.metaData.Status == order.OrderStatusEpoch || tracker.metaData.Status == order.OrderStatusBooked) { - md := tracker.metaData md.Status = order.OrderStatusRevoked metaOrder := &db.MetaOrder{ @@ -2517,8 +2520,13 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) } } } + dc.tradeMtx.RUnlock() } + + // Remove suspend after execution. + dc.pendingSuspendsMtx.Lock() delete(dc.pendingSuspends, sp.MarketID) + dc.pendingSuspendsMtx.Unlock() }) return nil diff --git a/client/order/bookside.go b/client/order/bookside.go index 96047e7956..20195eff34 100644 --- a/client/order/bookside.go +++ b/client/order/bookside.go @@ -131,6 +131,15 @@ func (d *bookSide) orders() []*Order { return orders } +// Reset clears out the book side. +func (d *bookSide) Reset() { + d.mtx.Lock() + defer d.mtx.Unlock() + + d.bins = make(map[uint64][]*Order) + d.rateIndex = NewRateIndex() +} + // BestNOrders returns the best N orders of the book side. func (d *bookSide) BestNOrders(n int) ([]*Order, bool) { d.mtx.RLock() diff --git a/client/order/orderbook.go b/client/order/orderbook.go index 984782e4aa..53e6fb0039 100644 --- a/client/order/orderbook.go +++ b/client/order/orderbook.go @@ -46,6 +46,7 @@ type cachedOrderNote struct { type OrderBook struct { seqMtx sync.Mutex seq uint64 + idMtx sync.Mutex marketID string noteQueue []*cachedOrderNote noteQueueMtx sync.Mutex @@ -180,7 +181,10 @@ func (ob *OrderBook) Sync(snapshot *msgjson.OrderBook) error { ob.seq = snapshot.Seq ob.seqMtx.Unlock() + ob.idMtx.Lock() ob.marketID = snapshot.MarketID + ob.idMtx.Unlock() + ob.orders = make(map[order.OrderID]*Order) ob.buys = NewBookSide(descending) ob.sells = NewBookSide(ascending) @@ -232,7 +236,11 @@ func (ob *OrderBook) Sync(snapshot *msgjson.OrderBook) error { // book is the workhorse of the exported Book function. It allows booking // cached and uncached order notes. func (ob *OrderBook) book(note *msgjson.BookOrderNote, cached bool) error { - if ob.marketID != note.MarketID { + ob.idMtx.Lock() + marketID := ob.marketID + ob.idMtx.Unlock() + + if marketID != note.MarketID { return fmt.Errorf("invalid note market id %s", note.MarketID) } @@ -287,7 +295,11 @@ func (ob *OrderBook) Book(note *msgjson.BookOrderNote) error { // updateRemaining is the workhorse of the exported UpdateRemaining function. It // allows updating cached and uncached orders. func (ob *OrderBook) updateRemaining(note *msgjson.UpdateRemainingNote, cached bool) error { - if ob.marketID != note.MarketID { + ob.idMtx.Lock() + marketID := ob.marketID + ob.idMtx.Unlock() + + if marketID != note.MarketID { return fmt.Errorf("invalid update_remaining note market id %s", note.MarketID) } @@ -329,7 +341,11 @@ func (ob *OrderBook) UpdateRemaining(note *msgjson.UpdateRemainingNote) error { // unbook is the workhorse of the exported Unbook function. It allows unbooking // cached and uncached order notes. func (ob *OrderBook) unbook(note *msgjson.UnbookOrderNote, cached bool) error { - if ob.marketID != note.MarketID { + ob.idMtx.Lock() + marketID := ob.marketID + ob.idMtx.Unlock() + + if marketID != note.MarketID { return fmt.Errorf("invalid note market id %s", note.MarketID) } @@ -444,7 +460,9 @@ func (ob *OrderBook) Reset() { ob.seq = 0 ob.seqMtx.Unlock() + ob.idMtx.Lock() ob.marketID = "" + ob.idMtx.Unlock() ob.noteQueueMtx.Lock() ob.noteQueue = make([]*cachedOrderNote, 0) @@ -454,8 +472,8 @@ func (ob *OrderBook) Reset() { ob.orders = make(map[order.OrderID]*Order) ob.ordersMtx.Unlock() - ob.buys = NewBookSide(descending) - ob.sells = NewBookSide(ascending) + ob.buys.Reset() + ob.sells.Reset() ob.ResetEpoch() } From bd13112e63ee986392f91d282bd3996cf03f6ab0 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Mon, 1 Jun 2020 14:35:13 +0000 Subject: [PATCH 07/10] core: resolve review issues (4 of x). --- client/core/core.go | 64 +++++++++++++++++++++++++++------------- client/core/core_test.go | 40 ++++++++++++++++++++++++- 2 files changed, 82 insertions(+), 22 deletions(-) diff --git a/client/core/core.go b/client/core/core.go index 99f4632c0e..aef2a56199 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -93,17 +93,31 @@ func (dc *dexConnection) suspended(mkt string) bool { } // suspend halts trading for the provided market. -func (dc *dexConnection) suspend(mkt string) { +func (dc *dexConnection) suspend(mkt string) error { dc.marketMtx.Lock() - dc.marketMap[mkt].Suspended = true - dc.marketMtx.Unlock() + defer dc.marketMtx.Unlock() + + market, ok := dc.marketMap[mkt] + if !ok { + return fmt.Errorf("no market found with ID %s", mkt) + } + + market.Suspended = true + return nil } // resume commences trading for the provided market. -func (dc *dexConnection) resume(mkt string) { +func (dc *dexConnection) resume(mkt string) error { dc.marketMtx.Lock() - dc.marketMap[mkt].Suspended = false - dc.marketMtx.Unlock() + defer dc.marketMtx.Unlock() + + market, ok := dc.marketMap[mkt] + if !ok { + return fmt.Errorf("no market found with ID %s", mkt) + } + + market.Suspended = false + return nil } // refreshMarkets rebuilds, saves, and returns the market map. The map itself @@ -2469,20 +2483,37 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) dc.pendingSuspendsMtx.Lock() defer dc.pendingSuspendsMtx.Unlock() - // Attempt to cancel and remove pending suspends. + // Ensure the provided market exists for the dex. + dc.marketMtx.RLock() + mkt, ok := dc.marketMap[sp.MarketID] + dc.marketMtx.RUnlock() + if !ok { + return fmt.Errorf("no market found with ID %s", sp.MarketID) + } + + // Ensure the market is not already suspended. + if mkt.Suspended { + return fmt.Errorf("market %s for dex %s is already suspended", + sp.MarketID, dc.acct.host) + } + + // Remove pending suspends for the market. if sched := dc.pendingSuspends[sp.MarketID]; sched != nil { - if !sched.Stop() { - return fmt.Errorf("market %s for dex %s is already suspended", - sp.MarketID, dc.acct.host) - } + sched.Stop() delete(dc.pendingSuspends, sp.MarketID) } // Set the new scheduled suspend. duration := time.Until(encode.UnixTimeMilli(int64(sp.SuspendTime))) dc.pendingSuspends[sp.MarketID] = time.AfterFunc(duration, func() { - dc.suspend(sp.MarketID) + // Update the market as suspended. + err := dc.suspend(sp.MarketID) + if err != nil { + log.Error(err) + return + } + // Clear the order book associated with the suspended market. dc.booksMtx.RLock() if bookie := dc.books[sp.MarketID]; bookie != nil { bookie.Reset() @@ -2490,15 +2521,6 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) dc.booksMtx.RUnlock() if !sp.Persist { - dc.marketMtx.RLock() - mkt, ok := dc.marketMap[sp.MarketID] - if !ok { - log.Errorf("no market found with ID %s", sp.MarketID) - dc.marketMtx.Unlock() - return - } - dc.marketMtx.RUnlock() - // Revoke all active orders of the suspended market for the dex. dc.tradeMtx.RLock() for _, tracker := range dc.trades { diff --git a/client/core/core_test.go b/client/core/core_test.go index d70e9ec7e0..4a06cc75ee 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -2947,8 +2947,10 @@ func TestAssetCounter(t *testing.T) { func TestHandleTradeSuspensionMsg(t *testing.T) { rig := newTestRig() + + // Ensure a non-existent market cannot be suspended. payload := &msgjson.TradeSuspension{ - MarketID: "dcr_btc", + MarketID: "dcr_dcr", FinalEpoch: 100, SuspendTime: encode.UnixMilliU(time.Now().Add(time.Second * 2)), Persist: true, @@ -2956,10 +2958,46 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { req, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload) err := handleTradeSuspensionMsg(rig.core, rig.dc, req) + if err == nil { + t.Fatal("[handleTradeSuspensionMsg] expected a market " + + "ID not found error: %v") + } + + mkt := "dcr_btc" + + // Ensure an already suspended market cannot be suspended again. + err = rig.dc.suspend(mkt) + if err != nil { + t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err) + } + + payload = &msgjson.TradeSuspension{ + MarketID: mkt, + FinalEpoch: 100, + SuspendTime: encode.UnixMilliU(time.Now().Add(time.Second * 2)), + Persist: true, + } + + req, _ = msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload) + err = handleTradeSuspensionMsg(rig.core, rig.dc, req) + if err == nil { + t.Fatal("[handleTradeSuspensionMsg] expected a market " + + "suspended error: %v") + } + + // Suspend a market. + err = rig.dc.resume(mkt) + if err != nil { + t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err) + } + + req, _ = msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload) + err = handleTradeSuspensionMsg(rig.core, rig.dc, req) if err != nil { t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err) } + // Ensure trades for a suspended market generate an error. form := &TradeForm{ Host: tDexHost, IsLimit: true, From fc088dbefb27151ea5c1d80f2a46a2f80879b83c Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Sat, 6 Jun 2020 03:18:48 +0000 Subject: [PATCH 08/10] multi: resolve review issues (5 of x). --- client/core/core.go | 16 ++++++++----- client/order/orderbook.go | 47 +++------------------------------------ 2 files changed, 13 insertions(+), 50 deletions(-) diff --git a/client/core/core.go b/client/core/core.go index aef2a56199..ee50a1d349 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -2499,8 +2499,12 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) // Remove pending suspends for the market. if sched := dc.pendingSuspends[sp.MarketID]; sched != nil { - sched.Stop() - delete(dc.pendingSuspends, sp.MarketID) + if !sched.Stop() { + // TODO: too late, timer already fired. Need to request the + // current configuration for the market at this point. + return fmt.Errorf("unable to stop previously scheduled "+ + "suspend for market %s on dex %s", sp.MarketID, dc.acct.host) + } } // Set the new scheduled suspend. @@ -2513,12 +2517,12 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) return } - // Clear the order book associated with the suspended market. - dc.booksMtx.RLock() + // Clear the bookie associated with the suspended market. + dc.booksMtx.Lock() if bookie := dc.books[sp.MarketID]; bookie != nil { - bookie.Reset() + dc.books[sp.MarketID] = newBookie(func() { c.unsub(dc, sp.MarketID) }) } - dc.booksMtx.RUnlock() + dc.booksMtx.Unlock() if !sp.Persist { // Revoke all active orders of the suspended market for the dex. diff --git a/client/order/orderbook.go b/client/order/orderbook.go index 53e6fb0039..8016ae0759 100644 --- a/client/order/orderbook.go +++ b/client/order/orderbook.go @@ -46,7 +46,6 @@ type cachedOrderNote struct { type OrderBook struct { seqMtx sync.Mutex seq uint64 - idMtx sync.Mutex marketID string noteQueue []*cachedOrderNote noteQueueMtx sync.Mutex @@ -181,10 +180,7 @@ func (ob *OrderBook) Sync(snapshot *msgjson.OrderBook) error { ob.seq = snapshot.Seq ob.seqMtx.Unlock() - ob.idMtx.Lock() ob.marketID = snapshot.MarketID - ob.idMtx.Unlock() - ob.orders = make(map[order.OrderID]*Order) ob.buys = NewBookSide(descending) ob.sells = NewBookSide(ascending) @@ -236,11 +232,7 @@ func (ob *OrderBook) Sync(snapshot *msgjson.OrderBook) error { // book is the workhorse of the exported Book function. It allows booking // cached and uncached order notes. func (ob *OrderBook) book(note *msgjson.BookOrderNote, cached bool) error { - ob.idMtx.Lock() - marketID := ob.marketID - ob.idMtx.Unlock() - - if marketID != note.MarketID { + if ob.marketID != note.MarketID { return fmt.Errorf("invalid note market id %s", note.MarketID) } @@ -295,11 +287,7 @@ func (ob *OrderBook) Book(note *msgjson.BookOrderNote) error { // updateRemaining is the workhorse of the exported UpdateRemaining function. It // allows updating cached and uncached orders. func (ob *OrderBook) updateRemaining(note *msgjson.UpdateRemainingNote, cached bool) error { - ob.idMtx.Lock() - marketID := ob.marketID - ob.idMtx.Unlock() - - if marketID != note.MarketID { + if ob.marketID != note.MarketID { return fmt.Errorf("invalid update_remaining note market id %s", note.MarketID) } @@ -341,11 +329,7 @@ func (ob *OrderBook) UpdateRemaining(note *msgjson.UpdateRemainingNote) error { // unbook is the workhorse of the exported Unbook function. It allows unbooking // cached and uncached order notes. func (ob *OrderBook) unbook(note *msgjson.UnbookOrderNote, cached bool) error { - ob.idMtx.Lock() - marketID := ob.marketID - ob.idMtx.Unlock() - - if marketID != note.MarketID { + if ob.marketID != note.MarketID { return fmt.Errorf("invalid note market id %s", note.MarketID) } @@ -453,31 +437,6 @@ func (ob *OrderBook) ResetEpoch() { ob.epochQueue.Reset() } -// Reset clears the orderbook and its associated epoch queue. This should be -// called when a trade suspension does not persist the orderbook. -func (ob *OrderBook) Reset() { - ob.seqMtx.Lock() - ob.seq = 0 - ob.seqMtx.Unlock() - - ob.idMtx.Lock() - ob.marketID = "" - ob.idMtx.Unlock() - - ob.noteQueueMtx.Lock() - ob.noteQueue = make([]*cachedOrderNote, 0) - ob.noteQueueMtx.Unlock() - - ob.ordersMtx.Lock() - ob.orders = make(map[order.OrderID]*Order) - ob.ordersMtx.Unlock() - - ob.buys.Reset() - ob.sells.Reset() - - ob.ResetEpoch() -} - // Enqueue appends the provided order note to the orderbook's epoch queue. func (ob *OrderBook) Enqueue(note *msgjson.EpochOrderNote) error { ob.setSeq(note.Seq) From 872e9642ed41d7318d80fde36f4b0b0f61e2d0a8 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Thu, 11 Jun 2020 14:46:36 +0000 Subject: [PATCH 09/10] core: resolve review issues (6 of x). --- client/core/core.go | 81 +++++++++++++++++----------------------- client/core/core_test.go | 50 ++++++++++++++++++------- client/core/types.go | 19 +++++++++- 3 files changed, 89 insertions(+), 61 deletions(-) diff --git a/client/core/core.go b/client/core/core.go index ee50a1d349..ad9118a0d7 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -76,47 +76,45 @@ type dexConnection struct { regConfMtx sync.RWMutex regConfirms uint32 - - pendingSuspendsMtx sync.Mutex - pendingSuspends map[string]*time.Timer } // suspended returns the suspended status of the provided market. func (dc *dexConnection) suspended(mkt string) bool { dc.marketMtx.Lock() defer dc.marketMtx.Unlock() + market, ok := dc.marketMap[mkt] if !ok { return false } - return market.Suspended + return market.GetSuspended() } // suspend halts trading for the provided market. func (dc *dexConnection) suspend(mkt string) error { dc.marketMtx.Lock() - defer dc.marketMtx.Unlock() - market, ok := dc.marketMap[mkt] + dc.marketMtx.Unlock() if !ok { return fmt.Errorf("no market found with ID %s", mkt) } - market.Suspended = true + market.SetSuspended(true) + return nil } // resume commences trading for the provided market. func (dc *dexConnection) resume(mkt string) error { dc.marketMtx.Lock() - defer dc.marketMtx.Unlock() - market, ok := dc.marketMap[mkt] + dc.marketMtx.Unlock() if !ok { return fmt.Errorf("no market found with ID %s", mkt) } - market.Suspended = false + market.SetSuspended(false) + return nil } @@ -140,7 +138,7 @@ func (dc *dexConnection) refreshMarkets() map[string]*Market { EpochLen: mkt.EpochLen, StartEpoch: mkt.StartEpoch, MarketBuyBuffer: mkt.MarketBuyBuffer, - Suspended: dc.suspended(mkt.Name), + suspended: dc.suspended(mkt.Name), } mid := market.marketName() dc.tradeMtx.RLock() @@ -2359,18 +2357,17 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) { // Create the dexConnection and listen for incoming messages. dc := &dexConnection{ - WsConn: conn, - connMaster: connMaster, - assets: assets, - cfg: dexCfg, - books: make(map[string]*bookie), - acct: newDEXAccount(acctInfo), - marketMap: marketMap, - trades: make(map[order.OrderID]*trackedTrade), - notify: c.notify, - epoch: epochMap, - connected: true, - pendingSuspends: make(map[string]*time.Timer), + WsConn: conn, + connMaster: connMaster, + assets: assets, + cfg: dexCfg, + books: make(map[string]*bookie), + acct: newDEXAccount(acctInfo), + marketMap: marketMap, + trades: make(map[order.OrderID]*trackedTrade), + notify: c.notify, + epoch: epochMap, + connected: true, } dc.refreshMarkets() @@ -2480,26 +2477,26 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) return fmt.Errorf("trade suspension unmarshal error: %v", err) } - dc.pendingSuspendsMtx.Lock() - defer dc.pendingSuspendsMtx.Unlock() - // Ensure the provided market exists for the dex. - dc.marketMtx.RLock() + dc.marketMtx.Lock() mkt, ok := dc.marketMap[sp.MarketID] - dc.marketMtx.RUnlock() + dc.marketMtx.Unlock() if !ok { return fmt.Errorf("no market found with ID %s", sp.MarketID) } // Ensure the market is not already suspended. - if mkt.Suspended { + mkt.mtx.Lock() + defer mkt.mtx.Unlock() + + if mkt.suspended { return fmt.Errorf("market %s for dex %s is already suspended", sp.MarketID, dc.acct.host) } - // Remove pending suspends for the market. - if sched := dc.pendingSuspends[sp.MarketID]; sched != nil { - if !sched.Stop() { + // Stop the current pending suspend. + if mkt.pendingSuspend != nil { + if !mkt.pendingSuspend.Stop() { // TODO: too late, timer already fired. Need to request the // current configuration for the market at this point. return fmt.Errorf("unable to stop previously scheduled "+ @@ -2507,9 +2504,9 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) } } - // Set the new scheduled suspend. + // Set a new scheduled suspend. duration := time.Until(encode.UnixTimeMilli(int64(sp.SuspendTime))) - dc.pendingSuspends[sp.MarketID] = time.AfterFunc(duration, func() { + mkt.pendingSuspend = time.AfterFunc(duration, func() { // Update the market as suspended. err := dc.suspend(sp.MarketID) if err != nil { @@ -2533,14 +2530,9 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) tracker.metaData.Host == dc.acct.host && (tracker.metaData.Status == order.OrderStatusEpoch || tracker.metaData.Status == order.OrderStatusBooked) { - md := tracker.metaData - md.Status = order.OrderStatusRevoked - metaOrder := &db.MetaOrder{ - MetaData: md, - Order: tracker.Order, - } - - err = tracker.db.UpdateOrder(metaOrder) + metaOrder := tracker.metaOrder() + metaOrder.MetaData.Status = order.OrderStatusRevoked + err := tracker.db.UpdateOrder(metaOrder) if err != nil { log.Errorf("unable to update order: %v", err) } @@ -2548,11 +2540,6 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) } dc.tradeMtx.RUnlock() } - - // Remove suspend after execution. - dc.pendingSuspendsMtx.Lock() - delete(dc.pendingSuspends, sp.MarketID) - dc.pendingSuspendsMtx.Unlock() }) return nil diff --git a/client/core/core_test.go b/client/core/core_test.go index 4a06cc75ee..b6e850117e 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -150,7 +150,7 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) { QuoteSymbol: tBTC.Symbol, EpochLen: 60000, MarketBuyBuffer: 1.1, - Suspended: false, + suspended: false, } return &dexConnection{ WsConn: conn, @@ -179,11 +179,10 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) { }, Fee: tFee, }, - notify: func(Notification) {}, - marketMap: map[string]*Market{tDcrBtcMktName: mkt}, - trades: make(map[order.OrderID]*trackedTrade), - epoch: map[string]uint64{tDcrBtcMktName: 0}, - pendingSuspends: make(map[string]*time.Timer), + notify: func(Notification) {}, + marketMap: map[string]*Market{tDcrBtcMktName: mkt}, + trades: make(map[order.OrderID]*trackedTrade), + epoch: map[string]uint64{tDcrBtcMktName: 0}, }, conn, acct } @@ -2948,12 +2947,34 @@ func TestAssetCounter(t *testing.T) { func TestHandleTradeSuspensionMsg(t *testing.T) { rig := newTestRig() + tCore := rig.core + dcrWallet, _ := newTWallet(tDCR.ID) + tCore.wallets[tDCR.ID] = dcrWallet + dcrWallet.address = "DsVmA7aqqWeKWy461hXjytbZbgCqbB8g2dq" + dcrWallet.Unlock(wPW, time.Hour) + + btcWallet, _ := newTWallet(tBTC.ID) + tCore.wallets[tBTC.ID] = btcWallet + btcWallet.address = "12DXGkvxFjuq5btXYkwWfBZaz1rVwFgini" + btcWallet.Unlock(wPW, time.Hour) + + handleLimit := func(msg *msgjson.Message, f msgFunc) error { + // Need to stamp and sign the message with the server's key. + msgOrder := new(msgjson.LimitOrder) + err := msg.Unmarshal(msgOrder) + if err != nil { + t.Fatalf("unmarshal error: %v", err) + } + lo := convertMsgLimitOrder(msgOrder) + f(orderResponse(msg.ID, msgOrder, lo, false, false, false)) + return nil + } + + rig.ws.queueResponse(msgjson.LimitRoute, handleLimit) + // Ensure a non-existent market cannot be suspended. payload := &msgjson.TradeSuspension{ - MarketID: "dcr_dcr", - FinalEpoch: 100, - SuspendTime: encode.UnixMilliU(time.Now().Add(time.Second * 2)), - Persist: true, + MarketID: "dcr_dcr", } req, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload) @@ -2963,7 +2984,7 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { "ID not found error: %v") } - mkt := "dcr_btc" + mkt := tDcrBtcMktName // Ensure an already suspended market cannot be suspended again. err = rig.dc.suspend(mkt) @@ -2974,7 +2995,7 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { payload = &msgjson.TradeSuspension{ MarketID: mkt, FinalEpoch: 100, - SuspendTime: encode.UnixMilliU(time.Now().Add(time.Second * 2)), + SuspendTime: encode.UnixMilliU(time.Now().Add(time.Millisecond * 20)), Persist: true, } @@ -2997,6 +3018,9 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err) } + // Wait for the suspend to execute. + time.Sleep(time.Millisecond * 40) + // Ensure trades for a suspended market generate an error. form := &TradeForm{ Host: tDexHost, @@ -3011,6 +3035,6 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { _, err = rig.core.Trade(tPW, form) if err == nil { - t.Fatalf("expected a trade suspension set error") + t.Fatalf("expected a suspension market error") } } diff --git a/client/core/types.go b/client/core/types.go index fc32305837..d5fa243db9 100644 --- a/client/core/types.go +++ b/client/core/types.go @@ -8,6 +8,7 @@ import ( "fmt" "strings" "sync" + "time" "decred.org/dcrdex/client/asset" "decred.org/dcrdex/client/db" @@ -158,7 +159,9 @@ type Market struct { StartEpoch uint64 `json:"startepoch"` MarketBuyBuffer float64 `json:"buybuffer"` Orders []*Order `json:"orders"` - Suspended bool `json:"suspended"` + pendingSuspend *time.Timer + suspended bool + mtx sync.Mutex } // Display returns an ID string suitable for displaying in a UI. @@ -171,6 +174,20 @@ func (m *Market) marketName() string { return marketName(m.BaseID, m.QuoteID) } +// GetSuspended returns the suspended state of the market. +func (m *Market) GetSuspended() bool { + m.mtx.Lock() + defer m.mtx.Unlock() + return m.suspended +} + +// SetSuspended states suspended state of the market. +func (m *Market) SetSuspended(state bool) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.suspended = state +} + // Exchange represents a single DEX with any number of markets. type Exchange struct { Host string `json:"host"` From dd16a0450aa2ba160322d953252ff82c6aa3e7ae Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Sun, 14 Jun 2020 18:51:34 +0000 Subject: [PATCH 10/10] multi: resolve review issues (7 of x). --- client/core/core.go | 8 ++++---- client/core/core_test.go | 18 +----------------- client/core/types.go | 8 ++++---- client/order/bookside.go | 9 --------- 4 files changed, 9 insertions(+), 34 deletions(-) diff --git a/client/core/core.go b/client/core/core.go index ad9118a0d7..01c10bbb44 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -87,7 +87,7 @@ func (dc *dexConnection) suspended(mkt string) bool { if !ok { return false } - return market.GetSuspended() + return market.Suspended() } // suspend halts trading for the provided market. @@ -99,7 +99,7 @@ func (dc *dexConnection) suspend(mkt string) error { return fmt.Errorf("no market found with ID %s", mkt) } - market.SetSuspended(true) + market.setSuspended(true) return nil } @@ -113,7 +113,7 @@ func (dc *dexConnection) resume(mkt string) error { return fmt.Errorf("no market found with ID %s", mkt) } - market.SetSuspended(false) + market.setSuspended(false) return nil } @@ -1510,7 +1510,7 @@ func (c *Core) Trade(pw []byte, form *TradeForm) (*Order, error) { // Proceed with the order if there is no trade suspension // scheduled for the market. - if dc.suspended(mktID) { + if mkt.Suspended() { return nil, fmt.Errorf("suspended market") } diff --git a/client/core/core_test.go b/client/core/core_test.go index b6e850117e..e37061b0e9 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -2950,28 +2950,12 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { tCore := rig.core dcrWallet, _ := newTWallet(tDCR.ID) tCore.wallets[tDCR.ID] = dcrWallet - dcrWallet.address = "DsVmA7aqqWeKWy461hXjytbZbgCqbB8g2dq" dcrWallet.Unlock(wPW, time.Hour) btcWallet, _ := newTWallet(tBTC.ID) tCore.wallets[tBTC.ID] = btcWallet - btcWallet.address = "12DXGkvxFjuq5btXYkwWfBZaz1rVwFgini" btcWallet.Unlock(wPW, time.Hour) - handleLimit := func(msg *msgjson.Message, f msgFunc) error { - // Need to stamp and sign the message with the server's key. - msgOrder := new(msgjson.LimitOrder) - err := msg.Unmarshal(msgOrder) - if err != nil { - t.Fatalf("unmarshal error: %v", err) - } - lo := convertMsgLimitOrder(msgOrder) - f(orderResponse(msg.ID, msgOrder, lo, false, false, false)) - return nil - } - - rig.ws.queueResponse(msgjson.LimitRoute, handleLimit) - // Ensure a non-existent market cannot be suspended. payload := &msgjson.TradeSuspension{ MarketID: "dcr_dcr", @@ -2986,7 +2970,7 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { mkt := tDcrBtcMktName - // Ensure an already suspended market cannot be suspended again. + // Ensure a suspended market cannot be resuspended. err = rig.dc.suspend(mkt) if err != nil { t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err) diff --git a/client/core/types.go b/client/core/types.go index d5fa243db9..16b607154f 100644 --- a/client/core/types.go +++ b/client/core/types.go @@ -174,15 +174,15 @@ func (m *Market) marketName() string { return marketName(m.BaseID, m.QuoteID) } -// GetSuspended returns the suspended state of the market. -func (m *Market) GetSuspended() bool { +// suspended returns the market's suspended state. +func (m *Market) Suspended() bool { m.mtx.Lock() defer m.mtx.Unlock() return m.suspended } -// SetSuspended states suspended state of the market. -func (m *Market) SetSuspended(state bool) { +// setSuspended sets the market's suspended state. +func (m *Market) setSuspended(state bool) { m.mtx.Lock() defer m.mtx.Unlock() m.suspended = state diff --git a/client/order/bookside.go b/client/order/bookside.go index 20195eff34..96047e7956 100644 --- a/client/order/bookside.go +++ b/client/order/bookside.go @@ -131,15 +131,6 @@ func (d *bookSide) orders() []*Order { return orders } -// Reset clears out the book side. -func (d *bookSide) Reset() { - d.mtx.Lock() - defer d.mtx.Unlock() - - d.bins = make(map[uint64][]*Order) - d.rateIndex = NewRateIndex() -} - // BestNOrders returns the best N orders of the book side. func (d *bookSide) BestNOrders(n int) ([]*Order, bool) { d.mtx.RLock()