Skip to content

Commit

Permalink
allow aborting DEX mgr construction (and DB setup)
Browse files Browse the repository at this point in the history
  • Loading branch information
chappjc committed Mar 27, 2021
1 parent d000f19 commit 26ae0c8
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 26 deletions.
3 changes: 1 addition & 2 deletions server/apidata/apidata.go
Expand Up @@ -4,7 +4,6 @@
package apidata

import (
"context"
"encoding/json"
"fmt"
"sync"
Expand Down Expand Up @@ -74,7 +73,7 @@ type DataAPI struct {
}

// NewDataAPI is the constructor for a new DataAPI.
func NewDataAPI(ctx context.Context, dbSrc DBSource) *DataAPI {
func NewDataAPI(dbSrc DBSource) *DataAPI {
s := &DataAPI{
db: dbSrc,
epochDurations: make(map[string]uint64),
Expand Down
15 changes: 4 additions & 11 deletions server/apidata/apidata_test.go
Expand Up @@ -4,7 +4,6 @@
package apidata

import (
"context"
"encoding/json"
"fmt"
"testing"
Expand Down Expand Up @@ -43,24 +42,20 @@ func (bs *TBookSource) Book(mktName string) (*msgjson.OrderBook, error) {
}

type testRig struct {
db *TDBSource
api *DataAPI
cancel context.CancelFunc
db *TDBSource
api *DataAPI
}

func newTestRig() *testRig {
ctx, cancel := context.WithCancel(context.Background())
db := new(TDBSource)
return &testRig{
db: db,
api: NewDataAPI(ctx, db),
cancel: cancel,
db: db,
api: NewDataAPI(db),
}
}

func TestAddMarketSource(t *testing.T) {
rig := newTestRig()
defer rig.cancel()
// initial success
err := rig.api.AddMarketSource(&TMarketSource{42, 0})
if err != nil {
Expand All @@ -87,7 +82,6 @@ func TestAddMarketSource(t *testing.T) {

func TestReportEpoch(t *testing.T) {
rig := newTestRig()
defer rig.cancel()
mktSrc := &TMarketSource{42, 0}
err := rig.api.AddMarketSource(mktSrc)
if err != nil {
Expand Down Expand Up @@ -176,7 +170,6 @@ func TestReportEpoch(t *testing.T) {

func TestOrderBook(t *testing.T) {
rig := newTestRig()
defer rig.cancel()
book := new(msgjson.OrderBook)
rig.api.SetBookSource(&TBookSource{book})
bookI, err := rig.api.handleOrderBook(&msgjson.OrderBookSubscription{
Expand Down
2 changes: 1 addition & 1 deletion server/cmd/dcrdex/main.go
Expand Up @@ -146,7 +146,7 @@ func mainCore(ctx context.Context) error {
},
NoResumeSwaps: cfg.NoResumeSwaps,
}
dexMan, err := dexsrv.NewDEX(dexConf)
dexMan, err := dexsrv.NewDEX(ctx, dexConf) // ctx cancel just aborts setup; Stop does normal shutdown
if err != nil {
return err
}
Expand Down
18 changes: 15 additions & 3 deletions server/db/driver/pg/upgrades.go
Expand Up @@ -186,7 +186,8 @@ func v2Upgrade(tx *sql.Tx) error {
}
log.Infof(" - Processing epochs [%d, %d)...", idx, to)
}
rates, quantities, _, err := matchStatsForMarketEpoch(matchStatsStmtPrep, idx, dur)
var rates, quantities []uint64 // don't shadow err from outer scope
rates, quantities, _, err = matchStatsForMarketEpoch(matchStatsStmtPrep, idx, dur)
if err != nil {
return err
}
Expand Down Expand Up @@ -313,8 +314,16 @@ func upgradeDB(ctx context.Context, db *sql.DB) error {
return err
}
defer func() {
if err != nil {
_ = tx.Rollback() // redundant if ctx canceled (sql.ErrTxDone)
// On error, rollback the transaction unless ctx was canceled
// (sql.ErrTxDone) because then rollback is automatic. See the
// (*sql.DB).BeginTx docs.
if err == nil || errors.Is(err, sql.ErrTxDone) {
return
}
log.Warnf("Rolling back upgrade to version %d", targetVer-1)
errRollback := tx.Rollback()
if errRollback != nil {
log.Errorf("Rollback failed: %v", errRollback)
}
}()

Expand All @@ -335,6 +344,9 @@ func upgradeDB(ctx context.Context, db *sql.DB) error {
targetVer := current + uint32(i) + 1
log.Debugf("Upgrading DB scheme to %d...", targetVer)
if err = runUpgradeTx(targetVer, up); err != nil {
if errors.Is(err, sql.ErrTxDone) {
return fmt.Errorf("upgrade cancelled (rolled back to version %d)", current+uint32(i))
}
return err
}
}
Expand Down
49 changes: 40 additions & 9 deletions server/dex/dex.go
Expand Up @@ -201,6 +201,7 @@ func (dm *DEX) Stop() {
ss.stop()
log.Infof("%s is now shut down.", ss.name)
}
log.Infof("Stopping storage...")
if err := dm.storage.Close(); err != nil {
log.Errorf("DEXArchivist.Close: %v", err)
}
Expand All @@ -217,7 +218,7 @@ func (dm *DEX) handleDEXConfig(interface{}) (interface{}, error) {
}

// NewDEX creates the dex manager and starts all subsystems. Use Stop to
// shutdown cleanly.
// shutdown cleanly. The Context is used to abort setup.
// 1. Validate each specified asset.
// 2. Create CoinLockers for each asset.
// 3. Create and start asset backends.
Expand All @@ -227,24 +228,22 @@ func (dm *DEX) handleDEXConfig(interface{}) (interface{}, error) {
// 7. Create and start the markets.
// 8. Create and start the book router, and create the order router.
// 9. Create and start the comms server.
func NewDEX(cfg *DexConf) (*DEX, error) {
func NewDEX(ctx context.Context, cfg *DexConf) (*DEX, error) {
// Disallow running without user penalization in a mainnet config.
if cfg.Anarchy && cfg.Network == dex.Mainnet {
return nil, fmt.Errorf("user penalties may not be disabled on mainnet")
}

ctx, cancel := context.WithCancel(context.Background())

var subsystems []subsystem
startSubSys := func(name string, rc interface{}) (err error) {
subsys := subsystem{name: name}
switch st := rc.(type) {
case dex.Runner:
subsys.ssw = dex.NewStartStopWaiter(st)
subsys.ssw.Start(ctx)
subsys.ssw.Start(context.Background()) // stopped with Stop
case dex.Connector:
subsys.cm = dex.NewConnectionMaster(st)
err = subsys.cm.Connect(ctx)
err = subsys.cm.Connect(context.Background()) // stopped with Disconnect
if err != nil {
return
}
Expand All @@ -256,12 +255,15 @@ func NewDEX(cfg *DexConf) (*DEX, error) {
return
}

// Do not wrap the caller's context for the DB since we must coordinate it's
// shutdown in sequence with the other subsystems.
ctxDB, cancelDB := context.WithCancel(context.Background())
abort := func() {
for _, ss := range subsystems {
ss.stop()
}
// If the DB is running, kill it too.
cancel()
cancelDB()
}

// Check each configured asset.
Expand Down Expand Up @@ -383,6 +385,11 @@ func NewDEX(cfg *DexConf) (*DEX, error) {
mkt.Name = strings.ToLower(mkt.Name)
}

if err := ctx.Err(); err != nil {
abort()
return nil, err
}

// Create DEXArchivist with the pg DB driver.
pgCfg := &pg.Config{
Host: cfg.DBConf.Host,
Expand All @@ -397,13 +404,26 @@ func NewDEX(cfg *DexConf) (*DEX, error) {
Net: cfg.Network,
FeeKey: cfg.RegFeeXPub,
}
storage, err := db.Open(ctx, "pg", pgCfg)
// After DEX construction, the storage subsystem should be stopped
// gracefully with its Close method, and in coordination with other
// subsystems via Stop. To abort its setup, rig a temporary link to the
// caller's Context.
running := make(chan struct{})
defer close(running) // break the link
go func() {
select {
case <-ctx.Done(): // cancelled construction
cancelDB()
case <-running: // DB shutdown now only via dex.Stop=>db.Close
}
}()
storage, err := db.Open(ctxDB, "pg", pgCfg)
if err != nil {
abort()
return nil, fmt.Errorf("db.Open: %w", err)
}

dataAPI := apidata.NewDataAPI(ctx, storage)
dataAPI := apidata.NewDataAPI(storage)

// Create the user order unbook dispatcher for the AuthManager.
markets := make(map[string]*market.Market, len(cfg.Markets))
Expand Down Expand Up @@ -469,6 +489,11 @@ func NewDEX(cfg *DexConf) (*DEX, error) {
return nil, fmt.Errorf("NewSwapper: %w", err)
}

if err := ctx.Err(); err != nil {
abort()
return nil, err
}

// Markets
usersWithOrders := make(map[account.AccountID]struct{})
for _, mktInf := range cfg.Markets {
Expand All @@ -480,6 +505,7 @@ func NewDEX(cfg *DexConf) (*DEX, error) {
return nil, fmt.Errorf("NewMarket failed: %w", err)
}
markets[mktInf.Name] = mkt
log.Infof("Preparing historical market data API for market %v...", mktInf.Name)
err = dataAPI.AddMarketSource(mkt)
if err != nil {
abort()
Expand Down Expand Up @@ -549,6 +575,11 @@ func NewDEX(cfg *DexConf) (*DEX, error) {
})
startSubSys("OrderRouter", orderRouter)

if err := ctx.Err(); err != nil {
abort()
return nil, err
}

// Client comms RPC server.
server, err := comms.NewServer(cfg.CommsCfg)
if err != nil {
Expand Down

0 comments on commit 26ae0c8

Please sign in to comment.