Skip to content

Commit

Permalink
[FAB-14691] Add to msgStore and puller atomically
Browse files Browse the repository at this point in the history
When adding a block to the message store, if it is added and
not rejected - it is then added to the block puller.

When the block is removed from the message store, a callback
is triggered to remove the block from the block puller.

However, these 2 operations are not atomic.

Since we can add a block to the message store from both AddToMsgStore
(which is invoked by Gossip() ) and from HandleMessage,
we can have the following schedule:

  1) A block with sequence of 100 is gossiped by the upper
    layer of the peer, and AddToMsgStore is called, which
    adds the block to the message store, and the CPU is preempted.
  2) A block message 210 is received via HandleMessage and it causes
     the block 100 to be evicted from the message store, and the
    callback to remove the message from the block puller is called,
    but it is not removed because it is not there yet.
  3) The block 210 is added to the block puller, since it was added
     to the message store.
  4) The CPU is back to perform AddToMsgStore, and adds block 100
     to the block puller.

Now the block puller has block 100, and the message store doesn't
have block 100 anymore - which means it will never be evicted from
the block puller.

To prevent this we need to make these 2 operations be atomic.

Change-Id: I3b7d0d013ce8da5d9a0e40f8b0cdbc3edaed22c9
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Mar 18, 2019
1 parent 7192430 commit 6be63e8
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions gossip/gossip/channel/channel.go
Expand Up @@ -456,6 +456,8 @@ func (gc *gossipChannel) EligibleForChannel(member discovery.NetworkMember) bool
// AddToMsgStore adds a given GossipMessage to the message store
func (gc *gossipChannel) AddToMsgStore(msg *proto.SignedGossipMessage) {
if msg.IsDataMsg() {
gc.Lock()
defer gc.Unlock()
added := gc.blockMsgStore.Add(msg)
if added {
gc.logger.Debugf("Adding %v to the block puller", msg)
Expand Down Expand Up @@ -541,7 +543,13 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
gc.logger.Warning("Failed verifying block", m.GetDataMsg().Payload.SeqNum)
return
}
gc.Lock()
added = gc.blockMsgStore.Add(msg.GetGossipMessage())
if added {
gc.logger.Debugf("Adding %v to the block puller", msg.GetGossipMessage())
gc.blocksPuller.Add(msg.GetGossipMessage())
}
gc.Unlock()
} else { // StateInfoMsg verification should be handled in a layer above
// since we don't have access to the id mapper here
added = gc.stateInfoMsgStore.Add(msg.GetGossipMessage())
Expand All @@ -552,11 +560,6 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
gc.Forward(msg)
// DeMultiplex to local subscribers
gc.DeMultiplex(m)

if m.IsDataMsg() {
gc.logger.Debugf("Adding %v to the block puller", msg.GetGossipMessage())
gc.blocksPuller.Add(msg.GetGossipMessage())
}
}
return
}
Expand All @@ -580,6 +583,8 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
// Iterate over the envelopes, and filter out blocks
// that we already have in the blockMsgStore, or blocks that
// are too far in the past.
var msgs []*proto.SignedGossipMessage
var items []*proto.Envelope
filteredEnvelopes := []*proto.Envelope{}
for _, item := range m.GetDataUpdate().Data {
gMsg, err := item.ToGossipMessage()
Expand All @@ -598,6 +603,15 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
if !gc.verifyBlock(gMsg.GossipMessage, msg.GetConnectionInfo().ID) {
return
}
msgs = append(msgs, gMsg)
items = append(items, item)
}

gc.Lock()
defer gc.Unlock()

for i, gMsg := range msgs {
item := items[i]
added := gc.blockMsgStore.Add(gMsg)
if !added {
// If this block doesn't need to be added, it means it either already
Expand All @@ -606,6 +620,7 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
}
filteredEnvelopes = append(filteredEnvelopes, item)
}

// Replace the update message with just the blocks that should be processed
m.GetDataUpdate().Data = filteredEnvelopes
}
Expand Down

0 comments on commit 6be63e8

Please sign in to comment.