Skip to content

Commit

Permalink
market: eliminate race in SubmitOrderAsync and Run's defer
Browse files Browse the repository at this point in the history
Simplify orderRouter channel access coordination.

Solve an order submission race in Run's defer by launching a goroutine
to drain m.orderRouter prior to flagging the Market as stopped, and
end the drain goroutine by closing the orderRouter channel. The new
runMtx RWMutex simplifies this.

Add test coverage for Market.Status.
  • Loading branch information
chappjc committed May 14, 2020
1 parent 40b66ca commit 2e3c2ba
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 57 deletions.
110 changes: 53 additions & 57 deletions server/market/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ type Market struct {

orderFeedMtx sync.RWMutex // guards orderFeeds and running
orderFeeds []chan *updateSignal // all outgoing notification consumers
running chan struct{}

runMtx sync.RWMutex
running chan struct{} // closed when running

bookMtx sync.Mutex // guards book and bookEpochIdx
book *book.Book
Expand Down Expand Up @@ -151,9 +153,8 @@ func NewMarket(mktInfo *dex.MarketInfo, storage db.DEXArchivist, swapper Swapper
}

return &Market{
running: make(chan struct{}),
running: make(chan struct{}), // closed on market start
marketInfo: mktInfo,
orderRouter: make(chan *orderUpdateSignal, 32),
book: Book,
matcher: matcher.New(),
epochCommitments: make(map[order.Commitment]order.OrderID),
Expand Down Expand Up @@ -231,13 +232,6 @@ func (m *Market) SetStartEpochIdx(startEpochIdx int64) {
m.epochMtx.Unlock()
}

// StartEpochIdx gets the starting epoch index.
func (m *Market) StartEpochIdx() int64 {
m.epochMtx.Lock()
defer m.epochMtx.Unlock()
return m.startEpochIdx
}

// Start begins order processing with a starting epoch index. See also
// SetStartEpochIdx and Run. Stop the Market by cancelling the context.
func (m *Market) Start(ctx context.Context, startEpochIdx int64) {
Expand All @@ -247,9 +241,9 @@ func (m *Market) Start(ctx context.Context, startEpochIdx int64) {

// waitForEpochOpen waits until the start of epoch processing.
func (m *Market) waitForEpochOpen() {
m.orderFeedMtx.RLock()
c := m.running
m.orderFeedMtx.RUnlock()
m.runMtx.RLock()
c := m.running // the field may be rewritten, but only after close
m.runMtx.RUnlock()
<-c
}

Expand Down Expand Up @@ -287,8 +281,8 @@ func (m *Market) Status() *Status {
// TODO: Instead of using Running in OrderRouter and DEX, these types should
// track statuses (known suspend times).
func (m *Market) Running() bool {
m.orderFeedMtx.RLock()
defer m.orderFeedMtx.RUnlock()
m.runMtx.RLock()
defer m.runMtx.RUnlock()
select {
case <-m.running:
return true
Expand Down Expand Up @@ -370,31 +364,23 @@ func (m *Market) SubmitOrderAsync(rec *orderRecord) <-chan error {
return sendErr(err)
}

trySend := func(sig *orderUpdateSignal) {
select {
case m.orderRouter <- sig:
case <-time.After(5 * time.Second):
log.Errorf("Timeout relaying incoming order %v", sig.rec.order.ID())
sig.errChan <- ErrMarketNotRunning
}
}

if !m.Running() {
return sendErr(ErrMarketNotRunning)
}
// Only submit orders while market is running.
m.runMtx.RLock()
defer m.runMtx.RUnlock()

// If orderRouter is full and blocking, there may just be an actual rush of
// orders that will clear out, or we've raced against Run's shutdown defer
// and lost.
sig := newOrderUpdateSignal(rec)
select {
case m.orderRouter <- sig:
case <-m.running:
default:
// If a synchronous send failed, spawn a goroutine with a send timeout.
log.Warnf("Incoming order queue full. Attempting time-limited send.")
go trySend(sig)
// m.orderRouter is closed
log.Infof("SubmitOrderAsync: Market stopped with an order in submission (commitment %v).",
rec.order.Commitment()) // The order is not time stamped, so no OrderID.
return sendErr(ErrMarketNotRunning)
}

sig := newOrderUpdateSignal(rec)
// The lock is still held, so there is a receiver: either Run's main loop or
// the drain in Run's defer that runs until m.running starts blocking.
m.orderRouter <- sig
return sig.errChan
}

Expand Down Expand Up @@ -518,28 +504,44 @@ func (m *Market) Run(ctx context.Context) {

var running bool
ctxRun, cancel := context.WithCancel(ctx)
var wgFeed, wgEpochs sync.WaitGroup
var wgFeeds, wgEpochs sync.WaitGroup
notifyChan := make(chan *updateSignal, 32)

// For clarity, define the shutdown sequence in a single closure rather than
// the defer stack.
defer func() {
// In case the market is stopped before the first epoch, close the
// running channel so that waitForEpochOpen does not hang.
m.orderFeedMtx.Lock()
// Drain the order router of incoming orders that made it in after the
// main loop broke and before flagging the market stopped. Do this in a
// goroutine because the market is flagged as stopped under runMtx lock
// in this defer and their is a risk of deadlock in SubmitOrderAsync
// that sends under runMtx lock as well.
wgFeeds.Add(1)
go func() {
defer wgFeeds.Done()
for sig := range m.orderRouter {
sig.errChan <- ErrMarketNotRunning
}
}()

// Under lock, flag as not running.
m.runMtx.Lock() // block while SubmitOrderAsync is sending to the drain
if !running {
// In case the market is stopped before the first epoch, close the
// running channel so that waitForEpochOpen does not hang.
close(m.running)
}
// Make a new running channel for Running, and for any future Run.
m.running = make(chan struct{})
m.orderFeedMtx.Unlock()
running = false
close(m.orderRouter) // stop the order router drain
m.runMtx.Unlock()

// Stop and wait for epoch pump and processing pipeline goroutines.
cancel() // may already be done by suspend
wgEpochs.Wait()

// Stop and wait for the order feed goroutine.
close(notifyChan)
wgFeed.Wait()
wgFeeds.Wait()

// Close and delete the outgoing order feed channels as a signal to
// subscribers that the Market has terminated.
Expand All @@ -557,25 +559,13 @@ func (m *Market) Run(ctx context.Context) {
}
m.epochMtx.RUnlock()

// Drain the order router of incoming orders that may have been sent
// before m.running began blocking.
drain:
for {
select {
case sig := <-m.orderRouter:
sig.errChan <- ErrMarketNotRunning
default:
break drain
}
}

log.Infof("Market %q stopped.", m.marketInfo.Name)
}()

// Start outgoing order feed notification goroutine.
wgFeed.Add(1)
wgFeeds.Add(1)
go func() {
defer wgFeed.Done()
defer wgFeeds.Done()
for sig := range notifyChan {
// send to each receiver
m.orderFeedMtx.RLock()
Expand Down Expand Up @@ -671,7 +661,8 @@ func (m *Market) Run(ctx context.Context) {
m.activeEpochIdx = currentEpoch.Epoch

if !running {
close(m.running) // no lock, this field is not set in another goroutine
// Open up SubmitOrderAsync.
close(m.running)
running = true
}

Expand All @@ -680,6 +671,11 @@ func (m *Market) Run(ctx context.Context) {
epochCycle = time.After(time.Until(nextEpoch.Start))
}

// Set the orderRouter field now since the main loop below receives on it,
// even though SubmitOrderAsync disallows sends on orderRouter when the
// market is not running.
m.orderRouter = make(chan *orderUpdateSignal, 32) // implicitly guarded by m.runMtx since Market is not running yet

for {
if ctxRun.Err() != nil {
return
Expand Down
10 changes: 10 additions & 0 deletions server/market/market_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,8 +722,18 @@ func TestMarket_Run(t *testing.T) {
t.Fatalf(`expected ErrMarketNotRunning ("%v"), got "%v"`, ErrMarketNotRunning, err)
}

mktStatus := mkt.Status()
if mktStatus.Running {
t.Errorf("Market should not be running yet")
}

mkt.waitForEpochOpen()

mktStatus = mkt.Status()
if !mktStatus.Running {
t.Errorf("Market should be running now")
}

// Submit again
oRecord = newOR()
storMsgPI(oRecord.msgID, pi)
Expand Down

0 comments on commit 2e3c2ba

Please sign in to comment.