diff --git a/gossip/gossip/pull/pullstore_test.go b/gossip/gossip/pull/pullstore_test.go index c994205a4c7..bd36751e03f 100644 --- a/gossip/gossip/pull/pullstore_test.go +++ b/gossip/gossip/pull/pullstore_test.go @@ -71,6 +71,8 @@ type pullInstance struct { msgChan chan *pullMsg peer2PullInst map[string]*pullInstance stopChan chan struct{} + pullAdapter *PullAdapter + config Config } func (p *pullInstance) Send(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer) { @@ -91,6 +93,20 @@ func (p *pullInstance) GetMembership() []discovery.NetworkMember { return members } +func (p *pullInstance) start() { + p.mediator = NewPullMediator(p.config, p.pullAdapter) + go func() { + for { + select { + case <-p.stopChan: + return + case msg := <-p.msgChan: + p.mediator.HandleMessage(msg) + } + } + }() +} + func (p *pullInstance) stop() { p.mediator.Stop() p.stopChan <- struct{}{} @@ -139,7 +155,7 @@ func createPullInstanceWithFilters(endpoint string, peer2PullInst map[string]*pu blockConsumer := func(msg *proto.SignedGossipMessage) { inst.items.Add(msg.GetDataMsg().Payload.SeqNum) } - adapter := &PullAdapter{ + inst.pullAdapter = &PullAdapter{ Sndr: inst, MemSvc: inst, IdExtractor: seqNumFromMsg, @@ -147,23 +163,15 @@ func createPullInstanceWithFilters(endpoint string, peer2PullInst map[string]*pu EgressDigFilter: df, IngressDigFilter: digestsFilter, } - inst.mediator = NewPullMediator(conf, adapter) - go func() { - for { - select { - case <-inst.stopChan: - return - case msg := <-inst.msgChan: - inst.mediator.HandleMessage(msg) - } - } - }() + inst.config = conf + return inst } func TestCreateAndStop(t *testing.T) { t.Parallel() pullInst := createPullInstance("localhost:2000", make(map[string]*pullInstance)) + pullInst.start() pullInst.stop() } @@ -172,6 +180,8 @@ func TestRegisterMsgHook(t *testing.T) { peer2pullInst := make(map[string]*pullInstance) inst1 := createPullInstance("localhost:5611", peer2pullInst) inst2 := createPullInstance("localhost:5612", peer2pullInst) + inst1.start() + inst2.start() defer inst1.stop() defer inst2.stop() @@ -215,6 +225,8 @@ func TestFilter(t *testing.T) { inst2 := createPullInstance("localhost:5612", peer2pullInst) defer inst1.stop() defer inst2.stop() + inst1.start() + inst2.start() inst1.mediator.Add(dataMsg(0)) inst1.mediator.Add(dataMsg(1)) @@ -232,6 +244,8 @@ func TestAddAndRemove(t *testing.T) { peer2pullInst := make(map[string]*pullInstance) inst1 := createPullInstance("localhost:5611", peer2pullInst) inst2 := createPullInstance("localhost:5612", peer2pullInst) + inst1.start() + inst2.start() defer inst1.stop() defer inst2.stop() @@ -268,6 +282,8 @@ func TestDigestsFilters(t *testing.T) { inst1 := createPullInstanceWithFilters("localhost:5611", make(map[string]*pullInstance), nil, df1) inst2 := createPullInstance("localhost:5612", make(map[string]*pullInstance)) inst1ReceivedDigest := int32(0) + inst1.start() + inst2.start() defer inst1.stop() defer inst2.stop() @@ -303,6 +319,8 @@ func TestHandleMessage(t *testing.T) { t.Parallel() inst1 := createPullInstance("localhost:5611", make(map[string]*pullInstance)) inst2 := createPullInstance("localhost:5612", make(map[string]*pullInstance)) + inst1.start() + inst2.start() defer inst1.stop() defer inst2.stop()