forked from tendermint/tendermint
-
Notifications
You must be signed in to change notification settings - Fork 0
/
replay.go
492 lines (424 loc) · 15.9 KB
/
replay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
package consensus
import (
"bytes"
"fmt"
"hash/crc32"
"io"
"reflect"
//"strconv"
//"strings"
"time"
abci "github.com/tendermint/tendermint/abci/types"
//auto "github.com/tendermint/tendermint/libs/autofile"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
)
var crc32c = crc32.MakeTable(crc32.Castagnoli)
// Functionality to replay blocks and messages on recovery from a crash.
// There are two general failure scenarios:
//
// 1. failure during consensus
// 2. failure while applying the block
//
// The former is handled by the WAL, the latter by the proxyApp Handshake on
// restart, which ultimately hands off the work to the WAL.
//-----------------------------------------
// 1. Recover from failure during consensus
// (by replaying messages from the WAL)
//-----------------------------------------
// Unmarshal and apply a single message to the consensus state as if it were
// received in receiveRoutine. Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running.
func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan interface{}) error {
// Skip meta messages which exist for demarcating boundaries.
if _, ok := msg.Msg.(EndHeightMessage); ok {
return nil
}
// for logging
switch m := msg.Msg.(type) {
case types.EventDataRoundState:
cs.Logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
// these are playback checks
ticker := time.After(time.Second * 2)
if newStepCh != nil {
select {
case mi := <-newStepCh:
m2 := mi.(types.EventDataRoundState)
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m)
}
case <-ticker:
return fmt.Errorf("Failed to read off newStepCh")
}
}
case msgInfo:
peerID := m.PeerID
if peerID == "" {
peerID = "local"
}
switch msg := m.Msg.(type) {
case *ProposalMessage:
p := msg.Proposal
cs.Logger.Info("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
p.BlockID.PartsHeader, "pol", p.POLRound, "peer", peerID)
case *BlockPartMessage:
cs.Logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerID)
case *VoteMessage:
v := msg.Vote
cs.Logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
"blockID", v.BlockID, "peer", peerID)
}
cs.handleMsg(m)
case timeoutInfo:
cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.handleTimeout(m, cs.RoundState)
default:
return fmt.Errorf("Replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg))
}
return nil
}
// Replay only those messages since the last block. `timeoutRoutine` should
// run concurrently to read off tickChan.
func (cs *ConsensusState) catchupReplay(csHeight int64) error {
// Set replayMode to true so we don't log signing errors.
cs.replayMode = true
defer func() { cs.replayMode = false }()
// Ensure that #ENDHEIGHT for this height doesn't exist.
// NOTE: This is just a sanity check. As far as we know things work fine
// without it, and Handshake could reuse ConsensusState if it weren't for
// this check (since we can crash after writing #ENDHEIGHT).
//
// Ignore data corruption errors since this is a sanity check.
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err != nil {
return err
}
if gr != nil {
if err := gr.Close(); err != nil {
return err
}
}
if found {
return fmt.Errorf("WAL should not contain #ENDHEIGHT %d", csHeight)
}
// Search for last height marker.
//
// Ignore data corruption errors in previous heights because we only care about last height
gr, found, err = cs.wal.SearchForEndHeight(csHeight-1, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err == io.EOF {
cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1)
} else if err != nil {
return err
}
if !found {
return fmt.Errorf("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d", csHeight, csHeight-1)
}
defer gr.Close() // nolint: errcheck
cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight)
var msg *TimedWALMessage
dec := WALDecoder{gr}
for {
msg, err = dec.Decode()
if err == io.EOF {
break
} else if IsDataCorruptionError(err) {
cs.Logger.Debug("data has been corrupted in last height of consensus WAL", "err", err, "height", csHeight)
panic(fmt.Sprintf("data has been corrupted (%v) in last height %d of consensus WAL", err, csHeight))
} else if err != nil {
return err
}
// NOTE: since the priv key is set when the msgs are received
// it will attempt to eg double sign but we can just ignore it
// since the votes will be replayed and we'll get to the next step
if err := cs.readReplayMessage(msg, nil); err != nil {
return err
}
}
cs.Logger.Info("Replay: Done")
return nil
}
//--------------------------------------------------------------------------------
// Parses marker lines of the form:
// #ENDHEIGHT: 12345
/*
func makeHeightSearchFunc(height int64) auto.SearchFunc {
return func(line string) (int, error) {
line = strings.TrimRight(line, "\n")
parts := strings.Split(line, " ")
if len(parts) != 2 {
return -1, errors.New("Line did not have 2 parts")
}
i, err := strconv.Atoi(parts[1])
if err != nil {
return -1, errors.New("Failed to parse INFO: " + err.Error())
}
if height < i {
return 1, nil
} else if height == i {
return 0, nil
} else {
return -1, nil
}
}
}*/
//---------------------------------------------------
// 2. Recover from failure while applying the block.
// (by handshaking with the app to figure out where
// we were last, and using the WAL to recover there.)
//---------------------------------------------------
type Handshaker struct {
stateDB dbm.DB
initialState sm.State
store sm.BlockStore
genDoc *types.GenesisDoc
logger log.Logger
nBlocks int // number of blocks applied to the state
}
func NewHandshaker(stateDB dbm.DB, state sm.State,
store sm.BlockStore, genDoc *types.GenesisDoc) *Handshaker {
return &Handshaker{
stateDB: stateDB,
initialState: state,
store: store,
genDoc: genDoc,
logger: log.NewNopLogger(),
nBlocks: 0,
}
}
func (h *Handshaker) SetLogger(l log.Logger) {
h.logger = l
}
func (h *Handshaker) NBlocks() int {
return h.nBlocks
}
// TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// Handshake is done via ABCI Info on the query conn.
res, err := proxyApp.Query().InfoSync(proxy.RequestInfo)
if err != nil {
return fmt.Errorf("Error calling Info: %v", err)
}
blockHeight := int64(res.LastBlockHeight)
if blockHeight < 0 {
return fmt.Errorf("Got a negative last block height (%d) from the app", blockHeight)
}
appHash := res.LastBlockAppHash
h.logger.Info("ABCI Handshake App Info",
"height", blockHeight,
"hash", fmt.Sprintf("%X", appHash),
"software-version", res.Version,
"protocol-version", res.AppVersion,
)
// Set AppVersion on the state.
h.initialState.Version.Consensus.App = version.Protocol(res.AppVersion)
sm.SaveState(h.stateDB, h.initialState)
// Replay blocks up to the latest in the blockstore.
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
if err != nil {
return fmt.Errorf("Error on replay: %v", err)
}
h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced",
"appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash))
// TODO: (on restart) replay mempool
return nil
}
// Replay all blocks since appBlockHeight and ensure the result matches the current state.
// Returns the final AppHash or an error.
func (h *Handshaker) ReplayBlocks(
state sm.State,
appHash []byte,
appBlockHeight int64,
proxyApp proxy.AppConns,
) ([]byte, error) {
storeBlockHeight := h.store.Height()
stateBlockHeight := state.LastBlockHeight
h.logger.Info("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight)
// If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain.
if appBlockHeight == 0 {
validators := make([]*types.Validator, len(h.genDoc.Validators))
for i, val := range h.genDoc.Validators {
validators[i] = types.NewValidator(val.PubKey, val.Power)
}
validatorSet := types.NewValidatorSet(validators)
nextVals := types.TM2PB.ValidatorUpdates(validatorSet)
csParams := types.TM2PB.ConsensusParams(h.genDoc.ConsensusParams)
req := abci.RequestInitChain{
Time: h.genDoc.GenesisTime,
ChainId: h.genDoc.ChainID,
ConsensusParams: csParams,
Validators: nextVals,
AppStateBytes: h.genDoc.AppState,
}
res, err := proxyApp.Consensus().InitChainSync(req)
if err != nil {
return nil, err
}
if stateBlockHeight == 0 { //we only update state when we are in initial state
// If the app returned validators or consensus params, update the state.
if len(res.Validators) > 0 {
vals, err := types.PB2TM.ValidatorUpdates(res.Validators)
if err != nil {
return nil, err
}
state.Validators = types.NewValidatorSet(vals)
state.NextValidators = types.NewValidatorSet(vals)
} else {
// If validator set is not set in genesis and still empty after InitChain, exit.
if len(h.genDoc.Validators) == 0 {
return nil, fmt.Errorf("Validator set is nil in genesis and still empty after InitChain")
}
}
if res.ConsensusParams != nil {
state.ConsensusParams = types.PB2TM.ConsensusParams(res.ConsensusParams)
}
sm.SaveState(h.stateDB, state)
}
}
// First handle edge cases and constraints on the storeBlockHeight.
if storeBlockHeight == 0 {
return appHash, checkAppHash(state, appHash)
} else if storeBlockHeight < appBlockHeight {
// the app should never be ahead of the store (but this is under app's control)
return appHash, sm.ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight}
} else if storeBlockHeight < stateBlockHeight {
// the state should never be ahead of the store (this is under tendermint's control)
cmn.PanicSanity(fmt.Sprintf("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight))
} else if storeBlockHeight > stateBlockHeight+1 {
// store should be at most one ahead of the state (this is under tendermint's control)
cmn.PanicSanity(fmt.Sprintf("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1))
}
var err error
// Now either store is equal to state, or one ahead.
// For each, consider all cases of where the app could be, given app <= store
if storeBlockHeight == stateBlockHeight {
// Tendermint ran Commit and saved the state.
// Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false)
} else if appBlockHeight == storeBlockHeight {
// We're good!
return appHash, checkAppHash(state, appHash)
}
} else if storeBlockHeight == stateBlockHeight+1 {
// We saved the block in the store but haven't updated the state,
// so we'll need to replay a block using the WAL.
if appBlockHeight < stateBlockHeight {
// the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true)
} else if appBlockHeight == stateBlockHeight {
// We haven't run Commit (both the state and app are one block behind),
// so replayBlock with the real app.
// NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT
h.logger.Info("Replay last block using real app")
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
return state.AppHash, err
} else if appBlockHeight == storeBlockHeight {
// We ran Commit, but didn't save the state, so replayBlock with mock app
abciResponses, err := sm.LoadABCIResponses(h.stateDB, storeBlockHeight)
if err != nil {
return nil, err
}
mockApp := newMockProxyApp(appHash, abciResponses)
h.logger.Info("Replay last block using mock app")
state, err = h.replayBlock(state, storeBlockHeight, mockApp)
return state.AppHash, err
}
}
cmn.PanicSanity("Should never happen")
return nil, nil
}
func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) {
// App is further behind than it should be, so we need to replay blocks.
// We replay all blocks from appBlockHeight+1.
//
// Note that we don't have an old version of the state,
// so we by-pass state validation/mutation using sm.ExecCommitBlock.
// This also means we won't be saving validator sets if they change during this period.
// TODO: Load the historical information to fix this and just use state.ApplyBlock
//
// If mutateState == true, the final block is replayed with h.replayBlock()
var appHash []byte
var err error
finalBlock := storeBlockHeight
if mutateState {
finalBlock--
}
for i := appBlockHeight + 1; i <= finalBlock; i++ {
h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i)
appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, state.LastValidators, h.stateDB)
if err != nil {
return nil, err
}
h.nBlocks++
}
if mutateState {
// sync the final block
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
if err != nil {
return nil, err
}
appHash = state.AppHash
}
return appHash, checkAppHash(state, appHash)
}
// ApplyBlock on the proxyApp with the last block.
func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) {
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)
blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, sm.MockMempool{}, sm.MockEvidencePool{})
var err error
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)
if err != nil {
return sm.State{}, err
}
h.nBlocks++
return state, nil
}
func checkAppHash(state sm.State, appHash []byte) error {
if !bytes.Equal(state.AppHash, appHash) {
panic(fmt.Errorf("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, state.AppHash).Error())
}
return nil
}
//--------------------------------------------------------------------------------
// mockProxyApp uses ABCIResponses to give the right results
// Useful because we don't want to call Commit() twice for the same block on the real app.
func newMockProxyApp(appHash []byte, abciResponses *sm.ABCIResponses) proxy.AppConnConsensus {
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{
appHash: appHash,
abciResponses: abciResponses,
})
cli, _ := clientCreator.NewABCIClient()
err := cli.Start()
if err != nil {
panic(err)
}
return proxy.NewAppConnConsensus(cli)
}
type mockProxyApp struct {
abci.BaseApplication
appHash []byte
txCount int
abciResponses *sm.ABCIResponses
}
func (mock *mockProxyApp) DeliverTx(tx []byte) abci.ResponseDeliverTx {
r := mock.abciResponses.DeliverTx[mock.txCount]
mock.txCount++
return *r
}
func (mock *mockProxyApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
mock.txCount = 0
return *mock.abciResponses.EndBlock
}
func (mock *mockProxyApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{Data: mock.appHash}
}