Skip to content

Commit

Permalink
core: resolve review issues (6 of x).
Browse files Browse the repository at this point in the history
  • Loading branch information
dnldd committed Jun 11, 2020
1 parent fc088db commit 785d7dc
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 50 deletions.
63 changes: 25 additions & 38 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,13 @@ type dexConnection struct {

regConfMtx sync.RWMutex
regConfirms uint32

pendingSuspendsMtx sync.Mutex
pendingSuspends map[string]*time.Timer
}

// suspended returns the suspended status of the provided market.
func (dc *dexConnection) suspended(mkt string) bool {
dc.marketMtx.Lock()
defer dc.marketMtx.Unlock()

market, ok := dc.marketMap[mkt]
if !ok {
return false
Expand Down Expand Up @@ -2359,18 +2357,17 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) {

// Create the dexConnection and listen for incoming messages.
dc := &dexConnection{
WsConn: conn,
connMaster: connMaster,
assets: assets,
cfg: dexCfg,
books: make(map[string]*bookie),
acct: newDEXAccount(acctInfo),
marketMap: marketMap,
trades: make(map[order.OrderID]*trackedTrade),
notify: c.notify,
epoch: epochMap,
connected: true,
pendingSuspends: make(map[string]*time.Timer),
WsConn: conn,
connMaster: connMaster,
assets: assets,
cfg: dexCfg,
books: make(map[string]*bookie),
acct: newDEXAccount(acctInfo),
marketMap: marketMap,
trades: make(map[order.OrderID]*trackedTrade),
notify: c.notify,
epoch: epochMap,
connected: true,
}

dc.refreshMarkets()
Expand Down Expand Up @@ -2480,36 +2477,36 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message)
return fmt.Errorf("trade suspension unmarshal error: %v", err)
}

dc.pendingSuspendsMtx.Lock()
defer dc.pendingSuspendsMtx.Unlock()

// Ensure the provided market exists for the dex.
dc.marketMtx.RLock()
dc.marketMtx.Lock()
mkt, ok := dc.marketMap[sp.MarketID]
dc.marketMtx.RUnlock()
dc.marketMtx.Unlock()
if !ok {
return fmt.Errorf("no market found with ID %s", sp.MarketID)
}

// Ensure the market is not already suspended.
mkt.mtx.Lock()
defer mkt.mtx.Unlock()

if mkt.Suspended {
return fmt.Errorf("market %s for dex %s is already suspended",
sp.MarketID, dc.acct.host)
}

// Remove pending suspends for the market.
if sched := dc.pendingSuspends[sp.MarketID]; sched != nil {
if !sched.Stop() {
// Stop the current pending suspend.
if mkt.pendingSuspend != nil {
if !mkt.pendingSuspend.Stop() {
// TODO: too late, timer already fired. Need to request the
// current configuration for the market at this point.
return fmt.Errorf("unable to stop previously scheduled "+
"suspend for market %s on dex %s", sp.MarketID, dc.acct.host)
}
}

// Set the new scheduled suspend.
// Set a new scheduled suspend.
duration := time.Until(encode.UnixTimeMilli(int64(sp.SuspendTime)))
dc.pendingSuspends[sp.MarketID] = time.AfterFunc(duration, func() {
mkt.pendingSuspend = time.AfterFunc(duration, func() {
// Update the market as suspended.
err := dc.suspend(sp.MarketID)
if err != nil {
Expand All @@ -2533,26 +2530,16 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message)
tracker.metaData.Host == dc.acct.host &&
(tracker.metaData.Status == order.OrderStatusEpoch ||
tracker.metaData.Status == order.OrderStatusBooked) {
md := tracker.metaData
md.Status = order.OrderStatusRevoked
metaOrder := &db.MetaOrder{
MetaData: md,
Order: tracker.Order,
}

err = tracker.db.UpdateOrder(metaOrder)
metaOrder := tracker.metaOrder()
metaOrder.MetaData.Status = order.OrderStatusRevoked
err := tracker.db.UpdateOrder(metaOrder)
if err != nil {
log.Errorf("unable to update order: %v", err)
}
}
}
dc.tradeMtx.RUnlock()
}

// Remove suspend after execution.
dc.pendingSuspendsMtx.Lock()
delete(dc.pendingSuspends, sp.MarketID)
dc.pendingSuspendsMtx.Unlock()
})

return nil
Expand Down
48 changes: 36 additions & 12 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,10 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) {
},
Fee: tFee,
},
notify: func(Notification) {},
marketMap: map[string]*Market{tDcrBtcMktName: mkt},
trades: make(map[order.OrderID]*trackedTrade),
epoch: map[string]uint64{tDcrBtcMktName: 0},
pendingSuspends: make(map[string]*time.Timer),
notify: func(Notification) {},
marketMap: map[string]*Market{tDcrBtcMktName: mkt},
trades: make(map[order.OrderID]*trackedTrade),
epoch: map[string]uint64{tDcrBtcMktName: 0},
}, conn, acct
}

