Skip to content

Commit

Permalink
[FAB-14205] move message type accessors to gossip
Browse files Browse the repository at this point in the history
Remove the message type extensions on gossip messages and replace them
with functions in gossip/protoext.

Change-Id: Iea67f9ed80692c89b81440f4a36035d0bbcc8b11
Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
  • Loading branch information
sykesm committed Mar 1, 2019
1 parent fe49517 commit a34c3fa
Show file tree
Hide file tree
Showing 26 changed files with 734 additions and 737 deletions.
4 changes: 2 additions & 2 deletions discovery/cmd/endorsers.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func endpointFromEnvelope(env *gossip.Envelope) string {
if aliveMsg == nil {
return ""
}
if !aliveMsg.IsAliveMsg() {
if !protoext.IsAliveMsg(aliveMsg.GossipMessage) {
return ""
}
if aliveMsg.GetAliveMsg().Membership == nil {
Expand All @@ -237,7 +237,7 @@ func ledgerHeightFromEnvelope(env *gossip.Envelope) uint64 {
if stateInfoMsg == nil {
return 0
}
if !stateInfoMsg.IsStateInfoMsg() {
if !protoext.IsStateInfoMsg(stateInfoMsg.GossipMessage) {
return 0
}
if stateInfoMsg.GetStateInfo().Properties == nil {
Expand Down
2 changes: 1 addition & 1 deletion gossip/comm/ack.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (aso *ackSendOperation) send(msg *protoext.SignedGossipMessage, minAckNum i

func interceptAcks(nextHandler handler, remotePeerID common.PKIidType, pubSub *util.PubSub) func(*protoext.SignedGossipMessage) {
return func(m *protoext.SignedGossipMessage) {
if m.IsAck() {
if protoext.IsAck(m.GossipMessage) {
topic := topicForAck(m.Nonce, remotePeerID)
pubSub.Publish(topic, m.GetAck())
return
Expand Down
3 changes: 2 additions & 1 deletion gossip/comm/ack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func TestAck(t *testing.T) {
defer comm4.Stop()

acceptData := func(o interface{}) bool {
return o.(protoext.ReceivedMessage).GetGossipMessage().IsDataMsg()
m := o.(protoext.ReceivedMessage).GetGossipMessage()
return protoext.IsDataMsg(m.GossipMessage)
}

ack := func(c <-chan protoext.ReceivedMessage) {
Expand Down
3 changes: 2 additions & 1 deletion gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ func TestMutualParallelSendWithAck(t *testing.T) {
defer comm2.Stop()

acceptData := func(o interface{}) bool {
return o.(protoext.ReceivedMessage).GetGossipMessage().IsDataMsg()
m := o.(protoext.ReceivedMessage).GetGossipMessage()
return protoext.IsDataMsg(m.GossipMessage)
}

inc1 := comm1.Accept(acceptData)
Expand Down
12 changes: 7 additions & 5 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(msg protoext.ReceivedMessage) {
return
}

if m.IsAliveMsg() {
if protoext.IsAliveMsg(m.GossipMessage) {
if !d.msgStore.CheckValid(m) || !d.crypt.ValidateAliveMsg(m) {
return
}
Expand All @@ -369,7 +369,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(msg protoext.ReceivedMessage) {
d.logger.Warningf("Membership response contains an invalid message from an online peer:%+v", errors.WithStack(err))
return
}
if !am.IsAliveMsg() {
if !protoext.IsAliveMsg(am.GossipMessage) {
d.logger.Warning("Expected alive message, got", am, "instead")
return
}
Expand Down Expand Up @@ -1022,7 +1022,7 @@ func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore {
externalUnlock := func() { d.lock.Unlock() }
callback := func(m interface{}) {
msg := m.(*protoext.SignedGossipMessage)
if !msg.IsAliveMsg() {
if !protoext.IsAliveMsg(msg.GossipMessage) {
return
}
id := msg.GetAliveMsg().Membership.PkiId
Expand All @@ -1040,14 +1040,16 @@ func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore {
}

func (s *aliveMsgStore) Add(msg interface{}) bool {
if !msg.(*protoext.SignedGossipMessage).IsAliveMsg() {
m := msg.(*protoext.SignedGossipMessage)
if !protoext.IsAliveMsg(m.GossipMessage) {
panic(fmt.Sprint("Msg ", msg, " is not AliveMsg"))
}
return s.MessageStore.Add(msg)
}

func (s *aliveMsgStore) CheckValid(msg interface{}) bool {
if !msg.(*protoext.SignedGossipMessage).IsAliveMsg() {
m := msg.(*protoext.SignedGossipMessage)
if !protoext.IsAliveMsg(m.GossipMessage) {
panic(fmt.Sprint("Msg ", msg, " is not AliveMsg"))
}
return s.MessageStore.CheckValid(msg)
Expand Down
2 changes: 1 addition & 1 deletion gossip/election/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (ai *adapterImpl) Accept() <-chan Msg {
adapterCh, _ := ai.gossip.Accept(func(message interface{}) bool {
// Get only leadership org and channel messages
return message.(*proto.GossipMessage).Tag == proto.GossipMessage_CHAN_AND_ORG &&
message.(*proto.GossipMessage).IsLeadershipMsg() &&
protoext.IsLeadershipMsg(message.(*proto.GossipMessage)) &&
bytes.Equal(message.(*proto.GossipMessage).Channel, ai.channel)
}, false)

Expand Down
4 changes: 2 additions & 2 deletions gossip/election/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestAdapterImpl_CreateMessage(t *testing.T) {
metrics.NewGossipMetrics(&disabled.Provider{}).ElectionMetrics)
msg := adapter.CreateMessage(true)

if !msg.(*msgImpl).msg.IsLeadershipMsg() {
if !protoext.IsLeadershipMsg(msg.(*msgImpl).msg) {
t.Error("Newly created message should be LeadershipMsg")
}

Expand All @@ -66,7 +66,7 @@ func TestAdapterImpl_CreateMessage(t *testing.T) {

msg = adapter.CreateMessage(false)

if !msg.(*msgImpl).msg.IsLeadershipMsg() {
if !protoext.IsLeadershipMsg(msg.(*msgImpl).msg) {
t.Error("Newly created message should be LeadershipMsg")
}

Expand Down
2 changes: 1 addition & 1 deletion gossip/gossip/certstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (cs *certStore) handleMessage(msg protoext.ReceivedMessage) {
cs.logger.Warningf("Data update contains an invalid message: %+v", errors.WithStack(err))
return
}
if !m.IsIdentityMsg() {
if !protoext.IsIdentityMsg(m.GossipMessage) {
cs.logger.Warning("Got a non-identity message:", m, "aborting")
return
}
Expand Down
2 changes: 1 addition & 1 deletion gossip/gossip/certstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func TestCertExpiration(t *testing.T) {
identitiesGotViaPull := make(chan struct{}, identities2Detect+100)
acceptIdentityPullMsgs := func(o interface{}) bool {
m := o.(protoext.ReceivedMessage).GetGossipMessage()
if m.IsPullMsg() && m.IsDigestMsg() {
if protoext.IsPullMsg(m.GossipMessage) && protoext.IsDigestMsg(m.GossipMessage) {
for _, dig := range m.GetDataDig().Digests {
if bytes.Equal(dig, []byte(fmt.Sprintf("127.0.0.1:%d", port0))) {
identitiesGotViaPull <- struct{}{}
Expand Down
30 changes: 15 additions & 15 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,12 +516,12 @@ func (gc *gossipChannel) EligibleForChannel(member discovery.NetworkMember) bool

// AddToMsgStore adds a given GossipMessage to the message store
func (gc *gossipChannel) AddToMsgStore(msg *protoext.SignedGossipMessage) {
if msg.IsDataMsg() {
if protoext.IsDataMsg(msg.GossipMessage) {
gc.blockMsgStore.Add(msg)
gc.blocksPuller.Add(msg)
}

if msg.IsStateInfoMsg() {
if protoext.IsStateInfoMsg(msg.GossipMessage) {
gc.stateInfoMsgStore.Add(msg)
}
}
Expand Down Expand Up @@ -558,7 +558,7 @@ func (gc *gossipChannel) HandleMessage(msg protoext.ReceivedMessage) {
return
}
m := msg.GetGossipMessage()
if !m.IsChannelRestricted() {
if !protoext.IsChannelRestricted(m.GossipMessage) {
gc.logger.Warning("Got message", msg.GetGossipMessage(), "but it's not a per-channel message, discarding it")
return
}
Expand All @@ -573,20 +573,20 @@ func (gc *gossipChannel) HandleMessage(msg protoext.ReceivedMessage) {
return
}

if m.IsStateInfoPullRequestMsg() {
if protoext.IsStateInfoPullRequestMsg(m.GossipMessage) {
msg.Respond(gc.createStateInfoSnapshot(orgID))
return
}

if m.IsStateInfoSnapshot() {
if protoext.IsStateInfoSnapshot(m.GossipMessage) {
gc.handleStateInfSnapshot(m.GossipMessage, msg.GetConnectionInfo().ID)
return
}

if m.IsDataMsg() || m.IsStateInfoMsg() {
if protoext.IsDataMsg(m.GossipMessage) || protoext.IsStateInfoMsg(m.GossipMessage) {
added := false

if m.IsDataMsg() {
if protoext.IsDataMsg(m.GossipMessage) {
if m.GetDataMsg().Payload == nil {
gc.logger.Warning("Payload is empty, got it from", msg.GetConnectionInfo().ID)
return
Expand All @@ -611,14 +611,14 @@ func (gc *gossipChannel) HandleMessage(msg protoext.ReceivedMessage) {
// DeMultiplex to local subscribers
gc.DeMultiplex(m)

if m.IsDataMsg() {
if protoext.IsDataMsg(m.GossipMessage) {
gc.blocksPuller.Add(msg.GetGossipMessage())
}
}
return
}

if m.IsPullMsg() && m.GetPullMsgType() == proto.PullMsgType_BLOCK_MSG {
if protoext.IsPullMsg(m.GossipMessage) && protoext.GetPullMsgType(m.GossipMessage) == proto.PullMsgType_BLOCK_MSG {
if gc.hasLeftChannel() {
gc.logger.Info("Received Pull message from", msg.GetConnectionInfo().Endpoint, "but left the channel", string(gc.chainID))
return
Expand All @@ -633,7 +633,7 @@ func (gc *gossipChannel) HandleMessage(msg protoext.ReceivedMessage) {
gc.logger.Warning(msg.GetConnectionInfo(), "isn't eligible for pulling blocks of", string(gc.chainID))
return
}
if m.IsDataUpdate() {
if protoext.IsDataUpdate(m.GossipMessage) {
// Iterate over the envelopes, and filter out blocks
// that we already have in the blockMsgStore, or blocks that
// are too far in the past.
Expand Down Expand Up @@ -669,7 +669,7 @@ func (gc *gossipChannel) HandleMessage(msg protoext.ReceivedMessage) {
gc.blocksPuller.HandleMessage(msg)
}

if m.IsLeadershipMsg() {
if protoext.IsLeadershipMsg(m.GossipMessage) {
// Handling leadership message
added := gc.leaderMsgStore.Add(m)
if added {
Expand All @@ -686,7 +686,7 @@ func (gc *gossipChannel) handleStateInfSnapshot(m *proto.GossipMessage, sender c
gc.logger.Warningf("Channel %s : StateInfo snapshot contains an invalid message: %+v", chanName, errors.WithStack(err))
return
}
if !stateInf.IsStateInfoMsg() {
if !protoext.IsStateInfoMsg(stateInf.GossipMessage) {
gc.logger.Warning("Channel", chanName, ": Element of StateInfoSnapshot isn't a StateInfoMessage:",
stateInf, "message sent from", sender)
return
Expand Down Expand Up @@ -728,7 +728,7 @@ func (gc *gossipChannel) handleStateInfSnapshot(m *proto.GossipMessage, sender c
}

func (gc *gossipChannel) verifyBlock(msg *proto.GossipMessage, sender common.PKIidType) bool {
if !msg.IsDataMsg() {
if !protoext.IsDataMsg(msg) {
gc.logger.Warning("Received from ", sender, "a DataUpdate message that contains a non-block GossipMessage:", msg)
return false
}
Expand Down Expand Up @@ -796,7 +796,7 @@ func (gc *gossipChannel) verifyMsg(msg protoext.ReceivedMessage) bool {
return false
}

if m.IsStateInfoMsg() {
if protoext.IsStateInfoMsg(m.GossipMessage) {
si := m.GetStateInfo()
expectedMAC := GenerateMAC(si.PkiId, gc.chainID)
if !bytes.Equal(expectedMAC, si.Channel_MAC) {
Expand All @@ -806,7 +806,7 @@ func (gc *gossipChannel) verifyMsg(msg protoext.ReceivedMessage) bool {
return true
}

if m.IsStateInfoPullRequestMsg() {
if protoext.IsStateInfoPullRequestMsg(m.GossipMessage) {
sipr := m.GetStateInfoPullReq()
expectedMAC := GenerateMAC(msg.GetConnectionInfo().ID, gc.chainID)
if !bytes.Equal(expectedMAC, sipr.Channel_MAC) {
Expand Down
32 changes: 16 additions & 16 deletions gossip/gossip/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func TestLeaveChannel(t *testing.T) {
gc := NewGossipChannel(common.PKIidType("p0"), orgInChannelA, cs, channelA, adapter, jcm, disabledMetrics)
adapter.On("Send", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) {
msg := arguments.Get(0).(*protoext.SignedGossipMessage)
if msg.IsPullMsg() {
if protoext.IsPullMsg(msg.GossipMessage) {
helloPullWG.Done()
assert.False(t, gc.(*gossipChannel).hasLeftChannel())
}
Expand Down Expand Up @@ -560,17 +560,17 @@ func TestChannelMsgStoreEviction(t *testing.T) {
adapter.On("Send", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
msg := args.Get(0).(*protoext.SignedGossipMessage)
// Ignore all other messages sent like StateInfo messages
if !msg.IsPullMsg() {
if !protoext.IsPullMsg(msg.GossipMessage) {
return
}
// Stop the pull when we reach the final phase
if atomic.LoadUint64(&phaseNum) == totalPhases && msg.IsHelloMsg() {
if atomic.LoadUint64(&phaseNum) == totalPhases && protoext.IsHelloMsg(msg.GossipMessage) {
return
}

start := atomic.LoadUint64(&phaseNum) * msgsPerPhase
end := start + msgsPerPhase
if msg.IsHelloMsg() {
if protoext.IsHelloMsg(msg.GossipMessage) {
// Advance phase
atomic.AddUint64(&phaseNum, uint64(1))
}
Expand All @@ -581,7 +581,7 @@ func TestChannelMsgStoreEviction(t *testing.T) {
pullPhase(args)

// If we finished the last phase, save the sequence to be used later for inspection
if msg.IsDataReq() && atomic.LoadUint64(&phaseNum) == totalPhases {
if protoext.IsDataReq(msg.GossipMessage) && atomic.LoadUint64(&phaseNum) == totalPhases {
for _, seq := range currSeq {
lastPullPhase <- seq
}
Expand All @@ -596,7 +596,7 @@ func TestChannelMsgStoreEviction(t *testing.T) {
helloMsg := createHelloMsg(pkiIDInOrg1)
helloMsg.On("Respond", mock.Anything).Run(func(arg mock.Arguments) {
msg := arg.Get(0).(*proto.GossipMessage)
if !msg.IsDigestMsg() {
if !protoext.IsDigestMsg(msg) {
return
}
msgSentFromPullMediator <- msg
Expand All @@ -614,7 +614,7 @@ func TestChannelMsgStoreEviction(t *testing.T) {
assert.Len(t, msgSentFromPullMediator, 1)
msg := <-msgSentFromPullMediator
// It's a digest and not anything else, like an update
assert.True(t, msg.IsDigestMsg())
assert.True(t, protoext.IsDigestMsg(msg))
assert.Len(t, msg.GetDataDig().Digests, adapter.GetConf().MaxBlockCountToStore+1)
// Check that the last sequences are kept.
// Since we checked the length, it proves that the old blocks were discarded, since we had much more
Expand All @@ -635,11 +635,11 @@ func TestChannelPull(t *testing.T) {
adapter.On("Forward", mock.Anything)
adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) {
msg := arg.Get(0).(*protoext.SignedGossipMessage)
if !msg.IsDataMsg() {
if !protoext.IsDataMsg(msg.GossipMessage) {
return
}
// The peer is supposed to de-multiplex 2 ledger blocks
assert.True(t, msg.IsDataMsg())
assert.True(t, protoext.IsDataMsg(msg.GossipMessage))
receivedBlocksChan <- msg
})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}, disabledMetrics)
Expand Down Expand Up @@ -699,7 +699,7 @@ func TestChannelPullAccessControl(t *testing.T) {
sentHello := int32(0)
adapter.On("Send", mock.Anything, mock.Anything).Run(func(arg mock.Arguments) {
msg := arg.Get(0).(*protoext.SignedGossipMessage)
if !msg.IsHelloMsg() {
if !protoext.IsHelloMsg(msg.GossipMessage) {
return
}
atomic.StoreInt32(&sentHello, int32(1))
Expand Down Expand Up @@ -994,7 +994,7 @@ func TestChannelBlockExpiration(t *testing.T) {
case <-time.After(time.Second):
t.Fatal("Haven't responded to hello message within a time period")
case msg := <-respondedChan:
if msg.IsDigestMsg() {
if protoext.IsDigestMsg(msg) {
assert.Equal(t, 1, len(msg.GetDataDig().Digests), "Number of digests returned by channel blockPuller incorrect")
} else {
t.Fatal("Not correct pull msg type in response - expect digest")
Expand Down Expand Up @@ -1038,7 +1038,7 @@ func TestChannelBlockExpiration(t *testing.T) {
case <-time.After(time.Second):
t.Fatal("Haven't responded to hello message within a time period")
case msg := <-respondedChan:
if msg.IsDigestMsg() {
if protoext.IsDigestMsg(msg) {
assert.Equal(t, 1, len(msg.GetDataDig().Digests), "Number of digests returned by channel blockPuller incorrect")
} else {
t.Fatal("Not correct pull msg type in response - expect digest")
Expand Down Expand Up @@ -1796,11 +1796,11 @@ func TestChannelPullWithDigestsFilter(t *testing.T) {
adapter.On("Forward", mock.Anything)
adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) {
msg := arg.Get(0).(*protoext.SignedGossipMessage)
if !msg.IsDataMsg() {
if !protoext.IsDataMsg(msg.GossipMessage) {
return
}
// The peer is supposed to de-multiplex 1 ledger block
assert.True(t, msg.IsDataMsg())
assert.True(t, protoext.IsDataMsg(msg.GossipMessage))
receivedBlocksChan <- msg
})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}, disabledMetrics)
Expand Down Expand Up @@ -1941,7 +1941,7 @@ func simulatePullPhaseWithVariableDigest(gc GossipChannel, t *testing.T, wg *syn
msg := args.Get(0).(*protoext.SignedGossipMessage)
l.Lock()
defer l.Unlock()
if msg.IsHelloMsg() && !sentHello {
if protoext.IsHelloMsg(msg.GossipMessage) && !sentHello {
sentHello = true
// Simulate a digest message an imaginary peer responds to the hello message sent
sMsg, _ := protoext.NoopSign(&proto.GossipMessage{
Expand All @@ -1961,7 +1961,7 @@ func simulatePullPhaseWithVariableDigest(gc GossipChannel, t *testing.T, wg *syn
}
go gc.HandleMessage(digestMsg)
}
if msg.IsDataReq() && !sentReq {
if protoext.IsDataReq(msg.GossipMessage) && !sentReq {
sentReq = true
dataReq := msg.GetDataReq()
for _, expectedDigest := range util.StringsToBytes(resultDigestSeqs) {
Expand Down
Loading

0 comments on commit a34c3fa

Please sign in to comment.