Skip to content

Commit

Permalink
blocksync: wait for poolRoutine to stop in (*Reactor).OnStop (tenderm…
Browse files Browse the repository at this point in the history
…int#1879)

* blocksync: wait for poolRoutine to stop in (*Reactor).OnStop

blocksync.(*Reactor).poolRoutine goroutine lifetime was not supervised,
so OnStop returning did not provide a guarantee that it was done.  As a
result, the other services could be stopped and a subsequent call to
bcR.blockExec.ApplyBlock would cause a panic, typically a leveldb
closed error.

This change uses a sync.WaitGroup to ensure that OnStop returns after
poolRoutine has returned.

This also triggers the poolRoutine method to return on a signal from
bcR.pool.Quit() instead of just bcR.Quit(), which seems to be important
as (*Reactor).OnStop itself can only stop the BlockPool (bcR.pool), while
the BaseReactor.BaseService's quit channel will only be closed _after_
(*Reactor).OnStop has returned.

* rename wg field, comment on strayish select before retry

* break poolRoutine on either Quit before quick retry

* update changelog

* break the loop with an IsRunning check

* do the waitgroup Add and Defer at the same site for clarity

* check both services IsRunning in poolRoutine

* fix oopsie

---------

Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
  • Loading branch information
jchappelow and melekes committed Jan 7, 2024
1 parent b2c948e commit 7ea352e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[blocksync]` wait for `poolRoutine` to stop in `(*Reactor).OnStop`
([\#1879](https://github.com/cometbft/cometbft/pull/1879))
32 changes: 26 additions & 6 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blocksync
import (
"fmt"
"reflect"
"sync"
"time"

bcproto "github.com/cometbft/cometbft/api/cometbft/blocksync/v1"
Expand Down Expand Up @@ -56,10 +57,11 @@ type Reactor struct {
// immutable
initialState sm.State

blockExec *sm.BlockExecutor
store sm.BlockStore
pool *BlockPool
blockSync bool
blockExec *sm.BlockExecutor
store sm.BlockStore
pool *BlockPool
blockSync bool
poolRoutineWg sync.WaitGroup

requestsCh <-chan BlockRequest
errorsCh <-chan peerError
Expand Down Expand Up @@ -125,7 +127,11 @@ func (bcR *Reactor) OnStart() error {
if err != nil {
return err
}
go bcR.poolRoutine(false)
bcR.poolRoutineWg.Add(1)
go func() {
defer bcR.poolRoutineWg.Done()
bcR.poolRoutine(false)
}()
}
return nil
}
Expand All @@ -140,7 +146,11 @@ func (bcR *Reactor) SwitchToBlockSync(state sm.State) error {
if err != nil {
return err
}
go bcR.poolRoutine(true)
bcR.poolRoutineWg.Add(1)
go func() {
defer bcR.poolRoutineWg.Done()
bcR.poolRoutine(true)
}()
return nil
}

Expand All @@ -150,6 +160,7 @@ func (bcR *Reactor) OnStop() {
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
}
bcR.poolRoutineWg.Wait()
}
}

Expand Down Expand Up @@ -442,6 +453,13 @@ FOR_LOOP:
panic(fmt.Errorf("peeked first block without extended commit at height %d - possible node store corruption", first.Height))
}

// Before priming didProcessCh for another check on the next
// iteration, break the loop if the BlockPool or the Reactor itself
// has quit. This avoids case ambiguity of the outer select when two
// channels are ready.
if !bcR.IsRunning() || !bcR.pool.IsRunning() {
break FOR_LOOP
}
// Try again quickly next loop.
didProcessCh <- struct{}{}

Expand Down Expand Up @@ -527,6 +545,8 @@ FOR_LOOP:

case <-bcR.Quit():
break FOR_LOOP
case <-bcR.pool.Quit():
break FOR_LOOP
}
}
}
Expand Down

0 comments on commit 7ea352e

Please sign in to comment.