Skip to content

Commit

Permalink
feat: make handshake cancelable (#857)
Browse files Browse the repository at this point in the history
it'll make the handshake work with graceful shutdown(see: cosmos/cosmos-sdk#16202)

handshake could be a long running process if there are many local blocks to replay, for example we use it to do profiling.

Hope we can backport this to 0.34.x.



---

#### PR checklist

- [ ] Tests written/updated
- [ ] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog)
- [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments
  • Loading branch information
yihuang committed May 19, 2023
1 parent 3003ef7 commit c1fdc1b
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[consensus]` `Handshaker.Handshake` now requires `context.Context` ([cometbft/cometbft\#857](https://github.com/cometbft/cometbft/pull/857))
- `[node]` `NewNode` now requires `context.Context` as the first parameter ([cometbft/cometbft\#857](https://github.com/cometbft/cometbft/pull/857))
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- `[node]` Make handshake cancelable ([cometbft/cometbft\#857](https://github.com/cometbft/cometbft/pull/857))
18 changes: 13 additions & 5 deletions consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,10 @@ func (h *Handshaker) NBlocks() int {
}

// TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
func (h *Handshaker) Handshake(ctx context.Context, proxyApp proxy.AppConns) error {

// Handshake is done via ABCI Info on the query conn.
res, err := proxyApp.Query().Info(context.TODO(), proxy.RequestInfo)
res, err := proxyApp.Query().Info(ctx, proxy.RequestInfo)
if err != nil {
return fmt.Errorf("error calling Info: %v", err)
}
Expand All @@ -266,7 +266,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
}

// Replay blocks up to the latest in the blockstore.
appHash, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
appHash, err = h.ReplayBlocks(ctx, h.initialState, appHash, blockHeight, proxyApp)
if err != nil {
return fmt.Errorf("error on replay: %v", err)
}
Expand All @@ -283,6 +283,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// matches the current state.
// Returns the final AppHash or an error.
func (h *Handshaker) ReplayBlocks(
ctx context.Context,
state sm.State,
appHash []byte,
appBlockHeight int64,
Expand Down Expand Up @@ -391,7 +392,7 @@ func (h *Handshaker) ReplayBlocks(
// Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false)
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, false)

} else if appBlockHeight == storeBlockHeight {
// We're good!
Expand All @@ -406,7 +407,7 @@ func (h *Handshaker) ReplayBlocks(
case appBlockHeight < stateBlockHeight:
// the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true)
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, true)

case appBlockHeight == stateBlockHeight:
// We haven't run Commit (both the state and app are one block behind),
Expand Down Expand Up @@ -443,6 +444,7 @@ func (h *Handshaker) ReplayBlocks(
}

func (h *Handshaker) replayBlocks(
ctx context.Context,
state sm.State,
proxyApp proxy.AppConns,
appBlockHeight,
Expand All @@ -469,6 +471,12 @@ func (h *Handshaker) replayBlocks(
firstBlock = state.InitialHeight
}
for i := firstBlock; i <= finalBlock; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i)
// Extra check to ensure the app was not changed in a way it shouldn't have.
Expand Down
2 changes: 1 addition & 1 deletion consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo

handshaker := NewHandshaker(stateStore, state, blockStore, gdoc)
handshaker.SetEventBus(eventBus)
err = handshaker.Handshake(proxyApp)
err = handshaker.Handshake(context.Background(), proxyApp)
if err != nil {
cmtos.Exit(fmt.Sprintf("Error on handshake: %v", err))
}
Expand Down
8 changes: 4 additions & 4 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
})

// perform the replay protocol to sync Tendermint and the application
err = handshaker.Handshake(proxyApp)
err = handshaker.Handshake(context.Background(), proxyApp)
if expectError {
require.Error(t, err)
// finish the test early
Expand Down Expand Up @@ -953,7 +953,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {

assert.Panics(t, func() {
h := NewHandshaker(stateStore, state, store, genDoc)
if err = h.Handshake(proxyApp); err != nil {
if err = h.Handshake(context.Background(), proxyApp); err != nil {
t.Log(err)
}
})
Expand All @@ -977,7 +977,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {

assert.Panics(t, func() {
h := NewHandshaker(stateStore, state, store, genDoc)
if err = h.Handshake(proxyApp); err != nil {
if err = h.Handshake(context.Background(), proxyApp); err != nil {
t.Log(err)
}
})
Expand Down Expand Up @@ -1267,7 +1267,7 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
t.Error(err)
}
})
if err := handshaker.Handshake(proxyApp); err != nil {
if err := handshaker.Handshake(context.Background(), proxyApp); err != nil {
t.Fatalf("Error on abci handshake: %v", err)
}
var err error
Expand Down
5 changes: 3 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ func StateProvider(stateProvider statesync.StateProvider) Option {
//------------------------------------------------------------------------------

// NewNode returns a new, ready to go, CometBFT Node.
func NewNode(config *cfg.Config,
func NewNode(ctx context.Context,
config *cfg.Config,
privValidator types.PrivValidator,
nodeKey *p2p.NodeKey,
clientCreator proxy.ClientCreator,
Expand Down Expand Up @@ -206,7 +207,7 @@ func NewNode(config *cfg.Config,
// and replays any blocks as necessary to sync CometBFT with the app.
consensusLogger := logger.With("module", "consensus")
if !stateSync {
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
if err := doHandshake(ctx, stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
return nil, err
}

Expand Down
3 changes: 2 additions & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,8 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
require.NoError(t, err)

n, err := NewNode(config,
n, err := NewNode(context.Background(),
config,
privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()),
nodeKey,
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
Expand Down
5 changes: 3 additions & 2 deletions node/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err)
}

return NewNode(config,
return NewNode(context.Background(), config,
privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()),
nodeKey,
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
Expand Down Expand Up @@ -167,6 +167,7 @@ func createAndStartIndexerService(
}

func doHandshake(
ctx context.Context,
stateStore sm.Store,
state sm.State,
blockStore sm.BlockStore,
Expand All @@ -178,7 +179,7 @@ func doHandshake(
handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger)
handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(proxyApp); err != nil {
if err := handshaker.Handshake(ctx, proxyApp); err != nil {
return fmt.Errorf("error during handshake: %v", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion rpc/test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func NewTendermint(app abci.Application, opts *Options) *nm.Node {
if err != nil {
panic(err)
}
node, err := nm.NewNode(config, pv, nodeKey, papp,
node, err := nm.NewNode(context.Background(), config, pv, nodeKey, papp,
nm.DefaultGenesisDocProviderFunc(config),
cfg.DefaultDBProvider,
nm.DefaultMetricsProvider(config.Instrumentation),
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func startNode(cfg *Config) error {
nodeLogger.Info("Using default (synchronized) local client creator")
}

n, err := node.NewNode(cmtcfg,
n, err := node.NewNode(context.Background(), cmtcfg,
privval.LoadOrGenFilePV(cmtcfg.PrivValidatorKeyFile(), cmtcfg.PrivValidatorStateFile()),
nodeKey,
clientCreator,
Expand Down

0 comments on commit c1fdc1b

Please sign in to comment.