Skip to content

Commit

Permalink
market,dex,admin,db,book,msgjson: Suspend support, clean Market stop
Browse files Browse the repository at this point in the history
Refactor epochPump (into epump.go) for suspend support.

Add Market.Suspend and Market.Running.

When persistBook is false, in-memory and persistent books are purged.
Add the PurgeBook method to Market to do this. PurgeBook calls the new
OrderArchiver.FlushBook method, clears the in-memory book, and unlocks
all coins. TODO: unlock just the book orders, not possible epoch orders.

Run shutdown sequence is now meticulous.

Use internal notifyChan with lifetime of Run, and async subscriber
broadcasts via the closure goroutine inside Run.

db: add OrderArchiver.FlushBook

Add the FlushBook method to the pg.Archiver type.  It changes all orders
for a market that have status booked to status revoked, and it creates
and stores the server-generated pseudocancel orders. It does this all in
a single db transaction.  This is similar to calling RevokeOrder
individually on all booked orders.

The new PurgeBook query template is added to do the bulk book-revoked
status change for a market.

coinlock: Add UnlockAll to CoinLocker interface.

book: add Clear method to Book

market: rework bookUpdateSignal -> updateAction+sigData

Remove bookUpdateSignal, which combined data used for many different
signals, and create updateSignal with action and data fields. Receivers
will type cast data into one of several sigData types that contain only
the data that is relevant to the action at hand.

msgjson, market: add TradeSuspension message

(*BookRouter).runBook now sends msgjson.TradeSuspension for
msgjson.SuspensionRoute when the market sends a sigDataSuspend
on the order feed.  (*Market).Run sends this signal when cycleEpoch hits
the target suspend epoch index.

market,dex: SuspendMarket methods for OrderRouter and DEX mgr

Add (*OrderRouter).SuspendMarket, called from the DEX manager in the new
server/dex.(*DEX) SuspendMarket method.

server/admin: suspend and market info routes

msgjson: suspend data in config

server/dex: on suspend update config resp and send TradeSuspension

market: Add Status type and (*Market).Status

dex: add MarketStatus and MarketStatuses methods for admin API

admin: add /markets

The markets route summarizes configured markets.  e.g.

$   dexadm markets
{
    "btc_ltc": {
        "running": true,
        "epochlen": 6000,
        "activeepoch": 264872312,
        "startepoch": 264872312
    },
    "dcr_btc": {
        "running": true,
        "epochlen": 7000,
        "activeepoch": 227033410,
        "startepoch": 227033410
    }
}

$   dexadm market/dcr_btc/suspend
{
    "market": "dcr_btc",
    "finalepoch": 227033424,
    "supendtime": "2020-05-11T21:52:55Z"
}

$   dexadm markets{
    "btc_ltc": {
        "running": true,
        "epochlen": 6000,
        "activeepoch": 264872328,
        "startepoch": 264872328
    },
    "dcr_btc": {
        "running": true,
        "epochlen": 7000,
        "activeepoch": 227033424,
        "startepoch": 227033424,
        "finalepoch": 227033424,
        "persistbook": true
    }
}

$   dexadm markets
{
    "btc_ltc": {
        "running": true,
        "epochlen": 6000,
        "activeepoch": 264872329,
        "startepoch": 264872329
    },
    "dcr_btc": {
        "running": false,
        "epochlen": 7000,
        "activeepoch": 0,
        "startepoch": 0,
        "finalepoch": 227033424,
        "persistbook": true
    }
}
  • Loading branch information
chappjc committed May 14, 2020
1 parent 7d77619 commit a47e2cd
Show file tree
Hide file tree
Showing 26 changed files with 2,454 additions and 474 deletions.
3 changes: 1 addition & 2 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1945,8 +1945,7 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) {
}

