Skip to content

Commit

Permalink
Merge "[FAB-3744] Gossip: only pull from peers in the same org"
Browse files Browse the repository at this point in the history
  • Loading branch information
mastersingh24 authored and Gerrit Code Review committed May 10, 2017
2 parents dd180b3 + 9ff8fc4 commit 9d6cec8
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 3 deletions.
13 changes: 10 additions & 3 deletions gossip/gossip/channel/channel.go
Expand Up @@ -149,7 +149,7 @@ type membershipFilter struct {
func (mf *membershipFilter) GetMembership() []discovery.NetworkMember {
var members []discovery.NetworkMember
for _, mem := range mf.adapter.GetMembership() {
if mf.EligibleForChannel(mem) {
if mf.eligibleForChannelAndSameOrg(mem) {
members = append(members, mem)
}
}
Expand Down Expand Up @@ -252,6 +252,13 @@ func (gc *gossipChannel) requestStateInfo() {
gc.Send(req, endpoints...)
}

func (gc *gossipChannel) eligibleForChannelAndSameOrg(member discovery.NetworkMember) bool {
sameOrg := func(networkMember discovery.NetworkMember) bool {
return bytes.Equal(gc.GetOrgOfPeer(networkMember.PKIid), gc.selfOrg)
}
return filter.CombineRoutingFilters(gc.EligibleForChannel, sameOrg)(member)
}

func (gc *gossipChannel) publishStateInfo() {
if atomic.LoadInt32(&gc.shouldGossipStateInfo) == int32(0) {
return
Expand Down Expand Up @@ -430,8 +437,8 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
return
}
if m.IsPullMsg() && m.GetPullMsgType() == proto.PullMsgType_BLOCK_MSG {
if !gc.EligibleForChannel(discovery.NetworkMember{PKIid: msg.GetConnectionInfo().ID}) {
gc.logger.Warning(msg.GetConnectionInfo().ID, "isn't eligible for channel", string(gc.chainID))
if !gc.eligibleForChannelAndSameOrg(discovery.NetworkMember{PKIid: msg.GetConnectionInfo().ID}) {
gc.logger.Warning(msg.GetConnectionInfo().ID, "isn't eligible for pulling blocks of", string(gc.chainID))
return
}
if m.IsDataUpdate() {
Expand Down
91 changes: 91 additions & 0 deletions gossip/gossip/channel/channel_test.go
Expand Up @@ -445,6 +445,97 @@ func TestChannelPull(t *testing.T) {
}
}

func TestChannelPullAccessControl(t *testing.T) {
t.Parallel()
// Scenario: We have 2 organizations in the channel: ORG1, ORG2
// The "acting peer" is from ORG1 and peers "1", "2", "3" are from
// the following organizations:
// ORG1: "1"
// ORG2: "2", "3"
// We test 2 cases:
// 1) We don't respond for Hello messages from peers in foreign organizations
// 2) We don't select peers from foreign organizations when doing pull

cs := &cryptoService{}
adapter := new(gossipAdapterMock)
cs.Mock = mock.Mock{}
cs.On("VerifyBlock", mock.Anything).Return(nil)

pkiID1 := common.PKIidType("1")
pkiID2 := common.PKIidType("2")
pkiID3 := common.PKIidType("3")

peer1 := discovery.NetworkMember{PKIid: pkiID1, InternalEndpoint: "1", Endpoint: "1"}
peer2 := discovery.NetworkMember{PKIid: pkiID2, InternalEndpoint: "2", Endpoint: "2"}
peer3 := discovery.NetworkMember{PKIid: pkiID3, InternalEndpoint: "3", Endpoint: "3"}

adapter.On("GetOrgOfPeer", pkiIDInOrg1).Return(api.OrgIdentityType("ORG1"))
adapter.On("GetOrgOfPeer", pkiID1).Return(api.OrgIdentityType("ORG1"))
adapter.On("GetOrgOfPeer", pkiID2).Return(api.OrgIdentityType("ORG2"))
adapter.On("GetOrgOfPeer", pkiID3).Return(api.OrgIdentityType("ORG2"))

adapter.On("GetMembership").Return([]discovery.NetworkMember{peer1, peer2, peer3})
adapter.On("DeMultiplex", mock.Anything)
adapter.On("Gossip", mock.Anything)
adapter.On("GetConf").Return(conf)

sentHello := int32(0)
adapter.On("Send", mock.Anything, mock.Anything).Run(func(arg mock.Arguments) {
msg := arg.Get(0).(*proto.SignedGossipMessage)
if !msg.IsHelloMsg() {
return
}
atomic.StoreInt32(&sentHello, int32(1))
peerID := string(arg.Get(1).([]*comm.RemotePeer)[0].PKIID)
assert.Equal(t, "1", peerID)
assert.NotEqual(t, "2", peerID, "Sent hello to peer 2 but it's in a different org")
assert.NotEqual(t, "3", peerID, "Sent hello to peer 3 but it's in a different org")
})

jcm := &joinChanMsg{
members2AnchorPeers: map[string][]api.AnchorPeer{
"ORG1": {},
"ORG2": {},
},
}
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, jcm)
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(100, pkiIDInOrg1, channelA)})
gc.HandleMessage(&receivedMsg{PKIID: pkiID1, msg: createStateInfoMsg(100, pkiID1, channelA)})
gc.HandleMessage(&receivedMsg{PKIID: pkiID2, msg: createStateInfoMsg(100, pkiID2, channelA)})
gc.HandleMessage(&receivedMsg{PKIID: pkiID3, msg: createStateInfoMsg(100, pkiID3, channelA)})

respondedChan := make(chan *proto.GossipMessage, 1)
messageRelayer := func(arg mock.Arguments) {
msg := arg.Get(0).(*proto.GossipMessage)
respondedChan <- msg
}

gc.HandleMessage(&receivedMsg{msg: dataMsgOfChannel(5, channelA), PKIID: pkiIDInOrg1})

helloMsg := createHelloMsg(pkiID1)
helloMsg.On("Respond", mock.Anything).Run(messageRelayer)
go gc.HandleMessage(helloMsg)
select {
case <-respondedChan:
case <-time.After(time.Second):
assert.Fail(t, "Didn't reply to a hello within a timely manner")
}

helloMsg = createHelloMsg(pkiID2)
helloMsg.On("Respond", mock.Anything).Run(messageRelayer)
go gc.HandleMessage(helloMsg)
select {
case <-respondedChan:
assert.Fail(t, "Shouldn't have replied to a hello, because the peer is from a foreign org")
case <-time.After(time.Second):
}

// Sleep a bit to let the gossip channel send out its hello messages
time.Sleep(time.Second * 3)
// Make sure we sent at least 1 hello message, otherwise the test passed vacuously
assert.Equal(t, int32(1), atomic.LoadInt32(&sentHello))
}

func TestChannelPeerNotInChannel(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 9d6cec8

Please sign in to comment.