@@ -112,9 +112,13 @@ type Options struct {
112
112
MaxSizePerMsg uint64
113
113
MaxInflightMsgs int
114
114
115
+ // BlockMetdata and Consenters should only be modified while under lock
116
+ // of raftMetadataLock
115
117
BlockMetadata * etcdraft.BlockMetadata
116
- Metrics * Metrics
117
- Cert []byte
118
+ Consenters map [uint64 ]* etcdraft.Consenter
119
+
120
+ Metrics * Metrics
121
+ Cert []byte
118
122
119
123
EvictionSuspicion time.Duration
120
124
LeaderCheckInterval time.Duration
@@ -313,7 +317,7 @@ func (c *Chain) MigrationStatus() migration.Status {
313
317
func (c * Chain ) Start () {
314
318
c .logger .Infof ("Starting Raft node" )
315
319
316
- c .Metrics .ClusterSize .Set (float64 (len (c .opts .BlockMetadata .Consenters )))
320
+ c .Metrics .ClusterSize .Set (float64 (len (c .opts .BlockMetadata .ConsenterIds )))
317
321
// all nodes start out as followers
318
322
c .Metrics .IsLeader .Set (float64 (0 ))
319
323
if err := c .configureComm (); err != nil {
@@ -719,7 +723,7 @@ func (c *Chain) serveRequest() {
719
723
select {
720
724
case <- c .errorC :
721
725
default :
722
- nodeCount := len (c .opts .BlockMetadata .Consenters )
726
+ nodeCount := len (c .opts .BlockMetadata .ConsenterIds )
723
727
// Only close the error channel (to signal the broadcast/deliver front-end a consensus backend error)
724
728
// If we are a cluster of size 3 or more, otherwise we can't expand a cluster of size 1 to 2 nodes.
725
729
if nodeCount > 2 {
@@ -945,6 +949,7 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
945
949
946
950
c .raftMetadataLock .Lock ()
947
951
c .opts .BlockMetadata = configMembership .NewBlockMetadata
952
+ c .opts .Consenters = configMembership .NewConsenters
948
953
c .raftMetadataLock .Unlock ()
949
954
950
955
if err := c .configureComm (); err != nil {
@@ -978,7 +983,7 @@ func (c *Chain) detectConfChange(block *common.Block) *MembershipChanges {
978
983
c .logger .Infof ("Snapshot interval is updated to %d bytes (was %d)" , c .sizeLimit , old )
979
984
}
980
985
981
- changes , err := ComputeMembershipChanges (c .opts .BlockMetadata , configMetadata .Consenters )
986
+ changes , err := ComputeMembershipChanges (c .opts .BlockMetadata , c . opts . Consenters , configMetadata .Consenters )
982
987
if err != nil {
983
988
c .logger .Panicf ("illegal configuration change detected: %s" , err )
984
989
}
@@ -1054,7 +1059,7 @@ func (c *Chain) apply(ents []raftpb.Entry) {
1054
1059
c .confChangeInProgress = nil
1055
1060
c .configInflight = false
1056
1061
// report the new cluster size
1057
- c .Metrics .ClusterSize .Set (float64 (len (c .opts .BlockMetadata .Consenters )))
1062
+ c .Metrics .ClusterSize .Set (float64 (len (c .opts .BlockMetadata .ConsenterIds )))
1058
1063
}
1059
1064
1060
1065
if cc .Type == raftpb .ConfChangeRemoveNode && cc .NodeID == c .raftID {
@@ -1130,7 +1135,7 @@ func (c *Chain) configureComm() error {
1130
1135
1131
1136
func (c * Chain ) remotePeers () ([]cluster.RemoteNode , error ) {
1132
1137
var nodes []cluster.RemoteNode
1133
- for raftID , consenter := range c .opts .BlockMetadata . Consenters {
1138
+ for raftID , consenter := range c .opts .Consenters {
1134
1139
// No need to know yourself
1135
1140
if raftID == c .raftID {
1136
1141
continue
@@ -1175,7 +1180,7 @@ func (c *Chain) checkConsentersSet(configValue *common.ConfigValue) error {
1175
1180
}
1176
1181
1177
1182
c .raftMetadataLock .RLock ()
1178
- _ , err = ComputeMembershipChanges (c .opts .BlockMetadata , updatedMetadata .Consenters )
1183
+ _ , err = ComputeMembershipChanges (c .opts .BlockMetadata , c . opts . Consenters , updatedMetadata .Consenters )
1179
1184
c .raftMetadataLock .RUnlock ()
1180
1185
1181
1186
return err
@@ -1196,11 +1201,12 @@ func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
1196
1201
case common .HeaderType_CONFIG :
1197
1202
configMembership := c .detectConfChange (block )
1198
1203
1204
+ c .opts .BlockMetadata .RaftIndex = index
1199
1205
c .raftMetadataLock .Lock ()
1200
1206
if configMembership != nil {
1201
1207
c .opts .BlockMetadata = configMembership .NewBlockMetadata
1208
+ c .opts .Consenters = configMembership .NewConsenters
1202
1209
}
1203
- c .opts .BlockMetadata .RaftIndex = index
1204
1210
c .raftMetadataLock .Unlock ()
1205
1211
1206
1212
blockMetadataBytes := protoutil .MarshalOrPanic (c .opts .BlockMetadata )
@@ -1285,7 +1291,7 @@ func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
1285
1291
// extracting current Raft configuration state
1286
1292
confState := c .Node .ApplyConfChange (raftpb.ConfChange {})
1287
1293
1288
- if len (confState .Nodes ) == len (c .opts .BlockMetadata .Consenters ) {
1294
+ if len (confState .Nodes ) == len (c .opts .BlockMetadata .ConsenterIds ) {
1289
1295
// since configuration change could only add one node or
1290
1296
// remove one node at a time, if raft nodes state size
1291
1297
// equal to membership stored in block metadata field,
0 commit comments