diff --git a/client/core/core.go b/client/core/core.go index ee50a1d349..e8ec2a12ea 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -76,15 +76,13 @@ type dexConnection struct { regConfMtx sync.RWMutex regConfirms uint32 - - pendingSuspendsMtx sync.Mutex - pendingSuspends map[string]*time.Timer } // suspended returns the suspended status of the provided market. func (dc *dexConnection) suspended(mkt string) bool { dc.marketMtx.Lock() defer dc.marketMtx.Unlock() + market, ok := dc.marketMap[mkt] if !ok { return false @@ -2359,18 +2357,17 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) { // Create the dexConnection and listen for incoming messages. dc := &dexConnection{ - WsConn: conn, - connMaster: connMaster, - assets: assets, - cfg: dexCfg, - books: make(map[string]*bookie), - acct: newDEXAccount(acctInfo), - marketMap: marketMap, - trades: make(map[order.OrderID]*trackedTrade), - notify: c.notify, - epoch: epochMap, - connected: true, - pendingSuspends: make(map[string]*time.Timer), + WsConn: conn, + connMaster: connMaster, + assets: assets, + cfg: dexCfg, + books: make(map[string]*bookie), + acct: newDEXAccount(acctInfo), + marketMap: marketMap, + trades: make(map[order.OrderID]*trackedTrade), + notify: c.notify, + epoch: epochMap, + connected: true, } dc.refreshMarkets() @@ -2480,26 +2477,26 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) return fmt.Errorf("trade suspension unmarshal error: %v", err) } - dc.pendingSuspendsMtx.Lock() - defer dc.pendingSuspendsMtx.Unlock() - // Ensure the provided market exists for the dex. - dc.marketMtx.RLock() + dc.marketMtx.Lock() mkt, ok := dc.marketMap[sp.MarketID] - dc.marketMtx.RUnlock() + dc.marketMtx.Unlock() if !ok { return fmt.Errorf("no market found with ID %s", sp.MarketID) } // Ensure the market is not already suspended. + mkt.mtx.Lock() + defer mkt.mtx.Unlock() + if mkt.Suspended { return fmt.Errorf("market %s for dex %s is already suspended", sp.MarketID, dc.acct.host) } - // Remove pending suspends for the market. - if sched := dc.pendingSuspends[sp.MarketID]; sched != nil { - if !sched.Stop() { + // Stop the current pending suspend. + if mkt.pendingSuspend != nil { + if !mkt.pendingSuspend.Stop() { // TODO: too late, timer already fired. Need to request the // current configuration for the market at this point. return fmt.Errorf("unable to stop previously scheduled "+ @@ -2507,9 +2504,9 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) } } - // Set the new scheduled suspend. + // Set a new scheduled suspend. duration := time.Until(encode.UnixTimeMilli(int64(sp.SuspendTime))) - dc.pendingSuspends[sp.MarketID] = time.AfterFunc(duration, func() { + mkt.pendingSuspend = time.AfterFunc(duration, func() { // Update the market as suspended. err := dc.suspend(sp.MarketID) if err != nil { @@ -2533,14 +2530,9 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) tracker.metaData.Host == dc.acct.host && (tracker.metaData.Status == order.OrderStatusEpoch || tracker.metaData.Status == order.OrderStatusBooked) { - md := tracker.metaData - md.Status = order.OrderStatusRevoked - metaOrder := &db.MetaOrder{ - MetaData: md, - Order: tracker.Order, - } - - err = tracker.db.UpdateOrder(metaOrder) + metaOrder := tracker.metaOrder() + metaOrder.MetaData.Status = order.OrderStatusRevoked + err := tracker.db.UpdateOrder(metaOrder) if err != nil { log.Errorf("unable to update order: %v", err) } @@ -2548,11 +2540,6 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) } dc.tradeMtx.RUnlock() } - - // Remove suspend after execution. - dc.pendingSuspendsMtx.Lock() - delete(dc.pendingSuspends, sp.MarketID) - dc.pendingSuspendsMtx.Unlock() }) return nil diff --git a/client/core/core_test.go b/client/core/core_test.go index 4a06cc75ee..5fe49afe11 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -179,11 +179,10 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) { }, Fee: tFee, }, - notify: func(Notification) {}, - marketMap: map[string]*Market{tDcrBtcMktName: mkt}, - trades: make(map[order.OrderID]*trackedTrade), - epoch: map[string]uint64{tDcrBtcMktName: 0}, - pendingSuspends: make(map[string]*time.Timer), + notify: func(Notification) {}, + marketMap: map[string]*Market{tDcrBtcMktName: mkt}, + trades: make(map[order.OrderID]*trackedTrade), + epoch: map[string]uint64{tDcrBtcMktName: 0}, }, conn, acct } @@ -2948,12 +2947,34 @@ func TestAssetCounter(t *testing.T) { func TestHandleTradeSuspensionMsg(t *testing.T) { rig := newTestRig() + tCore := rig.core + dcrWallet, _ := newTWallet(tDCR.ID) + tCore.wallets[tDCR.ID] = dcrWallet + dcrWallet.address = "DsVmA7aqqWeKWy461hXjytbZbgCqbB8g2dq" + dcrWallet.Unlock(wPW, time.Hour) + + btcWallet, _ := newTWallet(tBTC.ID) + tCore.wallets[tBTC.ID] = btcWallet + btcWallet.address = "12DXGkvxFjuq5btXYkwWfBZaz1rVwFgini" + btcWallet.Unlock(wPW, time.Hour) + + handleLimit := func(msg *msgjson.Message, f msgFunc) error { + // Need to stamp and sign the message with the server's key. + msgOrder := new(msgjson.LimitOrder) + err := msg.Unmarshal(msgOrder) + if err != nil { + t.Fatalf("unmarshal error: %v", err) + } + lo := convertMsgLimitOrder(msgOrder) + f(orderResponse(msg.ID, msgOrder, lo, false, false, false)) + return nil + } + + rig.ws.queueResponse(msgjson.LimitRoute, handleLimit) + // Ensure a non-existent market cannot be suspended. payload := &msgjson.TradeSuspension{ - MarketID: "dcr_dcr", - FinalEpoch: 100, - SuspendTime: encode.UnixMilliU(time.Now().Add(time.Second * 2)), - Persist: true, + MarketID: "dcr_dcr", } req, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload) @@ -2963,7 +2984,7 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { "ID not found error: %v") } - mkt := "dcr_btc" + mkt := tDcrBtcMktName // Ensure an already suspended market cannot be suspended again. err = rig.dc.suspend(mkt) @@ -2974,7 +2995,7 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { payload = &msgjson.TradeSuspension{ MarketID: mkt, FinalEpoch: 100, - SuspendTime: encode.UnixMilliU(time.Now().Add(time.Second * 2)), + SuspendTime: encode.UnixMilliU(time.Now().Add(time.Millisecond * 20)), Persist: true, } @@ -2997,6 +3018,9 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err) } + // Wait for the suspend to execute. + time.Sleep(time.Millisecond * 40) + // Ensure trades for a suspended market generate an error. form := &TradeForm{ Host: tDexHost, @@ -3011,6 +3035,6 @@ func TestHandleTradeSuspensionMsg(t *testing.T) { _, err = rig.core.Trade(tPW, form) if err == nil { - t.Fatalf("expected a trade suspension set error") + t.Fatalf("expected a suspension market error") } } diff --git a/client/core/types.go b/client/core/types.go index fc32305837..a9351240e4 100644 --- a/client/core/types.go +++ b/client/core/types.go @@ -8,6 +8,7 @@ import ( "fmt" "strings" "sync" + "time" "decred.org/dcrdex/client/asset" "decred.org/dcrdex/client/db" @@ -159,6 +160,8 @@ type Market struct { MarketBuyBuffer float64 `json:"buybuffer"` Orders []*Order `json:"orders"` Suspended bool `json:"suspended"` + pendingSuspend *time.Timer + mtx sync.Mutex } // Display returns an ID string suitable for displaying in a UI.