Skip to content

Commit

Permalink
market: simplify epochPump and eliminate possible Insert deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
chappjc committed May 14, 2020
1 parent a47e2cd commit cb02a99
Showing 1 changed file with 25 additions and 47 deletions.
72 changes: 25 additions & 47 deletions server/market/epump.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ type readyEpoch struct {
type epochPump struct {
ready chan *readyEpoch // consumer receives from this

mtx sync.RWMutex
q []*readyEpoch
halt bool
halted bool
head chan *readyEpoch // internal, closed when ready to halt
mtx sync.RWMutex
q []*readyEpoch
halt bool

newQ chan struct{}
}

func newEpochPump() *epochPump {
return &epochPump{
ready: make(chan *readyEpoch, 1),
head: make(chan *readyEpoch, 1),
newQ: make(chan struct{}, 1),
}
}

Expand All @@ -38,19 +38,11 @@ func (ep *epochPump) Run(ctx context.Context) {
go func() {
<-ctx.Done()

// next will close it after it drains the queue.
ep.mtx.Lock()
defer ep.mtx.Unlock()

// gracefully shut down the epoch pump, allowing the queue to be fully
// drained and all epochs passed on to the consumer.
if len(ep.q) == 0 {
// Ready to shutdown.
close(ep.head) // cause next() to return a closed channel and Run to return.
ep.halted = true
} else {
// next will close it after it drains the queue.
ep.halt = true
}
ep.halt = true
ep.mtx.Unlock()
ep.newQ <- struct{}{} // final popFront returns nil and next returns closed channel
}()

defer close(ep.ready)
Expand All @@ -74,18 +66,15 @@ func (ep *epochPump) Insert(epoch *EpochQueue) *readyEpoch {

ep.mtx.Lock()
defer ep.mtx.Unlock()

if ep.halted || ep.halt {
// head is closed or about to be.
if ep.halt {
return nil
}

select {
case ep.head <- rq: // buffered, so non-blocking when empty and no receiver
default:
// push: append a new readyEpoch to the closed epoch queue.
ep.q = append(ep.q, rq)
}
// push: append a new readyEpoch to the closed epoch queue.
ep.q = append(ep.q, rq)

// Signal to next in goroutine so slow or missing receiver doesn't block us.
go func() { ep.newQ <- struct{}{} }()

return rq
}
Expand All @@ -105,35 +94,24 @@ func (ep *epochPump) popFront() *readyEpoch {
// preimage collection. next blocks until there is an epoch to send.
func (ep *epochPump) next() <-chan *readyEpoch {
ready := make(chan *readyEpoch) // next sent on this channel when ready
next := <-ep.head

// A closed head channel signals a halted and drained pump.
if next == nil {
close(ready)
return ready
}
// Wait for new epoch in queue or halt.
<-ep.newQ

ep.mtx.Lock()
defer ep.mtx.Unlock()
head := ep.popFront()
ep.mtx.Unlock()

// If the queue is not empty, set new head.
x := ep.popFront()
if x != nil {
ep.head <- x // non-blocking, received in select above
} else if ep.halt {
// Only halt the pump once the queue is emptied. The final head is still
// forwarded to the consumer.
close(ep.head)
ep.halted = true
// continue to serve next, but a closed channel will be returned on
// subsequent calls.
if head == nil { // pump halted
close(ready)
return ready
}

// Send next on the returned channel when it becomes ready. If the process
// dies before goroutine completion, the Market is down anyway.
go func() {
<-next.ready // block until preimage collection is complete (close this channel)
ready <- next
<-head.ready // block until preimage collection is complete (close this channel)
ready <- head
}()
return ready
}

0 comments on commit cb02a99

Please sign in to comment.