Skip to content

Commit

Permalink
multi: handle trade suspension messages.
Browse files Browse the repository at this point in the history
This adds the trade suspension handler to the client core.
  • Loading branch information
dnldd committed May 20, 2020
1 parent 73a3f78 commit fe0b31d
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 13 deletions.
103 changes: 93 additions & 10 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type dexConnection struct {

tradeMtx sync.RWMutex
trades map[order.OrderID]*trackedTrade

suspensionsMtx sync.RWMutex
suspensions map[string]*msgjson.TradeSuspension

resumptionsMtx sync.RWMutex
resumptions map[string]*msgjson.TradeResumption
}

// refreshMarkets rebuilds, saves, and returns the market map. The map itself
Expand Down Expand Up @@ -1144,6 +1150,37 @@ func (c *Core) Trade(pw []byte, form *TradeForm) (*Order, error) {
return nil, fmt.Errorf("unknown DEX %s", form.DEX)
}

marketName, err := dex.MarketName(form.Base, form.Quote)
if err != nil {
return nil, fmt.Errorf("market name error %v", err)
}

dc.suspensionsMtx.RLock()
ts := dc.suspensions[marketName]
dc.suspensionsMtx.RUnlock()

dc.resumptionsMtx.RLock()
tr := dc.resumptions[marketName]
dc.resumptionsMtx.RUnlock()

// Proceed with the order if there is no trade suspension
// scheduled for the market.
if ts != nil {
now := encode.UnixMilliU(time.Now())

var resumptionAt uint64
if tr != nil {
resumptionAt = tr.StartEpoch * tr.EpochLen
}

if (now > ts.SuspendTime && resumptionAt == 0) ||
(now > ts.SuspendTime && now < resumptionAt) {
return nil, fmt.Errorf("order placement suspended due to"+
" trade suspension started at %v for dex %s",
encode.UnixTimeMilli(int64(ts.SuspendTime)), dc.acct.url)
}
}

rate, qty := form.Rate, form.Qty
if form.IsLimit && rate == 0 {
return nil, fmt.Errorf("zero-rate order not allowed")
Expand Down Expand Up @@ -1980,15 +2017,17 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) {

// Create the dexConnection and listen for incoming messages.
dc := &dexConnection{
WsConn: conn,
connMaster: connMaster,
assets: assets,
cfg: dexCfg,
books: make(map[string]*bookie),
acct: newDEXAccount(acctInfo),
marketMap: marketMap,
trades: make(map[order.OrderID]*trackedTrade),
notify: c.notify,
WsConn: conn,
connMaster: connMaster,
assets: assets,
cfg: dexCfg,
books: make(map[string]*bookie),
acct: newDEXAccount(acctInfo),
marketMap: marketMap,
trades: make(map[order.OrderID]*trackedTrade),
notify: c.notify,
suspensions: make(map[string]*msgjson.TradeSuspension),
resumptions: make(map[string]*msgjson.TradeResumption),
}

dc.refreshMarkets()
Expand Down Expand Up @@ -2068,6 +2107,50 @@ func handleRevokeMatchMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) erro
return tracker.db.UpdateOrder(metaOrder)
}

// handleTradeSuspensionMsg is called when a trade suspension notification is received.
func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) error {
var suspension msgjson.TradeSuspension
err := msg.Unmarshal(&suspension)
if err != nil {
return fmt.Errorf("trade suspension unmarshal error: %v", err)
}

dc.suspensionsMtx.Lock()
dc.suspensions[suspension.MarketID] = &suspension
dc.suspensionsMtx.Unlock()

go func(ctx context.Context, suspension *msgjson.TradeSuspension) {
nowMilli := encode.UnixMilliU(time.Now())

var duration time.Duration
if nowMilli < suspension.SuspendTime {
milliDiff := suspension.SuspendTime - nowMilli
duration = time.Duration(milliDiff * 1e6)
}

timer := time.NewTimer(duration)
for {
select {
case <-ctx.Done():
return

case <-timer.C:
switch suspension.Persist {
case true:
// TODO: persist outstanding client orders.
return

case false:
// TODO: purge outstanding client orders.
return
}
}
}
}(c.ctx, &suspension)

return nil
}

// routeHandler is a handler for a message from the DEX.
type routeHandler func(*Core, *dexConnection, *msgjson.Message) error

Expand All @@ -2077,14 +2160,14 @@ var reqHandlers = map[string]routeHandler{
msgjson.AuditRoute: handleAuditRoute,
msgjson.RedemptionRoute: handleRedemptionRoute,
msgjson.RevokeMatchRoute: handleRevokeMatchMsg,
msgjson.SuspensionRoute: nil,
}

var noteHandlers = map[string]routeHandler{
msgjson.MatchProofRoute: handleMatchProofMsg,
msgjson.BookOrderRoute: handleBookOrderMsg,
msgjson.EpochOrderRoute: handleEpochOrderMsg,
msgjson.UnbookOrderRoute: handleUnbookOrderMsg,
msgjson.SuspensionRoute: handleTradeSuspensionMsg,
}

// listen monitors the DEX websocket connection for server requests and
Expand Down
40 changes: 37 additions & 3 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,11 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) {
},
Fee: tFee,
},
notify: func(Notification) {},
marketMap: map[string]*Market{tDcrBtcMktName: mkt},
trades: make(map[order.OrderID]*trackedTrade),
notify: func(Notification) {},
marketMap: map[string]*Market{tDcrBtcMktName: mkt},
trades: make(map[order.OrderID]*trackedTrade),
suspensions: make(map[string]*msgjson.TradeSuspension),
resumptions: make(map[string]*msgjson.TradeResumption),
}, conn, acct
}

Expand Down Expand Up @@ -2649,3 +2651,35 @@ func TestHandleMatchProofMsg(t *testing.T) {
t.Fatalf("[handleMatchProofMsg] unexpected error: %v", err)
}
}

func TestHandleTradeSuspensionMsg(t *testing.T) {
rig := newTestRig()
payload := &msgjson.TradeSuspension{
MarketID: "dcr_btc",
FinalEpoch: 100,
SuspendTime: encode.UnixMilliU(time.Now().Add(time.Hour * 24)),
Persist: false,
}

req, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload)
err := handleTradeSuspensionMsg(rig.core, rig.dc, req)
if err != nil {
t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err)
}

form := &TradeForm{
DEX: tDexUrl,
IsLimit: true,
Sell: true,
Base: tDCR.ID,
Quote: tBTC.ID,
Qty: tDCR.LotSize * 10,
Rate: tBTC.RateStep * 1000,
TifNow: false,
}

_, err = rig.core.Trade(tPW, form)
if err == nil {
t.Fatalf("expected a trade suspension set error")
}
}

0 comments on commit fe0b31d

Please sign in to comment.