Skip to content

Commit

Permalink
server/{dex,market}: cleaner shutdown
Browse files Browse the repository at this point in the history
server/dex

This reorders the subsystem stack so that book router shuts down after
the markets that use it during DEX manager shutdown.

server/market

When a market is stopped by context cancellation (not a market suspend 
admin command), it does not wait for the current epoch to close, so any 
orders in that truncated epoch were just being dropped without changing 
their status from epoch to executed (how a no-match/fail is normally 
handled when epochs are processed by the matcher). This updates the 
primary defer function in (*Market).Run so that if there are orders left 
in the epochOrders map after the closed epoch pipeline is drained, their 
statuses are change in the DB so that they aren't reported to clients as 
active orders for eternity.
  • Loading branch information
chappjc committed Nov 3, 2020
1 parent 867ba89 commit d463439
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
2 changes: 2 additions & 0 deletions server/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,8 @@ func (auth *AuthManager) addClient(client *clientInfo) {
if oldConnID == connID {
return // reused conn, just update maps
}
log.Warnf("User %v reauthorized from %v (id %d) with an existing connection from %v (id %d). Disconnecting the old one.",
user, client.conn.IP(), connID, oldClient.conn.IP(), oldConnID)
// When replacing with a new conn, manually deregister the old conn so
// that when it disconnects it does not remove the new clientInfo.
delete(auth.conns, oldConnID)
Expand Down
13 changes: 9 additions & 4 deletions server/dex/dex.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,12 @@ func (cr *configResponse) remarshal() {
// Stop shuts down the DEX. Stop returns only after all components have
// completed their shutdown.
func (dm *DEX) Stop() {
log.Infof("Stopping subsystems...")
log.Infof("Stopping all DEX subsystems.")
for _, ssw := range dm.stopWaiters {
log.Infof("Stopping %s...", ssw.name)
ssw.Stop()
ssw.WaitForShutdown()
log.Infof("%s shutdown.", ssw.name)
log.Infof("%s is now shut down.", ssw.name)
}
if err := dm.storage.Close(); err != nil {
log.Errorf("DEXArchivist.Close: %v", err)
Expand Down Expand Up @@ -250,7 +251,7 @@ func NewDEX(cfg *DexConf) (*DEX, error) {
startSubSys := func(name string, r dex.Runner) {
ssw := dex.NewStartStopWaiter(r)
ssw.Start(ctx)
stopWaiters = append([]subsystem{{ssw, name}}, stopWaiters...)
stopWaiters = append([]subsystem{{ssw, name}}, stopWaiters...) // top of stack
}

abort := func() {
Expand Down Expand Up @@ -493,7 +494,6 @@ func NewDEX(cfg *DexConf) (*DEX, error) {
for name, mkt := range markets {
startEpochIdx := 1 + now/int64(mkt.EpochDuration())
mkt.SetStartEpochIdx(startEpochIdx)
startSubSys(marketSubSysName(name), mkt)
bookSources[name] = mkt
marketTunnels[name] = mkt
cfgMarkets = append(cfgMarkets, &msgjson.Market{
Expand All @@ -512,6 +512,11 @@ func NewDEX(cfg *DexConf) (*DEX, error) {
bookRouter := market.NewBookRouter(bookSources)
startSubSys("BookRouter", bookRouter)

// Market, now that book router is running.
for name, mkt := range markets {
startSubSys(marketSubSysName(name), mkt)
}

// Order router
orderRouter := market.NewOrderRouter(&market.OrderRouterConfig{
Assets: backedAssets,
Expand Down
15 changes: 15 additions & 0 deletions server/market/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,21 @@ func (m *Market) Run(ctx context.Context) {
}
m.persistBook = true // future resume default
m.activeEpochIdx = 0

// Revoke any unmatched epoch orders (if context was canceled, not a
// clean suspend stopped the market).
for oid, ord := range m.epochOrders {
log.Infof("Dropping epoch order %v", oid)
if co, ok := ord.(*order.CancelOrder); ok {
if err := m.storage.FailCancelOrder(co); err != nil {
log.Errorf("Failed to set orphaned epoch cancel order %v as executed: %v", oid, err)
}
continue
}
if err := m.storage.ExecuteOrder(ord); err != nil {
log.Errorf("Failed to set orphaned epoch trade order %v as executed: %v", oid, err)
}
}
m.epochMtx.Unlock()

// Stop and wait for the order feed goroutine.
Expand Down

0 comments on commit d463439

Please sign in to comment.