Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make handshake cancelable #857

Merged
merged 9 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[consensus]` `Handshaker.Handshake` now requires `context.Context` ([cometbft/cometbft\#857](https://github.com/cometbft/cometbft/pull/857))
- `[node]` `NewNode` now requires `context.Context` as the first parameter ([cometbft/cometbft\#857](https://github.com/cometbft/cometbft/pull/857))
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- `[node]` Make handshake cancelable ([cometbft/cometbft\#857](https://github.com/cometbft/cometbft/pull/857))
18 changes: 13 additions & 5 deletions consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,10 @@ func (h *Handshaker) NBlocks() int {
}

// TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
func (h *Handshaker) Handshake(ctx context.Context, proxyApp proxy.AppConns) error {

// Handshake is done via ABCI Info on the query conn.
res, err := proxyApp.Query().Info(context.TODO(), proxy.RequestInfo)
res, err := proxyApp.Query().Info(ctx, proxy.RequestInfo)
if err != nil {
return fmt.Errorf("error calling Info: %v", err)
}
Expand All @@ -266,7 +266,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
}

// Replay blocks up to the latest in the blockstore.
appHash, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
appHash, err = h.ReplayBlocks(ctx, h.initialState, appHash, blockHeight, proxyApp)
if err != nil {
return fmt.Errorf("error on replay: %v", err)
}
Expand All @@ -283,6 +283,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// matches the current state.
// Returns the final AppHash or an error.
func (h *Handshaker) ReplayBlocks(
ctx context.Context,
state sm.State,
appHash []byte,
appBlockHeight int64,
Expand Down Expand Up @@ -391,7 +392,7 @@ func (h *Handshaker) ReplayBlocks(
// Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false)
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, false)

} else if appBlockHeight == storeBlockHeight {
// We're good!
Expand All @@ -406,7 +407,7 @@ func (h *Handshaker) ReplayBlocks(
case appBlockHeight < stateBlockHeight:
// the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true)
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, true)

case appBlockHeight == stateBlockHeight:
// We haven't run Commit (both the state and app are one block behind),
Expand Down Expand Up @@ -443,6 +444,7 @@ func (h *Handshaker) ReplayBlocks(
}

func (h *Handshaker) replayBlocks(
ctx context.Context,
state sm.State,
proxyApp proxy.AppConns,
appBlockHeight,
Expand All @@ -469,6 +471,12 @@ func (h *Handshaker) replayBlocks(
firstBlock = state.InitialHeight
}
for i := firstBlock; i <= finalBlock; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i)
// Extra check to ensure the app was not changed in a way it shouldn't have.
Expand Down
2 changes: 1 addition & 1 deletion consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo

handshaker := NewHandshaker(stateStore, state, blockStore, gdoc)
handshaker.SetEventBus(eventBus)
err = handshaker.Handshake(proxyApp)
err = handshaker.Handshake(context.Background(), proxyApp)
if err != nil {
cmtos.Exit(fmt.Sprintf("Error on handshake: %v", err))
}
Expand Down
8 changes: 4 additions & 4 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
})

// perform the replay protocol to sync Tendermint and the application
err = handshaker.Handshake(proxyApp)
err = handshaker.Handshake(context.Background(), proxyApp)
if expectError {
require.Error(t, err)
// finish the test early
Expand Down Expand Up @@ -953,7 +953,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {

assert.Panics(t, func() {
h := NewHandshaker(stateStore, state, store, genDoc)
if err = h.Handshake(proxyApp); err != nil {
if err = h.Handshake(context.Background(), proxyApp); err != nil {
t.Log(err)
}
})
Expand All @@ -977,7 +977,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {

assert.Panics(t, func() {
h := NewHandshaker(stateStore, state, store, genDoc)
if err = h.Handshake(proxyApp); err != nil {
if err = h.Handshake(context.Background(), proxyApp); err != nil {
t.Log(err)
}
})
Expand Down Expand Up @@ -1267,7 +1267,7 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
t.Error(err)
}
})
if err := handshaker.Handshake(proxyApp); err != nil {
if err := handshaker.Handshake(context.Background(), proxyApp); err != nil {
t.Fatalf("Error on abci handshake: %v", err)
}
var err error
Expand Down
5 changes: 3 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ func StateProvider(stateProvider statesync.StateProvider) Option {
//------------------------------------------------------------------------------

// NewNode returns a new, ready to go, CometBFT Node.
func NewNode(config *cfg.Config,
func NewNode(ctx context.Context,
config *cfg.Config,
privValidator types.PrivValidator,
nodeKey *p2p.NodeKey,
clientCreator proxy.ClientCreator,
Expand Down Expand Up @@ -206,7 +207,7 @@ func NewNode(config *cfg.Config,
// and replays any blocks as necessary to sync CometBFT with the app.
consensusLogger := logger.With("module", "consensus")
if !stateSync {
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
if err := doHandshake(ctx, stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
return nil, err
}

Expand Down
3 changes: 2 additions & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,8 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
require.NoError(t, err)

n, err := NewNode(config,
n, err := NewNode(context.Background(),
config,
privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()),
nodeKey,
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
Expand Down
5 changes: 3 additions & 2 deletions node/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err)
}

return NewNode(config,
return NewNode(context.Background(), config,
privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()),
nodeKey,
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
Expand Down Expand Up @@ -167,6 +167,7 @@ func createAndStartIndexerService(
}

func doHandshake(
ctx context.Context,
stateStore sm.Store,
state sm.State,
blockStore sm.BlockStore,
Expand All @@ -178,7 +179,7 @@ func doHandshake(
handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger)
handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(proxyApp); err != nil {
if err := handshaker.Handshake(ctx, proxyApp); err != nil {
return fmt.Errorf("error during handshake: %v", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion rpc/test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func NewTendermint(app abci.Application, opts *Options) *nm.Node {
if err != nil {
panic(err)
}
node, err := nm.NewNode(config, pv, nodeKey, papp,
node, err := nm.NewNode(context.Background(), config, pv, nodeKey, papp,
nm.DefaultGenesisDocProviderFunc(config),
cfg.DefaultDBProvider,
nm.DefaultMetricsProvider(config.Instrumentation),
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func startNode(cfg *Config) error {
nodeLogger.Info("Using default (synchronized) local client creator")
}

n, err := node.NewNode(cmtcfg,
n, err := node.NewNode(context.Background(), cmtcfg,
privval.LoadOrGenFilePV(cmtcfg.PrivValidatorKeyFile(), cmtcfg.PrivValidatorStateFile()),
nodeKey,
clientCreator,
Expand Down
Loading