diff --git a/server/apidata/apidata.go b/server/apidata/apidata.go index a4aea18a63..7637604fe4 100644 --- a/server/apidata/apidata.go +++ b/server/apidata/apidata.go @@ -4,7 +4,6 @@ package apidata import ( - "context" "encoding/json" "fmt" "sync" @@ -74,7 +73,7 @@ type DataAPI struct { } // NewDataAPI is the constructor for a new DataAPI. -func NewDataAPI(ctx context.Context, dbSrc DBSource) *DataAPI { +func NewDataAPI(dbSrc DBSource) *DataAPI { s := &DataAPI{ db: dbSrc, epochDurations: make(map[string]uint64), diff --git a/server/apidata/apidata_test.go b/server/apidata/apidata_test.go index 64b48f3d2f..a629fcd3cb 100644 --- a/server/apidata/apidata_test.go +++ b/server/apidata/apidata_test.go @@ -4,7 +4,6 @@ package apidata import ( - "context" "encoding/json" "fmt" "testing" @@ -43,24 +42,20 @@ func (bs *TBookSource) Book(mktName string) (*msgjson.OrderBook, error) { } type testRig struct { - db *TDBSource - api *DataAPI - cancel context.CancelFunc + db *TDBSource + api *DataAPI } func newTestRig() *testRig { - ctx, cancel := context.WithCancel(context.Background()) db := new(TDBSource) return &testRig{ - db: db, - api: NewDataAPI(ctx, db), - cancel: cancel, + db: db, + api: NewDataAPI(db), } } func TestAddMarketSource(t *testing.T) { rig := newTestRig() - defer rig.cancel() // initial success err := rig.api.AddMarketSource(&TMarketSource{42, 0}) if err != nil { @@ -87,7 +82,6 @@ func TestAddMarketSource(t *testing.T) { func TestReportEpoch(t *testing.T) { rig := newTestRig() - defer rig.cancel() mktSrc := &TMarketSource{42, 0} err := rig.api.AddMarketSource(mktSrc) if err != nil { @@ -176,7 +170,6 @@ func TestReportEpoch(t *testing.T) { func TestOrderBook(t *testing.T) { rig := newTestRig() - defer rig.cancel() book := new(msgjson.OrderBook) rig.api.SetBookSource(&TBookSource{book}) bookI, err := rig.api.handleOrderBook(&msgjson.OrderBookSubscription{ diff --git a/server/cmd/dcrdex/main.go b/server/cmd/dcrdex/main.go index 0f1c77f6bb..86d0c9b26d 100644 --- a/server/cmd/dcrdex/main.go +++ b/server/cmd/dcrdex/main.go @@ -146,7 +146,7 @@ func mainCore(ctx context.Context) error { }, NoResumeSwaps: cfg.NoResumeSwaps, } - dexMan, err := dexsrv.NewDEX(dexConf) + dexMan, err := dexsrv.NewDEX(ctx, dexConf) // ctx cancel just aborts setup; Stop does normal shutdown if err != nil { return err } diff --git a/server/db/driver/pg/upgrades.go b/server/db/driver/pg/upgrades.go index d8b0279efc..134c12b782 100644 --- a/server/db/driver/pg/upgrades.go +++ b/server/db/driver/pg/upgrades.go @@ -186,7 +186,8 @@ func v2Upgrade(tx *sql.Tx) error { } log.Infof(" - Processing epochs [%d, %d)...", idx, to) } - rates, quantities, _, err := matchStatsForMarketEpoch(matchStatsStmtPrep, idx, dur) + var rates, quantities []uint64 // don't shadow err from outer scope + rates, quantities, _, err = matchStatsForMarketEpoch(matchStatsStmtPrep, idx, dur) if err != nil { return err } @@ -313,8 +314,16 @@ func upgradeDB(ctx context.Context, db *sql.DB) error { return err } defer func() { - if err != nil { - _ = tx.Rollback() // redundant if ctx canceled (sql.ErrTxDone) + // On error, rollback the transaction unless ctx was canceled + // (sql.ErrTxDone) because then rollback is automatic. See the + // (*sql.DB).BeginTx docs. + if err == nil || errors.Is(err, sql.ErrTxDone) { + return + } + log.Warnf("Rolling back upgrade to version %d", targetVer-1) + errRollback := tx.Rollback() + if errRollback != nil { + log.Errorf("Rollback failed: %v", errRollback) } }() @@ -335,6 +344,9 @@ func upgradeDB(ctx context.Context, db *sql.DB) error { targetVer := current + uint32(i) + 1 log.Debugf("Upgrading DB scheme to %d...", targetVer) if err = runUpgradeTx(targetVer, up); err != nil { + if errors.Is(err, sql.ErrTxDone) { + return fmt.Errorf("upgrade cancelled (rolled back to version %d)", current+uint32(i)) + } return err } } diff --git a/server/dex/dex.go b/server/dex/dex.go index cc78670b1a..0dd28d3d27 100644 --- a/server/dex/dex.go +++ b/server/dex/dex.go @@ -201,6 +201,7 @@ func (dm *DEX) Stop() { ss.stop() log.Infof("%s is now shut down.", ss.name) } + log.Infof("Stopping storage...") if err := dm.storage.Close(); err != nil { log.Errorf("DEXArchivist.Close: %v", err) } @@ -217,7 +218,7 @@ func (dm *DEX) handleDEXConfig(interface{}) (interface{}, error) { } // NewDEX creates the dex manager and starts all subsystems. Use Stop to -// shutdown cleanly. +// shutdown cleanly. The Context is used to abort setup. // 1. Validate each specified asset. // 2. Create CoinLockers for each asset. // 3. Create and start asset backends. @@ -227,24 +228,22 @@ func (dm *DEX) handleDEXConfig(interface{}) (interface{}, error) { // 7. Create and start the markets. // 8. Create and start the book router, and create the order router. // 9. Create and start the comms server. -func NewDEX(cfg *DexConf) (*DEX, error) { +func NewDEX(ctx context.Context, cfg *DexConf) (*DEX, error) { // Disallow running without user penalization in a mainnet config. if cfg.Anarchy && cfg.Network == dex.Mainnet { return nil, fmt.Errorf("user penalties may not be disabled on mainnet") } - ctx, cancel := context.WithCancel(context.Background()) - var subsystems []subsystem startSubSys := func(name string, rc interface{}) (err error) { subsys := subsystem{name: name} switch st := rc.(type) { case dex.Runner: subsys.ssw = dex.NewStartStopWaiter(st) - subsys.ssw.Start(ctx) + subsys.ssw.Start(context.Background()) // stopped with Stop case dex.Connector: subsys.cm = dex.NewConnectionMaster(st) - err = subsys.cm.Connect(ctx) + err = subsys.cm.Connect(context.Background()) // stopped with Disconnect if err != nil { return } @@ -256,12 +255,15 @@ func NewDEX(cfg *DexConf) (*DEX, error) { return } + // Do not wrap the caller's context for the DB since we must coordinate it's + // shutdown in sequence with the other subsystems. + ctxDB, cancelDB := context.WithCancel(context.Background()) abort := func() { for _, ss := range subsystems { ss.stop() } // If the DB is running, kill it too. - cancel() + cancelDB() } // Check each configured asset. @@ -383,6 +385,11 @@ func NewDEX(cfg *DexConf) (*DEX, error) { mkt.Name = strings.ToLower(mkt.Name) } + if err := ctx.Err(); err != nil { + abort() + return nil, err + } + // Create DEXArchivist with the pg DB driver. pgCfg := &pg.Config{ Host: cfg.DBConf.Host, @@ -397,13 +404,26 @@ func NewDEX(cfg *DexConf) (*DEX, error) { Net: cfg.Network, FeeKey: cfg.RegFeeXPub, } - storage, err := db.Open(ctx, "pg", pgCfg) + // After DEX construction, the storage subsystem should be stopped + // gracefully with its Close method, and in coordination with other + // subsystems via Stop. To abort its setup, rig a temporary link to the + // caller's Context. + running := make(chan struct{}) + defer close(running) // break the link + go func() { + select { + case <-ctx.Done(): // cancelled construction + cancelDB() + case <-running: // DB shutdown now only via dex.Stop=>db.Close + } + }() + storage, err := db.Open(ctxDB, "pg", pgCfg) if err != nil { abort() return nil, fmt.Errorf("db.Open: %w", err) } - dataAPI := apidata.NewDataAPI(ctx, storage) + dataAPI := apidata.NewDataAPI(storage) // Create the user order unbook dispatcher for the AuthManager. markets := make(map[string]*market.Market, len(cfg.Markets)) @@ -469,6 +489,11 @@ func NewDEX(cfg *DexConf) (*DEX, error) { return nil, fmt.Errorf("NewSwapper: %w", err) } + if err := ctx.Err(); err != nil { + abort() + return nil, err + } + // Markets usersWithOrders := make(map[account.AccountID]struct{}) for _, mktInf := range cfg.Markets { @@ -480,6 +505,7 @@ func NewDEX(cfg *DexConf) (*DEX, error) { return nil, fmt.Errorf("NewMarket failed: %w", err) } markets[mktInf.Name] = mkt + log.Infof("Preparing historical market data API for market %v...", mktInf.Name) err = dataAPI.AddMarketSource(mkt) if err != nil { abort() @@ -549,6 +575,11 @@ func NewDEX(cfg *DexConf) (*DEX, error) { }) startSubSys("OrderRouter", orderRouter) + if err := ctx.Err(); err != nil { + abort() + return nil, err + } + // Client comms RPC server. server, err := comms.NewServer(cfg.CommsCfg) if err != nil {