Skip to content

Commit

Permalink
market: remove resume option from Suspend
Browse files Browse the repository at this point in the history
Resuming a market is now done by starting Run again.

Modify SubmitOrderAsync to prevent hangs on orders sent to a suspended
market.  First it denies orders to a stopped market, then attempts to
on the buffered orderRouter channel, and if it is blocking it fires off
a goroutine with a 5 second timeout.

The Market's Run cleanup defer now drains the orderRouter channel
before returning in case orders were send on the orderRouter channel
before the running channel began blocking them in SubmitOrderAsync.
  • Loading branch information
chappjc committed May 6, 2020
1 parent e362fb3 commit a37b0cb
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 158 deletions.
153 changes: 74 additions & 79 deletions server/market/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Market struct {
marketInfo *dex.MarketInfo

// Communications.
orderRouter chan *orderUpdateSignal // incoming orders
orderRouter chan *orderUpdateSignal // incoming orders, via SubmitOrderAsync

orderFeedMtx sync.RWMutex // guards orderFeeds and running
orderFeeds []chan *bookUpdateSignal // all outgoing notification consumers
Expand Down Expand Up @@ -171,34 +171,20 @@ func NewMarket(mktInfo *dex.MarketInfo, storage db.DEXArchivist, swapper Swapper
// SuspendASAP suspends requests the market to gracefully suspend epoch cycling
// as soon as possible, always allowing an active epoch to close. See also
// Suspend.
func (m *Market) SuspendASAP(persistBook bool, resume ...time.Time) (finalEpochIdx int64, finalEpochEnd time.Time, resumeEpochIdx int64) {
return m.Suspend(time.Time{}, persistBook, resume...)
func (m *Market) SuspendASAP(persistBook bool) (finalEpochIdx int64, finalEpochEnd time.Time) {
return m.Suspend(time.Time{}, persistBook)
}

// Suspend requests the market to gracefully suspend epoch cycling as soon as
// the given time, always allowing the epoch including that time to complete. If
// the time is before the current epoch, the current epoch will be the last. An
// optional resume time may be provided, after which the market will resume at
// the start of the next full epoch that follows that time. If no resume time is
// given, the market quits after draining the completed epoch processing
// pipeline. If the computed resume epoch is not after the final epoch prior to
// suspension, the suspend is aborted and -1 is returned for finalEpochIdx.
func (m *Market) Suspend(asSoonAs time.Time, persistBook bool, resume ...time.Time) (finalEpochIdx int64, finalEpochEnd time.Time, resumeEpochIdx int64) {
// the time is before the current epoch, the current epoch will be the last.
func (m *Market) Suspend(asSoonAs time.Time, persistBook bool) (finalEpochIdx int64, finalEpochEnd time.Time) {
// Lock epochMtx to hold activeEpochIdx and suspendEpochIdx.
m.epochMtx.Lock()
defer m.epochMtx.Unlock()

dur := int64(m.EpochDuration())

if len(resume) > 0 {
// Resume idx is the first full epoch from the given time on.
ms := encode.UnixMilli(resume[0])
resumeEpochIdx = ms/dur + 1
if ms%dur == 0 {
resumeEpochIdx--
}
}

epochEnd := func(idx int64) time.Time {
start := encode.UnixTimeMilli(idx * dur)
return start.Add(time.Duration(dur) * time.Millisecond)
Expand All @@ -212,7 +198,7 @@ func (m *Market) Suspend(asSoonAs time.Time, persistBook bool, resume ...time.Ti
startEpochIdx := m.StartEpochIdx()
// Cannot schedule a suspend if Run isn't running.
if startEpochIdx == 0 {
return -1, time.Time{}, -1
return -1, time.Time{}
}
// Not yet started. Soonest suspend idx is the start epoch idx - 1.
soonestFinalIdx = startEpochIdx - 1
Expand All @@ -233,21 +219,6 @@ func (m *Market) Suspend(asSoonAs time.Time, persistBook bool, resume ...time.Ti
finalEpochEnd = epochEnd(finalEpochIdx)
}

// Set the final epoch index if the times result in a suspend period. A zero
// for resumeEpochIdx indicates no resume.
if resumeEpochIdx > 0 {
// Do not set a suspend index if the resume epoch idx results in no
// suspend (resume idx <= final idx + 1).
if resumeEpochIdx <= finalEpochIdx+1 {
log.Warnf("No suspend period for suspend epoch index %d with resume index %d.",
finalEpochIdx, resumeEpochIdx)
return -1, time.Time{}, -1
}
// Valid resume: Set the start epoch index for cycleEpoch to set its
// nextEpoch and epochCycle Timer.
m.SetStartEpochIdx(resumeEpochIdx)
}

m.suspendEpochIdx = finalEpochIdx
m.persistBook = persistBook

Expand Down Expand Up @@ -351,19 +322,46 @@ func (m *Market) TxMonitored(user account.AccountID, asset uint32, txid string)
// When submission is completed, an error value will be sent on the channel.
// This is the asynchronous version of SubmitOrder.
func (m *Market) SubmitOrderAsync(rec *orderRecord) <-chan error {
sendErr := func(err error) <-chan error {
errChan := make(chan error, 1)
errChan <- err // i.e. ErrInvalidOrder, ErrInvalidCommitment
return errChan
}

// Validate the order. The order router must do it's own validation, but do
// a second validation for (1) this Market and (2) epoch status, before
// putting it on the queue.
if err := m.validateOrder(rec.order); err != nil {
// Order ID may not be computed since ServerTime has not been set.
log.Debugf("SubmitOrderAsync: Invalid order received: %x: %v", rec.order.Serialize(), err)
errChan := make(chan error, 1)
errChan <- err // i.e. ErrInvalidOrder, ErrInvalidCommitment
return errChan
return sendErr(err)
}

trySend := func(sig *orderUpdateSignal) {
select {
case m.orderRouter <- sig:
case <-time.After(5 * time.Second):
log.Errorf("Timeout relaying incoming order %v", sig.rec.order.ID())
sig.errChan <- ErrMarketNotRunning
}
}

if !m.Running() {
return sendErr(ErrMarketNotRunning)
}

// If orderRouter is full and blocking, there may just be an actual rush of
// orders that will clear out, or we've raced against Run's shutdown defer
// and lost.
sig := newOrderUpdateSignal(rec)
m.orderRouter <- sig
select {
case m.orderRouter <- sig:
default:
// If a synchronous send failed, spawn a goroutine with a send timeout.
log.Warnf("Incoming order queue full. Attempting time-limited send.")
go trySend(sig)
}

return sig.errChan
}

Expand Down Expand Up @@ -492,6 +490,16 @@ func (m *Market) Run(ctx context.Context) {
// For clarity, define the shutdown sequence in a single closure rather than
// the defer stack.
defer func() {
// In case the market is stopped before the first epoch, close the
// running channel so that waitForEpochOpen does not hang.
m.orderFeedMtx.Lock()
if !running {
close(m.running)
}
// Make a new running channel for Running, and for any future Run.
m.running = make(chan struct{})
m.orderFeedMtx.Unlock()

// Stop and wait for epoch pump and processing pipeline goroutines.
cancel() // may already be done by suspend
wgEpochs.Wait()
Expand All @@ -507,22 +515,26 @@ func (m *Market) Run(ctx context.Context) {
close(s)
}
m.orderFeeds = nil

// In case the market is stopped before the first epoch, close the
// running channel so that waitForEpochOpen does not hang.
if !running {
close(m.running)
}
// Make a new running channel for Running, and for any future Run.
m.running = make(chan struct{})
m.orderFeedMtx.Unlock()

// persistBook is set under epochMtx lock.
m.epochMtx.RLock()
defer m.epochMtx.RUnlock()
if !m.persistBook {
m.PurgeBook()
}
m.epochMtx.RUnlock()

// Drain the order router of incoming orders that may have been sent
// before m.running began blocking.
drain:
for {
select {
case sig := <-m.orderRouter:
sig.errChan <- ErrMarketNotRunning
default:
break drain
}
}

log.Debugf("Market %q stopped.", m.marketInfo.Name)
}()
Expand Down Expand Up @@ -596,39 +608,22 @@ func (m *Market) Run(ctx context.Context) {
defer m.epochMtx.Unlock()

// Check suspendEpochIdx and suspend if the just-closed epoch idx is the
// suspend epoch, unless the resume epoch is also the next epoch.
if resumeEpochIdx := m.StartEpochIdx(); m.suspendEpochIdx == nextEpoch.Epoch-1 &&
resumeEpochIdx != nextEpoch.Epoch {
// Block incoming orders.
currentEpoch = nil // also, next epoch cycle just starts a new one
// suspend epoch.
if m.suspendEpochIdx == nextEpoch.Epoch-1 {
// Reject incoming orders.
currentEpoch = nil
m.activeEpochIdx = 0
cancel() // graceful market shutdown
return
}

if running {
m.orderFeedMtx.Lock()
m.running = make(chan struct{}) // the market is now suspended
m.orderFeedMtx.Unlock()
running = false
}
// else keep the same running channel

// End Run if resumeEpochIdx is not after suspendEpochIdx.
if resumeEpochIdx <= m.suspendEpochIdx {
cancel() // graceful epoch pump shutdown
return
}

// Resume automatically with resumeEpochIdx.
nextEpochIdx = resumeEpochIdx
currentEpoch = nextEpoch
nextEpochIdx = currentEpoch.Epoch + 1
m.activeEpochIdx = currentEpoch.Epoch

} else {
currentEpoch = nextEpoch
nextEpochIdx = currentEpoch.Epoch + 1
m.activeEpochIdx = currentEpoch.Epoch

if !running {
close(m.running) // no lock, this field is not set in another goroutine
running = true
}
if !running {
close(m.running) // no lock, this field is not set in another goroutine
running = true
}

// Replace the next epoch and set the cycle Timer.
Expand All @@ -653,7 +648,7 @@ func (m *Market) Run(ctx context.Context) {
default:
}

// cycleEpoch can cancel ctxRun if suspend without resume initiated.
// cycleEpoch can cancel ctxRun if suspend initiated.
if ctxRun.Err() != nil {
return
}
Expand Down
Loading

0 comments on commit a37b0cb

Please sign in to comment.