forked from iotexproject/iotex-core
/
rolldpos.go
369 lines (329 loc) · 10.5 KB
/
rolldpos.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
// Copyright (c) 2019 IoTeX
// This is an alpha (internal) release and is not suitable for production. This source code is provided 'as is' and no
// warranties are given as to title or non-infringement, merchantability or fitness for purpose and, to the extent
// permitted by law, all liability for your use of the code is disclaimed. This source code is governed by Apache
// License 2.0 that can be found in the LICENSE file.
package rolldpos
import (
"context"
"github.com/facebookgo/clock"
"github.com/iotexproject/go-fsm"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"github.com/iotexproject/iotex-core/action/protocol/rolldpos"
"github.com/iotexproject/iotex-core/actpool"
"github.com/iotexproject/iotex-core/blockchain"
"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/config"
"github.com/iotexproject/iotex-core/consensus/consensusfsm"
"github.com/iotexproject/iotex-core/consensus/scheme"
"github.com/iotexproject/iotex-core/endorsement"
"github.com/iotexproject/iotex-core/explorer/idl/explorer"
"github.com/iotexproject/iotex-core/pkg/keypair"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/protogen/iotextypes"
)
var (
timeSlotMtc = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "iotex_consensus_time_slot",
Help: "Consensus time slot",
},
[]string{},
)
blockIntervalMtc = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "iotex_consensus_block_interval",
Help: "Consensus block interval",
},
[]string{},
)
)
func init() {
prometheus.MustRegister(timeSlotMtc)
prometheus.MustRegister(blockIntervalMtc)
}
var (
// ErrNewRollDPoS indicates the error of constructing RollDPoS
ErrNewRollDPoS = errors.New("error when constructing RollDPoS")
// ErrZeroDelegate indicates seeing 0 delegates in the network
ErrZeroDelegate = errors.New("zero delegates in the network")
// ErrNotEnoughCandidates indicates there are not enough candidates from the candidate pool
ErrNotEnoughCandidates = errors.New("Candidate pool does not have enough candidates")
)
type blockWrapper struct {
*block.Block
round uint32
}
func (bw *blockWrapper) Hash() []byte {
hash := bw.HashBlock()
return hash[:]
}
func (bw *blockWrapper) Endorser() string {
return bw.ProducerAddress()
}
func (bw *blockWrapper) Round() uint32 {
return bw.round
}
// RollDPoS is Roll-DPoS consensus main entrance
type RollDPoS struct {
cfsm *consensusfsm.ConsensusFSM
ctx *rollDPoSCtx
ready chan interface{}
}
// Start starts RollDPoS consensus
func (r *RollDPoS) Start(ctx context.Context) error {
if err := r.cfsm.Start(ctx); err != nil {
return errors.Wrap(err, "error when starting the consensus FSM")
}
if _, err := r.cfsm.BackToPrepare(r.ctx.cfg.Delay); err != nil {
return err
}
close(r.ready)
return nil
}
// Stop stops RollDPoS consensus
func (r *RollDPoS) Stop(ctx context.Context) error {
return errors.Wrap(r.cfsm.Stop(ctx), "error when stopping the consensus FSM")
}
// HandleConsensusMsg handles incoming consensus message
func (r *RollDPoS) HandleConsensusMsg(msg *iotextypes.ConsensusMessage) error {
<-r.ready
consensusHeight := r.ctx.Height()
switch {
case consensusHeight == 0:
log.Logger("consensus").Debug("consensus component is not ready yet")
return nil
case msg.Height < consensusHeight:
log.Logger("consensus").Debug(
"old consensus message",
zap.Uint64("consensusHeight", consensusHeight),
zap.Uint64("msgHeight", msg.Height),
)
return nil
case msg.Height > consensusHeight+1:
log.Logger("consensus").Debug(
"future consensus message",
zap.Uint64("consensusHeight", consensusHeight),
zap.Uint64("msgHeight", msg.Height),
)
return nil
}
endorsedMessage := &EndorsedConsensusMessage{}
if err := endorsedMessage.LoadProto(msg); err != nil {
return errors.Wrapf(err, "failed to decode endorsed consensus message")
}
if !endorsement.VerifyEndorsedDocument(endorsedMessage) {
return errors.New("failed to verify signature in endorsement")
}
en := endorsedMessage.Endorsement()
switch consensusMessage := endorsedMessage.Document().(type) {
case *blockProposal:
if err := r.ctx.CheckBlockProposer(endorsedMessage.Height(), consensusMessage, en); err != nil {
return errors.Wrap(err, "failed to verify block proposal")
}
r.cfsm.ProduceReceiveBlockEvent(endorsedMessage)
return nil
case *ConsensusVote:
if err := r.ctx.CheckVoteEndorser(endorsedMessage.Height(), consensusMessage, en); err != nil {
return errors.Wrapf(err, "failed to verify vote")
}
switch consensusMessage.Topic() {
case PROPOSAL:
r.cfsm.ProduceReceiveProposalEndorsementEvent(endorsedMessage)
case LOCK:
r.cfsm.ProduceReceiveLockEndorsementEvent(endorsedMessage)
case COMMIT:
r.cfsm.ProduceReceivePreCommitEndorsementEvent(endorsedMessage)
}
return nil
// TODO: response block by hash, requestBlock.BlockHash
default:
return errors.Errorf("Invalid consensus message type %+v", msg)
}
}
// Calibrate called on receive a new block not via consensus
func (r *RollDPoS) Calibrate(height uint64) {
r.cfsm.Calibrate(height)
}
// ValidateBlockFooter validates the signatures in the block footer
func (r *RollDPoS) ValidateBlockFooter(blk *block.Block) error {
round, err := r.ctx.RoundCalc().NewRound(blk.Height(), blk.Timestamp())
if err != nil {
return err
}
if round.Proposer() != blk.ProducerAddress() {
return errors.Errorf(
"block proposer %s is invalid, %s expected",
blk.ProducerAddress(),
round.proposer,
)
}
if err := round.AddBlock(blk); err != nil {
return err
}
blkHash := blk.HashBlock()
for _, en := range blk.Endorsements() {
if err := round.AddVoteEndorsement(
NewConsensusVote(blkHash[:], COMMIT),
en,
); err != nil {
return err
}
}
if !round.EndorsedByMajority(blkHash[:], []ConsensusVoteTopic{COMMIT}) {
return ErrInsufficientEndorsements
}
return nil
}
// Metrics returns RollDPoS consensus metrics
func (r *RollDPoS) Metrics() (scheme.ConsensusMetrics, error) {
var metrics scheme.ConsensusMetrics
height := r.ctx.chain.TipHeight()
round, err := r.ctx.RoundCalc().NewRound(height+1, r.ctx.clock.Now())
if err != nil {
return metrics, errors.Wrap(err, "error when calculating round")
}
// Get all candidates
candidates, err := r.ctx.chain.CandidatesByHeight(height)
if err != nil {
return metrics, errors.Wrap(err, "error when getting all candidates")
}
candidateAddresses := make([]string, len(candidates))
for i, c := range candidates {
candidateAddresses[i] = c.Address
}
return scheme.ConsensusMetrics{
LatestEpoch: round.EpochNum(),
LatestHeight: height,
LatestDelegates: round.Delegates(),
LatestBlockProducer: r.ctx.round.proposer,
Candidates: candidateAddresses,
}, nil
}
// NumPendingEvts returns the number of pending events
func (r *RollDPoS) NumPendingEvts() int {
return r.cfsm.NumPendingEvents()
}
// CurrentState returns the current state
func (r *RollDPoS) CurrentState() fsm.State {
return r.cfsm.CurrentState()
}
// Activate activates or pauses the roll-DPoS consensus. When it is deactivated, the node will finish the current
// consensus round if it is doing the work and then return the the initial state
func (r *RollDPoS) Activate(active bool) { r.ctx.Activate(active) }
// Active is true if the roll-DPoS consensus is active, or false if it is stand-by
func (r *RollDPoS) Active() bool {
return r.ctx.Active() || r.cfsm.CurrentState() != consensusfsm.InitState
}
// Builder is the builder for RollDPoS
type Builder struct {
cfg config.Config
// TODO: we should use keystore in the future
encodedAddr string
priKey keypair.PrivateKey
chain blockchain.Blockchain
actPool actpool.ActPool
broadcastHandler scheme.Broadcast
clock clock.Clock
rootChainAPI explorer.Explorer
rp *rolldpos.Protocol
candidatesByHeightFunc CandidatesByHeightFunc
}
// NewRollDPoSBuilder instantiates a Builder instance
func NewRollDPoSBuilder() *Builder {
return &Builder{}
}
// SetConfig sets config
func (b *Builder) SetConfig(cfg config.Config) *Builder {
b.cfg = cfg
return b
}
// SetAddr sets the address and key pair for signature
func (b *Builder) SetAddr(encodedAddr string) *Builder {
b.encodedAddr = encodedAddr
return b
}
// SetPriKey sets the private key
func (b *Builder) SetPriKey(priKey keypair.PrivateKey) *Builder {
b.priKey = priKey
return b
}
// SetBlockchain sets the blockchain APIs
func (b *Builder) SetBlockchain(chain blockchain.Blockchain) *Builder {
b.chain = chain
return b
}
// SetActPool sets the action pool APIs
func (b *Builder) SetActPool(actPool actpool.ActPool) *Builder {
b.actPool = actPool
return b
}
// SetBroadcast sets the broadcast callback
func (b *Builder) SetBroadcast(broadcastHandler scheme.Broadcast) *Builder {
b.broadcastHandler = broadcastHandler
return b
}
// SetClock sets the clock
func (b *Builder) SetClock(clock clock.Clock) *Builder {
b.clock = clock
return b
}
// SetRootChainAPI sets root chain API
func (b *Builder) SetRootChainAPI(api explorer.Explorer) *Builder {
b.rootChainAPI = api
return b
}
// SetCandidatesByHeightFunc sets candidatesByHeightFunc
func (b *Builder) SetCandidatesByHeightFunc(
candidatesByHeightFunc CandidatesByHeightFunc,
) *Builder {
b.candidatesByHeightFunc = candidatesByHeightFunc
return b
}
// RegisterProtocol sets the rolldpos protocol
func (b *Builder) RegisterProtocol(rp *rolldpos.Protocol) *Builder {
b.rp = rp
return b
}
// Build builds a RollDPoS consensus module
func (b *Builder) Build() (*RollDPoS, error) {
if b.chain == nil {
return nil, errors.Wrap(ErrNewRollDPoS, "blockchain APIs is nil")
}
if b.actPool == nil {
return nil, errors.Wrap(ErrNewRollDPoS, "action pool APIs is nil")
}
if b.broadcastHandler == nil {
return nil, errors.Wrap(ErrNewRollDPoS, "broadcast callback is nil")
}
if b.clock == nil {
b.clock = clock.New()
}
ctx := newRollDPoSCtx(
b.cfg.Consensus.RollDPoS,
b.cfg.System.Active,
b.cfg.Genesis.Blockchain.BlockInterval,
b.cfg.Consensus.RollDPoS.ToleratedOvertime,
b.cfg.Genesis.TimeBasedRotation,
b.rootChainAPI,
b.chain,
b.actPool,
b.rp,
b.broadcastHandler,
b.candidatesByHeightFunc,
b.encodedAddr,
b.priKey,
b.clock,
)
cfsm, err := consensusfsm.NewConsensusFSM(b.cfg.Consensus.RollDPoS.FSM, ctx, b.clock)
if err != nil {
return nil, errors.Wrap(err, "error when constructing the consensus FSM")
}
return &RollDPoS{
cfsm: cfsm,
ctx: ctx,
ready: make(chan interface{}),
}, nil
}