-
Notifications
You must be signed in to change notification settings - Fork 2
/
consensus_service.go
589 lines (520 loc) · 19.8 KB
/
consensus_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
package consensus
import (
"math/big"
"sync/atomic"
"time"
"github.com/PositionExchange/posichain/core/types"
"github.com/PositionExchange/posichain/crypto/bls"
bls_core "github.com/PositionExchange/bls/ffi/go/bls"
msg_pb "github.com/PositionExchange/posichain/api/proto/message"
consensus_engine "github.com/PositionExchange/posichain/consensus/engine"
"github.com/PositionExchange/posichain/consensus/quorum"
"github.com/PositionExchange/posichain/consensus/signature"
bls_cosi "github.com/PositionExchange/posichain/crypto/bls"
"github.com/PositionExchange/posichain/crypto/hash"
"github.com/PositionExchange/posichain/internal/chain"
"github.com/PositionExchange/posichain/internal/utils"
"github.com/PositionExchange/posichain/multibls"
"github.com/PositionExchange/posichain/shard"
"github.com/PositionExchange/posichain/shard/committee"
"github.com/ethereum/go-ethereum/common"
protobuf "github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"github.com/rs/zerolog"
)
// WaitForNewRandomness listens to the RndChannel to receive new VDF randomness.
func (consensus *Consensus) WaitForNewRandomness() {
go func() {
for {
vdfOutput := <-consensus.RndChannel
consensus.pendingRnds = append(consensus.pendingRnds, vdfOutput)
}
}()
}
// GetNextRnd returns the oldest available randomness along with the hash of the block there randomness preimage is committed.
func (consensus *Consensus) GetNextRnd() ([vdFAndProofSize]byte, [32]byte, error) {
if len(consensus.pendingRnds) == 0 {
return [vdFAndProofSize]byte{}, [32]byte{}, errors.New("No available randomness")
}
vdfOutput := consensus.pendingRnds[0]
vdfBytes := [vdFAndProofSize]byte{}
seed := [32]byte{}
copy(vdfBytes[:], vdfOutput[:vdFAndProofSize])
copy(seed[:], vdfOutput[vdFAndProofSize:])
//pop the first vdfOutput from the list
consensus.pendingRnds = consensus.pendingRnds[1:]
return vdfBytes, seed, nil
}
var (
empty = []byte{}
)
// Signs the consensus message and returns the marshaled message.
func (consensus *Consensus) signAndMarshalConsensusMessage(message *msg_pb.Message,
priKey *bls_core.SecretKey) ([]byte, error) {
if err := consensus.signConsensusMessage(message, priKey); err != nil {
return empty, err
}
marshaledMessage, err := protobuf.Marshal(message)
if err != nil {
return empty, err
}
return marshaledMessage, nil
}
// UpdatePublicKeys updates the PublicKeys for
// quorum on current subcommittee, protected by a mutex
func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 {
// TODO: use mutex for updating public keys pointer. No need to lock on all these logic.
consensus.pubKeyLock.Lock()
consensus.Decider.UpdateParticipants(pubKeys, allowlist)
consensus.getLogger().Info().Msg("My Committee updated")
for i := range pubKeys {
consensus.getLogger().Info().
Int("index", i).
Str("BLSPubKey", pubKeys[i].Bytes.Hex()).
Msg("Member")
}
allKeys := consensus.Decider.Participants()
if len(allKeys) != 0 {
consensus.LeaderPubKey = &allKeys[0]
consensus.getLogger().Info().
Str("info", consensus.LeaderPubKey.Bytes.Hex()).Msg("My Leader")
} else {
consensus.getLogger().Error().
Msg("[UpdatePublicKeys] Participants is empty")
}
consensus.pubKeyLock.Unlock()
// reset states after update public keys
// TODO: incorporate bitmaps in the decider, so their state can't be inconsistent.
consensus.UpdateBitmaps()
consensus.ResetState()
// do not reset view change state if it is in view changing mode
if !consensus.IsViewChangingMode() {
consensus.ResetViewChangeState()
}
return consensus.Decider.ParticipantsCount()
}
// NewFaker returns a faker consensus.
func NewFaker() *Consensus {
return &Consensus{}
}
// Sign on the hash of the message
func (consensus *Consensus) signMessage(message []byte, priKey *bls_core.SecretKey) []byte {
hash := hash.Keccak256(message)
signature := priKey.SignHash(hash[:])
return signature.Serialize()
}
// Sign on the consensus message signature field.
func (consensus *Consensus) signConsensusMessage(message *msg_pb.Message,
priKey *bls_core.SecretKey) error {
message.Signature = nil
marshaledMessage, err := protobuf.Marshal(message)
if err != nil {
return err
}
// 64 byte of signature on previous data
signature := consensus.signMessage(marshaledMessage, priKey)
message.Signature = signature
return nil
}
// UpdateBitmaps update the bitmaps for prepare and commit phase
func (consensus *Consensus) UpdateBitmaps() {
consensus.getLogger().Debug().
Str("MessageType", consensus.phase.String()).
Msg("[UpdateBitmaps] Updating consensus bitmaps")
members := consensus.Decider.Participants()
prepareBitmap, _ := bls_cosi.NewMask(members, nil)
commitBitmap, _ := bls_cosi.NewMask(members, nil)
multiSigBitmap, _ := bls_cosi.NewMask(members, nil)
consensus.prepareBitmap = prepareBitmap
consensus.commitBitmap = commitBitmap
consensus.multiSigMutex.Lock()
consensus.multiSigBitmap = multiSigBitmap
consensus.multiSigMutex.Unlock()
}
// ResetState resets the state of the consensus
func (consensus *Consensus) ResetState() {
consensus.switchPhase("ResetState", FBFTAnnounce)
consensus.blockHash = [32]byte{}
consensus.block = []byte{}
consensus.Decider.ResetPrepareAndCommitVotes()
if consensus.prepareBitmap != nil {
consensus.prepareBitmap.Clear()
}
if consensus.commitBitmap != nil {
consensus.commitBitmap.Clear()
}
consensus.aggregatedPrepareSig = nil
consensus.aggregatedCommitSig = nil
}
// IsValidatorInCommittee returns whether the given validator BLS address is part of my committee
func (consensus *Consensus) IsValidatorInCommittee(pubKey bls.SerializedPublicKey) bool {
return consensus.Decider.IndexOf(pubKey) != -1
}
// SetMode sets the mode of consensus
func (consensus *Consensus) SetMode(m Mode) {
if m == Normal && consensus.isBackup {
m = NormalBackup
}
consensus.getLogger().Debug().
Str("Mode", m.String()).
Msg("[SetMode]")
consensus.current.SetMode(m)
}
// SetIsBackup sets the mode of consensus
func (consensus *Consensus) SetIsBackup(isBackup bool) {
consensus.getLogger().Debug().
Bool("IsBackup", isBackup).
Msg("[SetIsBackup]")
consensus.isBackup = isBackup
consensus.current.SetIsBackup(isBackup)
}
// Mode returns the mode of consensus
func (consensus *Consensus) Mode() Mode {
return consensus.current.Mode()
}
// RegisterPRndChannel registers the channel for receiving randomness preimage from DRG protocol
func (consensus *Consensus) RegisterPRndChannel(pRndChannel chan []byte) {
consensus.PRndChannel = pRndChannel
}
// RegisterRndChannel registers the channel for receiving final randomness from DRG protocol
func (consensus *Consensus) RegisterRndChannel(rndChannel chan [548]byte) {
consensus.RndChannel = rndChannel
}
// Check viewID, caller's responsibility to hold lock when change ignoreViewIDCheck
func (consensus *Consensus) checkViewID(msg *FBFTMessage) error {
// just ignore consensus check for the first time when node join
if consensus.IgnoreViewIDCheck.IsSet() {
//in syncing mode, node accepts incoming messages without viewID/leaderKey checking
//so only set mode to normal when new node enters consensus and need checking viewID
consensus.SetMode(Normal)
consensus.SetViewIDs(msg.ViewID)
if !msg.HasSingleSender() {
return errors.New("Leader message can not have multiple sender keys")
}
consensus.LeaderPubKey = msg.SenderPubkeys[0]
consensus.IgnoreViewIDCheck.UnSet()
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Info().
Str("leaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Msg("[checkViewID] Start consensus timer")
return nil
} else if msg.ViewID > consensus.GetCurBlockViewID() {
return consensus_engine.ErrViewIDNotMatch
} else if msg.ViewID < consensus.GetCurBlockViewID() {
return errors.New("view ID belongs to the past")
}
return nil
}
// SetBlockNum sets the blockNum in consensus object, called at node bootstrap
func (consensus *Consensus) SetBlockNum(blockNum uint64) {
atomic.StoreUint64(&consensus.blockNum, blockNum)
}
// ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading
func (consensus *Consensus) ReadSignatureBitmapPayload(
recvPayload []byte, offset int,
) (*bls_core.Sign, *bls_cosi.Mask, error) {
if offset+bls.BLSSignatureSizeInBytes > len(recvPayload) {
return nil, nil, errors.New("payload not have enough length")
}
sigAndBitmapPayload := recvPayload[offset:]
// TODO(audit): keep a Mask in the Decider so it won't be reconstructed on the fly.
members := consensus.Decider.Participants()
return chain.ReadSignatureBitmapByPublicKeys(
sigAndBitmapPayload, members,
)
}
// UpdateConsensusInformation will update shard information (epoch, publicKeys, blockNum, viewID)
// based on the local blockchain. It is called in two cases for now:
// 1. consensus object initialization. because of current dependency where chainreader is only available
// after node is initialized; node is only available after consensus is initialized
// we need call this function separately after create consensus object
// 2. after state syncing is finished
// It will return the mode:
// (a) node not in committed: Listening mode
// (b) node in committed but has any err during processing: Syncing mode
// (c) node in committed and everything looks good: Normal mode
func (consensus *Consensus) UpdateConsensusInformation() Mode {
curHeader := consensus.Blockchain.CurrentHeader()
curEpoch := curHeader.Epoch()
nextEpoch := new(big.Int).Add(curHeader.Epoch(), common.Big1)
// Overwrite nextEpoch if the shard state has a epoch number
if curHeader.IsLastBlockInEpoch() {
nextShardState, err := curHeader.GetShardState()
if err != nil {
consensus.getLogger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("[UpdateConsensusInformation] Error retrieving current shard state in the first block")
return Syncing
}
if nextShardState.Epoch != nil {
nextEpoch = nextShardState.Epoch
}
}
consensus.BlockPeriod = 5 * time.Second
// Enable 2s block time at the twoSecondsEpoch
if consensus.Blockchain.Config().IsTwoSeconds(nextEpoch) {
consensus.BlockPeriod = 2 * time.Second
}
isFirstTimeStaking := consensus.Blockchain.Config().IsStaking(nextEpoch) &&
curHeader.IsLastBlockInEpoch() && !consensus.Blockchain.Config().IsStaking(curEpoch)
haventUpdatedDecider := consensus.Blockchain.Config().IsStaking(curEpoch) &&
consensus.Decider.Policy() != quorum.SuperMajorityStake
// Only happens once, the flip-over to a new Decider policy
if isFirstTimeStaking || haventUpdatedDecider {
decider := quorum.NewDecider(quorum.SuperMajorityStake, consensus.ShardID)
decider.SetMyPublicKeyProvider(func() (multibls.PublicKeys, error) {
return consensus.GetPublicKeys(), nil
})
consensus.Decider = decider
}
var committeeToSet *shard.Committee
epochToSet := curEpoch
hasError := false
curShardState, err := committee.WithStakingEnabled.ReadFromDB(
curEpoch, consensus.Blockchain,
)
if err != nil {
consensus.getLogger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("[UpdateConsensusInformation] Error retrieving current shard state")
return Syncing
}
consensus.getLogger().Info().Msg("[UpdateConsensusInformation] Updating.....")
// genesis block is a special case that will have shard state and needs to skip processing
isNotGenesisBlock := curHeader.Number().Cmp(big.NewInt(0)) > 0
if curHeader.IsLastBlockInEpoch() && isNotGenesisBlock {
nextShardState, err := committee.WithStakingEnabled.ReadFromDB(
nextEpoch, consensus.Blockchain,
)
if err != nil {
consensus.getLogger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("Error retrieving nextEpoch shard state")
return Syncing
}
subComm, err := nextShardState.FindCommitteeByID(curHeader.ShardID())
if err != nil {
consensus.getLogger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("Error retrieving nextEpoch shard state")
return Syncing
}
committeeToSet = subComm
epochToSet = nextEpoch
} else {
subComm, err := curShardState.FindCommitteeByID(curHeader.ShardID())
if err != nil {
consensus.getLogger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("Error retrieving current shard state")
return Syncing
}
committeeToSet = subComm
}
if len(committeeToSet.Slots) == 0 {
consensus.getLogger().Warn().
Msg("[UpdateConsensusInformation] No members in the committee to update")
hasError = true
}
// update public keys in the committee
oldLeader := consensus.LeaderPubKey
pubKeys, _ := committeeToSet.BLSPublicKeys()
consensus.getLogger().Info().
Int("numPubKeys", len(pubKeys)).
Msg("[UpdateConsensusInformation] Successfully updated public keys")
consensus.UpdatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(nextEpoch).ExternalAllowlist())
// Update voters in the committee
if _, err := consensus.Decider.SetVoters(
committeeToSet, epochToSet,
); err != nil {
consensus.getLogger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("Error when updating voters")
return Syncing
}
// take care of possible leader change during the epoch
// TODO: in a very rare case, when a M1 view change happened, the block contains coinbase for last leader
// but the new leader is actually recognized by most of the nodes. At this time, if a node sync to this
// exact block and set its leader, it will set with the failed leader as in the coinbase of the block.
// This is a very rare case scenario and not likely to cause any issue in mainnet. But we need to think about
// a solution to take care of this case because the coinbase of the latest block doesn't really represent the
// the real current leader in case of M1 view change.
if !curHeader.IsLastBlockInEpoch() && curHeader.Number().Uint64() != 0 {
leaderPubKey, err := chain.GetLeaderPubKeyFromCoinbase(consensus.Blockchain, curHeader)
if err != nil || leaderPubKey == nil {
consensus.getLogger().Error().Err(err).
Msg("[UpdateConsensusInformation] Unable to get leaderPubKey from coinbase")
consensus.IgnoreViewIDCheck.Set()
hasError = true
} else {
consensus.getLogger().Info().
Str("leaderPubKey", leaderPubKey.Bytes.Hex()).
Msg("[UpdateConsensusInformation] Most Recent LeaderPubKey Updated Based on BlockChain")
consensus.LeaderPubKey = leaderPubKey
}
}
for _, key := range pubKeys {
// in committee
myPubKeys := consensus.GetPublicKeys()
if myPubKeys.Contains(key.Object) {
if hasError {
consensus.getLogger().Error().
Str("myKey", myPubKeys.SerializeToHexStr()).
Msg("[UpdateConsensusInformation] hasError")
return Syncing
}
// If the leader changed and I myself become the leader
if (oldLeader != nil && consensus.LeaderPubKey != nil &&
!consensus.LeaderPubKey.Object.IsEqual(oldLeader.Object)) && consensus.IsLeader() {
go func() {
consensus.getLogger().Info().
Str("myKey", myPubKeys.SerializeToHexStr()).
Msg("[UpdateConsensusInformation] I am the New Leader")
consensus.ReadySignal <- SyncProposal
}()
}
return Normal
}
}
consensus.getLogger().Info().
Msg("[UpdateConsensusInformation] not in committee, Listening")
// not in committee
return Listening
}
// IsLeader check if the node is a leader or not by comparing the public key of
// the node with the leader public key
func (consensus *Consensus) IsLeader() bool {
for _, key := range consensus.priKey {
if key.Pub.Object.IsEqual(consensus.LeaderPubKey.Object) {
return true
}
}
return false
}
// SetViewIDs set both current view ID and view changing ID to the height
// of the blockchain. It is used during client startup to recover the state
func (consensus *Consensus) SetViewIDs(height uint64) {
consensus.SetCurBlockViewID(height)
consensus.SetViewChangingID(height)
}
// SetCurBlockViewID set the current view ID
func (consensus *Consensus) SetCurBlockViewID(viewID uint64) {
consensus.current.SetCurBlockViewID(viewID)
}
// SetViewChangingID set the current view change ID
func (consensus *Consensus) SetViewChangingID(viewID uint64) {
consensus.current.SetViewChangingID(viewID)
}
// StartFinalityCount set the finality counter to current time
func (consensus *Consensus) StartFinalityCount() {
consensus.finalityCounter = time.Now().UnixNano()
}
// FinishFinalityCount calculate the current finality
func (consensus *Consensus) FinishFinalityCount() {
d := time.Now().UnixNano()
consensus.finality = (d - consensus.finalityCounter) / 1000000
consensusFinalityHistogram.Observe(float64(consensus.finality))
}
// GetFinality returns the finality time in milliseconds of previous consensus
func (consensus *Consensus) GetFinality() int64 {
return consensus.finality
}
// switchPhase will switch FBFTPhase to nextPhase if the desirePhase equals the nextPhase
func (consensus *Consensus) switchPhase(subject string, desired FBFTPhase) {
consensus.getLogger().Info().
Str("from:", consensus.phase.String()).
Str("to:", desired.String()).
Str("switchPhase:", subject)
consensus.phase = desired
return
}
var (
errGetPreparedBlock = errors.New("failed to get prepared block for self commit")
errReadBitmapPayload = errors.New("failed to read signature bitmap payload")
)
// selfCommit will create a commit message and commit it locally
// it is used by the new leadder of the view change routine
// when view change is succeeded and the new leader
// received prepared payload from other validators or from local
func (consensus *Consensus) selfCommit(payload []byte) error {
var blockHash [32]byte
copy(blockHash[:], payload[:32])
// Leader sign and add commit message
block := consensus.FBFTLog.GetBlockByHash(blockHash)
if block == nil {
return errGetPreparedBlock
}
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(payload, 32)
if err != nil {
return errReadBitmapPayload
}
// Have to keep the block hash so the leader can finish the commit phase of prepared block
consensus.ResetState()
copy(consensus.blockHash[:], blockHash[:])
consensus.switchPhase("selfCommit", FBFTCommit)
consensus.aggregatedPrepareSig = aggSig
consensus.prepareBitmap = mask
commitPayload := signature.ConstructCommitPayload(consensus.Blockchain,
block.Epoch(), block.Hash(), block.NumberU64(), block.Header().ViewID().Uint64())
for i, key := range consensus.priKey {
if err := consensus.commitBitmap.SetKey(key.Pub.Bytes, true); err != nil {
consensus.getLogger().Error().
Err(err).
Int("Index", i).
Str("Key", key.Pub.Bytes.Hex()).
Msg("[selfCommit] New Leader commit bitmap set failed")
continue
}
if _, err := consensus.Decider.AddNewVote(
quorum.Commit,
[]*bls_cosi.PublicKeyWrapper{key.Pub},
key.Pri.SignHash(commitPayload),
common.BytesToHash(consensus.blockHash[:]),
block.NumberU64(),
block.Header().ViewID().Uint64(),
); err != nil {
consensus.getLogger().Warn().
Err(err).
Int("Index", i).
Str("Key", key.Pub.Bytes.Hex()).
Msg("[selfCommit] submit vote on viewchange commit failed")
}
}
return nil
}
// NumSignaturesIncludedInBlock returns the number of signatures included in the block
func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uint32 {
count := uint32(0)
members := consensus.Decider.Participants()
// TODO(audit): do not reconstruct the Mask
mask, err := bls.NewMask(members, nil)
if err != nil {
return count
}
err = mask.SetMask(block.Header().LastCommitBitmap())
if err != nil {
return count
}
for _, key := range consensus.GetPublicKeys() {
if ok, err := mask.KeyEnabled(key.Bytes); err == nil && ok {
count++
}
}
return count
}
// getLogger returns logger for consensus contexts added
func (consensus *Consensus) getLogger() *zerolog.Logger {
logger := utils.Logger().With().
Uint64("myBlock", consensus.blockNum).
Uint64("myViewID", consensus.GetCurBlockViewID()).
Str("phase", consensus.phase.String()).
Str("mode", consensus.current.Mode().String()).
Logger()
return &logger
}