diff --git a/client/core/core.go b/client/core/core.go index 23dc7aa30e..242a6c173f 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -1945,8 +1945,7 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) { } assets := make(map[uint32]*dex.Asset, len(dexCfg.Assets)) - for i := range dexCfg.Assets { - asset := &dexCfg.Assets[i] + for _, asset := range dexCfg.Assets { assets[asset.ID] = convertAssetInfo(asset) } // Validate the markets so we don't have to check every time later. diff --git a/client/core/core_test.go b/client/core/core_test.go index c1b51460b4..8fffb07be3 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -163,11 +163,11 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) { cfg: &msgjson.ConfigResult{ CancelMax: 0.8, BroadcastTimeout: 5 * 60 * 1000, - Assets: []msgjson.Asset{ - *uncovertAssetInfo(tDCR), - *uncovertAssetInfo(tBTC), + Assets: []*msgjson.Asset{ + uncovertAssetInfo(tDCR), + uncovertAssetInfo(tBTC), }, - Markets: []msgjson.Market{ + Markets: []*msgjson.Market{ { Name: tDcrBtcMktName, Base: tDCR.ID, @@ -633,7 +633,7 @@ func TestMarkets(t *testing.T) { base, quote := randomMsgMarket() marketIDs[marketName(base.ID, quote.ID)] = struct{}{} cfg := rig.dc.cfg - cfg.Markets = append(cfg.Markets, msgjson.Market{ + cfg.Markets = append(cfg.Markets, &msgjson.Market{ Name: base.Symbol + quote.Symbol, Base: base.ID, Quote: quote.ID, diff --git a/dex/msgjson/types.go b/dex/msgjson/types.go index c52e97c023..bb38754b62 100644 --- a/dex/msgjson/types.go +++ b/dex/msgjson/types.go @@ -55,6 +55,7 @@ const ( RPCGetFeeError // 39 RPCRegisterError // 40 RPCArgumentsError // 41 + MarketNotRunningError // 42 ) // Routes are destinations for a "payload" of data. The type of data being @@ -127,8 +128,13 @@ const ( // preimages for the client's epoch orders. PreimageRoute = "preimage" // SuspensionRoute is the DEX-originating request-type message informing the - // client of an upcoming trade suspension. + // client of an upcoming trade suspension. This is part of the + // subscription-based orderbook notification feed. SuspensionRoute = "suspension" + // ResumptionRoute is the DEX-originating request-type message informing the + // client of an upcoming trade resumption. This is part of the + // subscription-based orderbook notification feed. + ResumptionRoute = "resumption" ) type Bytes = dex.Bytes @@ -212,6 +218,21 @@ const ( Notification // 3 ) +// String satisfies the Stringer interface for translating the MessageType code +// into a description, primarily for logging. +func (mt MessageType) String() string { + switch mt { + case Request: + return "request" + case Response: + return "response" + case Notification: + return "notification" + default: + return "unknown MessageType" + } +} + // Message is the primary messaging type for websocket communications. type Message struct { // Type is the message type. @@ -297,7 +318,7 @@ func (msg *Message) Response() (*ResponsePayload, error) { // NewNotification encodes the payload and creates a Notification-type *Message. func NewNotification(route string, payload interface{}) (*Message, error) { if route == "" { - return nil, fmt.Errorf("empty string not allowed for route of request-type message") + return nil, fmt.Errorf("empty string not allowed for route of notification-type message") } encPayload, err := json.Marshal(payload) if err != nil { @@ -669,6 +690,7 @@ type OrderBook struct { MarketID string `json:"marketid"` Seq uint64 `json:"seq"` Epoch uint64 `json:"epoch"` + // MarketStatus `json:"status"`// maybe // DRAFT NOTE: We might want to use a different structure for bulk updates. // Sending a struct of arrays rather than an array of structs could // potentially cut the encoding effort and encoded size substantially. @@ -685,6 +707,24 @@ type MatchProofNote struct { Seed Bytes `json:"seed"` } +// TradeSuspension is the SuspensionRoute notification payload. It is part of +// the orderbook subscription. +type TradeSuspension struct { + MarketID string `json:"marketid"` + FinalEpoch uint64 `json:"finalepoch"` + SuspendTime uint64 `json:"suspendtime"` + Persist bool `json:"persistbook"` +} + +// TradeResumption is the ResumptionRoute notification payload. It is part of +// the orderbook subscription. EpochLen is specified if the market configuration +// change, and the client should also hit the 'config' route for full details. +type TradeResumption struct { + MarketID string `json:"marketid"` + StartEpoch uint64 `json:"startepoch"` + EpochLen uint64 `json:"epochlen,omitempty"` // maybe just ConfigChange bool `json:"configchange"` +} + // PreimageRequest is the server-originating preimage request payload. type PreimageRequest struct { OrderID Bytes `json:"orderid"` @@ -789,25 +829,25 @@ type NotifyFeeResult struct { signable } -// ConfigResult is the successful result from the 'config' route. -type ConfigResult struct { - CancelMax float64 `json:"cancelmax"` - BroadcastTimeout uint64 `json:"btimeout"` - RegFeeConfirms uint16 `json:"regfeeconfirms"` - Assets []Asset `json:"assets"` - Markets []Market `json:"markets"` - Fee uint64 `json:"fee"` +// MarketStatus describes the status of the market, where StartEpoch is when the +// market started or will start. FinalEpoch is a when the market will suspend +// if it is running, or when the market suspended if it is presently stopped. +type MarketStatus struct { + StartEpoch uint64 `json:"startepoch"` + FinalEpoch uint64 `json:"finalepoch,omitempty"` + Persist *bool `json:"persistbook,omitempty"` // nil and omitted when finalepoch is omitted } // Market describes a market and its variables, and is returned as part of a -// ConfigResult. +// ConfigResult. The market's status (running, start epoch, and any planned +// final epoch before suspend) are also provided. type Market struct { Name string `json:"name"` Base uint32 `json:"base"` Quote uint32 `json:"quote"` EpochLen uint64 `json:"epochlen"` - StartEpoch uint64 `json:"startepoch"` MarketBuyBuffer float64 `json:"buybuffer"` + MarketStatus `json:"status"` } // Asset describes an asset and its variables, and is returned as part of a @@ -823,6 +863,16 @@ type Asset struct { FundConf uint16 `json:"fundconf"` } +// ConfigResult is the successful result for the ConfigRoute. +type ConfigResult struct { + CancelMax float64 `json:"cancelmax"` + BroadcastTimeout uint64 `json:"btimeout"` + RegFeeConfirms uint16 `json:"regfeeconfirms"` + Assets []*Asset `json:"assets"` + Markets []*Market `json:"markets"` + Fee uint64 `json:"fee"` +} + // Convert uint64 to 8 bytes. func uint64Bytes(i uint64) []byte { b := make([]byte, 8) diff --git a/server/admin/api.go b/server/admin/api.go index 7dda03c596..d35b71bcae 100644 --- a/server/admin/api.go +++ b/server/admin/api.go @@ -5,7 +5,14 @@ package admin import ( "encoding/json" + "fmt" "net/http" + "strconv" + "strings" + "time" + + "decred.org/dcrdex/dex/encode" + "github.com/go-chi/chi" ) // writeJSON marshals the provided interface and writes the bytes to the @@ -35,3 +42,107 @@ func (_ *Server) apiPing(w http.ResponseWriter, _ *http.Request) { func (s *Server) apiConfig(w http.ResponseWriter, _ *http.Request) { writeJSON(w, s.core.ConfigMsg()) } + +func (s *Server) apiMarkets(w http.ResponseWriter, r *http.Request) { + statuses := s.core.MarketStatuses() + mktStatuses := make(map[string]*MarketStatus) + for name, status := range statuses { + mktStatus := &MarketStatus{ + // Name is empty since the key is the name. + Running: status.Running, + EpochDuration: status.EpochDuration, + ActiveEpoch: status.ActiveEpoch, + StartEpoch: status.StartEpoch, + SuspendEpoch: status.SuspendEpoch, + } + if status.SuspendEpoch != 0 { + persist := status.PersistBook + mktStatus.PersistBook = &persist + } + mktStatuses[name] = mktStatus + } + + writeJSON(w, mktStatuses) +} + +// apiMarketInfo is the handler for the '/market/{marketName}' API request. +func (s *Server) apiMarketInfo(w http.ResponseWriter, r *http.Request) { + mkt := strings.ToLower(chi.URLParam(r, marketNameKey)) + status := s.core.MarketStatus(mkt) + if status == nil { + http.Error(w, fmt.Sprintf("unknown market %q", mkt), http.StatusBadRequest) + return + } + + mktStatus := &MarketStatus{ + Name: mkt, + Running: status.Running, + EpochDuration: status.EpochDuration, + ActiveEpoch: status.ActiveEpoch, + StartEpoch: status.ActiveEpoch, + SuspendEpoch: status.SuspendEpoch, + } + if status.SuspendEpoch != 0 { + persist := status.PersistBook + mktStatus.PersistBook = &persist + } + writeJSON(w, mktStatus) +} + +// hander for route '/market/{marketName}/suspend?t=EPOCH-MS&persist=BOOL' +func (s *Server) apiSuspend(w http.ResponseWriter, r *http.Request) { + // Ensure the market exists and is running. + mkt := strings.ToLower(chi.URLParam(r, marketNameKey)) + found, running := s.core.MarketRunning(mkt) + if !found { + http.Error(w, fmt.Sprintf("unknown market %q", mkt), http.StatusBadRequest) + return + } + if !running { + http.Error(w, fmt.Sprintf("market %q not running", mkt), http.StatusBadRequest) + return + } + + // Validate the suspend time provided in the "t" query. If not specified, + // the zero time.Time is used to indicate ASAP. + var suspTime time.Time + if tSuspendStr := r.URL.Query().Get("t"); tSuspendStr != "" { + suspTimeMs, err := strconv.ParseInt(tSuspendStr, 10, 64) + if err != nil { + http.Error(w, fmt.Sprintf("invalid suspend time %q: %v", tSuspendStr, err), http.StatusBadRequest) + return + } + + suspTime = encode.UnixTimeMilli(suspTimeMs) + if time.Until(suspTime) < 0 { + http.Error(w, fmt.Sprintf("specified market suspend time is in the past: %v", suspTime), + http.StatusBadRequest) + return + } + } + + // Validate the persist book flag provided in the "persist" query. If not + // specified, persist the books, do not purge. + persistBook := true + if persistBookStr := r.URL.Query().Get("persist"); persistBookStr != "" { + var err error + persistBook, err = strconv.ParseBool(persistBookStr) + if err != nil { + http.Error(w, fmt.Sprintf("invalid persist book boolean %q: %v", persistBookStr, err), http.StatusBadRequest) + return + } + } + + suspEpoch := s.core.SuspendMarket(mkt, suspTime, persistBook) + if suspEpoch == nil { + // Should not happen. + http.Error(w, "failed to suspend market "+mkt, http.StatusInternalServerError) + return + } + + writeJSON(w, &SuspendResult{ + Market: mkt, + FinalEpoch: suspEpoch.Idx, + SuspendTime: APITime{suspEpoch.End}, + }) +} diff --git a/server/admin/server.go b/server/admin/server.go index a6d4591daa..d4b47c3a0e 100644 --- a/server/admin/server.go +++ b/server/admin/server.go @@ -17,6 +17,7 @@ import ( "sync" "time" + "decred.org/dcrdex/server/market" "github.com/decred/slog" "github.com/go-chi/chi" "github.com/go-chi/chi/middleware" @@ -27,15 +28,21 @@ const ( // server is allowed to stay open without authenticating before it // is closed. rpcTimeoutSeconds = 10 + + marketNameKey = "market" ) var ( log slog.Logger ) -// SvrCore is satisfied by core.Core. +// SvrCore is satisfied by server/dex.DEX. type SvrCore interface { ConfigMsg() json.RawMessage + MarketRunning(mktName string) (found, running bool) + MarketStatus(mktName string) *market.Status + MarketStatuses() map[string]*market.Status + SuspendMarket(name string, tSusp time.Time, persistBooks bool) *market.SuspendEpoch } // Server is a multi-client https server. @@ -111,6 +118,12 @@ func NewServer(cfg *SrvConfig) (*Server, error) { r.Use(middleware.AllowContentType("application/json")) r.Get("/ping", s.apiPing) r.Get("/config", s.apiConfig) + + r.Get("/markets", s.apiMarkets) + r.Route("/market/{"+marketNameKey+"}", func(rm chi.Router) { + rm.Get("/", s.apiMarketInfo) + rm.Get("/suspend", s.apiSuspend) + }) }) return s, nil diff --git a/server/admin/server_test.go b/server/admin/server_test.go index 33353aaf3e..2485ac6507 100644 --- a/server/admin/server_test.go +++ b/server/admin/server_test.go @@ -14,13 +14,17 @@ import ( "net/http/httptest" "os" "path/filepath" + "reflect" + "strings" "sync" "testing" "time" - dexsrv "decred.org/dcrdex/server/dex" + "decred.org/dcrdex/dex/encode" + "decred.org/dcrdex/server/market" "github.com/decred/dcrd/certgen" "github.com/decred/slog" + "github.com/go-chi/chi" ) func init() { @@ -28,15 +32,88 @@ func init() { log.SetLevel(slog.LevelTrace) } -var ( - // Check that *dexsrv.DEX satisfies SvrCore. - _ SvrCore = (*dexsrv.DEX)(nil) -) +type TMarket struct { + running bool + ep0, ep int64 + dur uint64 + suspend *market.SuspendEpoch + persist bool +} -type TCore struct{} +type TCore struct { + markets map[string]*TMarket +} func (c *TCore) ConfigMsg() json.RawMessage { return nil } +func (c *TCore) Suspend(tSusp time.Time, persistBooks bool) map[string]*market.SuspendEpoch { + return nil +} + +func (c *TCore) SuspendMarket(name string, tSusp time.Time, persistBooks bool) *market.SuspendEpoch { + tMkt := c.markets[name] + if tMkt == nil { + return nil + } + tMkt.persist = persistBooks + tMkt.suspend.Idx = encode.UnixMilli(tSusp) + tMkt.suspend.End = tSusp.Add(time.Millisecond) + return tMkt.suspend +} + +func (c *TCore) market(name string) *TMarket { + if c.markets == nil { + return nil + } + return c.markets[name] +} + +func (c *TCore) MarketStatus(mktName string) *market.Status { + mkt := c.market(mktName) + if mkt == nil { + return nil + } + var suspendEpoch int64 + if mkt.suspend != nil { + suspendEpoch = mkt.suspend.Idx + } + return &market.Status{ + Running: mkt.running, + EpochDuration: mkt.dur, + ActiveEpoch: mkt.ep, + StartEpoch: mkt.ep0, + SuspendEpoch: suspendEpoch, + PersistBook: mkt.persist, + } +} + +func (c *TCore) MarketStatuses() map[string]*market.Status { + mktStatuses := make(map[string]*market.Status, len(c.markets)) + for name, mkt := range c.markets { + var suspendEpoch int64 + if mkt.suspend != nil { + suspendEpoch = mkt.suspend.Idx + } + mktStatuses[name] = &market.Status{ + Running: mkt.running, + EpochDuration: mkt.dur, + ActiveEpoch: mkt.ep, + StartEpoch: mkt.ep0, + SuspendEpoch: suspendEpoch, + PersistBook: mkt.persist, + } + } + return mktStatuses +} + +func (c *TCore) MarketRunning(mktName string) (found, running bool) { + mkt := c.market(mktName) + if mkt == nil { + return + } + return true, mkt.running +} + type tResponseWriter struct { b []byte code int @@ -149,6 +226,419 @@ func TestPing(t *testing.T) { } } +func TestMarkets(t *testing.T) { + core := &TCore{ + markets: make(map[string]*TMarket), + } + srv := &Server{ + core: core, + } + + mux := chi.NewRouter() + mux.Get("/markets", srv.apiMarkets) + + // No markets. + w := httptest.NewRecorder() + r, _ := http.NewRequest("GET", "https://localhost/markets", nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusOK { + t.Fatalf("apiMarkets returned code %d, expected %d", w.Code, http.StatusOK) + } + respBody := w.Body.String() + if respBody != fmt.Sprintf("{}\n") { + t.Errorf("incorrect response body: %q", respBody) + } + + // A market. + dur := uint64(1234) + idx := int64(12345) + tMkt := &TMarket{ + running: true, + dur: dur, + ep0: 12340, + ep: 12343, + } + core.markets["dcr_btc"] = tMkt + + w = httptest.NewRecorder() + r, _ = http.NewRequest("GET", "https://localhost/markets", nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusOK { + t.Fatalf("apiMarkets returned code %d, expected %d", w.Code, http.StatusOK) + } + + exp := `{ + "dcr_btc": { + "running": true, + "epochlen": 1234, + "activeepoch": 12343, + "startepoch": 12340 + } +} +` + if exp != w.Body.String() { + t.Errorf("unexpected response %q, wanted %q", w.Body.String(), exp) + } + + var mktStatuses map[string]*MarketStatus + err := json.Unmarshal(w.Body.Bytes(), &mktStatuses) + if err != nil { + t.Fatalf("Failed to unmarshal result: %v", err) + } + + wantMktStatuses := map[string]*MarketStatus{ + "dcr_btc": { + Running: true, + EpochDuration: 1234, + ActiveEpoch: 12343, + StartEpoch: 12340, + }, + } + if len(wantMktStatuses) != len(mktStatuses) { + t.Fatalf("got %d market statuses, wanted %d", len(mktStatuses), len(wantMktStatuses)) + } + for name, stat := range mktStatuses { + wantStat := wantMktStatuses[name] + if wantStat == nil { + t.Fatalf("market %s not expected", name) + } + if !reflect.DeepEqual(wantStat, stat) { + log.Errorf("incorrect market status. got %v, expected %v", stat, wantStat) + } + } + + // Set suspend data. + tMkt.suspend = &market.SuspendEpoch{Idx: 12345, End: encode.UnixTimeMilli(int64(dur) * idx)} + tMkt.persist = true + + w = httptest.NewRecorder() + r, _ = http.NewRequest("GET", "https://localhost/markets", nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusOK { + t.Fatalf("apiMarkets returned code %d, expected %d", w.Code, http.StatusOK) + } + + exp = `{ + "dcr_btc": { + "running": true, + "epochlen": 1234, + "activeepoch": 12343, + "startepoch": 12340, + "finalepoch": 12345, + "persistbook": true + } +} +` + if exp != w.Body.String() { + t.Errorf("unexpected response %q, wanted %q", w.Body.String(), exp) + } + + mktStatuses = nil + err = json.Unmarshal(w.Body.Bytes(), &mktStatuses) + if err != nil { + t.Fatalf("Failed to unmarshal result: %v", err) + } + + persist := true + wantMktStatuses = map[string]*MarketStatus{ + "dcr_btc": { + Running: true, + EpochDuration: 1234, + ActiveEpoch: 12343, + StartEpoch: 12340, + SuspendEpoch: 12345, + PersistBook: &persist, + }, + } + if len(wantMktStatuses) != len(mktStatuses) { + t.Fatalf("got %d market statuses, wanted %d", len(mktStatuses), len(wantMktStatuses)) + } + for name, stat := range mktStatuses { + wantStat := wantMktStatuses[name] + if wantStat == nil { + t.Fatalf("market %s not expected", name) + } + if !reflect.DeepEqual(wantStat, stat) { + log.Errorf("incorrect market status. got %v, expected %v", stat, wantStat) + } + } +} + +func TestMarketInfo(t *testing.T) { + + core := &TCore{ + markets: make(map[string]*TMarket), + } + srv := &Server{ + core: core, + } + + mux := chi.NewRouter() + mux.Get("/market/{"+marketNameKey+"}", srv.apiMarketInfo) + + // Request a non-existent market. + w := httptest.NewRecorder() + name := "dcr_btc" + r, _ := http.NewRequest("GET", "https://localhost/market/"+name, nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusBadRequest { + t.Fatalf("apiMarketInfo returned code %d, expected %d", w.Code, http.StatusBadRequest) + } + respBody := w.Body.String() + if respBody != fmt.Sprintf("unknown market %q\n", name) { + t.Errorf("incorrect response body: %q", respBody) + } + + tMkt := &TMarket{} + core.markets[name] = tMkt + + // Not running market. + w = httptest.NewRecorder() + r, _ = http.NewRequest("GET", "https://localhost/market/"+name, nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusOK { + t.Fatalf("apiMarketInfo returned code %d, expected %d", w.Code, http.StatusOK) + } + mktStatus := new(MarketStatus) + err := json.Unmarshal(w.Body.Bytes(), &mktStatus) + if err != nil { + t.Fatalf("Failed to unmarshal result: %v", err) + } + if mktStatus.Name != name { + t.Errorf("incorrect market name %q, expected %q", mktStatus.Name, name) + } + if mktStatus.Running { + t.Errorf("market should not have been reported as running") + } + + // Flip the market on. + core.markets[name].running = true + core.markets[name].suspend = &market.SuspendEpoch{Idx: 1324, End: time.Now()} + w = httptest.NewRecorder() + r, _ = http.NewRequest("GET", "https://localhost/market/"+name, nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusOK { + t.Fatalf("apiMarketInfo returned code %d, expected %d", w.Code, http.StatusOK) + } + mktStatus = new(MarketStatus) + err = json.Unmarshal(w.Body.Bytes(), &mktStatus) + if err != nil { + t.Fatalf("Failed to unmarshal result: %v", err) + } + if mktStatus.Name != name { + t.Errorf("incorrect market name %q, expected %q", mktStatus.Name, name) + } + if !mktStatus.Running { + t.Errorf("market should have been reported as running") + } +} + +func TestSuspend(t *testing.T) { + + core := &TCore{ + markets: make(map[string]*TMarket), + } + srv := &Server{ + core: core, + } + + mux := chi.NewRouter() + mux.Get("/market/{"+marketNameKey+"}/suspend", srv.apiSuspend) + + // Non-existent market + name := "dcr_btc" + w := httptest.NewRecorder() + r, _ := http.NewRequest("GET", "https://localhost/market/"+name+"/suspend", nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusBadRequest { + t.Fatalf("apiSuspend returned code %d, expected %d", w.Code, http.StatusBadRequest) + } + + // With the market, but not running + tMkt := &TMarket{ + suspend: &market.SuspendEpoch{}, + } + core.markets[name] = tMkt + + w = httptest.NewRecorder() + r, _ = http.NewRequest("GET", "https://localhost/market/"+name+"/suspend", nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusBadRequest { + t.Fatalf("apiSuspend returned code %d, expected %d", w.Code, http.StatusOK) + } + wantMsg := "market \"dcr_btc\" not running\n" + if w.Body.String() != wantMsg { + t.Errorf("expected body %q, got %q", wantMsg, w.Body) + } + + // Now running. + tMkt.running = true + w = httptest.NewRecorder() + r, _ = http.NewRequest("GET", "https://localhost/market/"+name+"/suspend", nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusOK { + t.Fatalf("apiSuspend returned code %d, expected %d", w.Code, http.StatusOK) + } + suspRes := new(SuspendResult) + err := json.Unmarshal(w.Body.Bytes(), &suspRes) + if err != nil { + t.Fatalf("Failed to unmarshal result: %v", err) + } + if suspRes.Market != name { + t.Errorf("incorrect market name %q, expected %q", suspRes.Market, name) + } + + var zeroTime time.Time + wantIdx := encode.UnixMilli(zeroTime) + if suspRes.FinalEpoch != wantIdx { + t.Errorf("incorrect final epoch index. got %d, expected %d", + suspRes.FinalEpoch, tMkt.suspend.Idx) + } + + wantFinal := zeroTime.Add(time.Millisecond) + if suspRes.SuspendTime.Equal(wantFinal) { + t.Errorf("incorrect suspend time. got %v, expected %v", + suspRes.SuspendTime, tMkt.suspend.End) + } + + // Specify a time in the past. + w = httptest.NewRecorder() + r, _ = http.NewRequest("GET", "https://localhost/market/"+name+"/suspend?t=12", nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusBadRequest { + t.Fatalf("apiSuspend returned code %d, expected %d", w.Code, http.StatusOK) + } + resp := w.Body.String() + wantPrefix := "specified market suspend time is in the past" + if !strings.HasPrefix(resp, wantPrefix) { + t.Errorf("Expected error message starting with %q, got %q", wantPrefix, resp) + } + + // Bad suspend time (not a time) + w = httptest.NewRecorder() + r, _ = http.NewRequest("GET", "https://localhost/market/"+name+"/suspend?t=QWERT", nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusBadRequest { + t.Fatalf("apiSuspend returned code %d, expected %d", w.Code, http.StatusOK) + } + resp = w.Body.String() + wantPrefix = "invalid suspend time" + if !strings.HasPrefix(resp, wantPrefix) { + t.Errorf("Expected error message starting with %q, got %q", wantPrefix, resp) + } + + // Good suspend time, one minute in the future + w = httptest.NewRecorder() + tMsFuture := encode.UnixMilli(time.Now().Add(time.Minute)) + r, _ = http.NewRequest("GET", fmt.Sprintf("https://localhost/market/%v/suspend?t=%d", name, tMsFuture), nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusOK { + t.Fatalf("apiSuspend returned code %d, expected %d", w.Code, http.StatusOK) + } + suspRes = new(SuspendResult) + err = json.Unmarshal(w.Body.Bytes(), &suspRes) + if err != nil { + t.Fatalf("Failed to unmarshal result: %v", err) + } + + if suspRes.FinalEpoch != tMsFuture { + t.Errorf("incorrect final epoch index. got %d, expected %d", + suspRes.FinalEpoch, tMsFuture) + } + + wantFinal = encode.UnixTimeMilli(tMsFuture + 1) + if suspRes.SuspendTime.Equal(wantFinal) { + t.Errorf("incorrect suspend time. got %v, expected %v", + suspRes.SuspendTime, wantFinal) + } + + if !tMkt.persist { + t.Errorf("market persist was false") + } + + // persist=true (OK) + w = httptest.NewRecorder() + r, _ = http.NewRequest("GET", "https://localhost/market/"+name+"/suspend?persist=true", nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusOK { + t.Fatalf("apiSuspend returned code %d, expected %d", w.Code, http.StatusOK) + } + + if !tMkt.persist { + t.Errorf("market persist was false") + } + + // persist=0 (OK) + w = httptest.NewRecorder() + r, _ = http.NewRequest("GET", "https://localhost/market/"+name+"/suspend?persist=0", nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusOK { + t.Fatalf("apiSuspend returned code %d, expected %d", w.Code, http.StatusOK) + } + + if tMkt.persist { + t.Errorf("market persist was true") + } + + // invalid persist + w = httptest.NewRecorder() + r, _ = http.NewRequest("GET", "https://localhost/market/"+name+"/suspend?persist=blahblahblah", nil) + r.RemoteAddr = "localhost" + + mux.ServeHTTP(w, r) + + if w.Code != http.StatusBadRequest { + t.Fatalf("apiSuspend returned code %d, expected %d", w.Code, http.StatusOK) + } + resp = w.Body.String() + wantPrefix = "invalid persist book boolean" + if !strings.HasPrefix(resp, wantPrefix) { + t.Errorf("Expected error message starting with %q, got %q", wantPrefix, resp) + } +} + func TestAuthMiddleware(t *testing.T) { pass := "password123" authSHA := sha256.Sum256([]byte(pass)) diff --git a/server/admin/types.go b/server/admin/types.go new file mode 100644 index 0000000000..5e1ae7e1f7 --- /dev/null +++ b/server/admin/types.go @@ -0,0 +1,51 @@ +package admin + +import ( + "time" +) + +// MarketStatus summarizes the operational status of a market. +type MarketStatus struct { + Name string `json:"market,omitempty"` + Running bool `json:"running"` + EpochDuration uint64 `json:"epochlen"` + ActiveEpoch int64 `json:"activeepoch"` + StartEpoch int64 `json:"startepoch"` + SuspendEpoch int64 `json:"finalepoch,omitempty"` + PersistBook *bool `json:"persistbook,omitempty"` +} + +// APITime marshals and unmarshals a time value in time.RFC3339Nano format. +type APITime struct { + time.Time +} + +// SuspendResult describes the result of a market suspend request. FinalEpoch is +// the last epoch before shutdown, and it the market will run for it's entire +// duration. As such, SuspendTime is the time at which the market is closed, +// immediately after close of FinalEpoch. +type SuspendResult struct { + Market string `json:"market"` + FinalEpoch int64 `json:"finalepoch"` + SuspendTime APITime `json:"supendtime"` +} + +// RFC3339Milli is the RFC3339 time formatting with millisecond precision. +const RFC3339Milli = "2006-01-02T15:04:05.999Z07:00" + +// MarshalJSON marshals APITime to a JSON string in RFC3339 format except with +// millisecond precision. +func (at *APITime) MarshalJSON() ([]byte, error) { + return []byte(`"` + at.Time.Format(RFC3339Milli) + `"`), nil +} + +// UnmarshalJSON unmarshals JSON string containing a time in RFC3339 format with +// millisecond precision into an APITime. +func (at *APITime) UnmarshalJSON(b []byte) error { + t, err := time.Parse(RFC3339Milli, string(b)) + if err != nil { + return nil + } + at.Time = t + return nil +} diff --git a/server/auth/auth_test.go b/server/auth/auth_test.go index 2a05de6a5d..27ed6ef8e6 100644 --- a/server/auth/auth_test.go +++ b/server/auth/auth_test.go @@ -106,6 +106,8 @@ func (c *TRPCClient) Send(msg *msgjson.Message) error { c.sends = append(c.sends, msg) return c.sendErr } +func (c *TRPCClient) SendError(id uint64, msg *msgjson.Error) { +} func (c *TRPCClient) Request(msg *msgjson.Message, f func(comms.Link, *msgjson.Message), _ time.Duration, _ func()) error { c.reqs = append(c.reqs, &tReq{ msg: msg, diff --git a/server/book/book.go b/server/book/book.go index 329f539f1f..8f002d2846 100644 --- a/server/book/book.go +++ b/server/book/book.go @@ -25,6 +25,7 @@ const ( type Book struct { mtx sync.RWMutex lotSize uint64 + halfCap uint32 buys *OrderPQ sells *OrderPQ } @@ -39,11 +40,20 @@ func New(lotSize uint64, halfCapacity ...uint32) *Book { } return &Book{ lotSize: lotSize, + halfCap: halfCap, buys: NewMaxOrderPQ(halfCap), sells: NewMinOrderPQ(halfCap), } } +// Clear reset the order book with configured capacity. +func (b *Book) Clear() { + b.mtx.Lock() + b.buys = NewMaxOrderPQ(b.halfCap) + b.sells = NewMaxOrderPQ(b.halfCap) + b.mtx.Unlock() +} + // Realloc changes the capacity of the order book given the specified capacity // of both buy and sell sides of the book. func (b *Book) Realloc(newHalfCap uint32) { diff --git a/server/book/book_test.go b/server/book/book_test.go index d181605837..40380b854d 100644 --- a/server/book/book_test.go +++ b/server/book/book_test.go @@ -248,4 +248,20 @@ func TestBook(t *testing.T) { t.Errorf("Failed to remove best sell order. Got %v, wanted %v", removed.ID(), bestSellOrder.ID()) } + + if b.SellCount() == 0 { + t.Errorf("sell side was empty") + } + if b.BuyCount() == 0 { + t.Errorf("buy side was empty") + } + + b.Clear() + + if b.SellCount() != 0 { + t.Errorf("sell side was not empty after Clear") + } + if b.BuyCount() != 0 { + t.Errorf("buy side was not empty after Clear") + } } diff --git a/server/coinlock/coinlocker.go b/server/coinlock/coinlocker.go index d0f81ceb2a..8075054887 100644 --- a/server/coinlock/coinlocker.go +++ b/server/coinlock/coinlocker.go @@ -22,6 +22,8 @@ type CoinLockChecker interface { // coins. type CoinLocker interface { CoinLockChecker + // UnlockAll releases all locked coins. + UnlockAll() // UnlockOrderCoins unlocks all locked coins associated with an order. UnlockOrderCoins(oid order.OrderID) // LockOrdersCoins locks all of the coins associated with multiple orders. @@ -96,6 +98,11 @@ func (bl *bookLocker) LockCoins(orderCoins map[order.OrderID][]CoinID) { bl.bookLock.LockCoins(orderCoins) } +// UnlockAll releases all locked coins. +func (bl *bookLocker) UnlockAll() { + bl.bookLock.UnlockAll() +} + // UnlockOrderCoins unlocks all locked coins associated with an order. func (bl *bookLocker) UnlockOrderCoins(oid order.OrderID) { bl.bookLock.UnlockOrderCoins(oid) @@ -108,18 +115,23 @@ type swapLocker struct { } // LockOrdersCoins locks all coins for the given orders. -func (bl *swapLocker) LockOrdersCoins(orders []order.Order) { - bl.swapLock.LockOrdersCoins(orders) +func (sl *swapLocker) LockOrdersCoins(orders []order.Order) { + sl.swapLock.LockOrdersCoins(orders) } // LockOrdersCoins locks coins associated with certain orders. -func (bl *swapLocker) LockCoins(orderCoins map[order.OrderID][]CoinID) { - bl.swapLock.LockCoins(orderCoins) +func (sl *swapLocker) LockCoins(orderCoins map[order.OrderID][]CoinID) { + sl.swapLock.LockCoins(orderCoins) +} + +// UnlockAll releases all locked coins. +func (sl *swapLocker) UnlockAll() { + sl.swapLock.UnlockAll() } // UnlockOrderCoins unlocks all locked coins associated with an order. -func (bl *swapLocker) UnlockOrderCoins(oid order.OrderID) { - bl.swapLock.UnlockOrderCoins(oid) +func (sl *swapLocker) UnlockOrderCoins(oid order.OrderID) { + sl.swapLock.UnlockOrderCoins(oid) } var _ (CoinLocker) = (*swapLocker)(nil) @@ -142,6 +154,14 @@ func NewAssetCoinLocker() *AssetCoinLocker { } } +// UnlockAll releases all locked coins. +func (ac *AssetCoinLocker) UnlockAll() { + ac.coinMtx.Lock() + ac.lockedCoins = make(map[coinIDKey]struct{}) + ac.lockedCoinsByOrder = make(map[order.OrderID][]CoinID) + ac.coinMtx.Unlock() +} + // CoinLocked indicates if a coin identifier (e.g. UTXO) is locked. func (ac *AssetCoinLocker) CoinLocked(coin CoinID) bool { ac.coinMtx.RLock() diff --git a/server/coinlock/coinlocker_test.go b/server/coinlock/coinlocker_test.go index 9c279e01a6..dcd2c65930 100644 --- a/server/coinlock/coinlocker_test.go +++ b/server/coinlock/coinlocker_test.go @@ -159,7 +159,7 @@ func Test_bookLocker_LockCoins(t *testing.T) { bookLock.LockCoins(coinMap) - verifyLocked := func(cl CoinLockChecker, coins []CoinID, wantLocked bool) bool { + verifyLocked := func(cl CoinLockChecker, coins []CoinID, wantLocked bool) (ok bool) { for _, coin := range coins { locked := cl.CoinLocked(coin) if locked != wantLocked { @@ -202,4 +202,18 @@ func Test_bookLocker_LockCoins(t *testing.T) { if !verifyLocked(swapLock, orderCoins, false) { t.Errorf("swapLock indicated coins were locked that should have been unlocked") } + + // Relock the coins. + bookLock.LockCoins(coinMap) + + // Make sure the BOOK locker say they are locked. + if !verifyLocked(bookLock, allCoins, true) { + t.Errorf("bookLock indicated coins were unlocked that should have been locked") + } + + bookLock.UnlockAll() + + if !verifyLocked(bookLock, orderCoins, false) { + t.Errorf("bookLock indicated coins were locked that should have been unlocked") + } } diff --git a/server/comms/link.go b/server/comms/link.go index 5acf80b455..23744fd51e 100644 --- a/server/comms/link.go +++ b/server/comms/link.go @@ -11,10 +11,6 @@ import ( "decred.org/dcrdex/dex/ws" ) -// outBufferSize is the size of the client's buffered channel for outgoing -// messages. -const outBufferSize = 128 - // Link is an interface for a communication channel with an API client. The // reference implementation of a Link-satisfying type is the wsLink, which // passes messages over a websocket connection. @@ -23,8 +19,11 @@ type Link interface { ID() uint64 // IP returns the IP address of the peer. IP() string - // Send sends the msgjson.Message to the client. + // Send sends the msgjson.Message to the peer. Send(msg *msgjson.Message) error + // SendError sends the msgjson.Error to the peer, with reference to a + // request message ID. + SendError(id uint64, rpcErr *msgjson.Error) // Request sends the Request-type msgjson.Message to the client and registers // a handler for the response. Request(msg *msgjson.Message, f func(Link, *msgjson.Message), expireTime time.Duration, expire func()) error diff --git a/server/db/driver/pg/internal/orders.go b/server/db/driver/pg/internal/orders.go index 113971b97d..f4b0d8df1e 100644 --- a/server/db/driver/pg/internal/orders.go +++ b/server/db/driver/pg/internal/orders.go @@ -121,8 +121,8 @@ const ( // quantity, // rate, // force, - // 2, -- new status ($d) - // 123456789, -- new filled ($d) + // 2, -- new status (%d) + // 123456789, -- new filled (%d) // epoch_idx, epoch_dur, preimage, complete_time // ) // INSERT INTO dcrdex.dcr_btc.orders_archived -- destination table (%s) @@ -139,6 +139,18 @@ const ( SELECT * FROM moved;` // TODO: consider a MoveOrderSameFilled query + PurgeBook = `WITH moved AS ( + DELETE FROM %s -- active orders table for market X + WHERE status = $1 -- booked status code + RETURNING oid, type, sell, account_id, address, + client_time, server_time, commit, coins, quantity, + rate, force, %d, filled, -- revoked status code + epoch_idx, epoch_dur, preimage, complete_time + ) + INSERT INTO %s -- archived orders table for market X + SELECT * FROM moved + RETURNING oid, sell, account_id;` + // CreateCancelOrdersTable creates a table specified via the %s printf // specifier for cancel orders. CreateCancelOrdersTable = `CREATE TABLE IF NOT EXISTS %s ( diff --git a/server/db/driver/pg/orders.go b/server/db/driver/pg/orders.go index ea886b0877..e7cd777c80 100644 --- a/server/db/driver/pg/orders.go +++ b/server/db/driver/pg/orders.go @@ -188,6 +188,106 @@ func (a *Archiver) NewEpochOrder(ord order.Order, epochIdx, epochDur int64) erro return a.storeOrder(ord, epochIdx, epochDur, orderStatusEpoch) } +func makePseudoCancel(target order.OrderID, user account.AccountID, base, quote uint32, timeStamp time.Time) *order.CancelOrder { + // Create a server-generated cancel order to record the server's revoke + // order action. + var commit order.Commitment + rand.Read(commit[:]) + return &order.CancelOrder{ + P: order.Prefix{ + AccountID: user, + BaseAsset: base, + QuoteAsset: quote, + OrderType: order.CancelOrderType, + ClientTime: timeStamp, + ServerTime: timeStamp, + Commit: commit, + }, + TargetOrderID: target, + } +} + +// FlushBook revokes all booked orders for a market. +func (a *Archiver) FlushBook(base, quote uint32) (sellsRemoved, buysRemoved []order.OrderID, err error) { + var marketSchema string + marketSchema, err = a.marketSchema(base, quote) + if err != nil { + return + } + + // Booked orders (active) are made revoked (archived). + srcTableName := fullOrderTableName(a.dbName, marketSchema, orderStatusBooked.active()) + dstTableName := fullOrderTableName(a.dbName, marketSchema, orderStatusRevoked.active()) + + timeStamp := time.Now().Truncate(time.Millisecond).UTC() + + var dbTx *sql.Tx + dbTx, err = a.db.Begin() + if err != nil { + err = fmt.Errorf("failed to begin database transaction: %v", err) + return + } + + fail := func() { + sellsRemoved, buysRemoved = nil, nil + a.fatalBackendErr(err) + _ = dbTx.Rollback() + } + + // Changed all booked orders to revoked. + stmt := fmt.Sprintf(internal.PurgeBook, srcTableName, orderStatusRevoked, dstTableName) + var rows *sql.Rows + rows, err = dbTx.Query(stmt, orderStatusBooked) + if err != nil { + fail() + return + } + + var cos []*order.CancelOrder + for rows.Next() { + var oid order.OrderID + var sell bool + var aid account.AccountID + if err = rows.Scan(&oid, &sell, &aid); err != nil { + rows.Close() + fail() + return + } + cos = append(cos, makePseudoCancel(oid, aid, base, quote, timeStamp)) + if sell { + sellsRemoved = append(sellsRemoved, oid) + } else { + buysRemoved = append(buysRemoved, oid) + } + } + + if err = rows.Err(); err != nil { + fail() + return + } + + // Insert the pseudo-cancel orders. + cancelTable := fullCancelOrderTableName(a.dbName, marketSchema, orderStatusRevoked.active()) + stmt = fmt.Sprintf(internal.InsertCancelOrder, cancelTable) + for _, co := range cos { + _, err = dbTx.Exec(stmt, co.ID(), co.AccountID, co.ClientTime, + co.ServerTime, co.Commit, co.TargetOrderID, orderStatusRevoked, 0, 0) + if err != nil { + fail() + err = fmt.Errorf("failed to store pseudo-cancel order: %v", err) + return + } + } + + if err = dbTx.Commit(); err != nil { + fail() + err = fmt.Errorf("failed to commit transaction: %v", err) + return + } + + return +} + // BookOrders retrieves all booked orders (with order status booked) for the // specified market. This will be used to repopulate a market's book on // construction of the market. @@ -300,26 +400,11 @@ func (a *Archiver) RevokeOrder(ord order.Order) (cancelID order.OrderID, timeSta return } - // Create a server-generated cancel order to record this action. - timeStamp = time.Now().Truncate(time.Millisecond).UTC() - var commit order.Commitment - rand.Read(commit[:]) - co := &order.CancelOrder{ - P: order.Prefix{ - AccountID: ord.User(), - BaseAsset: ord.Base(), - QuoteAsset: ord.Quote(), - OrderType: order.CancelOrderType, - ClientTime: timeStamp, - ServerTime: timeStamp, - Commit: commit, - }, - TargetOrderID: ord.ID(), - } - cancelID = co.ID() - // Store the pseudo-cancel order with 0 epoch idx and duration and status // orderStatusRevoked as indicators that this is a revocation. + timeStamp = time.Now().Truncate(time.Millisecond).UTC() + co := makePseudoCancel(ord.ID(), ord.User(), ord.Base(), ord.Quote(), timeStamp) + cancelID = co.ID() err = a.storeOrder(co, 0, 0, orderStatusRevoked) return } diff --git a/server/db/driver/pg/orders_online_test.go b/server/db/driver/pg/orders_online_test.go index 8fcf89bde1..286950656c 100644 --- a/server/db/driver/pg/orders_online_test.go +++ b/server/db/driver/pg/orders_online_test.go @@ -367,6 +367,99 @@ func TestRevokeOrder(t *testing.T) { } } +func TestFlushBook(t *testing.T) { + if err := cleanTables(archie.db); err != nil { + t.Fatalf("cleanTables: %v", err) + } + + // Standing limit == OK as booked + var epochIdx, epochDur int64 = 13245678, 6000 + lo := newLimitOrder(false, 4800000, 1, order.StandingTiF, 0) + err := archie.StoreOrder(lo, epochIdx, epochDur, order.OrderStatusBooked) + if err != nil { + t.Fatalf("StoreOrder failed: %v", err) + } + + // A not booked order. + mo := newMarketSellOrder(1, 0) + mo.AccountID = lo.AccountID + err = archie.StoreOrder(mo, epochIdx, epochDur, order.OrderStatusExecuted) + if err != nil { + t.Fatalf("StoreOrder failed: %v", err) + } + + sellsRemoved, buysRemoved, err := archie.FlushBook(lo.BaseAsset, lo.QuoteAsset) + if err != nil { + t.Fatalf("FlushBook failed: %v", err) + } + if len(sellsRemoved) != 0 { + t.Fatalf("flushed %d book sell orders, expected 0", len(sellsRemoved)) + } + if len(buysRemoved) != 1 { + t.Fatalf("flushed %d book buy orders, expected 1", len(buysRemoved)) + } + if buysRemoved[0] != lo.ID() { + t.Errorf("flushed sell order has ID %v, expected %v", buysRemoved[0], lo.ID()) + } + + // Check for new status of the order. + loNow, loStatus, err := archie.Order(lo.ID(), lo.BaseAsset, lo.QuoteAsset) + if err != nil { + t.Fatalf("Failed to locate order: %v", err) + } + if loNow.ID() != lo.ID() { + t.Errorf("incorrect order ID retrieved") + } + _, ok := loNow.(*order.LimitOrder) + if !ok { + t.Fatalf("not a limit order") + } + if loStatus != order.OrderStatusRevoked { + t.Errorf("got order status %v, expected %v", loStatus, order.OrderStatusRevoked) + } + + ordersOut, _, err := archie.UserOrders(context.Background(), lo.User(), lo.BaseAsset, lo.QuoteAsset) + if err != nil { + t.Fatalf("UserOrders failed: %v", err) + } + + wantNumOrders := 2 // market and limit + if len(ordersOut) != wantNumOrders { + t.Fatalf("got %d user orders, expected %d", len(ordersOut), wantNumOrders) + } + + coids, targets, _, err := archie.ExecutedCancelsForUser(lo.User(), 25) + if err != nil { + t.Errorf("ExecutedCancelsForUser failed: %v", err) + } + if len(coids) != 1 { + t.Fatalf("got %d cancels, expected 1", len(coids)) + } + if len(targets) != 1 { + t.Fatalf("got %d cancel targets, expected 1", len(targets)) + } + + if targets[0] != lo.ID() { + t.Fatalf("cancel order is targeting %v, expected %v", targets[0], lo.ID()) + } + + // Ensure market order is still there. + moNow, moStatus, err := archie.Order(mo.ID(), mo.BaseAsset, mo.QuoteAsset) + if err != nil { + t.Fatalf("Failed to locate order: %v", err) + } + if moNow.ID() != mo.ID() { + t.Errorf("incorrect order ID retrieved") + } + _, ok = moNow.(*order.MarketOrder) + if !ok { + t.Fatalf("not a market order") + } + if moStatus != order.OrderStatusExecuted { + t.Errorf("got order status %v, expected %v", loStatus, order.OrderStatusExecuted) + } +} + func TestLoadOrderUnknown(t *testing.T) { if err := cleanTables(archie.db); err != nil { t.Fatalf("cleanTables: %v", err) diff --git a/server/db/driver/pg/system_online_test.go b/server/db/driver/pg/system_online_test.go index 0f72566be5..6b94ed36c9 100644 --- a/server/db/driver/pg/system_online_test.go +++ b/server/db/driver/pg/system_online_test.go @@ -67,7 +67,7 @@ func openDB() (func() error, error) { User: PGTestsUser, Pass: PGTestsPass, DBName: PGTestsDBName, - ShowPGConfig: true, + ShowPGConfig: false, QueryTimeout: 0, // zero to use the default MarketCfg: []*dex.MarketInfo{mktInfo, mktInfo2}, //CheckedStores: true, diff --git a/server/db/interface.go b/server/db/interface.go index ad39b67f04..08ef84d1fa 100644 --- a/server/db/interface.go +++ b/server/db/interface.go @@ -82,6 +82,9 @@ type OrderArchiver interface { // BookOrders returns all book orders for a market. BookOrders(base, quote uint32) ([]*order.LimitOrder, error) + // FlushBook revokes all booked orders for a market. + FlushBook(base, quote uint32) (sellsRemoved, buysRemoved []order.OrderID, err error) + // ActiveOrderCoins retrieves a CoinID slice for each active order. ActiveOrderCoins(base, quote uint32) (baseCoins, quoteCoins map[order.OrderID][]order.CoinID, err error) diff --git a/server/dex/dex.go b/server/dex/dex.go index 962e22b83a..14257c2cbc 100644 --- a/server/dex/dex.go +++ b/server/dex/dex.go @@ -6,6 +6,7 @@ import ( "fmt" "strconv" "strings" + "sync" "time" "decred.org/dcrdex/dex" @@ -118,7 +119,9 @@ type DEX struct { bookRouter *market.BookRouter stopWaiters []subsystem server *comms.Server - config *configResponse + + configRespMtx sync.RWMutex + configResp *configResponse } // configResponse is defined here to leave open the possibility for hot @@ -129,7 +132,7 @@ type configResponse struct { configEnc json.RawMessage } -func newConfigResponse(cfg *DexConf, cfgAssets []msgjson.Asset, cfgMarkets []msgjson.Market) (*configResponse, error) { +func newConfigResponse(cfg *DexConf, cfgAssets []*msgjson.Asset, cfgMarkets []*msgjson.Market) (*configResponse, error) { configMsg := &msgjson.ConfigResult{ BroadcastTimeout: uint64(cfg.BroadcastTimeout.Milliseconds()), CancelMax: cfg.CancelThreshold, @@ -139,6 +142,12 @@ func newConfigResponse(cfg *DexConf, cfgAssets []msgjson.Asset, cfgMarkets []msg Fee: cfg.RegFeeAmount, } + // NOTE/TODO: To include active epoch in the market status objects, we need + // a channel from Market to push status changes back to DEX manager. + // Presently just include start epoch that we set when launching the + // Markets, and suspend info that DEX obtained when calling the Market's + // Suspend method. + encResult, err := json.Marshal(configMsg) if err != nil { return nil, err @@ -157,6 +166,35 @@ func newConfigResponse(cfg *DexConf, cfgAssets []msgjson.Asset, cfgMarkets []msg }, nil } +func (cr *configResponse) setMktSuspend(name string, finalEpoch uint64, persist bool) { + for _, mkt := range cr.configMsg.Markets { + if mkt.Name == name { + mkt.MarketStatus.FinalEpoch = finalEpoch + mkt.MarketStatus.Persist = &persist + cr.remarshall() + return + } + } + log.Errorf("Failed to set MarketStatus for market %q", name) +} + +func (cr *configResponse) remarshall() { + encResult, err := json.Marshal(cr.configMsg) + if err != nil { + log.Errorf("failed to marshal config message: %v", err) + return + } + payload := &msgjson.ResponsePayload{ + Result: encResult, + } + encPayload, err := json.Marshal(payload) + if err != nil { + log.Errorf("failed to marshal config message payload: %v", err) + return + } + cr.configEnc = encPayload +} + // Stop shuts down the DEX. Stop returns only after all components have // completed their shutdown. func (dm *DEX) Stop() { @@ -172,10 +210,13 @@ func (dm *DEX) Stop() { } func (dm *DEX) handleDEXConfig(conn comms.Link, msg *msgjson.Message) *msgjson.Error { + dm.configRespMtx.RLock() + defer dm.configRespMtx.RUnlock() + ack := &msgjson.Message{ Type: msgjson.Response, ID: msg.ID, - Payload: dm.config.configEnc, + Payload: dm.configResp.configEnc, } if err := conn.Send(ack); err != nil { @@ -252,7 +293,7 @@ func NewDEX(cfg *DexConf) (*DEX, error) { var dcrBackend *dcrasset.Backend lockableAssets := make(map[uint32]*swap.LockableAsset, len(cfg.Assets)) backedAssets := make(map[uint32]*asset.BackedAsset, len(cfg.Assets)) - cfgAssets := make([]msgjson.Asset, 0, len(cfg.Assets)) + cfgAssets := make([]*msgjson.Asset, 0, len(cfg.Assets)) for i, assetConf := range cfg.Assets { symbol := strings.ToLower(assetConf.Symbol) ID := assetIDs[i] @@ -297,7 +338,7 @@ func NewDEX(cfg *DexConf) (*DEX, error) { CoinLocker: dexCoinLocker.AssetLocker(ID).Swap(), } - cfgAssets = append(cfgAssets, msgjson.Asset{ + cfgAssets = append(cfgAssets, &msgjson.Asset{ Symbol: assetConf.Symbol, ID: ID, LotSize: assetConf.LotSize, @@ -309,6 +350,10 @@ func NewDEX(cfg *DexConf) (*DEX, error) { }) } + for _, mkt := range cfg.Markets { + mkt.Name = strings.ToLower(mkt.Name) + } + // Create DEXArchivist with the pg DB driver. pgCfg := &pg.Config{ Host: cfg.DBConf.Host, @@ -371,20 +416,22 @@ func NewDEX(cfg *DexConf) (*DEX, error) { now := encode.UnixMilli(time.Now()) bookSources := make(map[string]market.BookSource, len(cfg.Markets)) marketTunnels := make(map[string]market.MarketTunnel, len(cfg.Markets)) - cfgMarkets := make([]msgjson.Market, 0, len(cfg.Markets)) + cfgMarkets := make([]*msgjson.Market, 0, len(cfg.Markets)) for name, mkt := range markets { startEpochIdx := 1 + now/int64(mkt.EpochDuration()) mkt.SetStartEpochIdx(startEpochIdx) startSubSys(fmt.Sprintf("Market[%s]", name), mkt) bookSources[name] = mkt marketTunnels[name] = mkt - cfgMarkets = append(cfgMarkets, msgjson.Market{ + cfgMarkets = append(cfgMarkets, &msgjson.Market{ Name: name, Base: mkt.Base(), Quote: mkt.Quote(), EpochLen: mkt.EpochDuration(), - StartEpoch: uint64(startEpochIdx), MarketBuyBuffer: mkt.MarketBuyBuffer(), + MarketStatus: msgjson.MarketStatus{ + StartEpoch: uint64(startEpochIdx), + }, }) } @@ -422,7 +469,7 @@ func NewDEX(cfg *DexConf) (*DEX, error) { bookRouter: bookRouter, stopWaiters: stopWaiters, server: server, - config: cfgResp, + configResp: cfgResp, } comms.Route(msgjson.ConfigRoute, dexMgr.handleDEXConfig) @@ -432,5 +479,71 @@ func NewDEX(cfg *DexConf) (*DEX, error) { // Config returns the current dex configuration. func (dm *DEX) ConfigMsg() json.RawMessage { - return dm.config.configEnc + dm.configRespMtx.RLock() + defer dm.configRespMtx.RUnlock() + return dm.configResp.configEnc +} + +// TODO: for just market running status, the DEX manager should use it's +// knowledge of Market subsystem state. +func (dm *DEX) MarketRunning(mktName string) (found, running bool) { + mkt := dm.markets[mktName] + if mkt == nil { + return + } + return true, mkt.Running() } + +// MarketStatus returns the market.Status for the named market. If the market is +// unknown to the DEX, nil is returned. +func (dm *DEX) MarketStatus(mktName string) *market.Status { + mkt := dm.markets[mktName] + if mkt == nil { + return nil + } + return mkt.Status() +} + +// MarketStatuses returns a map of market names to market.Status for all known +// markets. +func (dm *DEX) MarketStatuses() map[string]*market.Status { + statuses := make(map[string]*market.Status, len(dm.markets)) + for name, mkt := range dm.markets { + statuses[name] = mkt.Status() + } + return statuses +} + +// SuspendMarket schedules a suspension of a given market, with the option to +// persist the orders on the book (or purge the book automatically on market +// shutdown). The scheduled final epoch and suspend time are returned. This is a +// passthrough to the OrderRouter. A TradeSuspension notification is broadcasted +// to all connected clients. +func (dm *DEX) SuspendMarket(name string, tSusp time.Time, persistBooks bool) *market.SuspendEpoch { + name = strings.ToLower(name) + // Go through the order router since OrderRouter is likely to have market + // status tracking built into it to facilitate resume. + suspEpoch := dm.orderRouter.SuspendMarket(name, tSusp, persistBooks) + + // Update config message with suspend schedule. + dm.configRespMtx.Lock() + dm.configResp.setMktSuspend(name, uint64(suspEpoch.Idx), persistBooks) + dm.configRespMtx.Unlock() + + // Broadcast a TradeSuspension notification to all connected clients. + note, err := msgjson.NewNotification(msgjson.SuspensionRoute, msgjson.TradeSuspension{ + MarketID: name, + FinalEpoch: uint64(suspEpoch.Idx), + SuspendTime: encode.UnixMilliU(suspEpoch.End), + Persist: persistBooks, + }) + if err != nil { + log.Errorf("Failed to create suspend notification: %v", err) + } else { + dm.server.Broadcast(note) + } + return suspEpoch +} + +// TODO: resume by relaunching the market subsystems (Run) +// Resume / ResumeMarket diff --git a/server/market/bookrouter.go b/server/market/bookrouter.go index b24b81fd8a..8222f6c088 100644 --- a/server/market/bookrouter.go +++ b/server/market/bookrouter.go @@ -16,14 +16,14 @@ import ( "decred.org/dcrdex/server/comms" ) -// A bookUpdateAction classifies updates into how they affect the book or epoch +// A updateAction classifies updates into how they affect the book or epoch // queue. -type bookUpdateAction uint8 +type updateAction uint8 const ( // invalidAction is the zero value action and should be considered programmer // error if received. - invalidAction bookUpdateAction = iota + invalidAction updateAction = iota // epochAction means an order is being added to the epoch queue and will // result in a msgjson.EpochOrderNote being sent to subscribers. epochAction @@ -39,11 +39,13 @@ const ( // matchProofAction means the matching has been performed and will result in // a msgjson.MatchProofNote being sent to subscribers. matchProofAction + // suspendAction means the market has suspended. + suspendAction ) -// String provides a string representation of a bookUpdateAction. This is -// primarily for logging and debugging purposes. -func (bua bookUpdateAction) String() string { +// String provides a string representation of a updateAction. This is primarily +// for logging and debugging purposes. +func (bua updateAction) String() string { switch bua { case invalidAction: return "invalid" @@ -57,25 +59,53 @@ func (bua bookUpdateAction) String() string { return "newEpoch" case matchProofAction: return "matchProof" + case suspendAction: + return "suspend" default: return "" } } -// bookUpdateSignal combines a bookUpdateAction with data for which the action +// updateSignal combines an updateAction with data for which the action // applies. -type bookUpdateSignal struct { - action bookUpdateAction - order order.Order +type updateSignal struct { + action updateAction + data interface{} // sigData* type +} + +func (us updateSignal) String() string { + return us.action.String() +} + +// nolint:structcheck,unused +type sigDataOrder struct { + order order.Order + epochIdx int64 +} + +type sigDataBookedOrder sigDataOrder +type sigDataUnbookedOrder sigDataOrder +type sigDataEpochOrder sigDataOrder + +type sigDataNewEpoch struct { + idx int64 +} + +type sigDataSuspend struct { + finalEpoch int64 + stopTime int64 + persistBook bool +} + +type sigDataMatchProof struct { matchProof *order.MatchProof - epochIdx int64 } // BookSource is a source of a market's order book and a feed of updates to the // order book and epoch queue. type BookSource interface { Book() (epoch int64, buys []*order.LimitOrder, sells []*order.LimitOrder) - OrderFeed() <-chan *bookUpdateSignal + OrderFeed() <-chan *updateSignal } // subscribers is a manager for a map of subscribers and a sequence counter. @@ -124,6 +154,7 @@ type msgBook struct { name string // mtx guards orders and epochIdx mtx sync.RWMutex + running bool orders map[order.OrderID]*msgjson.BookOrderNote epochIdx int64 subs *subscribers @@ -136,6 +167,12 @@ func (book *msgBook) setEpoch(idx int64) { book.mtx.Unlock() } +func (book *msgBook) epoch() int64 { + book.mtx.RLock() + defer book.mtx.RUnlock() + return book.epochIdx +} + // Update updates the order book with the new order information. If an order // with the same ID already exists in the book, it is overwritten without // warning. Such a case would be typical when an order's filled amount changes. @@ -219,30 +256,42 @@ func (r *BookRouter) runBook(ctx context.Context, book *msgBook) { book.addBulkOrders(book.source.Book()) subs := book.subs + defer func() { + book.mtx.Lock() + book.running = false + book.orders = make(map[order.OrderID]*msgjson.BookOrderNote) + book.mtx.Unlock() + log.Infof("Book router terminating for market %q", book.name) + }() + out: for { + book.mtx.Lock() + book.running = true + book.mtx.Unlock() select { case u, ok := <-feed: if !ok { - log.Errorf("order feed closed before shutting down BookRouter") + log.Errorf("Book order feed closed for market %q at epoch %d without a suspend signal", + book.name, book.epoch()) break out } // Prepare the book/unbook/epoch note. var note interface{} var route string - switch u.action { - case newEpochAction: + switch sigData := u.data.(type) { + case sigDataNewEpoch: // New epoch index should be sent here by the market following // order matching and booking, but before new orders are added // to this new epoch. This is needed for msgjson.OrderBook in // sendBook, which must include the current epoch index. - book.setEpoch(u.epochIdx) + book.setEpoch(sigData.idx) continue // no notification to send - case bookAction: + case sigDataBookedOrder: route = msgjson.BookOrderRoute - lo, ok := u.order.(*order.LimitOrder) + lo, ok := sigData.order.(*order.LimitOrder) if !ok { panic("non-limit order received with bookAction") } @@ -250,24 +299,24 @@ out: n.Seq = subs.nextSeq() note = n - case unbookAction: + case sigDataUnbookedOrder: route = msgjson.UnbookOrderRoute - lo, ok := u.order.(*order.LimitOrder) + lo, ok := sigData.order.(*order.LimitOrder) if !ok { panic("non-limit order received with unbookAction") } book.remove(lo) - oid := u.order.ID() + oid := sigData.order.ID() note = &msgjson.UnbookOrderNote{ Seq: subs.nextSeq(), MarketID: book.name, OrderID: oid[:], } - case epochAction: + case sigDataEpochOrder: route = msgjson.EpochOrderRoute epochNote := new(msgjson.EpochOrderNote) - switch o := u.order.(type) { + switch o := sigData.order.(type) { case *order.LimitOrder: epochNote.BookOrderNote = *limitOrderToMsgOrder(o, book.name) epochNote.OrderType = msgjson.LimitOrderNum @@ -281,32 +330,49 @@ out: epochNote.Seq = subs.nextSeq() epochNote.MarketID = book.name - epochNote.Epoch = uint64(u.epochIdx) - c := u.order.Commitment() + epochNote.Epoch = uint64(sigData.epochIdx) + c := sigData.order.Commitment() epochNote.Commit = c[:] note = epochNote - case matchProofAction: + case sigDataMatchProof: route = msgjson.MatchProofRoute - misses := make([]msgjson.Bytes, 0, len(u.matchProof.Misses)) - for _, o := range u.matchProof.Misses { + mp := sigData.matchProof + misses := make([]msgjson.Bytes, 0, len(mp.Misses)) + for _, o := range mp.Misses { oid := o.ID() misses = append(misses, oid[:]) } - preimages := make([]msgjson.Bytes, 0, len(u.matchProof.Preimages)) - for i := range u.matchProof.Preimages { - preimages = append(preimages, u.matchProof.Preimages[i][:]) + preimages := make([]msgjson.Bytes, 0, len(mp.Preimages)) + for i := range mp.Preimages { + preimages = append(preimages, mp.Preimages[i][:]) } note = &msgjson.MatchProofNote{ MarketID: book.name, - Epoch: u.matchProof.Epoch.Idx, // not u.epochIdx + Epoch: mp.Epoch.Idx, // not u.epochIdx Preimages: preimages, Misses: misses, - CSum: u.matchProof.CSum, - Seed: u.matchProof.Seed, + CSum: mp.CSum, + Seed: mp.Seed, } + case sigDataSuspend: + // Consider sending a TradeSuspension here too: + // note = &msgjson.TradeSuspension{ + // MarketID: book.name, + // FinalEpoch: uint64(sigData.finalEpoch), + // SuspendTime: uint64(sigData.stopTime), + // Persist: sigData.persistBook, + // } + // r.sendNote(msgjson.SuspensionRoute, subs, note) + + // Depending on resume handling, maybe kill the book router. + // Presently the Market closes the order feed channels, so quit. + log.Infof("Book order feed closed for market %q after epoch %d, persist book = %v.", + book.name, sigData.finalEpoch, sigData.persistBook) + break out + default: panic(fmt.Sprintf("unknown orderbook update action %d", u.action)) } @@ -320,19 +386,22 @@ out: // sendBook encodes and sends the the entire order book to the specified client. func (r *BookRouter) sendBook(conn comms.Link, book *msgBook, msgID uint64) { - seq := book.subs.lastSeq() - book.mtx.RLock() + book.mtx.RLock() // book.orders and book.running + if !book.running { + book.mtx.RUnlock() + conn.SendError(msgID, msgjson.NewError(msgjson.MarketNotRunningError, "market not running")) + return + } msgBook := make([]*msgjson.BookOrderNote, 0, len(book.orders)) for _, o := range book.orders { msgBook = append(msgBook, o) } - epoch := book.epochIdx book.mtx.RUnlock() msg, err := msgjson.NewResponse(msgID, &msgjson.OrderBook{ - Seq: seq, + Seq: book.subs.lastSeq(), MarketID: book.name, - Epoch: uint64(epoch), + Epoch: uint64(book.epochIdx), Orders: msgBook, }, nil) if err != nil { diff --git a/server/market/epump.go b/server/market/epump.go new file mode 100644 index 0000000000..c8fc1041bb --- /dev/null +++ b/server/market/epump.go @@ -0,0 +1,139 @@ +package market + +import ( + "context" + "sync" + + "decred.org/dcrdex/dex/order" + "decred.org/dcrdex/server/matcher" +) + +type readyEpoch struct { + *EpochQueue + ready chan struct{} // close this when the struct is ready + cSum []byte + ordersRevealed []*matcher.OrderRevealed + misses []order.Order +} + +type epochPump struct { + ready chan *readyEpoch // consumer receives from this + + mtx sync.RWMutex + q []*readyEpoch + halt bool + halted bool + head chan *readyEpoch // internal, closed when ready to halt +} + +func newEpochPump() *epochPump { + return &epochPump{ + ready: make(chan *readyEpoch, 1), + head: make(chan *readyEpoch, 1), + } +} + +func (ep *epochPump) Run(ctx context.Context) { + // Context cancellation must cause a graceful shutdown. + go func() { + <-ctx.Done() + + ep.mtx.Lock() + defer ep.mtx.Unlock() + + // gracefully shut down the epoch pump, allowing the queue to be fully + // drained and all epochs passed on to the consumer. + if len(ep.q) == 0 { + // Ready to shutdown. + close(ep.head) // cause next() to return a closed channel and Run to return. + ep.halted = true + } else { + // next will close it after it drains the queue. + ep.halt = true + } + }() + + defer close(ep.ready) + for { + rq, ok := <-ep.next() + if !ok { + return + } + ep.ready <- rq // consumer should receive this + } +} + +// Insert enqueues an EpochQueue and starts preimage collection immediately. +// Access epoch queues in order and when they have completed preimage collection +// by receiving from the epochPump.ready channel. +func (ep *epochPump) Insert(epoch *EpochQueue) *readyEpoch { + rq := &readyEpoch{ + EpochQueue: epoch, + ready: make(chan struct{}), + } + + ep.mtx.Lock() + defer ep.mtx.Unlock() + + if ep.halted || ep.halt { + // head is closed or about to be. + return nil + } + + select { + case ep.head <- rq: // buffered, so non-blocking when empty and no receiver + default: + // push: append a new readyEpoch to the closed epoch queue. + ep.q = append(ep.q, rq) + } + + return rq +} + +// popFront removes the next readyEpoch from the closed epoch queue, q. It is +// not thread-safe. pop is only used in next to advance the head of the pump. +func (ep *epochPump) popFront() *readyEpoch { + if len(ep.q) == 0 { + return nil + } + x := ep.q[0] + ep.q = ep.q[1:] + return x +} + +// next provides a channel for receiving the next readyEpoch when it completes +// preimage collection. next blocks until there is an epoch to send. +func (ep *epochPump) next() <-chan *readyEpoch { + ready := make(chan *readyEpoch) // next sent on this channel when ready + next := <-ep.head + + // A closed head channel signals a halted and drained pump. + if next == nil { + close(ready) + return ready + } + + ep.mtx.Lock() + defer ep.mtx.Unlock() + + // If the queue is not empty, set new head. + x := ep.popFront() + if x != nil { + ep.head <- x // non-blocking, received in select above + } else if ep.halt { + // Only halt the pump once the queue is emptied. The final head is still + // forwarded to the consumer. + close(ep.head) + ep.halted = true + // continue to serve next, but a closed channel will be returned on + // subsequent calls. + } + + // Send next on the returned channel when it becomes ready. If the process + // dies before goroutine completion, the Market is down anyway. + go func() { + <-next.ready // block until preimage collection is complete (close this channel) + ready <- next + }() + return ready +} diff --git a/server/market/epump_test.go b/server/market/epump_test.go new file mode 100644 index 0000000000..a0e6813613 --- /dev/null +++ b/server/market/epump_test.go @@ -0,0 +1,159 @@ +package market + +import ( + "context" + "math/rand" + "runtime" + "sync" + "testing" + "time" +) + +func TestMarket_epochPumpHalt(t *testing.T) { + // This tests stopping the epochPump. + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ePump := newEpochPump() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ePump.Run(ctx) + }() + + eq0 := NewEpoch(123413513, 1000) + rq0 := ePump.Insert(eq0) + + eq1 := NewEpoch(123413514, 1000) + rq1 := ePump.Insert(eq1) // testing ep.q append + + close(rq1.ready) + + var rq0Out *readyEpoch + select { + case rq0Out = <-ePump.ready: + t.Fatalf("readyQueue provided out of order, got epoch %d", rq0Out.Epoch) + default: + // good, nothing was supposed to come out yet + } + + close(rq0.ready) + + rq0Out = <-ePump.ready + if rq0Out.EpochQueue.Epoch != eq0.Epoch { + t.Errorf("expected epoch %d, got %d", eq0.Epoch, rq0Out.EpochQueue.Epoch) + } + + cancel() // testing len(ep.q) == 0 (Ready to shutdown), with rq1 in head + wg.Wait() + + // pump should be done, but the final readyEpoch should have been sent on + // the buffered ready chan. + rq1Out := <-ePump.ready + if rq1Out.EpochQueue.Epoch != eq1.Epoch { + t.Errorf("expected epoch %d, got %d", eq1.Epoch, rq1Out.EpochQueue.Epoch) + } + + // Test shutdown with multiple queued epochs. + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + ePump = newEpochPump() + wg.Add(1) + go func() { + defer wg.Done() + ePump.Run(ctx) + }() + + eq0 = NewEpoch(123413513, 1000) + rq0 = ePump.Insert(eq0) + eq1 = NewEpoch(123413514, 1000) + rq1 = ePump.Insert(eq1) + eq2 := NewEpoch(123413515, 1000) + rq2 := ePump.Insert(eq2) + + cancel() // testing len(ep.q) != 0 (stop after it drains the queue). + runtime.Gosched() // let Run start shutting things down + time.Sleep(50 * time.Millisecond) + + // Make sure epochs come out in order. + close(rq0.ready) + rq0Out = <-ePump.ready + if rq0Out.EpochQueue.Epoch != eq0.Epoch { + t.Errorf("expected epoch %d, got %d", eq0.Epoch, rq0Out.EpochQueue.Epoch) + } + + close(rq1.ready) + rq1Out = <-ePump.ready + if rq1Out.EpochQueue.Epoch != eq1.Epoch { + t.Errorf("expected epoch %d, got %d", eq1.Epoch, rq1Out.EpochQueue.Epoch) + } + + close(rq2.ready) + rq2Out := <-ePump.ready + if rq2Out.EpochQueue.Epoch != eq2.Epoch { + t.Errorf("expected epoch %d, got %d", eq2.Epoch, rq2Out.EpochQueue.Epoch) + } + + select { + case _, ok := <-ePump.ready: + if ok { + t.Errorf("ready channel should be closed now (Run returned), but something came through") + } + case <-time.After(time.Second): + t.Errorf("ready channel should be closed by now (Run returned)") + } + + rqX := ePump.Insert(eq2) + if rqX != nil { + t.Errorf("halted epoch pump allowed insertion of new epoch queue") + } + + wg.Wait() +} + +func Test_epochPump_next(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ep := newEpochPump() + var wg sync.WaitGroup + wg.Add(1) + go func() { + ep.Run(ctx) + wg.Done() + }() + + waitTime := int64(5 * time.Second) + if testing.Short() { + waitTime = int64(1 * time.Second) + } + + var epochStart, epochDur, numEpochs int64 = 123413513, 1_000, 100 + epochs := make([]*readyEpoch, numEpochs) + for i := int64(0); i < numEpochs; i++ { + rq := ep.Insert(NewEpoch(i+epochStart, epochDur)) + epochs[i] = rq + + // Simulate preimage collection, randomly making the queues ready. + go func() { + wait := time.Duration(rand.Int63n(waitTime)) + time.Sleep(wait) + close(rq.ready) + }() + } + + // Receive all the ready epochs, verifying they come in order. + for i := epochStart; i < numEpochs+epochStart; i++ { + rq := <-ep.ready + if rq.Epoch != i { + t.Errorf("Received epoch %d, expected %d", rq.Epoch, i) + } + } + // All fake preimage collection goroutines are done now. + + cancel() + wg.Wait() +} diff --git a/server/market/market.go b/server/market/market.go index 3beb6b6cfe..23f8340c5e 100644 --- a/server/market/market.go +++ b/server/market/market.go @@ -9,7 +9,6 @@ import ( "errors" "fmt" "sync" - "sync/atomic" "time" "decred.org/dcrdex/dex" @@ -77,18 +76,23 @@ type Market struct { marketInfo *dex.MarketInfo // Communications. - orderRouter chan *orderUpdateSignal // incoming orders - orderFeedMtx sync.RWMutex // guards orderFeeds and running - orderFeeds []chan *bookUpdateSignal // outgoing notifications - running chan struct{} + orderRouter chan *orderUpdateSignal // incoming orders, via SubmitOrderAsync - startEpochIdx int64 // atomic access only + orderFeedMtx sync.RWMutex // guards orderFeeds and running + orderFeeds []chan *updateSignal // all outgoing notification consumers - bookMtx sync.Mutex // guards book and epochIdx - book *book.Book - epochIdx int64 // next epoch from the point of view of the book + runMtx sync.RWMutex + running chan struct{} // closed when running + + bookMtx sync.Mutex // guards book and bookEpochIdx + book *book.Book + bookEpochIdx int64 // next epoch from the point of view of the book epochMtx sync.RWMutex + startEpochIdx int64 + activeEpochIdx int64 + suspendEpochIdx int64 + persistBook bool epochCommitments map[order.Commitment]order.OrderID epochOrders map[order.OrderID]order.Order @@ -149,9 +153,8 @@ func NewMarket(mktInfo *dex.MarketInfo, storage db.DEXArchivist, swapper Swapper } return &Market{ - running: make(chan struct{}), + running: make(chan struct{}), // closed on market start marketInfo: mktInfo, - orderRouter: make(chan *orderUpdateSignal, 16), book: Book, matcher: matcher.New(), epochCommitments: make(map[order.Commitment]order.OrderID), @@ -164,15 +167,69 @@ func NewMarket(mktInfo *dex.MarketInfo, storage db.DEXArchivist, swapper Swapper }, nil } +// SuspendASAP suspends requests the market to gracefully suspend epoch cycling +// as soon as possible, always allowing an active epoch to close. See also +// Suspend. +func (m *Market) SuspendASAP(persistBook bool) (finalEpochIdx int64, finalEpochEnd time.Time) { + return m.Suspend(time.Time{}, persistBook) +} + +// Suspend requests the market to gracefully suspend epoch cycling as soon as +// the given time, always allowing the epoch including that time to complete. If +// the time is before the current epoch, the current epoch will be the last. +func (m *Market) Suspend(asSoonAs time.Time, persistBook bool) (finalEpochIdx int64, finalEpochEnd time.Time) { + // epochMtx guards activeEpochIdx, startEpochIdx, suspendEpochIdx, and + // persistBook. + m.epochMtx.Lock() + defer m.epochMtx.Unlock() + + dur := int64(m.EpochDuration()) + + epochEnd := func(idx int64) time.Time { + start := encode.UnixTimeMilli(idx * dur) + return start.Add(time.Duration(dur) * time.Millisecond) + } + + // Determine which epoch includes asSoonAs, and compute its end time. If + // asSoonAs is in a past epoch, suspend at the end of the active epoch. + + soonestFinalIdx := m.activeEpochIdx + if soonestFinalIdx == 0 { + // Cannot schedule a suspend if Run isn't running. + if m.startEpochIdx == 0 { + return -1, time.Time{} + } + // Not yet started. Soonest suspend idx is the start epoch idx - 1. + soonestFinalIdx = m.startEpochIdx - 1 + } + + if soonestEnd := epochEnd(soonestFinalIdx); asSoonAs.Before(soonestEnd) { + // Suspend at the end of the active epoch or the one prior to start. + finalEpochIdx = soonestFinalIdx + finalEpochEnd = soonestEnd + } else { + // Suspend at the end of the epoch that includes the target time. + ms := encode.UnixMilli(asSoonAs) + finalEpochIdx = ms / dur + // Allow stopping at boundary, prior to the epoch starting at this time. + if ms%dur == 0 { + finalEpochIdx-- + } + finalEpochEnd = epochEnd(finalEpochIdx) + } + + m.suspendEpochIdx = finalEpochIdx + m.persistBook = persistBook + + return +} + // SetStartEpochIdx sets the starting epoch index. This should generally be // called before Run, or Start used to specify the index at the same time. func (m *Market) SetStartEpochIdx(startEpochIdx int64) { - atomic.StoreInt64(&m.startEpochIdx, startEpochIdx) -} - -// StartEpochIdx gets the starting epoch index. -func (m *Market) StartEpochIdx() int64 { - return atomic.LoadInt64(&m.startEpochIdx) + m.epochMtx.Lock() + m.startEpochIdx = startEpochIdx + m.epochMtx.Unlock() } // Start begins order processing with a starting epoch index. See also @@ -184,12 +241,56 @@ func (m *Market) Start(ctx context.Context, startEpochIdx int64) { // waitForEpochOpen waits until the start of epoch processing. func (m *Market) waitForEpochOpen() { - m.orderFeedMtx.RLock() - c := m.running - m.orderFeedMtx.RUnlock() + m.runMtx.RLock() + c := m.running // the field may be rewritten, but only after close + m.runMtx.RUnlock() <-c } +// Status describes the operation state of the Market. +type Status struct { + Running bool + EpochDuration uint64 // to compute times from epoch inds + ActiveEpoch int64 + StartEpoch int64 + SuspendEpoch int64 + PersistBook bool +} + +// Status returns the current operating state of the Market. +func (m *Market) Status() *Status { + m.epochMtx.Lock() + defer m.epochMtx.Unlock() + return &Status{ + Running: m.Running(), + EpochDuration: m.marketInfo.EpochDuration, + ActiveEpoch: m.activeEpochIdx, + StartEpoch: m.startEpochIdx, + SuspendEpoch: m.suspendEpochIdx, + PersistBook: m.persistBook, + } +} + +// Running indicates is the market is accepting new orders. This will return +// false when suspended, but false does not necessarily mean Run has stopped +// since a start epoch may be set. Note that this method is of limited use and +// communicating subsystems shouldn't rely on the result for correct operation +// since a market could start or stop. Rather, they should infer or be informed +// of market status rather than rely on this. +// +// TODO: Instead of using Running in OrderRouter and DEX, these types should +// track statuses (known suspend times). +func (m *Market) Running() bool { + m.runMtx.RLock() + defer m.runMtx.RUnlock() + select { + case <-m.running: + return true + default: + return false + } +} + // EpochDuration returns the Market's epoch duration in milliseconds. func (m *Market) EpochDuration() uint64 { return m.marketInfo.EpochDuration @@ -214,8 +315,8 @@ func (m *Market) Quote() uint32 { // the market starts and while a market is running are both valid. When the // market stops, channels are closed (invalidated), and new channels should be // requested if the market starts again. -func (m *Market) OrderFeed() <-chan *bookUpdateSignal { - bookUpdates := make(chan *bookUpdateSignal, 1) +func (m *Market) OrderFeed() <-chan *updateSignal { + bookUpdates := make(chan *updateSignal, 1) m.orderFeedMtx.Lock() m.orderFeeds = append(m.orderFeeds, bookUpdates) m.orderFeedMtx.Unlock() @@ -247,6 +348,12 @@ func (m *Market) TxMonitored(user account.AccountID, asset uint32, txid string) // When submission is completed, an error value will be sent on the channel. // This is the asynchronous version of SubmitOrder. func (m *Market) SubmitOrderAsync(rec *orderRecord) <-chan error { + sendErr := func(err error) <-chan error { + errChan := make(chan error, 1) + errChan <- err // i.e. ErrInvalidOrder, ErrInvalidCommitment + return errChan + } + // Validate the order. The order router must do it's own validation, but do // a second validation for (1) this Market and (2) epoch status, before // putting it on the queue. @@ -254,12 +361,25 @@ func (m *Market) SubmitOrderAsync(rec *orderRecord) <-chan error { // Order ID cannot be computed since ServerTime has not been set. log.Debugf("SubmitOrderAsync: Invalid order received from user %v with commitment %v: %v", rec.order.User(), rec.order.Commitment(), err) - errChan := make(chan error, 1) - errChan <- err // i.e. ErrInvalidOrder, ErrInvalidCommitment - return errChan + return sendErr(err) + } + + // Only submit orders while market is running. + m.runMtx.RLock() + defer m.runMtx.RUnlock() + + select { + case <-m.running: + default: + // m.orderRouter is closed + log.Infof("SubmitOrderAsync: Market stopped with an order in submission (commitment %v).", + rec.order.Commitment()) // The order is not time stamped, so no OrderID. + return sendErr(ErrMarketNotRunning) } sig := newOrderUpdateSignal(rec) + // The lock is still held, so there is a receiver: either Run's main loop or + // the drain in Run's defer that runs until m.running starts blocking. m.orderRouter <- sig return sig.errChan } @@ -337,23 +457,6 @@ func (m *Market) CancelableBy(oid order.OrderID, aid account.AccountID) bool { return false } -func (m *Market) notify(ctx context.Context, sig *bookUpdateSignal) { - // send to each receiver - m.orderFeedMtx.RLock() - defer m.orderFeedMtx.RUnlock() - // The market may have shut down while waiting for the lock. - if ctx.Err() != nil { - return - } - for _, s := range m.orderFeeds { - select { - case <-ctx.Done(): - return - case s <- sig: - } - } -} - // Book retrieves the market's current order book and the current epoch index. // If the Market is not yet running or the start epoch has not yet begun, the // epoch index will be zero. @@ -362,109 +465,231 @@ func (m *Market) Book() (epoch int64, buys, sells []*order.LimitOrder) { m.bookMtx.Lock() buys = m.book.BuyOrders() sells = m.book.SellOrders() - epoch = m.epochIdx + epoch = m.bookEpochIdx m.bookMtx.Unlock() return } +// PurgeBook flushes all booked orders from the in-memory book and persistent +// storage. In terms of storage, this means changing orders with status booked +// to status revoked. +func (m *Market) PurgeBook() { + m.bookMtx.Lock() + defer m.bookMtx.Unlock() + + // Revoke all booked orders in the DB. + sellsRemoved, buysRemoved, err := m.storage.FlushBook(m.marketInfo.Base, m.marketInfo.Quote) + if err != nil { + log.Errorf("Failed to flush book for market %s: %v", m.marketInfo.Name, err) + } else { + log.Infof("Flushed %d sell orders and %d buy orders from market %q book", + len(sellsRemoved), len(buysRemoved), m.marketInfo.Name) + // Clear the in-memory order book to match the DB. + m.book.Clear() + // Unlock coins for removed orders. + + // TODO: only unlock previously booked order coins, do not include coins + // that might belong to orders still in epoch status. This won't matter + // if the market is suspended, but it does if PurgeBook is used while + // the market is still accepting new orders and processing epochs. + + // Unlock base asset coins locked by sell orders. + for i := range sellsRemoved { + m.coinLockerBase.UnlockOrderCoins(sellsRemoved[i]) + } + // Unlock quote asset coins locked by buy orders. + for i := range buysRemoved { + m.coinLockerQuote.UnlockOrderCoins(buysRemoved[i]) + } + } +} + // Run is the main order processing loop, which takes new orders, notifies book // subscribers, and cycles the epochs. The caller should cancel the provided // Context to stop the market. When Run returns, all book order feeds obtained // via OrderFeed are closed and invalidated. Clients must request a new feed to // receive updates when and if the Market restarts. func (m *Market) Run(ctx context.Context) { + + var running bool ctxRun, cancel := context.WithCancel(ctx) - defer cancel() + var wgFeeds, wgEpochs sync.WaitGroup + notifyChan := make(chan *updateSignal, 32) + + // For clarity, define the shutdown sequence in a single closure rather than + // the defer stack. + defer func() { + // Drain the order router of incoming orders that made it in after the + // main loop broke and before flagging the market stopped. Do this in a + // goroutine because the market is flagged as stopped under runMtx lock + // in this defer and there is a risk of deadlock in SubmitOrderAsync + // that sends under runMtx lock as well. + wgFeeds.Add(1) + go func() { + defer wgFeeds.Done() + for sig := range m.orderRouter { + sig.errChan <- ErrMarketNotRunning + } + }() + + // Under lock, flag as not running. + m.runMtx.Lock() // block while SubmitOrderAsync is sending to the drain + if !running { + // In case the market is stopped before the first epoch, close the + // running channel so that waitForEpochOpen does not hang. + close(m.running) + } + m.running = make(chan struct{}) + running = false + close(m.orderRouter) // stop the order router drain + m.runMtx.Unlock() + + // Stop and wait for epoch pump and processing pipeline goroutines. + cancel() // may already be done by suspend + wgEpochs.Wait() + + // Stop and wait for the order feed goroutine. + close(notifyChan) + wgFeeds.Wait() + + // Close and delete the outgoing order feed channels as a signal to + // subscribers that the Market has terminated. + m.orderFeedMtx.Lock() + for _, s := range m.orderFeeds { + close(s) + } + m.orderFeeds = nil + m.orderFeedMtx.Unlock() + + // persistBook is set under epochMtx lock. + m.epochMtx.RLock() + if !m.persistBook { + m.PurgeBook() + } + m.epochMtx.RUnlock() + + log.Infof("Market %q stopped.", m.marketInfo.Name) + }() + + // Start outgoing order feed notification goroutine. + wgFeeds.Add(1) + go func() { + defer wgFeeds.Done() + for sig := range notifyChan { + // send to each receiver + m.orderFeedMtx.RLock() + feeds := m.orderFeeds + m.orderFeedMtx.RUnlock() + for _, s := range feeds { + s <- sig + } + } + }() // Start the closed epoch pump, which drives preimage collection and orderly // epoch processing. eq := newEpochPump() - var wg sync.WaitGroup - wg.Add(1) + wgEpochs.Add(1) go func() { - defer wg.Done() + defer wgEpochs.Done() eq.Run(ctxRun) }() // Start the closed epoch processing pipeline. - wg.Add(1) + wgEpochs.Add(1) go func() { - defer wg.Done() - for { - select { - case ep, ok := <-eq.ready: - if !ok { - return - } - - // epochStart has completed preimage collection. - m.processReadyEpoch(ctxRun, ep) - case <-ctxRun.Done(): - return - } + defer wgEpochs.Done() + for ep := range eq.ready { + // epochStart has completed preimage collection. + m.processReadyEpoch(ep, notifyChan) } + log.Debugf("epoch pump drained for market %s", m.marketInfo.Name) + // There must be no more notify calls. }() - nextEpochIdx := atomic.LoadInt64(&m.startEpochIdx) + m.epochMtx.Lock() + nextEpochIdx := m.startEpochIdx if nextEpochIdx == 0 { log.Warnf("Run: startEpochIdx not set. Starting at the next epoch.") now := encode.UnixMilli(time.Now()) nextEpochIdx = 1 + now/int64(m.EpochDuration()) + m.startEpochIdx = nextEpochIdx } + m.epochMtx.Unlock() + epochDuration := int64(m.marketInfo.EpochDuration) nextEpoch := NewEpoch(nextEpochIdx, epochDuration) epochCycle := time.After(time.Until(nextEpoch.Start)) - var running bool var currentEpoch *EpochQueue cycleEpoch := func() { if currentEpoch != nil { // Process the epoch asynchronously since there is a delay while the // preimages are requested and clients respond with their preimages. - m.enqueueEpoch(eq, currentEpoch) + if !m.enqueueEpoch(eq, currentEpoch) { + return + } // The epoch is closed, long live the epoch. - sig := &bookUpdateSignal{ - action: newEpochAction, - epochIdx: nextEpoch.Epoch, + sig := &updateSignal{ + action: newEpochAction, + data: sigDataNewEpoch{idx: nextEpoch.Epoch}, } - m.notify(ctxRun, sig) + notifyChan <- sig } - currentEpoch = nextEpoch - nextEpoch = NewEpoch(currentEpoch.Epoch+1, epochDuration) - epochCycle = time.After(time.Until(nextEpoch.Start)) + // Guard activeEpochIdx and suspendEpochIdx. + m.epochMtx.Lock() + defer m.epochMtx.Unlock() + + // Check suspendEpochIdx and suspend if the just-closed epoch idx is the + // suspend epoch. + if m.suspendEpochIdx == nextEpoch.Epoch-1 { + epochCloseTime := encode.UnixMilli(currentEpoch.End) + + // Reject incoming orders. + currentEpoch = nil + m.activeEpochIdx = 0 + + // Signal to the book router of the suspend: + notifyChan <- &updateSignal{ + action: suspendAction, + data: sigDataSuspend{ + finalEpoch: m.suspendEpochIdx, + stopTime: epochCloseTime, + persistBook: m.persistBook, + }, + } - if !running { - close(m.running) // no lock, this field is not set in another goroutine - running = true + cancel() // graceful market shutdown + return } - } - defer func() { - m.orderFeedMtx.Lock() - for _, s := range m.orderFeeds { - close(s) - } - m.orderFeeds = nil - // In case the market is stopped before the first epoch, close the - // running channel so that waitForEpochOpen does not hang. + currentEpoch = nextEpoch + nextEpochIdx = currentEpoch.Epoch + 1 + m.activeEpochIdx = currentEpoch.Epoch + if !running { + // Open up SubmitOrderAsync. close(m.running) + running = true } - // Make a new running channel for any future Run. - m.running = make(chan struct{}) // also guarded in OrderFeed and waitForEpochOpen - m.orderFeedMtx.Unlock() - wg.Wait() + // Replace the next epoch and set the cycle Timer. + nextEpoch = NewEpoch(nextEpochIdx, epochDuration) + epochCycle = time.After(time.Until(nextEpoch.Start)) + } - log.Debugf("Market %q stopped.", m.marketInfo.Name) - }() + // Set the orderRouter field now since the main loop below receives on it, + // even though SubmitOrderAsync disallows sends on orderRouter when the + // market is not running. + m.orderRouter = make(chan *orderUpdateSignal, 32) // implicitly guarded by m.runMtx since Market is not running yet for { if ctxRun.Err() != nil { return } + if err := m.storage.LastErr(); err != nil { log.Criticalf("Archivist failing. Last unexpected error: %v", err) return @@ -477,6 +702,11 @@ func (m *Market) Run(ctx context.Context) { default: } + // cycleEpoch can cancel ctxRun if suspend initiated. + if ctxRun.Err() != nil { + return + } + // Wait for the next signal (cancel, new order, or epoch cycle). select { case <-ctxRun.Done(): @@ -514,14 +744,10 @@ func (m *Market) Run(ctx context.Context) { } // Stamp and process the order in the target epoch queue. - err := m.processOrder(ctxRun, s.rec, orderEpoch, s.errChan) + err := m.processOrder(s.rec, orderEpoch, notifyChan, s.errChan) if err != nil { - if ctxRun.Err() == nil { - // This was not context cancellation. - log.Errorf("Failed to process order %v: %v", s.rec.order, err) - // Signal to the other Run goroutines to return. - cancel() - } + log.Errorf("Failed to process order %v: %v", s.rec.order, err) + // Signal to the other Run goroutines to return. return } @@ -588,7 +814,7 @@ func (m *Market) unlockOrderCoins(o order.Order) { // 4. Insert the order into the EpochQueue. // 5. Respond to the client that placed the order. // 6. Notify epoch queue event subscribers. -func (m *Market) processOrder(ctx context.Context, rec *orderRecord, epoch *EpochQueue, errChan chan<- error) error { +func (m *Market) processOrder(rec *orderRecord, epoch *EpochQueue, notifyChan chan<- *updateSignal, errChan chan<- error) error { // Verify that an order with the same commitment is not already in the epoch // queue. Since commitment is part of the order serialization and thus order // ID, this also prevents orders with the same ID. @@ -648,8 +874,8 @@ func (m *Market) processOrder(ctx context.Context, rec *orderRecord, epoch *Epoc } // For market and limit orders, lock the backing coins NOW so orders using - // locked coins cannot get into the epoch queue. Later, in processEpoch or - // the Swapper, release these coins when the swap is completed. + // locked coins cannot get into the epoch queue. Later, in processReadyEpoch + // or the Swapper, release these coins when the swap is completed. m.lockOrderCoins(ord) // Check for known orders in the DB with the same Commitment. @@ -697,12 +923,13 @@ func (m *Market) processOrder(ctx context.Context, rec *orderRecord, epoch *Epoc m.auth.Send(ord.User(), respMsg) // Send epoch update to epoch queue subscribers. - sig := &bookUpdateSignal{ - action: epochAction, - order: ord, - epochIdx: epoch.Epoch, + notifyChan <- &updateSignal{ + action: epochAction, + data: sigDataEpochOrder{ + order: ord, + epochIdx: epoch.Epoch, + }, } - go m.notify(ctx, sig) return nil } @@ -865,113 +1092,15 @@ func (m *Market) collectPreimages(orders []order.Order) (cSum []byte, ordersReve return } -type readyEpoch struct { - *EpochQueue - ready chan struct{} - cSum []byte - ordersRevealed []*matcher.OrderRevealed - misses []order.Order -} - -type epochPump struct { - ready chan *readyEpoch // consumer receives from this - - mtx sync.RWMutex - q []*readyEpoch - head chan *readyEpoch // internal -} - -func newEpochPump() *epochPump { - return &epochPump{ - ready: make(chan *readyEpoch, 1), - head: make(chan *readyEpoch, 1), - } -} - -func (ep *epochPump) Run(ctx context.Context) { - defer close(ep.ready) - for { - rq, ok := <-ep.next(ctx) - if !ok { - return - } - select { - case ep.ready <- rq: // consumer should receive this - case <-ctx.Done(): - return - } - } -} - -// Insert enqueues an EpochQueue and start's preimage collection immediately. -// Access epoch queues in order and when they have completed preimage collection -// by receiving from the epochPump.ready channel. -func (ep *epochPump) Insert(epoch *EpochQueue) *readyEpoch { - rq := &readyEpoch{ - EpochQueue: epoch, - ready: make(chan struct{}), - } - ep.mtx.Lock() - select { - case ep.head <- rq: // buffered, so non-blocking when empty and no receiver - default: - ep.push(rq) - } - ep.mtx.Unlock() - return rq -} - -// push appends a new readyEpoch to the closed epoch queue, q. It is not -// thread-safe. push is only used in Insert. -func (ep *epochPump) push(rq *readyEpoch) { - ep.q = append(ep.q, rq) -} - -// popFront removes the next readyEpoch from the closed epoch queue, q. It is -// not thread-safe. pop is only used in next to advance the head of the pump. -func (ep *epochPump) popFront() *readyEpoch { - if len(ep.q) == 0 { - return nil - } - x := ep.q[0] - ep.q = ep.q[1:] - return x -} - -// next provides a channel for receiving the next readyEpoch when it completes -// preimage collection. next blocks until there is an epoch to send. -func (ep *epochPump) next(ctx context.Context) <-chan *readyEpoch { - var next *readyEpoch - ready := make(chan *readyEpoch) - select { - case <-ctx.Done(): - close(ready) - return ready - case next = <-ep.head: // block if their is no next yet - } - - ep.mtx.Lock() - defer ep.mtx.Unlock() - - // If the queue is not empty, set new head. - x := ep.popFront() - if x != nil { - ep.head <- x // non-blocking - } - - // Send next on the returned channel when it becomes ready. If the process - // dies before goroutine completion, the Market is down anyway. - go func() { - <-next.ready // block until preimage collection is complete - ready <- next - }() - return ready -} - -func (m *Market) enqueueEpoch(eq *epochPump, epoch *EpochQueue) { +func (m *Market) enqueueEpoch(eq *epochPump, epoch *EpochQueue) bool { // Enqueue the epoch for matching when preimage collection is completed and // it is this epoch's turn. rq := eq.Insert(epoch) + if rq == nil { + // should not happen if cycleEpoch considers when the halt began. + log.Errorf("failed to enqueue an epoch into a halted epoch pump") + return false + } // With this epoch closed, these orders are no longer cancelable, if and // until they are booked in processReadyEpoch (after preimage collection). @@ -988,6 +1117,8 @@ func (m *Market) enqueueEpoch(eq *epochPump, epoch *EpochQueue) { rq.cSum, rq.ordersRevealed, rq.misses = m.epochStart(orders) close(rq.ready) }() + + return true } // epochStart collects order preimages, and penalizes users who fail to respond. @@ -1016,7 +1147,7 @@ func (m *Market) epochStart(orders []order.Order) (cSum []byte, ordersRevealed [ // 4. Lock coins with the swap lock. // 5. Initiate the swap negotiation via the Market's Swapper. // The EpochQueue's Orders map must not be modified by another goroutine. -func (m *Market) processReadyEpoch(ctx context.Context, epoch *readyEpoch) { +func (m *Market) processReadyEpoch(epoch *readyEpoch, notifyChan chan<- *updateSignal) { // Ensure the epoch has actually completed preimage collection. This can // only fail if the epochPump malfunctioned. Remove this check eventually. select { @@ -1042,7 +1173,7 @@ func (m *Market) processReadyEpoch(ctx context.Context, epoch *readyEpoch) { m.bookMtx.Lock() // allow a coherent view of book orders with (*Market).Book matchTime := time.Now() // considered as the time at which matched cancel orders are executed seed, matches, _, failed, doneOK, partial, booked, unbooked, updates := m.matcher.Match(m.book, ordersRevealed) - m.epochIdx = epoch.Epoch + 1 + m.bookEpochIdx = epoch.Epoch + 1 m.bookMtx.Unlock() if len(ordersRevealed) > 0 { log.Infof("Matching complete for market %v epoch %d:"+ @@ -1177,21 +1308,22 @@ func (m *Market) processReadyEpoch(ctx context.Context, epoch *readyEpoch) { for i := range ordersRevealed { preimages[i] = ordersRevealed[i].Preimage } - sig := &bookUpdateSignal{ + sig := &updateSignal{ action: matchProofAction, - matchProof: &order.MatchProof{ - Epoch: order.EpochID{ - Idx: uint64(epoch.Epoch), - Dur: m.EpochDuration(), + data: sigDataMatchProof{ + matchProof: &order.MatchProof{ + Epoch: order.EpochID{ + Idx: uint64(epoch.Epoch), + Dur: m.EpochDuration(), + }, + Preimages: preimages, + Misses: misses, + CSum: cSum, + Seed: seed, }, - Preimages: preimages, - Misses: misses, - CSum: cSum, - Seed: seed, }, - epochIdx: epoch.Epoch, } - m.notify(ctx, sig) + notifyChan <- sig // Unlock passed but not booked order (e.g. matched market and immediate // orders) coins were locked upon order receipt in processOrder and must be @@ -1217,22 +1349,26 @@ func (m *Market) processReadyEpoch(ctx context.Context, epoch *readyEpoch) { // Send "book" notifications to order book subscribers. for _, ord := range booked { - sig := &bookUpdateSignal{ - action: bookAction, - order: ord.Order, - epochIdx: epoch.Epoch, + sig := &updateSignal{ + action: bookAction, + data: sigDataBookedOrder{ + order: ord.Order, + epochIdx: epoch.Epoch, + }, } - m.notify(ctx, sig) + notifyChan <- sig } // Send "unbook" notifications to order book subscribers. for _, ord := range unbooked { - sig := &bookUpdateSignal{ - action: unbookAction, - order: ord, - epochIdx: epoch.Epoch, + sig := &updateSignal{ + action: unbookAction, + data: sigDataUnbookedOrder{ + order: ord, + epochIdx: epoch.Epoch, + }, } - m.notify(ctx, sig) + notifyChan <- sig } // Initiate the swaps. diff --git a/server/market/market_test.go b/server/market/market_test.go index 97387a4aee..59ad6cb24c 100644 --- a/server/market/market_test.go +++ b/server/market/market_test.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "math/rand" + "runtime" "strings" "sync" "testing" @@ -45,6 +46,19 @@ func (ta *TArchivist) BookOrders(base, quote uint32) ([]*order.LimitOrder, error defer ta.mtx.Unlock() return ta.bookedOrders, nil } +func (ta *TArchivist) FlushBook(base, quote uint32) (sells, buys []order.OrderID, err error) { + ta.mtx.Lock() + defer ta.mtx.Unlock() + for _, lo := range ta.bookedOrders { + if lo.Sell { + sells = append(sells, lo.ID()) + } else { + buys = append(buys, lo.ID()) + } + } + ta.bookedOrders = nil + return +} func (ta *TArchivist) ActiveOrderCoins(base, quote uint32) (baseCoins, quoteCoins map[order.OrderID][]order.CoinID, err error) { return make(map[order.OrderID][]order.CoinID), make(map[order.OrderID][]order.CoinID), nil } @@ -241,7 +255,7 @@ func TestMarket_NewMarket_BookOrders(t *testing.T) { _ = storage.BookOrder(loBuy) // the stub does not error _ = storage.BookOrder(loSell) // the stub does not error - mkt, _, _, cleanup, err = newTestMarket(storage) + mkt, storage, _, cleanup, err = newTestMarket(storage) if err != nil { t.Fatalf("newTestMarket failure: %v", err) } @@ -260,6 +274,20 @@ func TestMarket_NewMarket_BookOrders(t *testing.T) { t.Errorf("booked sell order has incorrect ID. Expected %v, got %v", loSell.ID(), sells[0].ID()) } + + // PurgeBook should clear the in memory book and those in storage. + mkt.PurgeBook() + _, buys, sells = mkt.Book() + if len(buys) > 0 || len(sells) > 0 { + t.Fatalf("purged market had %d buys and %d sells, expected none.", + len(buys), len(sells)) + } + + los, _ := storage.BookOrders(mkt.marketInfo.Base, mkt.marketInfo.Quote) + if len(los) != 0 { + t.Errorf("stored book orders were not flushed") + } + } func TestMarket_Book(t *testing.T) { @@ -307,6 +335,296 @@ func TestMarket_Book(t *testing.T) { } } +func TestMarket_Suspend(t *testing.T) { + // Create the market. + mkt, _, _, cleanup, err := newTestMarket() + if err != nil { + t.Fatalf("newTestMarket failure: %v", err) + cleanup() + return + } + defer cleanup() + epochDurationMSec := int64(mkt.EpochDuration()) + + // Suspend before market start. + finalIdx, _ := mkt.Suspend(time.Now(), false) + if finalIdx != -1 { + t.Fatalf("not running market should not allow suspend") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + startEpochIdx := 2 + encode.UnixMilli(time.Now())/epochDurationMSec + startEpochTime := encode.UnixTimeMilli(startEpochIdx * epochDurationMSec) + midPrevEpochTime := startEpochTime.Add(time.Duration(-epochDurationMSec/2) * time.Millisecond) + + // ~----|-------|-------|-------| + // ^now ^prev ^start ^next + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + mkt.Start(ctx, startEpochIdx) + }() + + var wantClosedFeed bool + feed := mkt.OrderFeed() + wg.Add(1) + go func() { + defer wg.Done() + for range feed { + } + if !wantClosedFeed { + t.Errorf("order feed should not be closed") + } + }() + + // Wait until half way through the epoch prior to start, when we know Run is + // running but the market hasn't started yet. + <-time.After(time.Until(midPrevEpochTime)) + + // This tests the case where m.activeEpochIdx == 0 but start is scheduled. + // The suspend (final) epoch should be the one just prior to startEpochIdx. + persist := true + finalIdx, finalTime := mkt.Suspend(time.Now(), persist) + if finalIdx != startEpochIdx-1 { + t.Fatalf("finalIdx = %d, wanted %d", finalIdx, startEpochIdx-1) + } + if !startEpochTime.Equal(finalTime) { + t.Errorf("got finalTime = %v, wanted %v", finalTime, startEpochTime) + } + + if mkt.suspendEpochIdx != finalIdx { + t.Errorf("got suspendEpochIdx = %d, wanted = %d", mkt.suspendEpochIdx, finalIdx) + } + + // Set a new suspend time, in the future this time. + nextEpochIdx := startEpochIdx + 1 + nextEpochTime := encode.UnixTimeMilli(nextEpochIdx * epochDurationMSec) + + // Just before second epoch start. + finalIdx, finalTime = mkt.Suspend(nextEpochTime.Add(-1*time.Millisecond), persist) + if finalIdx != nextEpochIdx-1 { + t.Fatalf("finalIdx = %d, wanted %d", finalIdx, nextEpochIdx-1) + } + if !nextEpochTime.Equal(finalTime) { + t.Errorf("got finalTime = %v, wanted %v", finalTime, nextEpochTime) + } + + if mkt.suspendEpochIdx != finalIdx { + t.Errorf("got suspendEpochIdx = %d, wanted = %d", mkt.suspendEpochIdx, finalIdx) + } + + // Exactly at second epoch start, with same result. + wantClosedFeed = true // we intend to have this suspend happen + finalIdx, finalTime = mkt.Suspend(nextEpochTime, persist) + if finalIdx != nextEpochIdx-1 { + t.Fatalf("finalIdx = %d, wanted %d", finalIdx, nextEpochIdx-1) + } + if !nextEpochTime.Equal(finalTime) { + t.Errorf("got finalTime = %v, wanted %v", finalTime, nextEpochTime) + } + + if mkt.suspendEpochIdx != finalIdx { + t.Errorf("got suspendEpochIdx = %d, wanted = %d", mkt.suspendEpochIdx, finalIdx) + } + + mkt.waitForEpochOpen() + + // should be running + if !mkt.Running() { + t.Fatal("the market should have be running") + } + + // Wait until after suspend time. + <-time.After(time.Until(finalTime.Add(20 * time.Millisecond))) + + // should be stopped + if mkt.Running() { + t.Fatal("the market should have been suspended") + } + + wg.Wait() + + // Start up again (consumer resumes the Market manually) + startEpochIdx = 1 + encode.UnixMilli(time.Now())/epochDurationMSec + startEpochTime = encode.UnixTimeMilli(startEpochIdx * epochDurationMSec) + + wg.Add(1) + go func() { + defer wg.Done() + mkt.Start(ctx, startEpochIdx) + }() + + feed = mkt.OrderFeed() + wg.Add(1) + go func() { + defer wg.Done() + for range feed { + } + if !wantClosedFeed { + t.Errorf("order feed should not be closed") + } + }() + + mkt.waitForEpochOpen() + + // should be running + if !mkt.Running() { + t.Fatal("the market should have be running") + } + + // Suspend asap. + wantClosedFeed = true // allow the feed receiver goroutine to return w/o error + _, finalTime = mkt.SuspendASAP(persist) + <-time.After(time.Until(finalTime.Add(40 * time.Millisecond))) + + // Should be stopped + if mkt.Running() { + t.Fatal("the market should have been suspended") + } + + // Ensure the feed is closed (Run returned). + select { + case <-feed: + default: + t.Errorf("order feed should be closed") + } + + cancel() + wg.Wait() +} + +func TestMarket_Suspend_Persist(t *testing.T) { + // Create the market. + mkt, storage, _, cleanup, err := newTestMarket() + if err != nil { + t.Fatalf("newTestMarket failure: %v", err) + cleanup() + return + } + defer cleanup() + epochDurationMSec := int64(mkt.EpochDuration()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + startEpochIdx := 2 + encode.UnixMilli(time.Now())/epochDurationMSec + startEpochTime := encode.UnixTimeMilli(startEpochIdx * epochDurationMSec) + + // ~----|-------|-------|-------| + // ^now ^prev ^start ^next + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + mkt.Start(ctx, startEpochIdx) + }() + + var wantClosedFeed bool + feedReceiver := func() { + feed := mkt.OrderFeed() + for range feed { + } + if !wantClosedFeed { + t.Errorf("order feed should not be closed") + } + } + go feedReceiver() + + // Wait until after original start time. + <-time.After(time.Until(startEpochTime.Add(20 * time.Millisecond))) + + if !mkt.Running() { + t.Fatal("the market should be running") + } + + lo := makeLO(seller3, mkRate3(0.8, 1.0), randLots(10), order.StandingTiF) + ok := mkt.book.Insert(lo) + if !ok { + t.Fatalf("Failed to insert an order into Market's Book") + } + _ = storage.BookOrder(lo) + + // Suspend asap with no resume. The epoch with the limit order will be + // processed and then the market will suspend. + wantClosedFeed = true // allow the feed receiver goroutine to return w/o error + persist := true + _, finalTime := mkt.SuspendASAP(persist) + <-time.After(time.Until(finalTime.Add(40 * time.Millisecond))) + + // Wait for Run to return. + wg.Wait() + + // Should be stopped + if mkt.Running() { + t.Fatal("the market should have been suspended") + } + + // Verify the order is still there. + los, _ := storage.BookOrders(mkt.marketInfo.Base, mkt.marketInfo.Quote) + if len(los) == 0 { + t.Errorf("stored book orders were flushed") + } + + _, buys, sells := mkt.Book() + if len(buys) != 0 { + t.Errorf("buy side of book not empty") + } + if len(sells) != 1 { + t.Errorf("sell side of book not equal to 1") + } + + // Start it up again. + startEpochIdx = 1 + encode.UnixMilli(time.Now())/epochDurationMSec + startEpochTime = encode.UnixTimeMilli(startEpochIdx * epochDurationMSec) + wg.Add(1) + go func() { + defer wg.Done() + mkt.Start(ctx, startEpochIdx) + }() + + go feedReceiver() + + <-time.After(time.Until(startEpochTime.Add(20 * time.Millisecond))) + + if !mkt.Running() { + t.Fatal("the market should be running") + } + + persist = false + _, finalTime = mkt.SuspendASAP(persist) + <-time.After(time.Until(finalTime.Add(40 * time.Millisecond))) + + // Wait for Run to return. + wg.Wait() + + // Should be stopped + if mkt.Running() { + t.Fatal("the market should have been suspended") + } + + // Verify the order is gone. + los, _ = storage.BookOrders(mkt.marketInfo.Base, mkt.marketInfo.Quote) + if len(los) != 0 { + t.Errorf("stored book orders were not flushed") + } + + _, buys, sells = mkt.Book() + if len(buys) != 0 { + t.Errorf("buy side of book not empty") + } + if len(sells) != 0 { + t.Errorf("sell side of book not empty") + } + + cancel() + wg.Wait() +} + func TestMarket_Run(t *testing.T) { // This test exercises the Market's main loop, which cycles the epochs and // queues (or not) incoming orders. @@ -410,8 +728,18 @@ func TestMarket_Run(t *testing.T) { t.Fatalf(`expected ErrMarketNotRunning ("%v"), got "%v"`, ErrMarketNotRunning, err) } + mktStatus := mkt.Status() + if mktStatus.Running { + t.Errorf("Market should not be running yet") + } + mkt.waitForEpochOpen() + mktStatus = mkt.Status() + if !mktStatus.Running { + t.Errorf("Market should be running now") + } + // Submit again oRecord = newOR() storMsgPI(oRecord.msgID, pi) @@ -542,6 +870,7 @@ func TestMarket_Run(t *testing.T) { auth.handlePreimageDone = make(chan struct{}, 1) ctx, cancel = context.WithCancel(context.Background()) + defer cancel() wg.Add(1) go func() { defer wg.Done() @@ -625,9 +954,8 @@ func TestMarket_Run(t *testing.T) { t.Errorf(`expected ErrInternalServer ("%v"), got "%v"`, ErrInternalServer, err) } - // NOTE: The Market is now stopping! + // NOTE: The Market is now stopping on its own because of the storage failure. - cancel() wg.Wait() cleanup() } @@ -707,11 +1035,13 @@ func TestMarket_enqueueEpoch(t *testing.T) { auth.preimagesByOrdID[coMiss.UID()] = coMissPI auth.piMtx.Unlock() - var bookSignals []*bookUpdateSignal + var bookSignals []*updateSignal var mtx sync.Mutex - bookChan := mkt.OrderFeed() + // intercept what would go to an OrderFeed() chan of Run were running. + notifyChan := make(chan *updateSignal, 32) + defer close(notifyChan) // quit bookSignals receiver, but not necessary go func() { - for up := range bookChan { + for up := range notifyChan { //fmt.Println("received signal", up.action) mtx.Lock() bookSignals = append(bookSignals, up) @@ -719,14 +1049,16 @@ func TestMarket_enqueueEpoch(t *testing.T) { } }() + var wg sync.WaitGroup + defer wg.Wait() // wait for the following epoch pipeline goroutines + ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + defer cancel() // stop the following epoch pipeline goroutines // This test does not start the entire market, so manually start the epoch // queue pump, and a goroutine to receive ready (preimage collection // completed) epochs and start matching, etc. ePump := newEpochPump() - var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() @@ -739,21 +1071,12 @@ func TestMarket_enqueueEpoch(t *testing.T) { go func() { defer close(goForIt) defer wg.Done() - for { - select { - case ep, ok := <-ePump.ready: - if !ok { - return - } - - t.Logf("processReadyEpoch: %d orders revealed\n", len(ep.ordersRevealed)) + for ep := range ePump.ready { + t.Logf("processReadyEpoch: %d orders revealed\n", len(ep.ordersRevealed)) - // epochStart has completed preimage collection. - mkt.processReadyEpoch(ctx, ep) - goForIt <- struct{}{} - case <-ctx.Done(): - return - } + // epochStart has completed preimage collection. + mkt.processReadyEpoch(ep, notifyChan) // notify is async! + goForIt <- struct{}{} } }() @@ -766,27 +1089,27 @@ func TestMarket_enqueueEpoch(t *testing.T) { tests := []struct { name string epoch *EpochQueue - expectedBookSignals []*bookUpdateSignal + expectedBookSignals []*updateSignal }{ { "ok book unbook", eq, - []*bookUpdateSignal{ - {matchProofAction, nil, mp, epochIdx}, - {bookAction, lo, nil, epochIdx}, - {unbookAction, bestBuy, nil, epochIdx}, - {unbookAction, bestSell, nil, epochIdx}, + []*updateSignal{ + {matchProofAction, sigDataMatchProof{mp}}, + {bookAction, sigDataBookedOrder{lo, epochIdx}}, + {unbookAction, sigDataUnbookedOrder{bestBuy, epochIdx}}, + {unbookAction, sigDataUnbookedOrder{bestSell, epochIdx}}, }, }, { "ok no matches, on book updates", eq2, - []*bookUpdateSignal{{matchProofAction, nil, mp2, epochIdx}}, + []*updateSignal{{matchProofAction, sigDataMatchProof{mp2}}}, }, { "ok empty queue", NewEpoch(epochIdx, epochDur), - []*bookUpdateSignal{{matchProofAction, nil, mp0, epochIdx}}, + []*updateSignal{{matchProofAction, sigDataMatchProof{mp0}}}, }, } for _, tt := range tests { @@ -795,10 +1118,13 @@ func TestMarket_enqueueEpoch(t *testing.T) { // Wait for processReadyEpoch, which sends on buffered (async) book // order feed channels. <-goForIt - time.Sleep(50 * time.Millisecond) // let the test goroutine receive the signals, and update bookSignals + // Preimage collection has completed, but notifications are asynchronous. + runtime.Gosched() // defer to the notify goroutine in (*Market).Run, somewhat redundant with the following sleep + time.Sleep(250 * time.Millisecond) // let the test goroutine receive the signals on notifyChan, updating bookSignals + // TODO: if this sleep becomes a problem, a receive(expectedNotes int) function might be needed mtx.Lock() defer mtx.Unlock() // inside this closure - defer func() { bookSignals = []*bookUpdateSignal{} }() + defer func() { bookSignals = []*updateSignal{} }() if len(bookSignals) != len(tt.expectedBookSignals) { t.Fatalf("expected %d book update signals, got %d", len(tt.expectedBookSignals), len(bookSignals)) @@ -810,64 +1136,76 @@ func TestMarket_enqueueEpoch(t *testing.T) { i, s.action, exp.action) } - switch s.action { - case matchProofAction: - mp := exp.matchProof - if !bytes.Equal(mp.CSum, s.matchProof.CSum) { + switch sigData := s.data.(type) { + case sigDataMatchProof: + mp := sigData.matchProof + wantMp := exp.data.(sigDataMatchProof).matchProof + if !bytes.Equal(wantMp.CSum, mp.CSum) { t.Errorf("Book signal #%d (action %v), has CSum %x, expected %x", - i, s.action, s.matchProof.CSum, mp.CSum) + i, s.action, mp.CSum, wantMp.CSum) } - if !bytes.Equal(mp.Seed, s.matchProof.Seed) { + if !bytes.Equal(wantMp.Seed, mp.Seed) { t.Errorf("Book signal #%d (action %v), has Seed %x, expected %x", - i, s.action, s.matchProof.Seed, mp.Seed) + i, s.action, mp.Seed, wantMp.Seed) } - if mp.Epoch.Idx != s.matchProof.Epoch.Idx { + if wantMp.Epoch.Idx != mp.Epoch.Idx { t.Errorf("Book signal #%d (action %v), has Epoch Idx %d, expected %d", - i, s.action, s.matchProof.Epoch.Idx, mp.Epoch.Idx) + i, s.action, mp.Epoch.Idx, wantMp.Epoch.Idx) } - if mp.Epoch.Dur != s.matchProof.Epoch.Dur { + if wantMp.Epoch.Dur != mp.Epoch.Dur { t.Errorf("Book signal #%d (action %v), has Epoch Dur %d, expected %d", - i, s.action, s.matchProof.Epoch.Dur, mp.Epoch.Dur) + i, s.action, mp.Epoch.Dur, wantMp.Epoch.Dur) } - if len(mp.Preimages) != len(s.matchProof.Preimages) { + if len(wantMp.Preimages) != len(mp.Preimages) { t.Errorf("Book signal #%d (action %v), has %d Preimages, expected %d", - i, s.action, len(s.matchProof.Preimages), len(mp.Preimages)) + i, s.action, len(mp.Preimages), len(wantMp.Preimages)) continue } - for ii := range mp.Preimages { - if mp.Preimages[ii] != s.matchProof.Preimages[ii] { + for ii := range wantMp.Preimages { + if wantMp.Preimages[ii] != mp.Preimages[ii] { t.Errorf("Book signal #%d (action %v), has #%d Preimage %x, expected %x", - i, s.action, ii, s.matchProof.Preimages[ii], mp.Preimages[ii]) + i, s.action, ii, mp.Preimages[ii], wantMp.Preimages[ii]) } } - if len(mp.Misses) != len(s.matchProof.Misses) { + if len(wantMp.Misses) != len(mp.Misses) { t.Errorf("Book signal #%d (action %v), has %d Misses, expected %d", - i, s.action, len(s.matchProof.Misses), len(mp.Misses)) + i, s.action, len(mp.Misses), len(wantMp.Misses)) continue } - for ii := range mp.Misses { - if mp.Misses[ii].ID() != s.matchProof.Misses[ii].ID() { + for ii := range wantMp.Misses { + if wantMp.Misses[ii].ID() != mp.Misses[ii].ID() { t.Errorf("Book signal #%d (action %v), has #%d missed Order %v, expected %v", - i, s.action, ii, s.matchProof.Misses[ii].ID(), mp.Misses[ii].ID()) + i, s.action, ii, mp.Misses[ii].ID(), wantMp.Misses[ii].ID()) } } - case bookAction, unbookAction: - if exp.order.ID() != s.order.ID() { + + case sigDataBookedOrder: + wantOrd := exp.data.(sigDataBookedOrder).order + if wantOrd.ID() != sigData.order.ID() { t.Errorf("Book signal #%d (action %v) has order %v, expected %v", - i, s.action, s.order.ID(), exp.order.ID()) + i, s.action, sigData.order.ID(), wantOrd.ID()) } - } - if exp.epochIdx != s.epochIdx { - t.Errorf("Book signal #%d (action %v) has epoch index %d, expected %d", - i, s.action, s.epochIdx, exp.epochIdx) + case sigDataUnbookedOrder: + wantOrd := exp.data.(sigDataUnbookedOrder).order + if wantOrd.ID() != sigData.order.ID() { + t.Errorf("Unbook signal #%d (action %v) has order %v, expected %v", + i, s.action, sigData.order.ID(), wantOrd.ID()) + } + + case sigDataNewEpoch: + wantIdx := exp.data.(sigDataNewEpoch).idx + if wantIdx != sigData.idx { + t.Errorf("new epoch signal #%d (action %v) has epoch index %d, expected %d", + i, s.action, sigData.idx, wantIdx) + } } + } }) } cancel() - wg.Wait() } func TestMarket_Cancelable(t *testing.T) { @@ -1195,47 +1533,3 @@ func TestMarket_handlePreimageResp(t *testing.T) { msgErr.Message) } } - -func Test_epochPump_next(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ep := newEpochPump() - var wg sync.WaitGroup - wg.Add(1) - go func() { - ep.Run(ctx) - wg.Done() - }() - - waitTime := int64(5 * time.Second) - if testing.Short() { - waitTime = int64(1 * time.Second) - } - - var epochStart, epochDur, numEpochs int64 = 123413513, 1_000, 100 - epochs := make([]*readyEpoch, numEpochs) - for i := int64(0); i < numEpochs; i++ { - rq := ep.Insert(NewEpoch(i+epochStart, epochDur)) - epochs[i] = rq - - // Simulate preimage collection, randomly making the queues ready. - go func(epochIdx int64) { - wait := time.Duration(rand.Int63n(waitTime)) - time.Sleep(wait) - close(rq.ready) - }(i + epochStart) - } - - // Receive all the ready epochs, verifying they come in order. - for i := epochStart; i < numEpochs+epochStart; i++ { - rq := <-ep.ready - if rq.Epoch != i { - t.Errorf("Received epoch %d, expected %d", rq.Epoch, i) - } - } - // All fake preimage collection goroutines are done now. - - cancel() - wg.Wait() -} diff --git a/server/market/orderrouter.go b/server/market/orderrouter.go index a279804e78..1fb012480a 100644 --- a/server/market/orderrouter.go +++ b/server/market/orderrouter.go @@ -66,6 +66,15 @@ type MarketTunnel interface { // involved in a DEX-monitored trade. Change outputs from DEX-monitored trades // can be used in other orders without waiting for fundConf confirmations. TxMonitored(user account.AccountID, asset uint32, txid string) bool + + // Suspend suspends the market as soon as a given time, returning the final + // epoch index and and time at which that epoch closes. + Suspend(asSoonAs time.Time, persistBook bool) (finalEpochIdx int64, finalEpochEnd time.Time) + + // Running indicates is the market is accepting new orders. This will return + // false when suspended, but false does not necessarily mean Run has stopped + // since a start epoch may be set. + Running() bool } // orderRecord contains the information necessary to respond to an order @@ -167,6 +176,12 @@ func (r *OrderRouter) handleLimit(user account.AccountID, msg *msgjson.Message) return rpcErr } + // Spare some resources if the market is closed now. Any orders that make it + // through to a closed market will receive a similar error from SubmitOrder. + if !tunnel.Running() { + return msgjson.NewError(msgjson.MarketNotRunningError, "market closed to new orders") + } + // Check that OrderType is set correctly if limit.OrderType != msgjson.LimitOrderNum { return msgjson.NewError(msgjson.OrderParameterError, "wrong order type set for limit order") @@ -272,6 +287,10 @@ func (r *OrderRouter) handleMarket(user account.AccountID, msg *msgjson.Message) return rpcErr } + if !tunnel.Running() { + return msgjson.NewError(msgjson.MarketNotRunningError, "market %s closed to new orders") + } + // Check that OrderType is set correctly if market.OrderType != msgjson.MarketOrderNum { return msgjson.NewError(msgjson.OrderParameterError, "wrong order type set for market order") @@ -295,9 +314,9 @@ func (r *OrderRouter) handleMarket(user account.AccountID, msg *msgjson.Message) lotWithBuffer := uint64(float64(assets.base.LotSize) * buyBuffer) minReq := matcher.BaseToQuote(midGap, lotWithBuffer) - // TODO: I'm pretty sure that if there are no orders on the book, the midGap - // will be zero, and so will minReq, meaning any Quanity would be accepted. - // Is this a security concern? + // TODO: I'm pretty sure that if there are no orders on the book, the + // midGap will be zero, and so will minReq, meaning any Quantity would + // be accepted. Is this a security concern? if market.Quantity < minReq { errStr := fmt.Sprintf("order quantity does not satisfy market buy buffer. %d < %d. midGap = %d", reqVal, minReq, midGap) @@ -368,6 +387,10 @@ func (r *OrderRouter) handleCancel(user account.AccountID, msg *msgjson.Message) return rpcErr } + if !tunnel.Running() { + return msgjson.NewError(msgjson.MarketNotRunningError, "market %s closed to new orders") + } + if len(cancel.TargetID) != order.OrderIDSize { return msgjson.NewError(msgjson.OrderParameterError, "invalid target ID format") } @@ -452,6 +475,53 @@ func (r *OrderRouter) extractMarket(prefix *msgjson.Prefix) (MarketTunnel, *msgj return tunnel, nil } +// SuspendEpoch holds the index and end time of final epoch marking the +// suspension of a market. +type SuspendEpoch struct { + Idx int64 + End time.Time +} + +// SuspendMarket schedules a suspension of a given market, with the option to +// persist the orders on the book (or purge the book automatically on market +// shutdown). The scheduled final epoch and suspend time are returned. Note that +// OrderRouter is a proxy for this request to the ultimate Market. This is done +// because OrderRouter is the entry point for new orders into the market. TODO: +// track running, suspended, and scheduled-suspended markets, appropriately +// blocking order submission according to the schedule rather than just checking +// Market.Running prior to submitting incoming orders to the Market. +func (r *OrderRouter) SuspendMarket(mktName string, asSoonAs time.Time, persistBooks bool) *SuspendEpoch { + mkt, found := r.tunnels[mktName] + if !found { + return nil + } + + idx, t := mkt.Suspend(asSoonAs, persistBooks) + return &SuspendEpoch{ + Idx: idx, + End: t, + } +} + +// Suspend is like SuspendMarket, but for all known markets. TODO: use this in a +// "suspend all as soon as" DEX function with rather than shutting down in the +// middle of an active epoch as SIGINT shutdown presently does. +func (r *OrderRouter) Suspend(asSoonAs time.Time, persistBooks bool) map[string]*SuspendEpoch { + + suspendTimes := make(map[string]*SuspendEpoch, len(r.tunnels)) + for name, mkt := range r.tunnels { + idx, ts := mkt.Suspend(asSoonAs, persistBooks) + suspendTimes[name] = &SuspendEpoch{Idx: idx, End: ts} + } + + // MarketTunnel.Running will return false when the market closes, and true + // when and if it opens again. Locking/blocking of the incoming order + // handlers is not necessary since any orders that sneak in to a Market will + // be rejected if there is no active epoch. + + return suspendTimes +} + // extractMarketDetails finds the MarketTunnel, an assetSet, and market side for // the provided prefix. func (r *OrderRouter) extractMarketDetails(prefix *msgjson.Prefix, trade *msgjson.Trade) (MarketTunnel, *assetSet, bool, *msgjson.Error) { diff --git a/server/market/routers_test.go b/server/market/routers_test.go index 5275e7476d..42a6c38d96 100644 --- a/server/market/routers_test.go +++ b/server/market/routers_test.go @@ -274,6 +274,15 @@ func (m *TMarketTunnel) Cancelable(order.OrderID) bool { return m.cancelable } +func (m *TMarketTunnel) Suspend(asSoonAs time.Time, persistBook bool) (finalEpochIdx int64, finalEpochEnd time.Time) { + // no suspension + return -1, time.Time{} +} + +func (m *TMarketTunnel) Running() bool { + return true +} + type TBackend struct { utxoErr error utxos map[string]uint64 @@ -1082,19 +1091,19 @@ func makeCORevealed(writer *ordertest.Writer, targetID order.OrderID) (*order.Ca type TBookSource struct { buys []*order.LimitOrder sells []*order.LimitOrder - feed chan *bookUpdateSignal + feed chan *updateSignal } func tNewBookSource() *TBookSource { return &TBookSource{ - feed: make(chan *bookUpdateSignal, 16), + feed: make(chan *updateSignal, 16), } } func (s *TBookSource) Book() (eidx int64, buys []*order.LimitOrder, sells []*order.LimitOrder) { return 13241324, s.buys, s.sells } -func (s *TBookSource) OrderFeed() <-chan *bookUpdateSignal { +func (s *TBookSource) OrderFeed() <-chan *updateSignal { return s.feed } @@ -1126,6 +1135,15 @@ func (conn *TLink) Send(msg *msgjson.Message) error { conn.sends = append(conn.sends, msg) return conn.sendErr } +func (conn *TLink) SendError(id uint64, msgErr *msgjson.Error) { + msg, err := msgjson.NewResponse(id, nil, msgErr) + if err != nil { + log.Errorf("SendError: failed to create message: %v", err) + } + conn.mtx.Lock() + defer conn.mtx.Unlock() + conn.sends = append(conn.sends, msg) +} func (conn *TLink) getSend() *msgjson.Message { conn.mtx.Lock() @@ -1336,10 +1354,12 @@ func TestRouter(t *testing.T) { // An epoch notification sent on market 1's channel should arrive at both // clients. lo := makeLO(buyer1, mkRate1(0.8, 1.0), randLots(10), order.ImmediateTiF) - sig := &bookUpdateSignal{ - action: epochAction, - order: lo, - epochIdx: 12345678, + sig := &updateSignal{ + action: epochAction, + data: sigDataEpochOrder{ + order: lo, + epochIdx: 12345678, + }, } src1.feed <- sig tick(responseDelay) @@ -1354,8 +1374,9 @@ func TestRouter(t *testing.T) { compareLO(&epochNote.BookOrderNote, lo, msgjson.ImmediateOrderNum, "epoch notification, link2") // just for kicks, checks the epoch is as expected. - if epochNote.Epoch != uint64(sig.epochIdx) { - t.Fatalf("wrong epoch. wanted %d, got %d", sig.epochIdx, epochNote.Epoch) + wantIdx := sig.data.(sigDataEpochOrder).epochIdx + if epochNote.Epoch != uint64(wantIdx) { + t.Fatalf("wrong epoch. wanted %d, got %d", wantIdx, epochNote.Epoch) } // Have both subscribers subscribe to market 2. @@ -1370,10 +1391,12 @@ func TestRouter(t *testing.T) { // Send an epoch update for a market order. mo := makeMO(buyer2, randLots(10)) - sig = &bookUpdateSignal{ - action: epochAction, - order: mo, - epochIdx: 12345678, + sig = &updateSignal{ + action: epochAction, + data: sigDataEpochOrder{ + order: mo, + epochIdx: 12345678, + }, } src2.feed <- sig tick(responseDelay) @@ -1392,9 +1415,12 @@ func TestRouter(t *testing.T) { } lo.FillAmt = mkt2.LotSize - sig = &bookUpdateSignal{ + sig = &updateSignal{ action: bookAction, - order: lo, + data: sigDataBookedOrder{ + order: lo, + epochIdx: 12344365, + }, } src2.feed <- sig tick(responseDelay) @@ -1413,10 +1439,12 @@ func TestRouter(t *testing.T) { } // Now unbook the order. - sig = &bookUpdateSignal{ - action: unbookAction, - order: lo, - epochIdx: 12345678, + sig = &updateSignal{ + action: unbookAction, + data: sigDataUnbookedOrder{ + order: lo, + epochIdx: 12345678, + }, } src2.feed <- sig tick(responseDelay) @@ -1459,10 +1487,12 @@ func TestRouter(t *testing.T) { } mo = makeMO(seller1, randLots(10)) - sig = &bookUpdateSignal{ - action: epochAction, - order: mo, - epochIdx: 12345678, + sig = &updateSignal{ + action: epochAction, + data: sigDataEpochOrder{ + order: mo, + epochIdx: 12345678, + }, } src1.feed <- sig tick(responseDelay) @@ -1483,10 +1513,12 @@ func TestRouter(t *testing.T) { } oid := lo.ID() co := makeCO(buyer1, oid) - sig = &bookUpdateSignal{ - action: epochAction, - order: co, - epochIdx: 12345678, + sig = &updateSignal{ + action: epochAction, + data: sigDataEpochOrder{ + order: co, + epochIdx: 12345678, + }, } src1.feed <- sig tick(responseDelay)