Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: handle trade suspension messages. #269

Merged
merged 10 commits into from
Jun 15, 2020
160 changes: 148 additions & 12 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,48 @@ type dexConnection struct {

regConfMtx sync.RWMutex
regConfirms uint32

pendingSuspendsMtx sync.Mutex
pendingSuspends map[string]*time.Timer
dnldd marked this conversation as resolved.
Show resolved Hide resolved
}

// suspended returns the suspended status of the provided market.
func (dc *dexConnection) suspended(mkt string) bool {
dc.marketMtx.Lock()
defer dc.marketMtx.Unlock()
market, ok := dc.marketMap[mkt]
if !ok {
return false
}
return market.Suspended
}

// suspend halts trading for the provided market.
func (dc *dexConnection) suspend(mkt string) error {
dc.marketMtx.Lock()
defer dc.marketMtx.Unlock()

market, ok := dc.marketMap[mkt]
if !ok {
return fmt.Errorf("no market found with ID %s", mkt)
}

market.Suspended = true
return nil
}

// resume commences trading for the provided market.
func (dc *dexConnection) resume(mkt string) error {
dc.marketMtx.Lock()
defer dc.marketMtx.Unlock()

market, ok := dc.marketMap[mkt]
if !ok {
return fmt.Errorf("no market found with ID %s", mkt)
}

market.Suspended = false
return nil
}

// refreshMarkets rebuilds, saves, and returns the market map. The map itself
Expand All @@ -98,6 +140,7 @@ func (dc *dexConnection) refreshMarkets() map[string]*Market {
EpochLen: mkt.EpochLen,
StartEpoch: mkt.StartEpoch,
MarketBuyBuffer: mkt.MarketBuyBuffer,
Suspended: dc.suspended(mkt.Name),
}
mid := market.marketName()
dc.tradeMtx.RLock()
Expand Down Expand Up @@ -1467,6 +1510,12 @@ func (c *Core) Trade(pw []byte, form *TradeForm) (*Order, error) {
return nil, fmt.Errorf("order placed for unknown market")
}

// Proceed with the order if there is no trade suspension
// scheduled for the market.
if dc.suspended(mktID) {
dnldd marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("suspended market")
}

rate, qty := form.Rate, form.Qty
if form.IsLimit && rate == 0 {
return nil, fmt.Errorf("zero-rate order not allowed")
Expand Down Expand Up @@ -2310,17 +2359,18 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) {

// Create the dexConnection and listen for incoming messages.
dc := &dexConnection{
WsConn: conn,
connMaster: connMaster,
assets: assets,
cfg: dexCfg,
books: make(map[string]*bookie),
acct: newDEXAccount(acctInfo),
marketMap: marketMap,
trades: make(map[order.OrderID]*trackedTrade),
notify: c.notify,
epoch: epochMap,
connected: true,
WsConn: conn,
connMaster: connMaster,
assets: assets,
cfg: dexCfg,
books: make(map[string]*bookie),
acct: newDEXAccount(acctInfo),
marketMap: marketMap,
trades: make(map[order.OrderID]*trackedTrade),
notify: c.notify,
epoch: epochMap,
connected: true,
pendingSuspends: make(map[string]*time.Timer),
}

dc.refreshMarkets()
Expand Down Expand Up @@ -2422,6 +2472,92 @@ func handleRevokeMatchMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) erro
return tracker.db.UpdateOrder(metaOrder)
}

// handleTradeSuspensionMsg is called when a trade suspension notification is received.
func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) error {
var sp msgjson.TradeSuspension
err := msg.Unmarshal(&sp)
if err != nil {
return fmt.Errorf("trade suspension unmarshal error: %v", err)
}

dc.pendingSuspendsMtx.Lock()
defer dc.pendingSuspendsMtx.Unlock()

// Ensure the provided market exists for the dex.
dc.marketMtx.RLock()
mkt, ok := dc.marketMap[sp.MarketID]
dc.marketMtx.RUnlock()
if !ok {
return fmt.Errorf("no market found with ID %s", sp.MarketID)
}

// Ensure the market is not already suspended.
if mkt.Suspended {
dnldd marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("market %s for dex %s is already suspended",
sp.MarketID, dc.acct.host)
}

// Remove pending suspends for the market.
if sched := dc.pendingSuspends[sp.MarketID]; sched != nil {
if !sched.Stop() {
// TODO: too late, timer already fired. Need to request the
// current configuration for the market at this point.
return fmt.Errorf("unable to stop previously scheduled "+
"suspend for market %s on dex %s", sp.MarketID, dc.acct.host)
}
}

// Set the new scheduled suspend.
duration := time.Until(encode.UnixTimeMilli(int64(sp.SuspendTime)))
dc.pendingSuspends[sp.MarketID] = time.AfterFunc(duration, func() {
// Update the market as suspended.
err := dc.suspend(sp.MarketID)
if err != nil {
log.Error(err)
return
}

// Clear the bookie associated with the suspended market.
dc.booksMtx.Lock()
if bookie := dc.books[sp.MarketID]; bookie != nil {
dnldd marked this conversation as resolved.
Show resolved Hide resolved
dc.books[sp.MarketID] = newBookie(func() { c.unsub(dc, sp.MarketID) })
}
dc.booksMtx.Unlock()

if !sp.Persist {
// Revoke all active orders of the suspended market for the dex.
dc.tradeMtx.RLock()
for _, tracker := range dc.trades {
dnldd marked this conversation as resolved.
Show resolved Hide resolved
if tracker.Order.Base() == mkt.BaseID &&
tracker.Order.Quote() == mkt.QuoteID &&
tracker.metaData.Host == dc.acct.host &&
(tracker.metaData.Status == order.OrderStatusEpoch ||
tracker.metaData.Status == order.OrderStatusBooked) {
md := tracker.metaData
md.Status = order.OrderStatusRevoked
metaOrder := &db.MetaOrder{
MetaData: md,
Order: tracker.Order,
}

err = tracker.db.UpdateOrder(metaOrder)
dnldd marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Errorf("unable to update order: %v", err)
}
}
}
dc.tradeMtx.RUnlock()
}

// Remove suspend after execution.
dc.pendingSuspendsMtx.Lock()
delete(dc.pendingSuspends, sp.MarketID)
dc.pendingSuspendsMtx.Unlock()
})

