Skip to content

Commit

Permalink
multi: handle trade suspension messages
Browse files Browse the repository at this point in the history
This adds the trade suspension handler to the client core.
  • Loading branch information
dnldd committed Jun 15, 2020
1 parent eac1b9c commit 93971d5
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 1 deletion.
125 changes: 124 additions & 1 deletion client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,46 @@ type dexConnection struct {
regConfirms *uint32 // nil regConfirms means no pending registration.
}

// suspended returns the suspended status of the provided market.
func (dc *dexConnection) suspended(mkt string) bool {
dc.marketMtx.Lock()
defer dc.marketMtx.Unlock()

market, ok := dc.marketMap[mkt]
if !ok {
return false
}
return market.Suspended()
}

// suspend halts trading for the provided market.
func (dc *dexConnection) suspend(mkt string) error {
dc.marketMtx.Lock()
market, ok := dc.marketMap[mkt]
dc.marketMtx.Unlock()
if !ok {
return fmt.Errorf("no market found with ID %s", mkt)
}

market.setSuspended(true)

return nil
}

// resume commences trading for the provided market.
func (dc *dexConnection) resume(mkt string) error {
dc.marketMtx.Lock()
market, ok := dc.marketMap[mkt]
dc.marketMtx.Unlock()
if !ok {
return fmt.Errorf("no market found with ID %s", mkt)
}

market.setSuspended(false)

return nil
}

// refreshMarkets rebuilds, saves, and returns the market map. The map itself
// should be treated as read only. A new map is constructed and is assigned to
// dc.marketMap under lock, and can be safely accessed with
Expand All @@ -98,6 +138,7 @@ func (dc *dexConnection) refreshMarkets() map[string]*Market {
EpochLen: mkt.EpochLen,
StartEpoch: mkt.StartEpoch,
MarketBuyBuffer: mkt.MarketBuyBuffer,
suspended: dc.suspended(mkt.Name),
}
mid := market.marketName()
dc.tradeMtx.RLock()
Expand Down Expand Up @@ -1495,6 +1536,12 @@ func (c *Core) Trade(pw []byte, form *TradeForm) (*Order, error) {
return nil, fmt.Errorf("order placed for unknown market")
}

// Proceed with the order if there is no trade suspension
// scheduled for the market.
if mkt.Suspended() {
return nil, fmt.Errorf("suspended market")
}

rate, qty := form.Rate, form.Qty
if form.IsLimit && rate == 0 {
return nil, fmt.Errorf("zero-rate order not allowed")
Expand Down Expand Up @@ -2442,6 +2489,82 @@ func handleRevokeMatchMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) erro
return tracker.db.UpdateMatch(&revokedMatch.MetaMatch)
}

// handleTradeSuspensionMsg is called when a trade suspension notification is received.
func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) error {
var sp msgjson.TradeSuspension
err := msg.Unmarshal(&sp)
if err != nil {
return fmt.Errorf("trade suspension unmarshal error: %v", err)
}

// Ensure the provided market exists for the dex.
dc.marketMtx.Lock()
mkt, ok := dc.marketMap[sp.MarketID]
dc.marketMtx.Unlock()
if !ok {
return fmt.Errorf("no market found with ID %s", sp.MarketID)
}

// Ensure the market is not already suspended.
mkt.mtx.Lock()
defer mkt.mtx.Unlock()

if mkt.suspended {
return fmt.Errorf("market %s for dex %s is already suspended",
sp.MarketID, dc.acct.host)
}

// Stop the current pending suspend.
if mkt.pendingSuspend != nil {
if !mkt.pendingSuspend.Stop() {
// TODO: too late, timer already fired. Need to request the
// current configuration for the market at this point.
return fmt.Errorf("unable to stop previously scheduled "+
"suspend for market %s on dex %s", sp.MarketID, dc.acct.host)
}
}

// Set a new scheduled suspend.
duration := time.Until(encode.UnixTimeMilli(int64(sp.SuspendTime)))
mkt.pendingSuspend = time.AfterFunc(duration, func() {
// Update the market as suspended.
err := dc.suspend(sp.MarketID)
if err != nil {
log.Error(err)
return
}

// Clear the bookie associated with the suspended market.
dc.booksMtx.Lock()
if bookie := dc.books[sp.MarketID]; bookie != nil {
dc.books[sp.MarketID] = newBookie(func() { c.unsub(dc, sp.MarketID) })
}
dc.booksMtx.Unlock()

if !sp.Persist {
// Revoke all active orders of the suspended market for the dex.
dc.tradeMtx.RLock()
for _, tracker := range dc.trades {
if tracker.Order.Base() == mkt.BaseID &&
tracker.Order.Quote() == mkt.QuoteID &&
tracker.metaData.Host == dc.acct.host &&
(tracker.metaData.Status == order.OrderStatusEpoch ||
tracker.metaData.Status == order.OrderStatusBooked) {
metaOrder := tracker.metaOrder()
metaOrder.MetaData.Status = order.OrderStatusRevoked
err := tracker.db.UpdateOrder(metaOrder)
if err != nil {
log.Errorf("unable to update order: %v", err)
}
}
}
dc.tradeMtx.RUnlock()
}
})

return nil
}

