Skip to content

Commit

Permalink
market: Add Status type and (*Market).Status
Browse files Browse the repository at this point in the history
dex: add MarketStatus and MarketStatuses methods for admin API (TODO)
  • Loading branch information
chappjc committed May 11, 2020
1 parent 3e9ed9e commit 828fe3f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 11 deletions.
16 changes: 16 additions & 0 deletions server/dex/dex.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,22 @@ func (dm *DEX) MarketRunning(mktName string) (found, running bool) {
return true, mkt.Running()
}

func (dm *DEX) MarketStatus(mktName string) *market.Status {
mkt := dm.markets[mktName]
if mkt == nil {
return nil
}
return mkt.Status()
}

func (dm *DEX) MarketStatuses(mktName string) map[string]*market.Status {
statuses := make(map[string]*market.Status, len(dm.markets))
for name, mkt := range dm.markets {
statuses[name] = mkt.Status()
}
return statuses
}

// SuspendMarket schedules a suspension of a given market, with the option to
// persist the orders on the book (or purge the book automatically on market
// shutdown). The scheduled final epoch and suspend time are returned. This is a
Expand Down
51 changes: 40 additions & 11 deletions server/market/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"decred.org/dcrdex/dex"
Expand Down Expand Up @@ -83,13 +82,12 @@ type Market struct {
orderFeeds []chan *updateSignal // all outgoing notification consumers
running chan struct{}

startEpochIdx int64 // atomic access only

bookMtx sync.Mutex // guards book and bookEpochIdx
book *book.Book
bookEpochIdx int64 // next epoch from the point of view of the book

epochMtx sync.RWMutex
startEpochIdx int64
activeEpochIdx int64
suspendEpochIdx int64
persistBook bool
Expand Down Expand Up @@ -179,7 +177,8 @@ func (m *Market) SuspendASAP(persistBook bool) (finalEpochIdx int64, finalEpochE
// the given time, always allowing the epoch including that time to complete. If
// the time is before the current epoch, the current epoch will be the last.
func (m *Market) Suspend(asSoonAs time.Time, persistBook bool) (finalEpochIdx int64, finalEpochEnd time.Time) {
// Lock epochMtx to hold activeEpochIdx and suspendEpochIdx.
// epochMtx guards activeEpochIdx, startEpochIdx, suspendEpochIdx, and
// persistBook.
m.epochMtx.Lock()
defer m.epochMtx.Unlock()

Expand All @@ -195,13 +194,12 @@ func (m *Market) Suspend(asSoonAs time.Time, persistBook bool) (finalEpochIdx in

soonestFinalIdx := m.activeEpochIdx
if soonestFinalIdx == 0 {
startEpochIdx := m.StartEpochIdx()
// Cannot schedule a suspend if Run isn't running.
if startEpochIdx == 0 {
if m.startEpochIdx == 0 {
return -1, time.Time{}
}
// Not yet started. Soonest suspend idx is the start epoch idx - 1.
soonestFinalIdx = startEpochIdx - 1
soonestFinalIdx = m.startEpochIdx - 1
}

if soonestEnd := epochEnd(soonestFinalIdx); asSoonAs.Before(soonestEnd) {
Expand All @@ -228,12 +226,16 @@ func (m *Market) Suspend(asSoonAs time.Time, persistBook bool) (finalEpochIdx in
// SetStartEpochIdx sets the starting epoch index. This should generally be
// called before Run, or Start used to specify the index at the same time.
func (m *Market) SetStartEpochIdx(startEpochIdx int64) {
atomic.StoreInt64(&m.startEpochIdx, startEpochIdx)
m.epochMtx.Lock()
m.startEpochIdx = startEpochIdx
m.epochMtx.Unlock()
}

// StartEpochIdx gets the starting epoch index.
func (m *Market) StartEpochIdx() int64 {
return atomic.LoadInt64(&m.startEpochIdx)
m.epochMtx.Lock()
defer m.epochMtx.Unlock()
return m.startEpochIdx
}

// Start begins order processing with a starting epoch index. See also
Expand All @@ -251,6 +253,30 @@ func (m *Market) waitForEpochOpen() {
<-c
}

// Status describes the operation state of the Market.
type Status struct {
Running bool
EpochDuration uint64 // to compute times from epoch inds
ActiveEpoch int64
StartEpoch int64
SuspendEpoch int64
PersistBook bool
}

// Status returns the current operating state of the Market.
func (m *Market) Status() *Status {
m.epochMtx.Lock()
defer m.epochMtx.Unlock()
return &Status{
Running: m.Running(),
EpochDuration: m.marketInfo.EpochDuration,
ActiveEpoch: m.activeEpochIdx,
StartEpoch: m.startEpochIdx,
SuspendEpoch: m.suspendEpochIdx,
PersistBook: m.persistBook,
}
}

// Running indicates is the market is accepting new orders. This will return
// false when suspended, but false does not necessarily mean Run has stopped
// since a start epoch may be set.
Expand Down Expand Up @@ -575,13 +601,16 @@ func (m *Market) Run(ctx context.Context) {
// There must be no more notify calls.
}()

nextEpochIdx := m.StartEpochIdx()
m.epochMtx.Lock()
nextEpochIdx := m.startEpochIdx
if nextEpochIdx == 0 {
log.Warnf("Run: startEpochIdx not set. Starting at the next epoch.")
now := encode.UnixMilli(time.Now())
nextEpochIdx = 1 + now/int64(m.EpochDuration())
m.SetStartEpochIdx(nextEpochIdx)
m.startEpochIdx = nextEpochIdx
}
m.epochMtx.Unlock()

epochDuration := int64(m.marketInfo.EpochDuration)
nextEpoch := NewEpoch(nextEpochIdx, epochDuration)
epochCycle := time.After(time.Until(nextEpoch.Start))
Expand Down

0 comments on commit 828fe3f

Please sign in to comment.