Skip to content

Commit

Permalink
[FAB-2641] Prevent loop while gossiping msgs
Browse files Browse the repository at this point in the history
Currently while gossip forward the message during distribution it
selects peers from the current membership view, while not filtering
the peer which delivered the message. As a consequence message might be
send to the source peer one again. This commit extends the peers
selection logic by adding the check which allows to avoid sending same
message to the source peer back again.

Change-Id: I76ceafba2335c27e1186f511d3cdab8a24d600af
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Nov 27, 2017
1 parent 34a1f3e commit eb437da
Show file tree
Hide file tree
Showing 10 changed files with 294 additions and 48 deletions.
9 changes: 9 additions & 0 deletions gossip/common/common.go
Expand Up @@ -6,6 +6,8 @@ SPDX-License-Identifier: Apache-2.0

package common

import "bytes"

func init() {
// This is just to satisfy the code coverage tool
// miss any methods
Expand All @@ -18,6 +20,13 @@ func init() {
// which is the security identifier of a peer
type PKIidType []byte

// IsNotSameFilter generate filter function which
// provides a predicate to identify whenever current id
// equals to another one.
func (this PKIidType) IsNotSameFilter(that PKIidType) bool {
return !bytes.Equal(this, that)
}

// MessageAcceptor is a predicate that is used to
// determine in which messages the subscriber that created the
// instance of the MessageAcceptor is interested in.
Expand Down
6 changes: 5 additions & 1 deletion gossip/discovery/discovery.go
Expand Up @@ -55,13 +55,17 @@ type CommService interface {
Ping(peer *NetworkMember) bool

// Accept returns a read-only channel for membership messages sent from remote peers
Accept() <-chan *proto.SignedGossipMessage
Accept() <-chan proto.ReceivedMessage

// PresumedDead returns a read-only channel for peers that are presumed to be dead
PresumedDead() <-chan common.PKIidType

// CloseConn orders to close the connection with a certain peer
CloseConn(peer *NetworkMember)

// Forward sends message to the next hop, excluding the hop
// from which message was initially received
Forward(msg proto.ReceivedMessage)
}

// NetworkMember is a peer's representation
Expand Down
8 changes: 4 additions & 4 deletions gossip/discovery/discovery_impl.go
Expand Up @@ -309,10 +309,11 @@ func (d *gossipDiscoveryImpl) handleMessages() {
}
}

func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
if m == nil {
func (d *gossipDiscoveryImpl) handleMsgFromComm(msg proto.ReceivedMessage) {
if msg == nil {
return
}
m := msg.GetGossipMessage()
if m.GetAliveMsg() == nil && m.GetMemRes() == nil && m.GetMemReq() == nil {
d.logger.Warning("Got message with wrong type (expected Alive or MembershipResponse or MembershipRequest message):", m.GossipMessage)
return
Expand Down Expand Up @@ -345,13 +346,12 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
}

if m.IsAliveMsg() {

if !d.msgStore.Add(m) {
return
}
d.handleAliveMessage(m)

d.comm.Gossip(m)
d.comm.Forward(msg)
return
}

Expand Down
91 changes: 81 additions & 10 deletions gossip/discovery/discovery_test.go
Expand Up @@ -43,6 +43,31 @@ func init() {
maxConnectionAttempts = 10000
}

type dummyReceivedMessage struct {
msg *proto.SignedGossipMessage
info *proto.ConnectionInfo
}

func (*dummyReceivedMessage) Respond(msg *proto.GossipMessage) {
panic("implement me")
}

func (rm *dummyReceivedMessage) GetGossipMessage() *proto.SignedGossipMessage {
return rm.msg
}

func (*dummyReceivedMessage) GetSourceEnvelope() *proto.Envelope {
panic("implement me")
}

func (rm *dummyReceivedMessage) GetConnectionInfo() *proto.ConnectionInfo {
return rm.info
}

func (*dummyReceivedMessage) Ack(err error) {
panic("implement me")
}

type dummyCommModule struct {
msgsReceived uint32
msgsSent uint32
Expand All @@ -52,7 +77,7 @@ type dummyCommModule struct {
streams map[string]proto.Gossip_GossipStreamClient
conns map[string]*grpc.ClientConn
lock *sync.RWMutex
incMsgs chan *proto.SignedGossipMessage
incMsgs chan proto.ReceivedMessage
lastSeqs map[string]uint64
shouldGossip bool
mock *mock.Mock
Expand Down Expand Up @@ -101,6 +126,17 @@ func (comm *dummyCommModule) Gossip(msg *proto.SignedGossipMessage) {
}
}

func (comm *dummyCommModule) Forward(msg proto.ReceivedMessage) {
if !comm.shouldGossip {
return
}
comm.lock.Lock()
defer comm.lock.Unlock()
for _, conn := range comm.streams {
conn.Send(msg.GetGossipMessage().Envelope)
}
}

func (comm *dummyCommModule) SendToPeer(peer *NetworkMember, msg *proto.SignedGossipMessage) {
comm.lock.RLock()
_, exists := comm.streams[peer.Endpoint]
Expand Down Expand Up @@ -152,7 +188,7 @@ func (comm *dummyCommModule) Ping(peer *NetworkMember) bool {
return true
}

func (comm *dummyCommModule) Accept() <-chan *proto.SignedGossipMessage {
func (comm *dummyCommModule) Accept() <-chan proto.ReceivedMessage {
return comm.incMsgs
}

Expand Down Expand Up @@ -217,7 +253,12 @@ func (g *gossipInstance) GossipStream(stream proto.Gossip_GossipStreamServer) er
}

lgr.Debug(g.Discovery.Self().Endpoint, "Got message:", gMsg)
g.comm.incMsgs <- gMsg
g.comm.incMsgs <- &dummyReceivedMessage{
msg: gMsg,
info: &proto.ConnectionInfo{
ID: common.PKIidType("testID"),
},
}
atomic.AddUint32(&g.comm.msgsReceived, 1)

if aliveMsg := gMsg.GetAliveMsg(); aliveMsg != nil {
Expand Down Expand Up @@ -294,7 +335,7 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st
comm := &dummyCommModule{
conns: make(map[string]*grpc.ClientConn),
streams: make(map[string]proto.Gossip_GossipStreamClient),
incMsgs: make(chan *proto.SignedGossipMessage, 1000),
incMsgs: make(chan proto.ReceivedMessage, 1000),
presumeDead: make(chan common.PKIidType, 10000),
id: id,
detectedDead: make(chan string, 10000),
Expand Down Expand Up @@ -364,7 +405,12 @@ func TestBadInput(t *testing.T) {
DataMsg: &proto.DataMessage{},
},
}).NoopSign()
inst.Discovery.(*gossipDiscoveryImpl).handleMsgFromComm(s)
inst.Discovery.(*gossipDiscoveryImpl).handleMsgFromComm(&dummyReceivedMessage{
msg: s,
info: &proto.ConnectionInfo{
ID: common.PKIidType("testID"),
},
})
}

func TestConnect(t *testing.T) {
Expand Down Expand Up @@ -944,7 +990,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {
// Handling Alive
for i := 0; i < peersNum; i++ {
for k := 0; k < peersNum; k++ {
instances[i].discoveryImpl().handleMsgFromComm(aliveMsgs[k])
instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{
msg: aliveMsgs[k],
info: &proto.ConnectionInfo{
ID: common.PKIidType(fmt.Sprintf("d%d", i)),
},
})
}
}

Expand Down Expand Up @@ -1006,7 +1057,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {
// Handling new Alive set
for i := 0; i < peersNum; i++ {
for k := 0; k < peersNum; k++ {
instances[i].discoveryImpl().handleMsgFromComm(newAliveMsgs[k])
instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{
msg: newAliveMsgs[k],
info: &proto.ConnectionInfo{
ID: common.PKIidType(fmt.Sprintf("d%d", i)),
},
})
}
}

Expand Down Expand Up @@ -1041,7 +1097,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {
return k == i
},
func(k int) {
instances[i].discoveryImpl().handleMsgFromComm(memReqMsgs[k])
instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{
msg: memReqMsgs[k],
info: &proto.ConnectionInfo{
ID: common.PKIidType(fmt.Sprintf("d%d", i)),
},
})
})
}

Expand All @@ -1053,7 +1114,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {
// Processing old (later) Alive messages
for i := 0; i < peersNum; i++ {
for k := 0; k < peersNum; k++ {
instances[i].discoveryImpl().handleMsgFromComm(aliveMsgs[k])
instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{
msg: aliveMsgs[k],
info: &proto.ConnectionInfo{
ID: common.PKIidType(fmt.Sprintf("d%d", i)),
},
})
}
}

Expand All @@ -1074,7 +1140,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {
MemRes: msg,
},
}).NoopSign()
instances[i].discoveryImpl().handleMsgFromComm(sMsg)
instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{
msg: sMsg,
info: &proto.ConnectionInfo{
ID: common.PKIidType(fmt.Sprintf("d%d", i)),
},
})
}
}

Expand Down
5 changes: 4 additions & 1 deletion gossip/gossip/channel/channel.go
Expand Up @@ -92,6 +92,9 @@ type Adapter interface {
// Gossip gossips a message in the channel
Gossip(message *proto.SignedGossipMessage)

// Forward sends a message to the next hops
Forward(message proto.ReceivedMessage)

// DeMultiplex de-multiplexes an item to subscribers
DeMultiplex(interface{})

Expand Down Expand Up @@ -541,7 +544,7 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {

if added {
// Forward the message
gc.Gossip(msg.GetGossipMessage())
gc.Forward(msg)
// DeMultiplex to local subscribers
gc.DeMultiplex(m)

Expand Down

0 comments on commit eb437da

Please sign in to comment.