Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: handle trade suspension messages. #269

Merged
merged 10 commits into from
Jun 15, 2020
125 changes: 124 additions & 1 deletion client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,46 @@ type dexConnection struct {
regConfirms uint32
}

// suspended returns the suspended status of the provided market.
func (dc *dexConnection) suspended(mkt string) bool {
dc.marketMtx.Lock()
defer dc.marketMtx.Unlock()

market, ok := dc.marketMap[mkt]
if !ok {
return false
}
return market.Suspended()
}

// suspend halts trading for the provided market.
func (dc *dexConnection) suspend(mkt string) error {
dc.marketMtx.Lock()
market, ok := dc.marketMap[mkt]
dc.marketMtx.Unlock()
if !ok {
return fmt.Errorf("no market found with ID %s", mkt)
}

market.setSuspended(true)

return nil
}

// resume commences trading for the provided market.
func (dc *dexConnection) resume(mkt string) error {
dc.marketMtx.Lock()
market, ok := dc.marketMap[mkt]
dc.marketMtx.Unlock()
if !ok {
return fmt.Errorf("no market found with ID %s", mkt)
}

market.setSuspended(false)

return nil
}

// refreshMarkets rebuilds, saves, and returns the market map. The map itself
// should be treated as read only. A new map is constructed and is assigned to
// dc.marketMap under lock, and can be safely accessed with
Expand All @@ -98,6 +138,7 @@ func (dc *dexConnection) refreshMarkets() map[string]*Market {
EpochLen: mkt.EpochLen,
StartEpoch: mkt.StartEpoch,
MarketBuyBuffer: mkt.MarketBuyBuffer,
suspended: dc.suspended(mkt.Name),
}
mid := market.marketName()
dc.tradeMtx.RLock()
Expand Down Expand Up @@ -1467,6 +1508,12 @@ func (c *Core) Trade(pw []byte, form *TradeForm) (*Order, error) {
return nil, fmt.Errorf("order placed for unknown market")
}

// Proceed with the order if there is no trade suspension
// scheduled for the market.
if mkt.Suspended() {
return nil, fmt.Errorf("suspended market")
}

rate, qty := form.Rate, form.Qty
if form.IsLimit && rate == 0 {
return nil, fmt.Errorf("zero-rate order not allowed")
Expand Down Expand Up @@ -2422,6 +2469,82 @@ func handleRevokeMatchMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) erro
return tracker.db.UpdateOrder(metaOrder)
}

// handleTradeSuspensionMsg is called when a trade suspension notification is received.
func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) error {
var sp msgjson.TradeSuspension
err := msg.Unmarshal(&sp)
if err != nil {
return fmt.Errorf("trade suspension unmarshal error: %v", err)
}

// Ensure the provided market exists for the dex.
dc.marketMtx.Lock()
mkt, ok := dc.marketMap[sp.MarketID]
dc.marketMtx.Unlock()
if !ok {
return fmt.Errorf("no market found with ID %s", sp.MarketID)
}

// Ensure the market is not already suspended.
mkt.mtx.Lock()
defer mkt.mtx.Unlock()

if mkt.suspended {
return fmt.Errorf("market %s for dex %s is already suspended",
sp.MarketID, dc.acct.host)
}

// Stop the current pending suspend.
if mkt.pendingSuspend != nil {
if !mkt.pendingSuspend.Stop() {
// TODO: too late, timer already fired. Need to request the
// current configuration for the market at this point.
return fmt.Errorf("unable to stop previously scheduled "+
"suspend for market %s on dex %s", sp.MarketID, dc.acct.host)
}
}

// Set a new scheduled suspend.
duration := time.Until(encode.UnixTimeMilli(int64(sp.SuspendTime)))
mkt.pendingSuspend = time.AfterFunc(duration, func() {
// Update the market as suspended.
err := dc.suspend(sp.MarketID)
if err != nil {
log.Error(err)
return
}

// Clear the bookie associated with the suspended market.
dc.booksMtx.Lock()
if bookie := dc.books[sp.MarketID]; bookie != nil {
dnldd marked this conversation as resolved.
Show resolved Hide resolved
dc.books[sp.MarketID] = newBookie(func() { c.unsub(dc, sp.MarketID) })
}
dc.booksMtx.Unlock()

if !sp.Persist {
// Revoke all active orders of the suspended market for the dex.
dc.tradeMtx.RLock()
for _, tracker := range dc.trades {
dnldd marked this conversation as resolved.
Show resolved Hide resolved
if tracker.Order.Base() == mkt.BaseID &&
tracker.Order.Quote() == mkt.QuoteID &&
tracker.metaData.Host == dc.acct.host &&
(tracker.metaData.Status == order.OrderStatusEpoch ||
tracker.metaData.Status == order.OrderStatusBooked) {
metaOrder := tracker.metaOrder()
metaOrder.MetaData.Status = order.OrderStatusRevoked
err := tracker.db.UpdateOrder(metaOrder)
if err != nil {
log.Errorf("unable to update order: %v", err)
}
}
}
dc.tradeMtx.RUnlock()
}
})

return nil
}
dnldd marked this conversation as resolved.
Show resolved Hide resolved