// routeHandler is a handler for a message from the DEX.
type routeHandler func(*Core, *dexConnection, *msgjson.Message) error

Expand All @@ -2451,7 +2574,6 @@ var reqHandlers = map[string]routeHandler{
msgjson.AuditRoute: handleAuditRoute,
msgjson.RedemptionRoute: handleRedemptionRoute,
msgjson.RevokeMatchRoute: handleRevokeMatchMsg,
msgjson.SuspensionRoute: nil,
}

var noteHandlers = map[string]routeHandler{
Expand All @@ -2460,6 +2582,7 @@ var noteHandlers = map[string]routeHandler{
msgjson.EpochOrderRoute: handleEpochOrderMsg,
msgjson.UnbookOrderRoute: handleUnbookOrderMsg,
msgjson.UpdateRemainingRoute: handleUpdateRemainingMsg,
msgjson.SuspensionRoute: handleTradeSuspensionMsg,
}

// listen monitors the DEX websocket connection for server requests and
Expand Down
80 changes: 80 additions & 0 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) {
QuoteSymbol: tBTC.Symbol,
EpochLen: 60000,
MarketBuyBuffer: 1.1,
suspended: false,
}
return &dexConnection{
WsConn: conn,
Expand Down Expand Up @@ -3230,3 +3231,82 @@ func TestAssetCounter(t *testing.T) {
t.Fatalf("absorbed counts not combined correctly")
}
}

func TestHandleTradeSuspensionMsg(t *testing.T) {
rig := newTestRig()

tCore := rig.core
dcrWallet, _ := newTWallet(tDCR.ID)
tCore.wallets[tDCR.ID] = dcrWallet
dcrWallet.Unlock(wPW, time.Hour)

btcWallet, _ := newTWallet(tBTC.ID)
tCore.wallets[tBTC.ID] = btcWallet
btcWallet.Unlock(wPW, time.Hour)

// Ensure a non-existent market cannot be suspended.
payload := &msgjson.TradeSuspension{
MarketID: "dcr_dcr",
}

req, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload)
err := handleTradeSuspensionMsg(rig.core, rig.dc, req)
if err == nil {
t.Fatal("[handleTradeSuspensionMsg] expected a market " +
"ID not found error: %v")
}

mkt := tDcrBtcMktName

// Ensure a suspended market cannot be resuspended.
err = rig.dc.suspend(mkt)
if err != nil {
t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err)
}

payload = &msgjson.TradeSuspension{
MarketID: mkt,
FinalEpoch: 100,
SuspendTime: encode.UnixMilliU(time.Now().Add(time.Millisecond * 20)),
Persist: true,
}

req, _ = msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload)
err = handleTradeSuspensionMsg(rig.core, rig.dc, req)
if err == nil {
t.Fatal("[handleTradeSuspensionMsg] expected a market " +
"suspended error: %v")
}

// Suspend a market.
err = rig.dc.resume(mkt)
if err != nil {
t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err)
}

req, _ = msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload)
err = handleTradeSuspensionMsg(rig.core, rig.dc, req)
if err != nil {
t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err)
}

// Wait for the suspend to execute.
time.Sleep(time.Millisecond * 40)

// Ensure trades for a suspended market generate an error.
form := &TradeForm{
Host: tDexHost,
IsLimit: true,
Sell: true,
Base: tDCR.ID,
Quote: tBTC.ID,
Qty: tDCR.LotSize * 10,
Rate: tBTC.RateStep * 1000,
TifNow: false,
}

_, err = rig.core.Trade(tPW, form)
if err == nil {
t.Fatalf("expected a suspension market error")
}
}
18 changes: 18 additions & 0 deletions client/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"strings"
"sync"
"time"

"decred.org/dcrdex/client/asset"
"decred.org/dcrdex/client/db"
Expand Down Expand Up @@ -148,6 +149,9 @@ type Market struct {
StartEpoch uint64 `json:"startepoch"`
MarketBuyBuffer float64 `json:"buybuffer"`
Orders []*Order `json:"orders"`
pendingSuspend *time.Timer
suspended bool
mtx sync.Mutex
}

// Display returns an ID string suitable for displaying in a UI.
Expand All @@ -160,6 +164,20 @@ func (m *Market) marketName() string {
return marketName(m.BaseID, m.QuoteID)
}

// suspended returns the market's suspended state.
func (m *Market) Suspended() bool {
m.mtx.Lock()
defer m.mtx.Unlock()
return m.suspended
}

// setSuspended sets the market's suspended state.
func (m *Market) setSuspended(state bool) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.suspended = state
}

// Exchange represents a single DEX with any number of markets.
type Exchange struct {
Host string `json:"host"`
Expand Down

0 comments on commit 93971d5

Please sign in to comment.