From cd6ce4a85838dfe109d490c2bccfbfcabe65bd85 Mon Sep 17 00:00:00 2001 From: yacovm Date: Wed, 5 Aug 2020 14:36:22 +0300 Subject: [PATCH] [FAB-18208] Do not sign gossip message if membership is empty This change set makes gossip discovery and channel modules only sign (and gossip) alive messages and state info messages if the membership is non empty. Change-Id: Id4de507cdccaa5860ba4b67cf059defc98545bd9 Signed-off-by: yacovm --- gossip/discovery/discovery_impl.go | 31 ++++--- gossip/discovery/discovery_test.go | 37 ++++++--- gossip/gossip/channel/channel.go | 65 ++++++++++----- gossip/gossip/channel/channel_test.go | 111 +++++++++++++++++++++----- 4 files changed, 184 insertions(+), 60 deletions(-) diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index 375cedb9a8a..1da96254ea4 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -230,16 +230,7 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) { return } var peers2SendTo []*NetworkMember - m, err := d.createMembershipRequest(true) - if err != nil { - d.logger.Warningf("Failed creating membership request: %+v", errors.WithStack(err)) - return - } - memReq, err := protoext.NoopSign(m) - if err != nil { - d.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err)) - return - } + d.lock.RLock() n := d.aliveMembership.Size() @@ -266,6 +257,22 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) { d.lock.RUnlock() + if len(peers2SendTo) == 0 { + d.logger.Debugf("No peers to send to, aborting membership sync") + return + } + + m, err := d.createMembershipRequest(true) + if err != nil { + d.logger.Warningf("Failed creating membership request: %+v", errors.WithStack(err)) + return + } + memReq, err := protoext.NoopSign(m) + if err != nil { + d.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err)) + return + } + for _, netMember := range peers2SendTo { d.comm.SendToPeer(netMember, memReq) } @@ -755,6 +762,10 @@ func (d *gossipDiscoveryImpl) periodicalSendAlive() { for !d.toDie() { d.logger.Debug("Sleeping", d.aliveTimeInterval) time.Sleep(d.aliveTimeInterval) + if d.aliveMembership.Size() == 0 { + d.logger.Debugf("Empty membership, no one to send a heartbeat to") + continue + } msg, err := d.createSignedAliveMessage(true) if err != nil { d.logger.Warningf("Failed creating alive message: %+v", errors.WithStack(err)) diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index 2e7e72cde8e..04e9441bc6c 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -104,6 +104,7 @@ type dummyCommModule struct { shouldGossip bool disableComm bool mock *mock.Mock + signCount uint32 } type gossipInstance struct { @@ -140,6 +141,7 @@ func (comm *dummyCommModule) recordValidation(validatedMessages chan *protoext.S } func (comm *dummyCommModule) SignMessage(am *proto.GossipMessage, internalEndpoint string) *proto.Envelope { + atomic.AddUint32(&comm.signCount, 1) protoext.NoopSign(am) secret := &proto.Secret{ @@ -586,6 +588,18 @@ func TestConnect(t *testing.T) { } } +func TestNoSigningIfNoMembership(t *testing.T) { + t.Parallel() + + inst := createDiscoveryInstance(8931, "foreveralone", nil) + defer inst.Stop() + time.Sleep(defaultTestConfig.AliveTimeInterval * 10) + assert.Zero(t, atomic.LoadUint32(&inst.comm.signCount)) + + inst.InitiateSync(10000) + assert.Zero(t, atomic.LoadUint32(&inst.comm.signCount)) +} + func TestValidation(t *testing.T) { // Scenarios: This test contains the following sub-tests: // 1) alive message validation: a message is validated <==> it entered the message store @@ -1588,23 +1602,28 @@ func TestMemRespDisclosurePol(t *testing.T) { pol := func(remotePeer *NetworkMember) (Sieve, EnvelopeFilter) { assert.Equal(t, remotePeer.InternalEndpoint, remotePeer.Endpoint) return func(_ *protoext.SignedGossipMessage) bool { - return remotePeer.Endpoint == "localhost:7880" + return remotePeer.Endpoint != "localhost:7879" }, func(m *protoext.SignedGossipMessage) *proto.Envelope { return m.Envelope } } + + wasMembershipResponseReceived := func(msg *protoext.SignedGossipMessage) { + assert.Nil(t, msg.GetMemRes()) + } + d1 := createDiscoveryInstanceThatGossips(7878, "d1", []string{}, true, pol, defaultTestConfig) defer d1.Stop() - d2 := createDiscoveryInstanceThatGossips(7879, "d2", []string{"localhost:7878"}, true, noopPolicy, defaultTestConfig) + d2 := createDiscoveryInstanceThatGossipsWithInterceptors(7879, "d2", []string{"localhost:7878"}, true, noopPolicy, wasMembershipResponseReceived, defaultTestConfig) defer d2.Stop() - d3 := createDiscoveryInstanceThatGossips(7880, "d3", []string{"localhost:7878"}, true, noopPolicy, defaultTestConfig) + d3 := createDiscoveryInstanceThatGossips(7880, "d3", []string{"localhost:7878"}, true, pol, defaultTestConfig) defer d3.Stop() - // Both d1 and d3 know each other, and also about d2 - assertMembership(t, []*gossipInstance{d1, d3}, 2) - // d2 doesn't know about any one because the bootstrap peer is ignoring it due to custom policy - assertMembership(t, []*gossipInstance{d2}, 0) - require.Zero(t, d2.receivedMsgCount()) - require.NotZero(t, d2.sentMsgCount()) + + // all peers know each other + assertMembership(t, []*gossipInstance{d1, d2, d3}, 2) + // d2 received some messages, but we asserted that none of them are membership responses. + assert.NotZero(t, d2.receivedMsgCount()) + assert.NotZero(t, d2.sentMsgCount()) } func TestMembersByID(t *testing.T) { diff --git a/gossip/gossip/channel/channel.go b/gossip/gossip/channel/channel.go index 542f60c93f8..85fe8b5039b 100644 --- a/gossip/gossip/channel/channel.go +++ b/gossip/gossip/channel/channel.go @@ -145,7 +145,8 @@ type gossipChannel struct { pkiID common.PKIidType selfOrg api.OrgIdentityType stopChan chan struct{} - stateInfoMsg *protoext.SignedGossipMessage + selfStateInfoMsg *proto.GossipMessage + selfStateInfoSignedMsg *protoext.SignedGossipMessage orgs []api.OrgIdentityType joinMsg api.JoinChannelMessage blockMsgStore msgstore.MessageStore @@ -272,6 +273,11 @@ func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.M } gc.stateInfoMsgStore = newStateInfoCache(gc.GetConf().StateInfoCacheSweepInterval, hashPeerExpiredInMembership, verifyStateInfoMsg) + // Setup a plain state info message at startup, just to have all required fields populated + // when this gossip channel is created + gc.updateProperties(1, nil, false) + gc.setupSignedStateInfoMessage() + ttl := adapter.GetConf().MsgExpirationTimeout pol := protoext.NewGossipMessageComparator(0) @@ -331,7 +337,7 @@ func (gc *gossipChannel) periodicalInvocation(fn func(), c <-chan time.Time) { func (gc *gossipChannel) Self() *protoext.SignedGossipMessage { gc.RLock() defer gc.RUnlock() - return gc.stateInfoMsg + return gc.selfStateInfoSignedMsg } // LeaveChannel makes the peer leave the channel @@ -343,11 +349,12 @@ func (gc *gossipChannel) LeaveChannel() { var chaincodes []*proto.Chaincode var height uint64 - if prevMsg := gc.stateInfoMsg; prevMsg != nil { + if prevMsg := gc.selfStateInfoMsg; prevMsg != nil { chaincodes = prevMsg.GetStateInfo().Properties.Chaincodes height = prevMsg.GetStateInfo().Properties.LedgerHeight } gc.updateProperties(height, chaincodes, true) + atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1)) } func (gc *gossipChannel) hasLeftChannel() bool { @@ -401,13 +408,38 @@ func (gc *gossipChannel) publishStateInfo() { if atomic.LoadInt32(&gc.shouldGossipStateInfo) == int32(0) { return } + + if len(gc.GetMembership()) == 0 { + gc.logger.Debugf("Empty membership, no one to publish state info to") + return + } + + stateInfoMsg, err := gc.setupSignedStateInfoMessage() + if err != nil { + gc.logger.Errorf("Failed creating signed state info message: %v", err) + return + } + gc.stateInfoMsgStore.Add(stateInfoMsg) + gc.Gossip(stateInfoMsg) + atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(0)) +} + +func (gc *gossipChannel) setupSignedStateInfoMessage() (*protoext.SignedGossipMessage, error) { gc.RLock() - stateInfoMsg := gc.stateInfoMsg + msg := gc.selfStateInfoMsg gc.RUnlock() - gc.Gossip(stateInfoMsg) - if len(gc.GetMembership()) > 0 { - atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(0)) + + stateInfoMsg, err := gc.Sign(msg) + if err != nil { + gc.logger.Error("Failed signing message:", err) + return nil, err } + + gc.Lock() + gc.selfStateInfoSignedMsg = stateInfoMsg + gc.Unlock() + + return stateInfoMsg, nil } func (gc *gossipChannel) createBlockPuller() pull.Mediator { @@ -885,11 +917,12 @@ func (gc *gossipChannel) UpdateLedgerHeight(height uint64) { var chaincodes []*proto.Chaincode var leftChannel bool - if prevMsg := gc.stateInfoMsg; prevMsg != nil { + if prevMsg := gc.selfStateInfoMsg; prevMsg != nil { leftChannel = prevMsg.GetStateInfo().Properties.LeftChannel chaincodes = prevMsg.GetStateInfo().Properties.Chaincodes } gc.updateProperties(height, chaincodes, leftChannel) + atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1)) } // UpdateChaincodes updates the chaincodes the peer publishes @@ -900,20 +933,19 @@ func (gc *gossipChannel) UpdateChaincodes(chaincodes []*proto.Chaincode) { var ledgerHeight uint64 = 1 var leftChannel bool - if prevMsg := gc.stateInfoMsg; prevMsg != nil { + if prevMsg := gc.selfStateInfoMsg; prevMsg != nil { ledgerHeight = prevMsg.GetStateInfo().Properties.LedgerHeight leftChannel = prevMsg.GetStateInfo().Properties.LeftChannel } gc.updateProperties(ledgerHeight, chaincodes, leftChannel) + atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1)) } // UpdateStateInfo updates this channel's StateInfo message // that is periodically published -func (gc *gossipChannel) updateStateInfo(msg *protoext.SignedGossipMessage) { - gc.stateInfoMsgStore.Add(msg) +func (gc *gossipChannel) updateStateInfo(msg *proto.GossipMessage) { gc.ledgerHeight = msg.GetStateInfo().Properties.LedgerHeight - gc.stateInfoMsg = msg - atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1)) + gc.selfStateInfoMsg = msg } func (gc *gossipChannel) updateProperties(ledgerHeight uint64, chaincodes []*proto.Chaincode, leftChannel bool) { @@ -938,12 +970,7 @@ func (gc *gossipChannel) updateProperties(ledgerHeight uint64, chaincodes []*pro }, } - msg, err := gc.Sign(m) - if err != nil { - gc.logger.Error("Failed signing message:", err) - return - } - gc.updateStateInfo(msg) + gc.updateStateInfo(m) } func newStateInfoCache(sweepInterval time.Duration, hasExpired func(interface{}) bool, verifyFunc membershipPredicate) *stateInfoCache { diff --git a/gossip/gossip/channel/channel_test.go b/gossip/gossip/channel/channel_test.go index 45c502c517c..e5bd278fffe 100644 --- a/gossip/gossip/channel/channel_test.go +++ b/gossip/gossip/channel/channel_test.go @@ -30,6 +30,7 @@ import ( "github.com/hyperledger/fabric/gossip/metrics/mocks" "github.com/hyperledger/fabric/gossip/protoext" "github.com/hyperledger/fabric/gossip/util" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -187,6 +188,7 @@ func (m *receivedMsg) GetConnectionInfo() *protoext.ConnectionInfo { } type gossipAdapterMock struct { + signCallCount uint32 mock.Mock sync.RWMutex } @@ -198,6 +200,7 @@ func (ga *gossipAdapterMock) On(methodName string, arguments ...interface{}) *mo } func (ga *gossipAdapterMock) Sign(msg *proto.GossipMessage) (*protoext.SignedGossipMessage, error) { + atomic.AddUint32(&ga.signCallCount, 1) return protoext.NoopSign(msg) } @@ -220,12 +223,14 @@ func (ga *gossipAdapterMock) DeMultiplex(msg interface{}) { func (ga *gossipAdapterMock) GetMembership() []discovery.NetworkMember { args := ga.Called() - arg := args.Get(0) - if f, isFunc := arg.(func() []discovery.NetworkMember); isFunc { + val := args.Get(0) + if f, isFunc := val.(func() []discovery.NetworkMember); isFunc { return f() - } else { - return arg.([]discovery.NetworkMember) } + + members := val.([]discovery.NetworkMember) + + return members } // Lookup returns a network member, or nil if not found @@ -361,6 +366,7 @@ func TestMsgStoreNotExpire(t *testing.T) { // Receive StateInfo messages from other peers gc.HandleMessage(&receivedMsg{PKIID: pkiID2, msg: createStateInfoMsg(1, pkiID2, channelA)}) gc.HandleMessage(&receivedMsg{PKIID: pkiID3, msg: createStateInfoMsg(1, pkiID3, channelA)}) + time.Sleep(adapter.GetConf().PublishStateInfoInterval * 2) simulateStateInfoRequest := func(pkiID []byte, outChan chan *protoext.SignedGossipMessage) { sentMessages := make(chan *proto.GossipMessage, 1) @@ -501,8 +507,14 @@ func TestChannelPeriodicalPublishStateInfo(t *testing.T) { cs := &cryptoService{} cs.On("VerifyBlock", mock.Anything).Return(nil) + peerA := discovery.NetworkMember{ + PKIid: pkiIDInOrg1, + Endpoint: "a", + InternalEndpoint: "a", + } + adapter := new(gossipAdapterMock) - configureAdapter(adapter) + configureAdapter(adapter, peerA) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("Gossip", mock.Anything).Run(func(arg mock.Arguments) { if atomic.LoadInt32(&receivedMsg) == int32(1) { @@ -1074,6 +1086,55 @@ func TestChannelBadBlocks(t *testing.T) { require.Len(t, receivedMessages, 0) } +func TestNoGossipOrSigningWhenEmptyMembership(t *testing.T) { + t.Parallel() + + var gossipedWG sync.WaitGroup + gossipedWG.Add(1) + + var emptyMembership []discovery.NetworkMember + nonEmptyMembership := []discovery.NetworkMember{{PKIid: pkiIDInOrg1}} + + var dynamicMembership atomic.Value + dynamicMembership.Store(nonEmptyMembership) + + cs := &cryptoService{} + adapter := new(gossipAdapterMock) + // Override configuration and disable outgoing state info requests + conf := conf + conf.PublishStateInfoInterval = time.Second + conf.RequestStateInfoInterval = time.Hour + conf.TimeForMembershipTracker = time.Hour + adapter.On("GetConf").Return(conf) + adapter.On("GetOrgOfPeer", pkiIDInOrg1).Return(orgInChannelA) + adapter.On("Gossip", mock.Anything).Run(func(arg mock.Arguments) { + gossipedWG.Done() + }) + adapter.On("GetMembership").Return(func() []discovery.NetworkMember { + return dynamicMembership.Load().([]discovery.NetworkMember) + }) + + gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}, disabledMetrics, nil) + // We have signed only once at creation time + assert.Equal(t, uint32(1), atomic.LoadUint32(&adapter.signCallCount)) + defer gc.Stop() + gc.UpdateLedgerHeight(1) + + // The first time we have membership, so we should gossip and sign + gossipedWG.Wait() + // So far we have signed twice: Once at creation time, and once before we gossiped + assert.Equal(t, uint32(2), atomic.LoadUint32(&adapter.signCallCount)) + + // Membership is now empty + dynamicMembership.Store(emptyMembership) + // Set the required conditions for gossiping and signing + gc.UpdateLedgerHeight(2) + // Wait some time and ensure we do not sign because membership is now empty + time.Sleep(conf.PublishStateInfoInterval * 3) + // We haven't signed anything + assert.Equal(t, uint32(2), atomic.LoadUint32(&adapter.signCallCount)) +} + func TestChannelPulledBadBlocks(t *testing.T) { // Test a pull with a block of a bad channel cs := &cryptoService{} @@ -1698,16 +1759,20 @@ func TestChannelGetPeers(t *testing.T) { func TestOnDemandGossip(t *testing.T) { // Scenario: update the metadata and ensure only 1 dissemination // takes place when membership is not empty + peerA := discovery.NetworkMember{ + PKIid: pkiIDInOrg1, + Endpoint: "a", + InternalEndpoint: "a", + } cs := &cryptoService{} adapter := new(gossipAdapterMock) - configureAdapter(adapter) + configureAdapter(adapter, peerA) adapter.ExpectedCalls = append(adapter.ExpectedCalls[:1], adapter.ExpectedCalls[2:]...) var lock sync.RWMutex var membershipKnown bool - var signal bool adapter.On("GetMembership").Return(func() []discovery.NetworkMember { lock.RLock() defer lock.RUnlock() @@ -1718,12 +1783,13 @@ func TestOnDemandGossip(t *testing.T) { }) gossipedEvents := make(chan struct{}) + + conf := conf + conf.PublishStateInfoInterval = time.Millisecond * 200 + adapter.On("GetConf").Return(conf) adapter.On("Gossip", mock.Anything).Run(func(mock.Arguments) { lock.Lock() defer lock.Unlock() - if signal { - membershipKnown = true - } gossipedEvents <- struct{}{} }) adapter.On("Forward", mock.Anything) @@ -1736,22 +1802,23 @@ func TestOnDemandGossip(t *testing.T) { require.Fail(t, "Should not have gossiped because metadata has not been updated yet") case <-time.After(time.Millisecond * 500): } - gc.UpdateLedgerHeight(0) + + gc.UpdateLedgerHeight(1) + lock.Lock() + membershipKnown = true + lock.Unlock() + select { case <-gossipedEvents: case <-time.After(time.Second): require.Fail(t, "Didn't gossip within a timely manner") } - select { - case <-gossipedEvents: - case <-time.After(time.Second): - require.Fail(t, "Should have gossiped a second time, because membership is empty") - } - - lock.Lock() - signal = true - lock.Unlock() - + gc.UpdateLedgerHeight(2) + adapter.On("Gossip", mock.Anything).Run(func(mock.Arguments) { + gossipedEvents <- struct{}{} + }) + adapter.On("Forward", mock.Anything) + gc.(*gossipChannel).Adapter = adapter select { case <-gossipedEvents: case <-time.After(time.Second): @@ -1762,7 +1829,7 @@ func TestOnDemandGossip(t *testing.T) { require.Fail(t, "Should not have gossiped a fourth time, because dirty flag should have been turned off") case <-time.After(time.Millisecond * 500): } - gc.UpdateLedgerHeight(1) + gc.UpdateLedgerHeight(3) select { case <-gossipedEvents: case <-time.After(time.Second):