Skip to content

Commit

Permalink
[FAB-12034] Fix data races in pull_test.go
Browse files Browse the repository at this point in the history
The pull test has tests which have instances of puller, that send each
other messages via a common membership map.

This map is populated at the creation of each instance via having
each instance register itself into the map.

The instance (which is test code) - spawns a goroutine at its creation,
which schedules the production code and the latter accesses the said map.

This makes it an unsafe memory access, and hence a data race occurs.

I simply extractd the goroutine to its own start method, and made all
tests first create the instances (which involves the write to the map)
and only afterwards - start the goroutine, and thus - there is a happens
before relation and the golang memory model is happy.

FAB-12034 #done

Change-Id: I976e9161873d1a7022b0263189af4cc2d3151001
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Sep 17, 2018
1 parent 211e63e commit 1af2dad
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions gossip/gossip/pull/pullstore_test.go
Expand Up @@ -71,6 +71,8 @@ type pullInstance struct {
msgChan chan *pullMsg
peer2PullInst map[string]*pullInstance
stopChan chan struct{}
pullAdapter *PullAdapter
config Config
}

func (p *pullInstance) Send(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer) {
Expand All @@ -91,6 +93,20 @@ func (p *pullInstance) GetMembership() []discovery.NetworkMember {
return members
}

func (p *pullInstance) start() {
p.mediator = NewPullMediator(p.config, p.pullAdapter)
go func() {
for {
select {
case <-p.stopChan:
return
case msg := <-p.msgChan:
p.mediator.HandleMessage(msg)
}
}
}()
}

func (p *pullInstance) stop() {
p.mediator.Stop()
p.stopChan <- struct{}{}
Expand Down Expand Up @@ -139,31 +155,23 @@ func createPullInstanceWithFilters(endpoint string, peer2PullInst map[string]*pu
blockConsumer := func(msg *proto.SignedGossipMessage) {
inst.items.Add(msg.GetDataMsg().Payload.SeqNum)
}
adapter := &PullAdapter{
inst.pullAdapter = &PullAdapter{
Sndr: inst,
MemSvc: inst,
IdExtractor: seqNumFromMsg,
MsgCons: blockConsumer,
EgressDigFilter: df,
IngressDigFilter: digestsFilter,
}
inst.mediator = NewPullMediator(conf, adapter)
go func() {
for {
select {
case <-inst.stopChan:
return
case msg := <-inst.msgChan:
inst.mediator.HandleMessage(msg)
}
}
}()
inst.config = conf

return inst
}

func TestCreateAndStop(t *testing.T) {
t.Parallel()
pullInst := createPullInstance("localhost:2000", make(map[string]*pullInstance))
pullInst.start()
pullInst.stop()
}

Expand All @@ -172,6 +180,8 @@ func TestRegisterMsgHook(t *testing.T) {
peer2pullInst := make(map[string]*pullInstance)
inst1 := createPullInstance("localhost:5611", peer2pullInst)
inst2 := createPullInstance("localhost:5612", peer2pullInst)
inst1.start()
inst2.start()
defer inst1.stop()
defer inst2.stop()

Expand Down Expand Up @@ -215,6 +225,8 @@ func TestFilter(t *testing.T) {
inst2 := createPullInstance("localhost:5612", peer2pullInst)
defer inst1.stop()
defer inst2.stop()
inst1.start()
inst2.start()

inst1.mediator.Add(dataMsg(0))
inst1.mediator.Add(dataMsg(1))
Expand All @@ -232,6 +244,8 @@ func TestAddAndRemove(t *testing.T) {
peer2pullInst := make(map[string]*pullInstance)
inst1 := createPullInstance("localhost:5611", peer2pullInst)
inst2 := createPullInstance("localhost:5612", peer2pullInst)
inst1.start()
inst2.start()
defer inst1.stop()
defer inst2.stop()

Expand Down Expand Up @@ -268,6 +282,8 @@ func TestDigestsFilters(t *testing.T) {
inst1 := createPullInstanceWithFilters("localhost:5611", make(map[string]*pullInstance), nil, df1)
inst2 := createPullInstance("localhost:5612", make(map[string]*pullInstance))
inst1ReceivedDigest := int32(0)
inst1.start()
inst2.start()

defer inst1.stop()
defer inst2.stop()
Expand Down Expand Up @@ -303,6 +319,8 @@ func TestHandleMessage(t *testing.T) {
t.Parallel()
inst1 := createPullInstance("localhost:5611", make(map[string]*pullInstance))
inst2 := createPullInstance("localhost:5612", make(map[string]*pullInstance))
inst1.start()
inst2.start()
defer inst1.stop()
defer inst2.stop()

Expand Down

0 comments on commit 1af2dad

Please sign in to comment.