// routeHandler is a handler for a message from the DEX.
type routeHandler func(*Core, *dexConnection, *msgjson.Message) error

Expand All @@ -2431,7 +2554,6 @@ var reqHandlers = map[string]routeHandler{
msgjson.AuditRoute: handleAuditRoute,
msgjson.RedemptionRoute: handleRedemptionRoute,
msgjson.RevokeMatchRoute: handleRevokeMatchMsg,
msgjson.SuspensionRoute: nil,
}

var noteHandlers = map[string]routeHandler{
Expand All @@ -2440,6 +2562,7 @@ var noteHandlers = map[string]routeHandler{
msgjson.EpochOrderRoute: handleEpochOrderMsg,
msgjson.UnbookOrderRoute: handleUnbookOrderMsg,
msgjson.UpdateRemainingRoute: handleUpdateRemainingMsg,
msgjson.SuspensionRoute: handleTradeSuspensionMsg,
}

// listen monitors the DEX websocket connection for server requests and
Expand Down
80 changes: 80 additions & 0 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) {
QuoteSymbol: tBTC.Symbol,
EpochLen: 60000,
MarketBuyBuffer: 1.1,
suspended: false,
}
return &dexConnection{
WsConn: conn,
Expand Down Expand Up @@ -2942,3 +2943,82 @@ func TestAssetCounter(t *testing.T) {
t.Fatalf("absorbed counts not combined correctly")
}
}

func TestHandleTradeSuspensionMsg(t *testing.T) {
dnldd marked this conversation as resolved.
Show resolved Hide resolved
rig := newTestRig()

tCore := rig.core
dcrWallet, _ := newTWallet(tDCR.ID)
tCore.wallets[tDCR.ID] = dcrWallet
dcrWallet.Unlock(wPW, time.Hour)

btcWallet, _ := newTWallet(tBTC.ID)
tCore.wallets[tBTC.ID] = btcWallet
btcWallet.Unlock(wPW, time.Hour)

// Ensure a non-existent market cannot be suspended.
payload := &msgjson.TradeSuspension{
MarketID: "dcr_dcr",
}

req, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload)
err := handleTradeSuspensionMsg(rig.core, rig.dc, req)
if err == nil {
t.Fatal("[handleTradeSuspensionMsg] expected a market " +
"ID not found error: %v")
}

mkt := tDcrBtcMktName

// Ensure a suspended market cannot be resuspended.
err = rig.dc.suspend(mkt)
if err != nil {
t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err)
}

payload = &msgjson.TradeSuspension{
MarketID: mkt,
FinalEpoch: 100,
SuspendTime: encode.UnixMilliU(time.Now().Add(time.Millisecond * 20)),
Persist: true,
}

req, _ = msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload)
err = handleTradeSuspensionMsg(rig.core, rig.dc, req)
if err == nil {
t.Fatal("[handleTradeSuspensionMsg] expected a market " +
"suspended error: %v")
}

// Suspend a market.
err = rig.dc.resume(mkt)
if err != nil {
t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err)
}

req, _ = msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload)
err = handleTradeSuspensionMsg(rig.core, rig.dc, req)
if err != nil {
t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err)
}

// Wait for the suspend to execute.
time.Sleep(time.Millisecond * 40)

// Ensure trades for a suspended market generate an error.
form := &TradeForm{
Host: tDexHost,
IsLimit: true,
Sell: true,
Base: tDCR.ID,
Quote: tBTC.ID,
Qty: tDCR.LotSize * 10,
Rate: tBTC.RateStep * 1000,
TifNow: false,
}

_, err = rig.core.Trade(tPW, form)
if err == nil {
t.Fatalf("expected a suspension market error")
}
}
18 changes: 18 additions & 0 deletions client/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"strings"
"sync"
"time"

"decred.org/dcrdex/client/asset"
"decred.org/dcrdex/client/db"
Expand Down Expand Up @@ -158,6 +159,9 @@ type Market struct {
StartEpoch uint64 `json:"startepoch"`
MarketBuyBuffer float64 `json:"buybuffer"`
Orders []*Order `json:"orders"`
pendingSuspend *time.Timer
suspended bool
mtx sync.Mutex
}

// Display returns an ID string suitable for displaying in a UI.
Expand All @@ -170,6 +174,20 @@ func (m *Market) marketName() string {
return marketName(m.BaseID, m.QuoteID)
}

// suspended returns the market's suspended state.
func (m *Market) Suspended() bool {
m.mtx.Lock()
defer m.mtx.Unlock()
return m.suspended
}

// setSuspended sets the market's suspended state.
func (m *Market) setSuspended(state bool) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.suspended = state
}

// Exchange represents a single DEX with any number of markets.
type Exchange struct {
Host string `json:"host"`
Expand Down