-
Notifications
You must be signed in to change notification settings - Fork 2
/
fbft_log.go
407 lines (350 loc) · 12.6 KB
/
fbft_log.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
package consensus
import (
"encoding/binary"
"fmt"
"sync"
bls_core "github.com/PositionExchange/bls/ffi/go/bls"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/rs/zerolog"
msg_pb "github.com/PositionExchange/posichain/api/proto/message"
"github.com/PositionExchange/posichain/core/types"
"github.com/PositionExchange/posichain/crypto/bls"
bls_cosi "github.com/PositionExchange/posichain/crypto/bls"
)
// FBFTMessage is the record of pbft messages received by a node during FBFT process
type FBFTMessage struct {
MessageType msg_pb.MessageType
ViewID uint64
BlockNum uint64
BlockHash common.Hash
Block []byte
SenderPubkeys []*bls.PublicKeyWrapper
SenderPubkeyBitmap []byte
LeaderPubkey *bls.PublicKeyWrapper
Payload []byte
ViewchangeSig *bls_core.Sign
ViewidSig *bls_core.Sign
M2AggSig *bls_core.Sign
M2Bitmap *bls_cosi.Mask
M3AggSig *bls_core.Sign
M3Bitmap *bls_cosi.Mask
Verified bool
}
// String ..
func (m *FBFTMessage) String() string {
sender := ""
for _, key := range m.SenderPubkeys {
if sender == "" {
sender = key.Bytes.Hex()
} else {
sender = sender + ";" + key.Bytes.Hex()
}
}
leader := ""
if m.LeaderPubkey != nil {
leader = m.LeaderPubkey.Bytes.Hex()
}
return fmt.Sprintf(
"[Type:%s ViewID:%d Num:%d BlockHash:%s Sender:%s Leader:%s]",
m.MessageType.String(),
m.ViewID,
m.BlockNum,
m.BlockHash.Hex(),
sender,
leader,
)
}
// HasSingleSender returns whether the message has only a single sender
func (m *FBFTMessage) HasSingleSender() bool {
return len(m.SenderPubkeys) == 1
}
const (
idTypeBytes = 4
idViewIDBytes = 8
idHashBytes = common.HashLength
idSenderBytes = bls.PublicKeySizeInBytes
idBytes = idTypeBytes + idViewIDBytes + idHashBytes + idSenderBytes
)
type (
// fbftMsgID is the id that uniquely defines a fbft message.
fbftMsgID [idBytes]byte
)
// id return the ID of the FBFT message which uniquely identifies a FBFT message.
// The ID is a concatenation of MsgType, BlockHash, and sender key
func (m *FBFTMessage) id() fbftMsgID {
var id fbftMsgID
binary.LittleEndian.PutUint32(id[:], uint32(m.MessageType))
binary.LittleEndian.PutUint64(id[idTypeBytes:], m.ViewID)
copy(id[idTypeBytes+idViewIDBytes:], m.BlockHash[:])
if m.HasSingleSender() {
copy(id[idTypeBytes+idViewIDBytes+idHashBytes:], m.SenderPubkeys[0].Bytes[:])
} else {
// Currently this case is not reachable as only validator will use id() func
// and validator won't receive message with multiple senders
copy(id[idTypeBytes+idViewIDBytes+idHashBytes:], m.SenderPubkeyBitmap[:])
}
return id
}
// FBFTLog represents the log stored by a node during FBFT process
type FBFTLog struct {
blocks map[common.Hash]*types.Block // store blocks received in FBFT
verifiedBlocks map[common.Hash]struct{} // store block hashes for blocks that has already been verified
blockLock sync.RWMutex
messages map[fbftMsgID]*FBFTMessage // store messages received in FBFT
msgLock sync.RWMutex
}
// NewFBFTLog returns new instance of FBFTLog
func NewFBFTLog() *FBFTLog {
pbftLog := FBFTLog{
blocks: make(map[common.Hash]*types.Block),
messages: make(map[fbftMsgID]*FBFTMessage),
verifiedBlocks: make(map[common.Hash]struct{}),
}
return &pbftLog
}
// AddBlock add a new block into the log
func (log *FBFTLog) AddBlock(block *types.Block) {
log.blockLock.Lock()
defer log.blockLock.Unlock()
log.blocks[block.Hash()] = block
}
// MarkBlockVerified marks the block as verified
func (log *FBFTLog) MarkBlockVerified(block *types.Block) {
log.blockLock.Lock()
defer log.blockLock.Unlock()
log.verifiedBlocks[block.Hash()] = struct{}{}
}
// IsBlockVerified checks whether the block is verified
func (log *FBFTLog) IsBlockVerified(hash common.Hash) bool {
log.blockLock.RLock()
defer log.blockLock.RUnlock()
_, exist := log.verifiedBlocks[hash]
return exist
}
// GetBlockByHash returns the block matches the given block hash
func (log *FBFTLog) GetBlockByHash(hash common.Hash) *types.Block {
log.blockLock.RLock()
defer log.blockLock.RUnlock()
return log.blocks[hash]
}
// GetBlocksByNumber returns the blocks match the given block number
func (log *FBFTLog) GetBlocksByNumber(number uint64) []*types.Block {
log.blockLock.RLock()
defer log.blockLock.RUnlock()
var blocks []*types.Block
for _, block := range log.blocks {
if block.NumberU64() == number {
blocks = append(blocks, block)
}
}
return blocks
}
// DeleteBlocksLessThan deletes blocks less than given block number
func (log *FBFTLog) DeleteBlocksLessThan(number uint64) {
log.blockLock.Lock()
defer log.blockLock.Unlock()
for h, block := range log.blocks {
if block.NumberU64() < number {
delete(log.blocks, h)
delete(log.verifiedBlocks, h)
}
}
}
// DeleteBlockByNumber deletes block of specific number
func (log *FBFTLog) DeleteBlockByNumber(number uint64) {
log.blockLock.Lock()
defer log.blockLock.Unlock()
for h, block := range log.blocks {
if block.NumberU64() == number {
delete(log.blocks, h)
delete(log.verifiedBlocks, h)
}
}
}
// DeleteMessagesLessThan deletes messages less than given block number
func (log *FBFTLog) DeleteMessagesLessThan(number uint64) {
log.msgLock.Lock()
defer log.msgLock.Unlock()
for h, msg := range log.messages {
if msg.BlockNum < number {
delete(log.messages, h)
}
}
}
// AddVerifiedMessage adds a signature verified pbft message into the log
func (log *FBFTLog) AddVerifiedMessage(msg *FBFTMessage) {
log.msgLock.Lock()
defer log.msgLock.Unlock()
msg.Verified = true
log.messages[msg.id()] = msg
}
// AddNotVerifiedMessage adds a not signature verified pbft message into the log
func (log *FBFTLog) AddNotVerifiedMessage(msg *FBFTMessage) {
log.msgLock.Lock()
defer log.msgLock.Unlock()
msg.Verified = false
log.messages[msg.id()] = msg
}
// GetNotVerifiedCommittedMessages returns not verified committed pbft messages with matching blockNum, viewID and blockHash
func (log *FBFTLog) GetNotVerifiedCommittedMessages(blockNum uint64, viewID uint64, blockHash common.Hash) []*FBFTMessage {
log.msgLock.RLock()
defer log.msgLock.RUnlock()
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType == msg_pb.MessageType_COMMITTED && msg.BlockNum == blockNum && msg.ViewID == viewID && msg.BlockHash == blockHash && !msg.Verified {
found = append(found, msg)
}
}
return found
}
// GetMessagesByTypeSeqViewHash returns pbft messages with matching type, blockNum, viewID and blockHash
func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNum uint64, viewID uint64, blockHash common.Hash) []*FBFTMessage {
log.msgLock.RLock()
defer log.msgLock.RUnlock()
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType == typ && msg.BlockNum == blockNum && msg.ViewID == viewID && msg.BlockHash == blockHash && msg.Verified {
found = append(found, msg)
}
}
return found
}
// GetMessagesByTypeSeq returns pbft messages with matching type, blockNum
func (log *FBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64) []*FBFTMessage {
log.msgLock.RLock()
defer log.msgLock.RUnlock()
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType == typ && msg.BlockNum == blockNum && msg.Verified {
found = append(found, msg)
}
}
return found
}
// GetMessagesByTypeSeqHash returns pbft messages with matching type, blockNum
func (log *FBFTLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum uint64, blockHash common.Hash) []*FBFTMessage {
log.msgLock.RLock()
defer log.msgLock.RUnlock()
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType == typ && msg.BlockNum == blockNum && msg.BlockHash == blockHash && msg.Verified {
found = append(found, msg)
}
}
return found
}
// HasMatchingAnnounce returns whether the log contains announce type message with given blockNum, blockHash
func (log *FBFTLog) HasMatchingAnnounce(blockNum uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqHash(msg_pb.MessageType_ANNOUNCE, blockNum, blockHash)
return len(found) >= 1
}
// HasMatchingViewAnnounce returns whether the log contains announce type message with given blockNum, viewID and blockHash
func (log *FBFTLog) HasMatchingViewAnnounce(blockNum uint64, viewID uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqViewHash(msg_pb.MessageType_ANNOUNCE, blockNum, viewID, blockHash)
return len(found) >= 1
}
// HasMatchingPrepared returns whether the log contains prepared message with given blockNum, viewID and blockHash
func (log *FBFTLog) HasMatchingPrepared(blockNum uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqHash(msg_pb.MessageType_PREPARED, blockNum, blockHash)
return len(found) >= 1
}
// HasMatchingViewPrepared returns whether the log contains prepared message with given blockNum, viewID and blockHash
func (log *FBFTLog) HasMatchingViewPrepared(blockNum uint64, viewID uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqViewHash(msg_pb.MessageType_PREPARED, blockNum, viewID, blockHash)
return len(found) >= 1
}
// GetMessagesByTypeSeqView returns pbft messages with matching type, blockNum and viewID
func (log *FBFTLog) GetMessagesByTypeSeqView(typ msg_pb.MessageType, blockNum uint64, viewID uint64) []*FBFTMessage {
log.msgLock.RLock()
defer log.msgLock.RUnlock()
var found []*FBFTMessage
for _, msg := range log.messages {
if msg.MessageType != typ || msg.BlockNum != blockNum || msg.ViewID != viewID && msg.Verified {
continue
}
found = append(found, msg)
}
return found
}
// FindMessageByMaxViewID returns the message that has maximum ViewID
func (log *FBFTLog) FindMessageByMaxViewID(msgs []*FBFTMessage) *FBFTMessage {
if len(msgs) == 0 {
return nil
}
maxIdx := -1
maxViewID := uint64(0)
for k, v := range msgs {
if v.ViewID >= maxViewID {
maxIdx = k
maxViewID = v.ViewID
}
}
return msgs[maxIdx]
}
// ParseFBFTMessage parses FBFT message into FBFTMessage structure
func (consensus *Consensus) ParseFBFTMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
// TODO Have this do sanity checks on the message please
pbftMsg := FBFTMessage{}
pbftMsg.MessageType = msg.GetType()
consensusMsg := msg.GetConsensus()
pbftMsg.ViewID = consensusMsg.ViewId
pbftMsg.BlockNum = consensusMsg.BlockNum
copy(pbftMsg.BlockHash[:], consensusMsg.BlockHash[:])
pbftMsg.Payload = make([]byte, len(consensusMsg.Payload))
copy(pbftMsg.Payload[:], consensusMsg.Payload[:])
pbftMsg.Block = make([]byte, len(consensusMsg.Block))
copy(pbftMsg.Block[:], consensusMsg.Block[:])
pbftMsg.SenderPubkeyBitmap = make([]byte, len(consensusMsg.SenderPubkeyBitmap))
copy(pbftMsg.SenderPubkeyBitmap[:], consensusMsg.SenderPubkeyBitmap[:])
if len(consensusMsg.SenderPubkey) != 0 {
// If SenderPubKey is populated, treat it as a single key message
pubKey, err := bls_cosi.BytesToBLSPublicKey(consensusMsg.SenderPubkey)
if err != nil {
return nil, err
}
pbftMsg.SenderPubkeys = []*bls.PublicKeyWrapper{{Object: pubKey}}
copy(pbftMsg.SenderPubkeys[0].Bytes[:], consensusMsg.SenderPubkey[:])
} else {
// else, it should be a multi-key message where the bitmap is populated
consensus.multiSigMutex.RLock()
pubKeys, err := consensus.multiSigBitmap.GetSignedPubKeysFromBitmap(pbftMsg.SenderPubkeyBitmap)
consensus.multiSigMutex.RUnlock()
if err != nil {
return nil, err
}
pbftMsg.SenderPubkeys = pubKeys
}
return &pbftMsg, nil
}
var errFBFTLogNotFound = errors.New("FBFT log not found")
// GetCommittedBlockAndMsgsFromNumber get committed block and message starting from block number bn.
func (log *FBFTLog) GetCommittedBlockAndMsgsFromNumber(bn uint64, logger *zerolog.Logger) (*types.Block, *FBFTMessage, error) {
msgs := log.GetMessagesByTypeSeq(
msg_pb.MessageType_COMMITTED, bn,
)
if len(msgs) == 0 {
return nil, nil, errFBFTLogNotFound
}
if len(msgs) > 1 {
logger.Error().Int("numMsgs", len(msgs)).Err(errors.New("DANGER!!! multiple COMMITTED message in PBFT log observed"))
}
for i := range msgs {
block := log.GetBlockByHash(msgs[i].BlockHash)
if block == nil {
logger.Debug().
Uint64("blockNum", msgs[i].BlockNum).
Uint64("viewID", msgs[i].ViewID).
Str("blockHash", msgs[i].BlockHash.Hex()).
Err(errors.New("failed finding a matching block for committed message"))
continue
}
return block, msgs[i], nil
}
return nil, nil, errFBFTLogNotFound
}
// PruneCacheBeforeBlock prune all blocks before bn
func (log *FBFTLog) PruneCacheBeforeBlock(bn uint64) {
log.DeleteBlocksLessThan(bn - 1)
log.DeleteMessagesLessThan(bn - 1)
}