forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 11
/
msgcomparator.go
125 lines (98 loc) · 3.93 KB
/
msgcomparator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package protoext
import (
"bytes"
"github.com/hyperledger/fabric-protos-go/gossip"
"github.com/hyperledger/fabric/gossip/common"
)
// NewGossipMessageComparator creates a MessageReplacingPolicy given a maximum number of blocks to hold
func NewGossipMessageComparator(dataBlockStorageSize int) common.MessageReplacingPolicy {
return (&msgComparator{dataBlockStorageSize: dataBlockStorageSize}).getMsgReplacingPolicy()
}
type msgComparator struct {
dataBlockStorageSize int
}
func (mc *msgComparator) getMsgReplacingPolicy() common.MessageReplacingPolicy {
return func(this interface{}, that interface{}) common.InvalidationResult {
return mc.invalidationPolicy(this, that)
}
}
func (mc *msgComparator) invalidationPolicy(this interface{}, that interface{}) common.InvalidationResult {
thisMsg := this.(*SignedGossipMessage)
thatMsg := that.(*SignedGossipMessage)
if IsAliveMsg(thisMsg.GossipMessage) && IsAliveMsg(thatMsg.GossipMessage) {
return aliveInvalidationPolicy(thisMsg.GetAliveMsg(), thatMsg.GetAliveMsg())
}
if IsDataMsg(thisMsg.GossipMessage) && IsDataMsg(thatMsg.GossipMessage) {
return mc.dataInvalidationPolicy(thisMsg.GetDataMsg(), thatMsg.GetDataMsg())
}
if IsStateInfoMsg(thisMsg.GossipMessage) && IsStateInfoMsg(thatMsg.GossipMessage) {
return mc.stateInvalidationPolicy(thisMsg.GetStateInfo(), thatMsg.GetStateInfo())
}
if IsIdentityMsg(thisMsg.GossipMessage) && IsIdentityMsg(thatMsg.GossipMessage) {
return mc.identityInvalidationPolicy(thisMsg.GetPeerIdentity(), thatMsg.GetPeerIdentity())
}
if IsLeadershipMsg(thisMsg.GossipMessage) && IsLeadershipMsg(thatMsg.GossipMessage) {
return leaderInvalidationPolicy(thisMsg.GetLeadershipMsg(), thatMsg.GetLeadershipMsg())
}
return common.MessageNoAction
}
func (mc *msgComparator) stateInvalidationPolicy(thisStateMsg *gossip.StateInfo, thatStateMsg *gossip.StateInfo) common.InvalidationResult {
if !bytes.Equal(thisStateMsg.PkiId, thatStateMsg.PkiId) {
return common.MessageNoAction
}
return compareTimestamps(thisStateMsg.Timestamp, thatStateMsg.Timestamp)
}
func (mc *msgComparator) identityInvalidationPolicy(thisIdentityMsg *gossip.PeerIdentity, thatIdentityMsg *gossip.PeerIdentity) common.InvalidationResult {
if bytes.Equal(thisIdentityMsg.PkiId, thatIdentityMsg.PkiId) {
return common.MessageInvalidated
}
return common.MessageNoAction
}
func (mc *msgComparator) dataInvalidationPolicy(thisDataMsg *gossip.DataMessage, thatDataMsg *gossip.DataMessage) common.InvalidationResult {
if thisDataMsg.Payload.SeqNum == thatDataMsg.Payload.SeqNum {
return common.MessageInvalidated
}
diff := abs(thisDataMsg.Payload.SeqNum, thatDataMsg.Payload.SeqNum)
if diff <= uint64(mc.dataBlockStorageSize) {
return common.MessageNoAction
}
if thisDataMsg.Payload.SeqNum > thatDataMsg.Payload.SeqNum {
return common.MessageInvalidates
}
return common.MessageInvalidated
}
func aliveInvalidationPolicy(thisMsg *gossip.AliveMessage, thatMsg *gossip.AliveMessage) common.InvalidationResult {
if !bytes.Equal(thisMsg.Membership.PkiId, thatMsg.Membership.PkiId) {
return common.MessageNoAction
}
return compareTimestamps(thisMsg.Timestamp, thatMsg.Timestamp)
}
func leaderInvalidationPolicy(thisMsg *gossip.LeadershipMessage, thatMsg *gossip.LeadershipMessage) common.InvalidationResult {
if !bytes.Equal(thisMsg.PkiId, thatMsg.PkiId) {
return common.MessageNoAction
}
return compareTimestamps(thisMsg.Timestamp, thatMsg.Timestamp)
}
func compareTimestamps(thisTS *gossip.PeerTime, thatTS *gossip.PeerTime) common.InvalidationResult {
if thisTS.IncNum == thatTS.IncNum {
if thisTS.SeqNum > thatTS.SeqNum {
return common.MessageInvalidates
}
return common.MessageInvalidated
}
if thisTS.IncNum < thatTS.IncNum {
return common.MessageInvalidated
}
return common.MessageInvalidates
}
// abs returns abs(a-b)
func abs(a, b uint64) uint64 {
if a > b {
return a - b
}
return b - a
}