return nil
}
dnldd marked this conversation as resolved.
Show resolved Hide resolved

// routeHandler is a handler for a message from the DEX.
type routeHandler func(*Core, *dexConnection, *msgjson.Message) error

Expand All @@ -2431,7 +2567,6 @@ var reqHandlers = map[string]routeHandler{
msgjson.AuditRoute: handleAuditRoute,
msgjson.RedemptionRoute: handleRedemptionRoute,
msgjson.RevokeMatchRoute: handleRevokeMatchMsg,
msgjson.SuspensionRoute: nil,
}

var noteHandlers = map[string]routeHandler{
Expand All @@ -2440,6 +2575,7 @@ var noteHandlers = map[string]routeHandler{
msgjson.EpochOrderRoute: handleEpochOrderMsg,
msgjson.UnbookOrderRoute: handleUnbookOrderMsg,
msgjson.UpdateRemainingRoute: handleUpdateRemainingMsg,
msgjson.SuspensionRoute: handleTradeSuspensionMsg,
}

// listen monitors the DEX websocket connection for server requests and
Expand Down
80 changes: 76 additions & 4 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) {
QuoteSymbol: tBTC.Symbol,
EpochLen: 60000,
MarketBuyBuffer: 1.1,
Suspended: false,
}
return &dexConnection{
WsConn: conn,
Expand Down Expand Up @@ -178,10 +179,11 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) {
},
Fee: tFee,
},
notify: func(Notification) {},
marketMap: map[string]*Market{tDcrBtcMktName: mkt},
trades: make(map[order.OrderID]*trackedTrade),
epoch: map[string]uint64{tDcrBtcMktName: 0},
notify: func(Notification) {},
marketMap: map[string]*Market{tDcrBtcMktName: mkt},
trades: make(map[order.OrderID]*trackedTrade),
epoch: map[string]uint64{tDcrBtcMktName: 0},
pendingSuspends: make(map[string]*time.Timer),
}, conn, acct
}

Expand Down Expand Up @@ -2942,3 +2944,73 @@ func TestAssetCounter(t *testing.T) {
t.Fatalf("absorbed counts not combined correctly")
}
}

func TestHandleTradeSuspensionMsg(t *testing.T) {
dnldd marked this conversation as resolved.
Show resolved Hide resolved
rig := newTestRig()

// Ensure a non-existent market cannot be suspended.
payload := &msgjson.TradeSuspension{
MarketID: "dcr_dcr",
FinalEpoch: 100,
SuspendTime: encode.UnixMilliU(time.Now().Add(time.Second * 2)),
Persist: true,
dnldd marked this conversation as resolved.
Show resolved Hide resolved
}

req, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload)
err := handleTradeSuspensionMsg(rig.core, rig.dc, req)
if err == nil {
t.Fatal("[handleTradeSuspensionMsg] expected a market " +
"ID not found error: %v")
}

mkt := "dcr_btc"
dnldd marked this conversation as resolved.
Show resolved Hide resolved

// Ensure an already suspended market cannot be suspended again.
dnldd marked this conversation as resolved.
Show resolved Hide resolved
err = rig.dc.suspend(mkt)
if err != nil {
t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err)
}

payload = &msgjson.TradeSuspension{
MarketID: mkt,
FinalEpoch: 100,
SuspendTime: encode.UnixMilliU(time.Now().Add(time.Second * 2)),
Persist: true,
}

req, _ = msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload)
err = handleTradeSuspensionMsg(rig.core, rig.dc, req)
if err == nil {
t.Fatal("[handleTradeSuspensionMsg] expected a market " +
"suspended error: %v")
}

// Suspend a market.
err = rig.dc.resume(mkt)
if err != nil {
t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err)
}

req, _ = msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload)
err = handleTradeSuspensionMsg(rig.core, rig.dc, req)
if err != nil {
t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err)
}

// Ensure trades for a suspended market generate an error.
form := &TradeForm{
Host: tDexHost,
IsLimit: true,
Sell: true,
Base: tDCR.ID,
Quote: tBTC.ID,
Qty: tDCR.LotSize * 10,
Rate: tBTC.RateStep * 1000,
TifNow: false,
}

_, err = rig.core.Trade(tPW, form)
if err == nil {
t.Fatalf("expected a trade suspension set error")
}
dnldd marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions client/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type Market struct {
StartEpoch uint64 `json:"startepoch"`
MarketBuyBuffer float64 `json:"buybuffer"`
Orders []*Order `json:"orders"`
Suspended bool `json:"suspended"`
}

// Display returns an ID string suitable for displaying in a UI.
Expand Down
9 changes: 9 additions & 0 deletions client/order/bookside.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ func (d *bookSide) orders() []*Order {
return orders
}

// Reset clears out the book side.
func (d *bookSide) Reset() {
d.mtx.Lock()
defer d.mtx.Unlock()

d.bins = make(map[uint64][]*Order)
d.rateIndex = NewRateIndex()
}

dnldd marked this conversation as resolved.
Show resolved Hide resolved
// BestNOrders returns the best N orders of the book side.
func (d *bookSide) BestNOrders(n int) ([]*Order, bool) {
d.mtx.RLock()
Expand Down