Skip to content

Commit

Permalink
[FAB-18208] Do not sign gossip message if membership is empty
Browse files Browse the repository at this point in the history
This change set makes gossip discovery and channel modules only sign (and gossip)
alive messages and state info messages if the membership is non empty.

Change-Id: Id4de507cdccaa5860ba4b67cf059defc98545bd9
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm authored and C0rWin committed Sep 10, 2020
1 parent 65d0b62 commit cd6ce4a
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 60 deletions.
31 changes: 21 additions & 10 deletions gossip/discovery/discovery_impl.go
Expand Up @@ -230,16 +230,7 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {
return
}
var peers2SendTo []*NetworkMember
m, err := d.createMembershipRequest(true)
if err != nil {
d.logger.Warningf("Failed creating membership request: %+v", errors.WithStack(err))
return
}
memReq, err := protoext.NoopSign(m)
if err != nil {
d.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
return
}

d.lock.RLock()

n := d.aliveMembership.Size()
Expand All @@ -266,6 +257,22 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {

d.lock.RUnlock()

if len(peers2SendTo) == 0 {
d.logger.Debugf("No peers to send to, aborting membership sync")
return
}

m, err := d.createMembershipRequest(true)
if err != nil {
d.logger.Warningf("Failed creating membership request: %+v", errors.WithStack(err))
return
}
memReq, err := protoext.NoopSign(m)
if err != nil {
d.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
return
}

for _, netMember := range peers2SendTo {
d.comm.SendToPeer(netMember, memReq)
}
Expand Down Expand Up @@ -755,6 +762,10 @@ func (d *gossipDiscoveryImpl) periodicalSendAlive() {
for !d.toDie() {
d.logger.Debug("Sleeping", d.aliveTimeInterval)
time.Sleep(d.aliveTimeInterval)
if d.aliveMembership.Size() == 0 {
d.logger.Debugf("Empty membership, no one to send a heartbeat to")
continue
}
msg, err := d.createSignedAliveMessage(true)
if err != nil {
d.logger.Warningf("Failed creating alive message: %+v", errors.WithStack(err))
Expand Down
37 changes: 28 additions & 9 deletions gossip/discovery/discovery_test.go
Expand Up @@ -104,6 +104,7 @@ type dummyCommModule struct {
shouldGossip bool
disableComm bool
mock *mock.Mock
signCount uint32
}

type gossipInstance struct {
Expand Down Expand Up @@ -140,6 +141,7 @@ func (comm *dummyCommModule) recordValidation(validatedMessages chan *protoext.S
}

func (comm *dummyCommModule) SignMessage(am *proto.GossipMessage, internalEndpoint string) *proto.Envelope {
atomic.AddUint32(&comm.signCount, 1)
protoext.NoopSign(am)

secret := &proto.Secret{
Expand Down Expand Up @@ -586,6 +588,18 @@ func TestConnect(t *testing.T) {
}
}

func TestNoSigningIfNoMembership(t *testing.T) {
t.Parallel()

inst := createDiscoveryInstance(8931, "foreveralone", nil)
defer inst.Stop()
time.Sleep(defaultTestConfig.AliveTimeInterval * 10)
assert.Zero(t, atomic.LoadUint32(&inst.comm.signCount))

inst.InitiateSync(10000)
assert.Zero(t, atomic.LoadUint32(&inst.comm.signCount))
}

func TestValidation(t *testing.T) {
// Scenarios: This test contains the following sub-tests:
// 1) alive message validation: a message is validated <==> it entered the message store
Expand Down Expand Up @@ -1588,23 +1602,28 @@ func TestMemRespDisclosurePol(t *testing.T) {
pol := func(remotePeer *NetworkMember) (Sieve, EnvelopeFilter) {
assert.Equal(t, remotePeer.InternalEndpoint, remotePeer.Endpoint)
return func(_ *protoext.SignedGossipMessage) bool {
return remotePeer.Endpoint == "localhost:7880"
return remotePeer.Endpoint != "localhost:7879"
}, func(m *protoext.SignedGossipMessage) *proto.Envelope {
return m.Envelope
}
}

wasMembershipResponseReceived := func(msg *protoext.SignedGossipMessage) {
assert.Nil(t, msg.GetMemRes())
}

d1 := createDiscoveryInstanceThatGossips(7878, "d1", []string{}, true, pol, defaultTestConfig)
defer d1.Stop()
d2 := createDiscoveryInstanceThatGossips(7879, "d2", []string{"localhost:7878"}, true, noopPolicy, defaultTestConfig)
d2 := createDiscoveryInstanceThatGossipsWithInterceptors(7879, "d2", []string{"localhost:7878"}, true, noopPolicy, wasMembershipResponseReceived, defaultTestConfig)
defer d2.Stop()
d3 := createDiscoveryInstanceThatGossips(7880, "d3", []string{"localhost:7878"}, true, noopPolicy, defaultTestConfig)
d3 := createDiscoveryInstanceThatGossips(7880, "d3", []string{"localhost:7878"}, true, pol, defaultTestConfig)
defer d3.Stop()
// Both d1 and d3 know each other, and also about d2
assertMembership(t, []*gossipInstance{d1, d3}, 2)
// d2 doesn't know about any one because the bootstrap peer is ignoring it due to custom policy
assertMembership(t, []*gossipInstance{d2}, 0)
require.Zero(t, d2.receivedMsgCount())
require.NotZero(t, d2.sentMsgCount())

// all peers know each other
assertMembership(t, []*gossipInstance{d1, d2, d3}, 2)
// d2 received some messages, but we asserted that none of them are membership responses.
assert.NotZero(t, d2.receivedMsgCount())
assert.NotZero(t, d2.sentMsgCount())
}

func TestMembersByID(t *testing.T) {
Expand Down
65 changes: 46 additions & 19 deletions gossip/gossip/channel/channel.go
Expand Up @@ -145,7 +145,8 @@ type gossipChannel struct {
pkiID common.PKIidType
selfOrg api.OrgIdentityType
stopChan chan struct{}
stateInfoMsg *protoext.SignedGossipMessage
selfStateInfoMsg *proto.GossipMessage
selfStateInfoSignedMsg *protoext.SignedGossipMessage
orgs []api.OrgIdentityType
joinMsg api.JoinChannelMessage
blockMsgStore msgstore.MessageStore
Expand Down Expand Up @@ -272,6 +273,11 @@ func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.M
}
gc.stateInfoMsgStore = newStateInfoCache(gc.GetConf().StateInfoCacheSweepInterval, hashPeerExpiredInMembership, verifyStateInfoMsg)

// Setup a plain state info message at startup, just to have all required fields populated
// when this gossip channel is created
gc.updateProperties(1, nil, false)
gc.setupSignedStateInfoMessage()

ttl := adapter.GetConf().MsgExpirationTimeout
pol := protoext.NewGossipMessageComparator(0)

Expand Down Expand Up @@ -331,7 +337,7 @@ func (gc *gossipChannel) periodicalInvocation(fn func(), c <-chan time.Time) {
func (gc *gossipChannel) Self() *protoext.SignedGossipMessage {
gc.RLock()
defer gc.RUnlock()
return gc.stateInfoMsg
return gc.selfStateInfoSignedMsg
}

// LeaveChannel makes the peer leave the channel
Expand All @@ -343,11 +349,12 @@ func (gc *gossipChannel) LeaveChannel() {

var chaincodes []*proto.Chaincode
var height uint64
if prevMsg := gc.stateInfoMsg; prevMsg != nil {
if prevMsg := gc.selfStateInfoMsg; prevMsg != nil {
chaincodes = prevMsg.GetStateInfo().Properties.Chaincodes
height = prevMsg.GetStateInfo().Properties.LedgerHeight
}
gc.updateProperties(height, chaincodes, true)
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
}

func (gc *gossipChannel) hasLeftChannel() bool {
Expand Down Expand Up @@ -401,13 +408,38 @@ func (gc *gossipChannel) publishStateInfo() {
if atomic.LoadInt32(&gc.shouldGossipStateInfo) == int32(0) {
return
}

if len(gc.GetMembership()) == 0 {
gc.logger.Debugf("Empty membership, no one to publish state info to")
return
}

stateInfoMsg, err := gc.setupSignedStateInfoMessage()
if err != nil {
gc.logger.Errorf("Failed creating signed state info message: %v", err)
return
}
gc.stateInfoMsgStore.Add(stateInfoMsg)
gc.Gossip(stateInfoMsg)
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(0))
}

func (gc *gossipChannel) setupSignedStateInfoMessage() (*protoext.SignedGossipMessage, error) {
gc.RLock()
stateInfoMsg := gc.stateInfoMsg
msg := gc.selfStateInfoMsg
gc.RUnlock()
gc.Gossip(stateInfoMsg)
if len(gc.GetMembership()) > 0 {
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(0))

stateInfoMsg, err := gc.Sign(msg)
if err != nil {
gc.logger.Error("Failed signing message:", err)
return nil, err
}

gc.Lock()
gc.selfStateInfoSignedMsg = stateInfoMsg
gc.Unlock()

return stateInfoMsg, nil
}

func (gc *gossipChannel) createBlockPuller() pull.Mediator {
Expand Down Expand Up @@ -885,11 +917,12 @@ func (gc *gossipChannel) UpdateLedgerHeight(height uint64) {

var chaincodes []*proto.Chaincode
var leftChannel bool
if prevMsg := gc.stateInfoMsg; prevMsg != nil {
if prevMsg := gc.selfStateInfoMsg; prevMsg != nil {
leftChannel = prevMsg.GetStateInfo().Properties.LeftChannel
chaincodes = prevMsg.GetStateInfo().Properties.Chaincodes
}
gc.updateProperties(height, chaincodes, leftChannel)
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
}

// UpdateChaincodes updates the chaincodes the peer publishes
Expand All @@ -900,20 +933,19 @@ func (gc *gossipChannel) UpdateChaincodes(chaincodes []*proto.Chaincode) {

var ledgerHeight uint64 = 1
var leftChannel bool
if prevMsg := gc.stateInfoMsg; prevMsg != nil {
if prevMsg := gc.selfStateInfoMsg; prevMsg != nil {
ledgerHeight = prevMsg.GetStateInfo().Properties.LedgerHeight
leftChannel = prevMsg.GetStateInfo().Properties.LeftChannel
}
gc.updateProperties(ledgerHeight, chaincodes, leftChannel)
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
}

// UpdateStateInfo updates this channel's StateInfo message
// that is periodically published
func (gc *gossipChannel) updateStateInfo(msg *protoext.SignedGossipMessage) {
gc.stateInfoMsgStore.Add(msg)
func (gc *gossipChannel) updateStateInfo(msg *proto.GossipMessage) {
gc.ledgerHeight = msg.GetStateInfo().Properties.LedgerHeight
gc.stateInfoMsg = msg
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
gc.selfStateInfoMsg = msg
}

func (gc *gossipChannel) updateProperties(ledgerHeight uint64, chaincodes []*proto.Chaincode, leftChannel bool) {
Expand All @@ -938,12 +970,7 @@ func (gc *gossipChannel) updateProperties(ledgerHeight uint64, chaincodes []*pro
},
}

msg, err := gc.Sign(m)
if err != nil {
gc.logger.Error("Failed signing message:", err)
return
}
gc.updateStateInfo(msg)
gc.updateStateInfo(m)
}

func newStateInfoCache(sweepInterval time.Duration, hasExpired func(interface{}) bool, verifyFunc membershipPredicate) *stateInfoCache {
Expand Down

0 comments on commit cd6ce4a

Please sign in to comment.