From 93971d51d9572be4da7b6d5ee8505884fc94a7a3 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Mon, 15 Jun 2020 20:06:22 +0000 Subject: [PATCH] multi: handle trade suspension messages This adds the trade suspension handler to the client core. --- client/core/core.go | 125 ++++++++++++++++++++++++++++++++++++++- client/core/core_test.go | 80 +++++++++++++++++++++++++ client/core/types.go | 18 ++++++ 3 files changed, 222 insertions(+), 1 deletion(-) diff --git a/client/core/core.go b/client/core/core.go index 1876081915..f506353aac 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -78,6 +78,46 @@ type dexConnection struct { regConfirms *uint32 // nil regConfirms means no pending registration. } +// suspended returns the suspended status of the provided market. +func (dc *dexConnection) suspended(mkt string) bool { + dc.marketMtx.Lock() + defer dc.marketMtx.Unlock() + + market, ok := dc.marketMap[mkt] + if !ok { + return false + } + return market.Suspended() +} + +// suspend halts trading for the provided market. +func (dc *dexConnection) suspend(mkt string) error { + dc.marketMtx.Lock() + market, ok := dc.marketMap[mkt] + dc.marketMtx.Unlock() + if !ok { + return fmt.Errorf("no market found with ID %s", mkt) + } + + market.setSuspended(true) + + return nil +} + +// resume commences trading for the provided market. +func (dc *dexConnection) resume(mkt string) error { + dc.marketMtx.Lock() + market, ok := dc.marketMap[mkt] + dc.marketMtx.Unlock() + if !ok { + return fmt.Errorf("no market found with ID %s", mkt) + } + + market.setSuspended(false) + + return nil +} + // refreshMarkets rebuilds, saves, and returns the market map. The map itself // should be treated as read only. A new map is constructed and is assigned to // dc.marketMap under lock, and can be safely accessed with @@ -98,6 +138,7 @@ func (dc *dexConnection) refreshMarkets() map[string]*Market { EpochLen: mkt.EpochLen, StartEpoch: mkt.StartEpoch, MarketBuyBuffer: mkt.MarketBuyBuffer, + suspended: dc.suspended(mkt.Name), } mid := market.marketName() dc.tradeMtx.RLock() @@ -1495,6 +1536,12 @@ func (c *Core) Trade(pw []byte, form *TradeForm) (*Order, error) { return nil, fmt.Errorf("order placed for unknown market") } + // Proceed with the order if there is no trade suspension + // scheduled for the market. + if mkt.Suspended() { + return nil, fmt.Errorf("suspended market") + } + rate, qty := form.Rate, form.Qty if form.IsLimit && rate == 0 { return nil, fmt.Errorf("zero-rate order not allowed") @@ -2442,6 +2489,82 @@ func handleRevokeMatchMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) erro return tracker.db.UpdateMatch(&revokedMatch.MetaMatch) } +// handleTradeSuspensionMsg is called when a trade suspension notification is received. +func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) error { + var sp msgjson.TradeSuspension + err := msg.Unmarshal(&sp) + if err != nil { + return fmt.Errorf("trade suspension unmarshal error: %v", err) + } + + // Ensure the provided market exists for the dex. + dc.marketMtx.Lock() + mkt, ok := dc.marketMap[sp.MarketID] + dc.marketMtx.Unlock() + if !ok { + return fmt.Errorf("no market found with ID %s", sp.MarketID) + } + + // Ensure the market is not already suspended. + mkt.mtx.Lock() + defer mkt.mtx.Unlock() + + if mkt.suspended { + return fmt.Errorf("market %s for dex %s is already suspended", + sp.MarketID, dc.acct.host) + } + + // Stop the current pending suspend. + if mkt.pendingSuspend != nil { + if !mkt.pendingSuspend.Stop() { + // TODO: too late, timer already fired. Need to request the + // current configuration for the market at this point. + return fmt.Errorf("unable to stop previously scheduled "+ + "suspend for market %s on dex %s", sp.MarketID, dc.acct.host) + } + } + + // Set a new scheduled suspend. + duration := time.Until(encode.UnixTimeMilli(int64(sp.SuspendTime))) + mkt.pendingSuspend = time.AfterFunc(duration, func() { + // Update the market as suspended. + err := dc.suspend(sp.MarketID) + if err != nil { + log.Error(err) + return + } + + // Clear the bookie associated with the suspended market. + dc.booksMtx.Lock() + if bookie := dc.books[sp.MarketID]; bookie != nil { + dc.books[sp.MarketID] = newBookie(func() { c.unsub(dc, sp.MarketID) }) + } + dc.booksMtx.Unlock() + + if !sp.Persist { + // Revoke all active orders of the suspended market for the dex. + dc.tradeMtx.RLock() + for _, tracker := range dc.trades { + if tracker.Order.Base() == mkt.BaseID && + tracker.Order.Quote() == mkt.QuoteID && + tracker.metaData.Host == dc.acct.host && + (tracker.metaData.Status == order.OrderStatusEpoch || + tracker.metaData.Status == order.OrderStatusBooked) { + metaOrder := tracker.metaOrder() + metaOrder.MetaData.Status = order.OrderStatusRevoked + err := tracker.db.UpdateOrder(metaOrder) + if err != nil { + log.Errorf("unable to update order: %v", err) + } + } + } + dc.tradeMtx.RUnlock() + } + }) + + return nil +} + // routeHandler is a handler for a message from the DEX. type routeHandler func(*Core, *dexConnection, *msgjson.Message) error @@ -2451,7 +2574,6 @@ var reqHandlers = map[string]routeHandler{ msgjson.AuditRoute: handleAuditRoute, msgjson.RedemptionRoute: handleRedemptionRoute, msgjson.RevokeMatchRoute: handleRevokeMatchMsg, - msgjson.SuspensionRoute: nil, } var noteHandlers = map[string]routeHandler{ @@ -2460,6 +2582,7 @@ var noteHandlers = map[string]routeHandler{ msgjson.EpochOrderRoute: handleEpochOrderMsg, msgjson.UnbookOrderRoute: handleUnbookOrderMsg, msgjson.UpdateRemainingRoute: handleUpdateRemainingMsg, + msgjson.SuspensionRoute: handleTradeSuspensionMsg, } // listen monitors the DEX websocket connection for server requests and diff --git a/client/core/core_test.go b/client/core/core_test.go index bc2394dc7a..bac49da7c6 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -151,6 +151,7 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) { QuoteSymbol: tBTC.Symbol, EpochLen: 60000, MarketBuyBuffer: 1.1, + suspended: false, } return &dexConnection{ WsConn: conn, @@ -3230,3 +3231,82 @@ func TestAssetCounter(t *testing.T) { t.Fatalf("absorbed counts not combined correctly") } } + +func TestHandleTradeSuspensionMsg(t *testing.T) { + rig := newTestRig() + + tCore := rig.core + dcrWallet, _ := newTWallet(tDCR.ID) + tCore.wallets[tDCR.ID] = dcrWallet + dcrWallet.Unlock(wPW, time.Hour) + + btcWallet, _ := newTWallet(tBTC.ID) + tCore.wallets[tBTC.ID] = btcWallet + btcWallet.Unlock(wPW, time.Hour) + + // Ensure a non-existent market cannot be suspended. + payload := &msgjson.TradeSuspension{ + MarketID: "dcr_dcr", + } + + req, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload) + err := handleTradeSuspensionMsg(rig.core, rig.dc, req) + if err == nil { + t.Fatal("[handleTradeSuspensionMsg] expected a market " + + "ID not found error: %v") + } + + mkt := tDcrBtcMktName + + // Ensure a suspended market cannot be resuspended. + err = rig.dc.suspend(mkt) + if err != nil { + t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err) + } + + payload = &msgjson.TradeSuspension{ + MarketID: mkt, + FinalEpoch: 100, + SuspendTime: encode.UnixMilliU(time.Now().Add(time.Millisecond * 20)), + Persist: true, + } + + req, _ = msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload) + err = handleTradeSuspensionMsg(rig.core, rig.dc, req) + if err == nil { + t.Fatal("[handleTradeSuspensionMsg] expected a market " + + "suspended error: %v") + } + + // Suspend a market. + err = rig.dc.resume(mkt) + if err != nil { + t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err) + } + + req, _ = msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload) + err = handleTradeSuspensionMsg(rig.core, rig.dc, req) + if err != nil { + t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err) + } + + // Wait for the suspend to execute. + time.Sleep(time.Millisecond * 40) + + // Ensure trades for a suspended market generate an error. + form := &TradeForm{ + Host: tDexHost, + IsLimit: true, + Sell: true, + Base: tDCR.ID, + Quote: tBTC.ID, + Qty: tDCR.LotSize * 10, + Rate: tBTC.RateStep * 1000, + TifNow: false, + } + + _, err = rig.core.Trade(tPW, form) + if err == nil { + t.Fatalf("expected a suspension market error") + } +} diff --git a/client/core/types.go b/client/core/types.go index a6db9cf847..fe75e8f950 100644 --- a/client/core/types.go +++ b/client/core/types.go @@ -8,6 +8,7 @@ import ( "fmt" "strings" "sync" + "time" "decred.org/dcrdex/client/asset" "decred.org/dcrdex/client/db" @@ -148,6 +149,9 @@ type Market struct { StartEpoch uint64 `json:"startepoch"` MarketBuyBuffer float64 `json:"buybuffer"` Orders []*Order `json:"orders"` + pendingSuspend *time.Timer + suspended bool + mtx sync.Mutex } // Display returns an ID string suitable for displaying in a UI. @@ -160,6 +164,20 @@ func (m *Market) marketName() string { return marketName(m.BaseID, m.QuoteID) } +// suspended returns the market's suspended state. +func (m *Market) Suspended() bool { + m.mtx.Lock() + defer m.mtx.Unlock() + return m.suspended +} + +// setSuspended sets the market's suspended state. +func (m *Market) setSuspended(state bool) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.suspended = state +} + // Exchange represents a single DEX with any number of markets. type Exchange struct { Host string `json:"host"`