From 872e9642ed41d7318d80fde36f4b0b0f61e2d0a8 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Thu, 11 Jun 2020 14:46:36 +0000 Subject: [PATCH] core: resolve review issues (6 of x). --- client/core/core.go | 81 +++++++++++++++++----------------------- client/core/core_test.go | 50 ++++++++++++++++++------- client/core/types.go | 19 +++++++++- 3 files changed, 89 insertions(+), 61 deletions(-) diff --git a/client/core/core.go b/client/core/core.go index ee50a1d349..ad9118a0d7 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -76,47 +76,45 @@ type dexConnection struct { regConfMtx sync.RWMutex regConfirms uint32 - - pendingSuspendsMtx sync.Mutex - pendingSuspends map[string]*time.Timer } // suspended returns the suspended status of the provided market. func (dc *dexConnection) suspended(mkt string) bool { dc.marketMtx.Lock() defer dc.marketMtx.Unlock() + market, ok := dc.marketMap[mkt] if !ok { return false } - return market.Suspended + return market.GetSuspended() } // suspend halts trading for the provided market. func (dc *dexConnection) suspend(mkt string) error { dc.marketMtx.Lock() - defer dc.marketMtx.Unlock() - market, ok := dc.marketMap[mkt] + dc.marketMtx.Unlock() if !ok { return fmt.Errorf("no market found with ID %s", mkt) } - market.Suspended = true + market.SetSuspended(true) + return nil } // resume commences trading for the provided market. func (dc *dexConnection) resume(mkt string) error { dc.marketMtx.Lock() - defer dc.marketMtx.Unlock() - market, ok := dc.marketMap[mkt] + dc.marketMtx.Unlock() if !ok { return fmt.Errorf("no market found with ID %s", mkt) } - market.Suspended = false + market.SetSuspended(false) + return nil } @@ -140,7 +138,7 @@ func (dc *dexConnection) refreshMarkets() map[string]*Market { EpochLen: mkt.EpochLen, StartEpoch: mkt.StartEpoch, MarketBuyBuffer: mkt.MarketBuyBuffer, - Suspended: dc.suspended(mkt.Name), + suspended: dc.suspended(mkt.Name), } mid := market.marketName() dc.tradeMtx.RLock() @@ -2359,18 +2357,17 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) { // Create the dexConnection and listen for incoming messages. dc := &dexConnection{ - WsConn: conn, - connMaster: connMaster, - assets: assets, - cfg: dexCfg, - books: make(map[string]*bookie), - acct: newDEXAccount(acctInfo), - marketMap: marketMap, - trades: make(map[order.OrderID]*trackedTrade), - notify: c.notify, - epoch: epochMap, - connected: true, - pendingSuspends: make(map[string]*time.Timer), + WsConn: conn, + connMaster: connMaster, + assets: assets, + cfg: dexCfg, + books: make(map[string]*bookie), + acct: newDEXAccount(acctInfo), + marketMap: marketMap, + trades: make(map[order.OrderID]*trackedTrade), + notify: c.notify, + epoch: epochMap, + connected: true, } dc.refreshMarkets() @@ -2480,26 +2477,26 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) return fmt.Errorf("trade suspension unmarshal error: %v", err) } - dc.pendingSuspendsMtx.Lock() - defer dc.pendingSuspendsMtx.Unlock() - // Ensure the provided market exists for the dex. - dc.marketMtx.RLock() + dc.marketMtx.Lock() mkt, ok := dc.marketMap[sp.MarketID] - dc.marketMtx.RUnlock() + dc.marketMtx.Unlock() if !ok { return fmt.Errorf("no market found with ID %s", sp.MarketID) } // Ensure the market is not already suspended. - if mkt.Suspended { + mkt.mtx.Lock() + defer mkt.mtx.Unlock() + + if mkt.suspended { return fmt.Errorf("market %s for dex %s is already suspended", sp.MarketID, dc.acct.host) } - // Remove pending suspends for the market. - if sched := dc.pendingSuspends[sp.MarketID]; sched != nil { - if !sched.Stop() { + // Stop the current pending suspend. + if mkt.pendingSuspend != nil { + if !mkt.pendingSuspend.Stop() { // TODO: too late, timer already fired. Need to request the // current configuration for the market at this point. return fmt.Errorf("unable to stop previously scheduled "+ @@ -2507,9 +2504,9 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) } } - // Set the new scheduled suspend. + // Set a new scheduled suspend. duration := time.Until(encode.UnixTimeMilli(int64(sp.SuspendTime))) - dc.pendingSuspends[sp.MarketID] = time.AfterFunc(duration, func() { + mkt.pendingSuspend = time.AfterFunc(duration, func() { // Update the market as suspended. err := dc.suspend(sp.MarketID) if err != nil { @@ -2533,14 +2530,9 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) tracker.metaData.Host == dc.acct.host && (tracker.metaData.Status == order.OrderStatusEpoch || tracker.metaData.Status == order.OrderStatusBooked) { - md := tracker.metaData - md.Status = order.OrderStatusRevoked - metaOrder := &db.MetaOrder{ - MetaData: md, - Order: tracker.Order, - } - - err = tracker.db.UpdateOrder(metaOrder) + metaOrder := tracker.metaOrder() + metaOrder.MetaData.Status = order.OrderStatusRevoked + err := tracker.db.UpdateOrder(metaOrder) if err != nil { log.Errorf("unable to update order: %v", err) } @@ -2548,11 +2540,6 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) } dc.tradeMtx.RUnlock() } - - // Remove suspend after execution. - dc.pendingSuspendsMtx.Lock() - delete(dc.pendingSuspends, sp.MarketID) - dc.pendingSuspendsMtx.Unlock() }) return nil diff --git a/client/core/core_test.go b/client/core/core_test.go index 4a06cc75ee..b6e850117e 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -150,7 +150,7 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) { QuoteSymbol: tBTC.Symbol, EpochLen: 60000, MarketBuyBuffer: 1.1, - Suspended: false, + suspended: false, } return &dexConnection{ WsConn: conn, @@ -179,11 +179,10 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) { }, Fee: tFee, }, - notify: func(Notification) {}, - marketMap: map[string]*Market{tDcrBtcMktName: mkt}, - trades: make(map[order.OrderID]*trackedTrade), - epoch: map[string]uint64{tDcrBtcMktName: 0}, - pendingSuspends: make(map[string]*time.Timer), + notify: func(Notification) {}, + marketMap: map[string]*Market{tDcrBtcMktName: mkt}, + trades: make(map[order.OrderID]*trackedTrade), + epoch: map[string]uint64{tDcrBtcMktName: 0}, }, conn, acct } @@ -2948,12 +2947,34 @@ func TestAssetCounter(t *testing.T) { func TestHandleTradeSuspensionMsg(t *testing.T) { rig := newTestRig() + tCore := rig.core + dcrWallet, _ := newTWallet(tDCR.ID) + tCore.wallets[tDCR.ID] = dcrWallet + dcrWallet.address = "DsVmA7aqqWeKWy461hXjytbZbgCqbB8g2dq" + dcrWallet.Unlock(wPW, time.Hour) + + btcWallet, _ := newTWallet(tBTC.ID) + tCore.wallets[tBTC.ID] = btcWallet + btcWallet.address = "12DXGkvxFjuq5btXYkwWfBZaz1rVwFgini" + btcWallet.Unlock(wPW, time.Hour) + + handleLimit := func(msg *msgjson.Message, f msgFunc) error { + // Need to stamp and sign the message with the server's key. + msgOrder := new(msgjson.LimitOrder) + err := msg.Unmarshal(msgOrder) + if err != nil { + t.Fatalf("unmarshal error: %v", err) + } + lo := convertMsgLimitOrder(msgOrder) + f(orderResponse(msg.ID, msgOrder, lo, false, false, false)) + return nil + } + + rig.ws.queueResponse(msgjson.LimitRoute, handleLimit) + // Ensure a non-existent market cannot be suspended. payload := &msgjson.TradeSuspension{ - MarketID: "dcr_dcr", - FinalEpoch: 100, - SuspendTime: encode.UnixMilliU(time.Now().Add(time.Second * 2)), - Persist: true, + MarketID: "dcr_dcr", } req, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload) @@ -2963,7 +2984,7 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { "ID not found error: %v") } - mkt := "dcr_btc" + mkt := tDcrBtcMktName // Ensure an already suspended market cannot be suspended again. err = rig.dc.suspend(mkt) @@ -2974,7 +2995,7 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { payload = &msgjson.TradeSuspension{ MarketID: mkt, FinalEpoch: 100, - SuspendTime: encode.UnixMilliU(time.Now().Add(time.Second * 2)), + SuspendTime: encode.UnixMilliU(time.Now().Add(time.Millisecond * 20)), Persist: true, } @@ -2997,6 +3018,9 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err) } + // Wait for the suspend to execute. + time.Sleep(time.Millisecond * 40) + // Ensure trades for a suspended market generate an error. form := &TradeForm{ Host: tDexHost, @@ -3011,6 +3035,6 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { _, err = rig.core.Trade(tPW, form) if err == nil { - t.Fatalf("expected a trade suspension set error") + t.Fatalf("expected a suspension market error") } } diff --git a/client/core/types.go b/client/core/types.go index fc32305837..d5fa243db9 100644 --- a/client/core/types.go +++ b/client/core/types.go @@ -8,6 +8,7 @@ import ( "fmt" "strings" "sync" + "time" "decred.org/dcrdex/client/asset" "decred.org/dcrdex/client/db" @@ -158,7 +159,9 @@ type Market struct { StartEpoch uint64 `json:"startepoch"` MarketBuyBuffer float64 `json:"buybuffer"` Orders []*Order `json:"orders"` - Suspended bool `json:"suspended"` + pendingSuspend *time.Timer + suspended bool + mtx sync.Mutex } // Display returns an ID string suitable for displaying in a UI. @@ -171,6 +174,20 @@ func (m *Market) marketName() string { return marketName(m.BaseID, m.QuoteID) } +// GetSuspended returns the suspended state of the market. +func (m *Market) GetSuspended() bool { + m.mtx.Lock() + defer m.mtx.Unlock() + return m.suspended +} + +// SetSuspended states suspended state of the market. +func (m *Market) SetSuspended(state bool) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.suspended = state +} + // Exchange represents a single DEX with any number of markets. type Exchange struct { Host string `json:"host"`