assets := make(map[uint32]*dex.Asset, len(dexCfg.Assets))
for i := range dexCfg.Assets {
asset := &dexCfg.Assets[i]
for _, asset := range dexCfg.Assets {
assets[asset.ID] = convertAssetInfo(asset)
}
// Validate the markets so we don't have to check every time later.
Expand Down
10 changes: 5 additions & 5 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) {
cfg: &msgjson.ConfigResult{
CancelMax: 0.8,
BroadcastTimeout: 5 * 60 * 1000,
Assets: []msgjson.Asset{
*uncovertAssetInfo(tDCR),
*uncovertAssetInfo(tBTC),
Assets: []*msgjson.Asset{
uncovertAssetInfo(tDCR),
uncovertAssetInfo(tBTC),
},
Markets: []msgjson.Market{
Markets: []*msgjson.Market{
{
Name: tDcrBtcMktName,
Base: tDCR.ID,
Expand Down Expand Up @@ -633,7 +633,7 @@ func TestMarkets(t *testing.T) {
base, quote := randomMsgMarket()
marketIDs[marketName(base.ID, quote.ID)] = struct{}{}
cfg := rig.dc.cfg
cfg.Markets = append(cfg.Markets, msgjson.Market{
cfg.Markets = append(cfg.Markets, &msgjson.Market{
Name: base.Symbol + quote.Symbol,
Base: base.ID,
Quote: quote.ID,
Expand Down
74 changes: 62 additions & 12 deletions dex/msgjson/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
RPCGetFeeError // 39
RPCRegisterError // 40
RPCArgumentsError // 41
MarketNotRunningError // 42
)

// Routes are destinations for a "payload" of data. The type of data being
Expand Down Expand Up @@ -127,8 +128,13 @@ const (
// preimages for the client's epoch orders.
PreimageRoute = "preimage"
// SuspensionRoute is the DEX-originating request-type message informing the
// client of an upcoming trade suspension.
// client of an upcoming trade suspension. This is part of the
// subscription-based orderbook notification feed.
SuspensionRoute = "suspension"
// ResumptionRoute is the DEX-originating request-type message informing the
// client of an upcoming trade resumption. This is part of the
// subscription-based orderbook notification feed.
ResumptionRoute = "resumption"
)

type Bytes = dex.Bytes
Expand Down Expand Up @@ -212,6 +218,21 @@ const (
Notification // 3
)

// String satisfies the Stringer interface for translating the MessageType code
// into a description, primarily for logging.
func (mt MessageType) String() string {
switch mt {
case Request:
return "request"
case Response:
return "response"
case Notification:
return "notification"
default:
return "unknown MessageType"
}
}

// Message is the primary messaging type for websocket communications.
type Message struct {
// Type is the message type.
Expand Down Expand Up @@ -297,7 +318,7 @@ func (msg *Message) Response() (*ResponsePayload, error) {
// NewNotification encodes the payload and creates a Notification-type *Message.
func NewNotification(route string, payload interface{}) (*Message, error) {
if route == "" {
return nil, fmt.Errorf("empty string not allowed for route of request-type message")
return nil, fmt.Errorf("empty string not allowed for route of notification-type message")
}
encPayload, err := json.Marshal(payload)
if err != nil {
Expand Down Expand Up @@ -669,6 +690,7 @@ type OrderBook struct {
MarketID string `json:"marketid"`
Seq uint64 `json:"seq"`
Epoch uint64 `json:"epoch"`
// MarketStatus `json:"status"`// maybe
// DRAFT NOTE: We might want to use a different structure for bulk updates.
// Sending a struct of arrays rather than an array of structs could
// potentially cut the encoding effort and encoded size substantially.
Expand All @@ -685,6 +707,24 @@ type MatchProofNote struct {
Seed Bytes `json:"seed"`
}

// TradeSuspension is the SuspensionRoute notification payload. It is part of
// the orderbook subscription.
type TradeSuspension struct {
MarketID string `json:"marketid"`
FinalEpoch uint64 `json:"finalepoch"`
SuspendTime uint64 `json:"suspendtime"`
Persist bool `json:"persistbook"`
}

// TradeResumption is the ResumptionRoute notification payload. It is part of
// the orderbook subscription. EpochLen is specified if the market configuration
// change, and the client should also hit the 'config' route for full details.
type TradeResumption struct {
MarketID string `json:"marketid"`
StartEpoch uint64 `json:"startepoch"`
EpochLen uint64 `json:"epochlen,omitempty"` // maybe just ConfigChange bool `json:"configchange"`
}

// PreimageRequest is the server-originating preimage request payload.
type PreimageRequest struct {
OrderID Bytes `json:"orderid"`
Expand Down Expand Up @@ -789,25 +829,25 @@ type NotifyFeeResult struct {
signable
}

// ConfigResult is the successful result from the 'config' route.
type ConfigResult struct {
CancelMax float64 `json:"cancelmax"`
BroadcastTimeout uint64 `json:"btimeout"`
RegFeeConfirms uint16 `json:"regfeeconfirms"`
Assets []Asset `json:"assets"`
Markets []Market `json:"markets"`
Fee uint64 `json:"fee"`
// MarketStatus describes the status of the market, where StartEpoch is when the
// market started or will start. FinalEpoch is a when the market will suspend
// if it is running, or when the market suspended if it is presently stopped.
type MarketStatus struct {
StartEpoch uint64 `json:"startepoch"`
FinalEpoch uint64 `json:"finalepoch,omitempty"`
Persist *bool `json:"persistbook,omitempty"` // nil and omitted when finalepoch is omitted
}

// Market describes a market and its variables, and is returned as part of a
// ConfigResult.
// ConfigResult. The market's status (running, start epoch, and any planned
// final epoch before suspend) are also provided.
type Market struct {
Name string `json:"name"`
Base uint32 `json:"base"`
Quote uint32 `json:"quote"`
EpochLen uint64 `json:"epochlen"`
StartEpoch uint64 `json:"startepoch"`
MarketBuyBuffer float64 `json:"buybuffer"`
MarketStatus `json:"status"`
}

// Asset describes an asset and its variables, and is returned as part of a
Expand All @@ -823,6 +863,16 @@ type Asset struct {
FundConf uint16 `json:"fundconf"`
}

// ConfigResult is the successful result for the ConfigRoute.
type ConfigResult struct {
CancelMax float64 `json:"cancelmax"`
BroadcastTimeout uint64 `json:"btimeout"`
RegFeeConfirms uint16 `json:"regfeeconfirms"`
Assets []*Asset `json:"assets"`
Markets []*Market `json:"markets"`
Fee uint64 `json:"fee"`
}

// Convert uint64 to 8 bytes.
func uint64Bytes(i uint64) []byte {
b := make([]byte, 8)
Expand Down
111 changes: 111 additions & 0 deletions server/admin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ package admin

import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"

"decred.org/dcrdex/dex/encode"
"github.com/go-chi/chi"
)

// writeJSON marshals the provided interface and writes the bytes to the
Expand Down Expand Up @@ -35,3 +42,107 @@ func (_ *Server) apiPing(w http.ResponseWriter, _ *http.Request) {
func (s *Server) apiConfig(w http.ResponseWriter, _ *http.Request) {
writeJSON(w, s.core.ConfigMsg())
}

func (s *Server) apiMarkets(w http.ResponseWriter, r *http.Request) {
statuses := s.core.MarketStatuses()
mktStatuses := make(map[string]*MarketStatus)
for name, status := range statuses {
mktStatus := &MarketStatus{
// Name is empty since the key is the name.
Running: status.Running,
EpochDuration: status.EpochDuration,
ActiveEpoch: status.ActiveEpoch,
StartEpoch: status.StartEpoch,
SuspendEpoch: status.SuspendEpoch,
}
if status.SuspendEpoch != 0 {
persist := status.PersistBook
mktStatus.PersistBook = &persist
}
mktStatuses[name] = mktStatus
}

writeJSON(w, mktStatuses)
}

// apiMarketInfo is the handler for the '/market/{marketName}' API request.
func (s *Server) apiMarketInfo(w http.ResponseWriter, r *http.Request) {
mkt := strings.ToLower(chi.URLParam(r, marketNameKey))
status := s.core.MarketStatus(mkt)
if status == nil {
http.Error(w, fmt.Sprintf("unknown market %q", mkt), http.StatusBadRequest)
return
}

mktStatus := &MarketStatus{
Name: mkt,
Running: status.Running,
EpochDuration: status.EpochDuration,
ActiveEpoch: status.ActiveEpoch,
StartEpoch: status.ActiveEpoch,
SuspendEpoch: status.SuspendEpoch,
}
if status.SuspendEpoch != 0 {
persist := status.PersistBook
mktStatus.PersistBook = &persist
}
writeJSON(w, mktStatus)
}

// hander for route '/market/{marketName}/suspend?t=EPOCH-MS&persist=BOOL'
func (s *Server) apiSuspend(w http.ResponseWriter, r *http.Request) {
// Ensure the market exists and is running.
mkt := strings.ToLower(chi.URLParam(r, marketNameKey))
found, running := s.core.MarketRunning(mkt)
if !found {
http.Error(w, fmt.Sprintf("unknown market %q", mkt), http.StatusBadRequest)
return
}
if !running {
http.Error(w, fmt.Sprintf("market %q not running", mkt), http.StatusBadRequest)
return
}

// Validate the suspend time provided in the "t" query. If not specified,
// the zero time.Time is used to indicate ASAP.
var suspTime time.Time
if tSuspendStr := r.URL.Query().Get("t"); tSuspendStr != "" {
suspTimeMs, err := strconv.ParseInt(tSuspendStr, 10, 64)
if err != nil {
http.Error(w, fmt.Sprintf("invalid suspend time %q: %v", tSuspendStr, err), http.StatusBadRequest)
return
}

suspTime = encode.UnixTimeMilli(suspTimeMs)
if time.Until(suspTime) < 0 {
http.Error(w, fmt.Sprintf("specified market suspend time is in the past: %v", suspTime),
http.StatusBadRequest)
return
}
}

// Validate the persist book flag provided in the "persist" query. If not
// specified, persist the books, do not purge.
persistBook := true
if persistBookStr := r.URL.Query().Get("persist"); persistBookStr != "" {
var err error
persistBook, err = strconv.ParseBool(persistBookStr)
if err != nil {
http.Error(w, fmt.Sprintf("invalid persist book boolean %q: %v", persistBookStr, err), http.StatusBadRequest)
return
}
}

suspEpoch := s.core.SuspendMarket(mkt, suspTime, persistBook)
if suspEpoch == nil {
// Should not happen.
http.Error(w, "failed to suspend market "+mkt, http.StatusInternalServerError)
return
}

writeJSON(w, &SuspendResult{
Market: mkt,
FinalEpoch: suspEpoch.Idx,
SuspendTime: APITime{suspEpoch.End},
})
}
15 changes: 14 additions & 1 deletion server/admin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sync"
"time"

"decred.org/dcrdex/server/market"
"github.com/decred/slog"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
Expand All @@ -27,15 +28,21 @@ const (
// server is allowed to stay open without authenticating before it
// is closed.
rpcTimeoutSeconds = 10

marketNameKey = "market"
)

var (
log slog.Logger
)

// SvrCore is satisfied by core.Core.
// SvrCore is satisfied by server/dex.DEX.
type SvrCore interface {
ConfigMsg() json.RawMessage
MarketRunning(mktName string) (found, running bool)
MarketStatus(mktName string) *market.Status
MarketStatuses() map[string]*market.Status
SuspendMarket(name string, tSusp time.Time, persistBooks bool) *market.SuspendEpoch
}

// Server is a multi-client https server.
Expand Down Expand Up @@ -111,6 +118,12 @@ func NewServer(cfg *SrvConfig) (*Server, error) {
r.Use(middleware.AllowContentType("application/json"))
r.Get("/ping", s.apiPing)
r.Get("/config", s.apiConfig)

r.Get("/markets", s.apiMarkets)
r.Route("/market/{"+marketNameKey+"}", func(rm chi.Router) {
rm.Get("/", s.apiMarketInfo)
rm.Get("/suspend", s.apiSuspend)
})
})

return s, nil
Expand Down
Loading

0 comments on commit a47e2cd

Please sign in to comment.