Skip to content

Commit e67eeb9

Browse files
waelsC0rWin
authored andcommitted
[FAB-11639]: Fix data races in gossip/discovery
Change-Id: I6d5c6554ffe77079ee23c735f1ea89e3654f695f Signed-off-by: waels <wael.shama@ibm.com>
1 parent 2fd63a4 commit e67eeb9

File tree

2 files changed

+20
-16
lines changed

2 files changed

+20
-16
lines changed

gossip/discovery/discovery_test.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"testing"
2222
"time"
2323

24+
protoG "github.com/golang/protobuf/proto"
2425
"github.com/hyperledger/fabric/core/config/configtest"
2526
"github.com/hyperledger/fabric/gossip/common"
2627
"github.com/hyperledger/fabric/gossip/util"
@@ -35,7 +36,7 @@ var timeout = time.Second * time.Duration(15)
3536

3637
func init() {
3738
util.SetupTestLogging()
38-
aliveTimeInterval := time.Duration(time.Millisecond * 100)
39+
aliveTimeInterval := time.Duration(time.Millisecond * 300)
3940
SetAliveTimeInterval(aliveTimeInterval)
4041
SetAliveExpirationTimeout(10 * aliveTimeInterval)
4142
SetAliveExpirationCheckInterval(aliveTimeInterval)
@@ -298,9 +299,11 @@ func (g *gossipInstance) Stop() {
298299
}
299300
g.gRGCserv.Stop()
300301
g.lsnr.Close()
302+
g.comm.lock.Lock()
301303
for _, stream := range g.comm.streams {
302304
stream.CloseSend()
303305
}
306+
g.comm.lock.Unlock()
304307
for _, conn := range g.comm.conns {
305308
conn.Close()
306309
}
@@ -361,8 +364,9 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st
361364

362365
discSvc := NewDiscoveryService(self, comm, comm, pol)
363366
for _, bootPeer := range bootstrapPeers {
364-
discSvc.Connect(NetworkMember{Endpoint: bootPeer, InternalEndpoint: bootPeer}, func() (*PeerIdentification, error) {
365-
return &PeerIdentification{SelfOrg: true, ID: common.PKIidType(bootPeer)}, nil
367+
bp := bootPeer
368+
discSvc.Connect(NetworkMember{Endpoint: bp, InternalEndpoint: bootPeer}, func() (*PeerIdentification, error) {
369+
return &PeerIdentification{SelfOrg: true, ID: common.PKIidType(bp)}, nil
366370
})
367371
}
368372

@@ -463,7 +467,6 @@ func TestConnect(t *testing.T) {
463467
})
464468
inst.comm.mock.On("Ping", mock.Anything)
465469
inst.comm.lock.Unlock()
466-
467470
instances = append(instances, inst)
468471
j := (i + 1) % 10
469472
endpoint := fmt.Sprintf("localhost:%d", 7611+j)
@@ -474,12 +477,6 @@ func TestConnect(t *testing.T) {
474477
}
475478

476479
time.Sleep(time.Second * 3)
477-
assert.Len(t, firstSentMemReqMsgs, 10)
478-
close(firstSentMemReqMsgs)
479-
for firstSentSelfMsg := range firstSentMemReqMsgs {
480-
assert.Nil(t, firstSentSelfMsg.Envelope.SecretEnvelope)
481-
}
482-
483480
fullMembership := func() bool {
484481
return nodeNum-1 == len(instances[nodeNum-1].GetMembership())
485482
}
@@ -493,6 +490,11 @@ func TestConnect(t *testing.T) {
493490
am, _ = mr2.GetMemReq().SelfInformation.ToGossipMessage()
494491
assert.Nil(t, am.SecretEnvelope)
495492
stopInstances(t, instances)
493+
assert.Len(t, firstSentMemReqMsgs, 10)
494+
close(firstSentMemReqMsgs)
495+
for firstSentSelfMsg := range firstSentMemReqMsgs {
496+
assert.Nil(t, firstSentSelfMsg.Envelope.SecretEnvelope)
497+
}
496498
}
497499

498500
func TestUpdate(t *testing.T) {
@@ -804,7 +806,7 @@ func TestDisclosurePolicyWithPull(t *testing.T) {
804806
// Now, we shutdown instance 0 and ensure that peers that shouldn't know it,
805807
// do not know it via membership requests
806808
stopInstances(t, []*gossipInstance{instances1[0]})
807-
time.Sleep(time.Second * 3)
809+
time.Sleep(time.Second * 6)
808810
for _, inst := range append(instances1[1:], instances2...) {
809811
if peersThatShouldBeKnownToPeers[inst.port][0] == 8610 {
810812
assert.Equal(t, 1, inst.Discovery.(*gossipDiscoveryImpl).deadMembership.Size())
@@ -856,15 +858,16 @@ func discPolForPeer(selfPort int) DisclosurePolicy {
856858
// Else, expose peers with even ids to other peers with even ids
857859
return portOfAliveMsg%2 == 0 && targetPort%2 == 0
858860
}, func(msg *proto.SignedGossipMessage) *proto.Envelope {
861+
envelope := protoG.Clone(msg.Envelope).(*proto.Envelope)
859862
if selfPort < 8615 && targetPort >= 8615 {
860-
msg.Envelope.SecretEnvelope = nil
863+
envelope.SecretEnvelope = nil
861864
}
862865

863866
if selfPort >= 8615 && targetPort < 8615 {
864-
msg.Envelope.SecretEnvelope = nil
867+
envelope.SecretEnvelope = nil
865868
}
866869

867-
return msg.Envelope
870+
return envelope
868871
}
869872
}
870873
}

gossip/gossip/gossip_impl.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1270,10 +1270,11 @@ func (g *gossipServiceImpl) disclosurePolicy(remotePeer *discovery.NetworkMember
12701270
// or the message has an external endpoint, and the remote peer also has one
12711271
return bytes.Equal(org, remotePeerOrg) || msg.GetAliveMsg().Membership.Endpoint != "" && remotePeer.Endpoint != ""
12721272
}, func(msg *proto.SignedGossipMessage) *proto.Envelope {
1273+
envelope := protoG.Clone(msg.Envelope).(*proto.Envelope)
12731274
if !bytes.Equal(g.selfOrg, remotePeerOrg) {
1274-
msg.SecretEnvelope = nil
1275+
envelope.SecretEnvelope = nil
12751276
}
1276-
return msg.Envelope
1277+
return envelope
12771278
}
12781279
}
12791280

0 commit comments

Comments
 (0)