Expand Down Expand Up @@ -2948,12 +2947,34 @@ func TestAssetCounter(t *testing.T) {
func TestHandleTradeSuspensionMsg(t *testing.T) {
rig := newTestRig()

tCore := rig.core
dcrWallet, _ := newTWallet(tDCR.ID)
tCore.wallets[tDCR.ID] = dcrWallet
dcrWallet.address = "DsVmA7aqqWeKWy461hXjytbZbgCqbB8g2dq"
dcrWallet.Unlock(wPW, time.Hour)

btcWallet, _ := newTWallet(tBTC.ID)
tCore.wallets[tBTC.ID] = btcWallet
btcWallet.address = "12DXGkvxFjuq5btXYkwWfBZaz1rVwFgini"
btcWallet.Unlock(wPW, time.Hour)

handleLimit := func(msg *msgjson.Message, f msgFunc) error {
// Need to stamp and sign the message with the server's key.
msgOrder := new(msgjson.LimitOrder)
err := msg.Unmarshal(msgOrder)
if err != nil {
t.Fatalf("unmarshal error: %v", err)
}
lo := convertMsgLimitOrder(msgOrder)
f(orderResponse(msg.ID, msgOrder, lo, false, false, false))
return nil
}

rig.ws.queueResponse(msgjson.LimitRoute, handleLimit)

// Ensure a non-existent market cannot be suspended.
payload := &msgjson.TradeSuspension{
MarketID: "dcr_dcr",
FinalEpoch: 100,
SuspendTime: encode.UnixMilliU(time.Now().Add(time.Second * 2)),
Persist: true,
MarketID: "dcr_dcr",
}

req, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.SuspensionRoute, payload)
Expand All @@ -2963,7 +2984,7 @@ func TestHandleTradeSuspensionMsg(t *testing.T) {
"ID not found error: %v")
}

mkt := "dcr_btc"
mkt := tDcrBtcMktName

// Ensure an already suspended market cannot be suspended again.
err = rig.dc.suspend(mkt)
Expand All @@ -2974,7 +2995,7 @@ func TestHandleTradeSuspensionMsg(t *testing.T) {
payload = &msgjson.TradeSuspension{
MarketID: mkt,
FinalEpoch: 100,
SuspendTime: encode.UnixMilliU(time.Now().Add(time.Second * 2)),
SuspendTime: encode.UnixMilliU(time.Now().Add(time.Millisecond * 20)),
Persist: true,
}

Expand All @@ -2997,6 +3018,9 @@ func TestHandleTradeSuspensionMsg(t *testing.T) {
t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err)
}

// Wait for the suspend to execute.
time.Sleep(time.Millisecond * 40)

// Ensure trades for a suspended market generate an error.
form := &TradeForm{
Host: tDexHost,
Expand All @@ -3011,6 +3035,6 @@ func TestHandleTradeSuspensionMsg(t *testing.T) {

_, err = rig.core.Trade(tPW, form)
if err == nil {
t.Fatalf("expected a trade suspension set error")
t.Fatalf("expected a suspension market error")
}
}
3 changes: 3 additions & 0 deletions client/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"strings"
"sync"
"time"

"decred.org/dcrdex/client/asset"
"decred.org/dcrdex/client/db"
Expand Down Expand Up @@ -159,6 +160,8 @@ type Market struct {
MarketBuyBuffer float64 `json:"buybuffer"`
Orders []*Order `json:"orders"`
Suspended bool `json:"suspended"`
pendingSuspend *time.Timer
mtx sync.Mutex
}

// Display returns an ID string suitable for displaying in a UI.
Expand Down

0 comments on commit 785d7dc

Please sign in to comment.