Skip to content

Commit

Permalink
[FAB-6043] migrate gossip stateInfo metadata to proto
Browse files Browse the repository at this point in the history
This commit adds a Properties sub-message to the StateInfo message
and makes sure the gossip logic can handle peers with and without
this field set.

Change-Id: Ibe164db1e4dc4c5a1d861e62eb61da01d31778e6
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Sep 12, 2017
1 parent 3a8d54c commit b48cea6
Show file tree
Hide file tree
Showing 11 changed files with 309 additions and 145 deletions.
2 changes: 1 addition & 1 deletion gossip/common/metastate.go
Expand Up @@ -21,7 +21,7 @@ type NodeMetastate struct {
LedgerHeight uint64
}

// NewNodeMetastate creates new meta data with given ledger height148.69
// NewNodeMetastate creates new meta data with given ledger height
func NewNodeMetastate(height uint64) *NodeMetastate {
return &NodeMetastate{height}
}
Expand Down
1 change: 1 addition & 0 deletions gossip/discovery/discovery.go
Expand Up @@ -70,6 +70,7 @@ type NetworkMember struct {
Metadata []byte
PKIid common.PKIidType
InternalEndpoint string
Properties *proto.Properties
}

// String returns a string representation of the NetworkMember
Expand Down
8 changes: 2 additions & 6 deletions gossip/gossip/channel/channel.go
Expand Up @@ -282,6 +282,7 @@ func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
continue
}
member.Metadata = stateInf.GetStateInfo().Metadata
member.Properties = stateInf.GetStateInfo().Properties
members = append(members, member)
}
return members
Expand Down Expand Up @@ -751,13 +752,8 @@ func (gc *gossipChannel) UpdateStateInfo(msg *proto.SignedGossipMessage) {
gc.Lock()
defer gc.Unlock()

nodeMeta, err := common.FromBytes(msg.GetStateInfo().Metadata)
if err != nil {
gc.logger.Warningf("Can't extract ledger height from metadata %+v", errors.WithStack(err))
return
}
gc.stateInfoMsgStore.Add(msg)
gc.ledgerHeight = nodeMeta.LedgerHeight
gc.ledgerHeight = msg.GetStateInfo().Properties.LedgerHeight
gc.stateInfoMsg = msg
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
}
Expand Down
4 changes: 4 additions & 0 deletions gossip/gossip/channel/channel_test.go
Expand Up @@ -412,6 +412,7 @@ func TestChannelPeriodicalPublishStateInfo(t *testing.T) {
nodeMeta, err := common.FromBytes(msg.GetStateInfo().Metadata)
assert.NoError(t, err, "ReceivedMetadata is invalid")
assert.Equal(t, ledgerHeight, int(nodeMeta.LedgerHeight), "Received different ledger height than expected")
assert.Equal(t, ledgerHeight, int(msg.GetStateInfo().Properties.LedgerHeight))
}

func TestChannelMsgStoreEviction(t *testing.T) {
Expand Down Expand Up @@ -1751,6 +1752,9 @@ func createStateInfoMsg(ledgerHeight int, pkiID common.PKIidType, channel common
Timestamp: &proto.PeerTime{IncNum: uint64(time.Now().UnixNano()), SeqNum: 1},
Metadata: metaBytes,
PkiId: []byte(pkiID),
Properties: &proto.Properties{
LedgerHeight: uint64(ledgerHeight),
},
},
},
}).NoopSign()
Expand Down
9 changes: 8 additions & 1 deletion gossip/gossip/gossip_impl.go
Expand Up @@ -1089,6 +1089,10 @@ func (g *gossipServiceImpl) connect2BootstrapPeers() {
}

func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.ChainID) (*proto.SignedGossipMessage, error) {
metaState, err := common.FromBytes(metadata)
if err != nil {
return nil, err
}
pkiID := g.comm.GetPKIid()
stateInfMsg := &proto.StateInfo{
Channel_MAC: channel.GenerateMAC(pkiID, chainID),
Expand All @@ -1098,6 +1102,9 @@ func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.C
IncNum: uint64(g.incTime.UnixNano()),
SeqNum: uint64(time.Now().UnixNano()),
},
Properties: &proto.Properties{
LedgerHeight: metaState.LedgerHeight,
},
}
m := &proto.GossipMessage{
Nonce: 0,
Expand All @@ -1112,7 +1119,7 @@ func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.C
signer := func(msg []byte) ([]byte, error) {
return g.mcs.Sign(msg)
}
_, err := sMsg.Sign(signer)
_, err = sMsg.Sign(signer)
return sMsg, errors.WithStack(err)
}

Expand Down
2 changes: 2 additions & 0 deletions gossip/gossip/gossip_test.go
Expand Up @@ -897,8 +897,10 @@ func TestDataLeakage(t *testing.T) {
assert.Len(t, peers[instanceIndex].PeersOfChannel(channel), 2)
if i == 0 {
assert.Equal(t, channelAmetadata, peers[instanceIndex].PeersOfChannel(channel)[0].Metadata)
assert.Equal(t, uint64(1), peers[instanceIndex].PeersOfChannel(channel)[0].Properties.LedgerHeight)
} else {
assert.Equal(t, channelBmetadata, peers[instanceIndex].PeersOfChannel(channel)[0].Metadata)
assert.Equal(t, uint64(2), peers[instanceIndex].PeersOfChannel(channel)[0].Properties.LedgerHeight)
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions gossip/state/state.go
Expand Up @@ -547,10 +547,15 @@ func (s *GossipStateProviderImpl) antiEntropy() {
func (s *GossipStateProviderImpl) maxAvailableLedgerHeight() uint64 {
max := uint64(0)
for _, p := range s.mediator.PeersOfChannel(common2.ChainID(s.chainID)) {
if nodeMetastate, err := common2.FromBytes(p.Metadata); err == nil {
if max < nodeMetastate.LedgerHeight {
max = nodeMetastate.LedgerHeight
}
var peerHeight uint64
if p.Properties != nil {
peerHeight = p.Properties.LedgerHeight
} else if nodeMetastate, err := common2.FromBytes(p.Metadata); err == nil {
peerHeight = nodeMetastate.LedgerHeight
}

if max < peerHeight {
max = peerHeight
}
}
return max
Expand Down Expand Up @@ -660,6 +665,9 @@ func (s *GossipStateProviderImpl) filterPeers(predicate func(peer discovery.Netw
// by provided input parameter
func (s *GossipStateProviderImpl) hasRequiredHeight(height uint64) func(peer discovery.NetworkMember) bool {
return func(peer discovery.NetworkMember) bool {
if peer.Properties != nil {
return peer.Properties.LedgerHeight >= height
}
if nodeMetadata, err := common2.FromBytes(peer.Metadata); err != nil {
logger.Errorf("Unable to de-serialize node meta state, error = %+v", errors.WithStack(err))
} else if nodeMetadata.LedgerHeight >= height {
Expand Down
102 changes: 102 additions & 0 deletions gossip/state/state_test.go
Expand Up @@ -13,6 +13,7 @@ import (
"math/rand"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -553,6 +554,107 @@ func TestGossipReception(t *testing.T) {
}
}

func TestMetadataCompatibility(t *testing.T) {
// Scenario: For each test, spawn a peer and supply it
// with a specific mock of PeersOfChannel from peers that
// either set both metadata properly, or only the properties, or none, or both.
// Ensure the logic handles all of the 4 possible cases as needed

// Returns whether the given networkMember was selected or not
wasNetworkMemberSelected := func(t *testing.T, networkMember discovery.NetworkMember, wg *sync.WaitGroup) bool {
var wasGivenNetworkMemberSelected int32
finChan := make(chan struct{})
g := &mocks.GossipMock{}
g.On("Send", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) {
defer wg.Done()
msg := arguments.Get(0).(*proto.GossipMessage)
assert.NotNil(t, msg.GetStateRequest())
peer := arguments.Get(1).([]*comm.RemotePeer)[0]
if bytes.Equal(networkMember.PKIid, peer.PKIID) {
atomic.StoreInt32(&wasGivenNetworkMemberSelected, 1)
}
finChan <- struct{}{}
})
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
metaState := common.NewNodeMetastate(5)
b, _ := metaState.Bytes()
defaultPeer := discovery.NetworkMember{
InternalEndpoint: "b",
PKIid: common.PKIidType("b"),
Metadata: b,
Properties: &proto.Properties{
LedgerHeight: 5,
},
}
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{
defaultPeer,
networkMember,
})
mc := &mockCommitter{}
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()
select {
case <-time.After(time.Second * 20):
t.Fatal("Didn't send a request within a timely manner")
case <-finChan:
}
return atomic.LoadInt32(&wasGivenNetworkMemberSelected) == 1
}

peerWithoutMetadata := discovery.NetworkMember{
PKIid: common.PKIidType("peerWithoutMetadata"),
Properties: &proto.Properties{
LedgerHeight: 10,
},
InternalEndpoint: "peerWithoutMetadata",
}

ms := common.NodeMetastate{
LedgerHeight: 10,
}
b, _ := ms.Bytes()
peerWithoutProperties := discovery.NetworkMember{
PKIid: common.PKIidType("peerWithoutProperties"),
InternalEndpoint: "peerWithoutProperties",
Metadata: b,
}

peerWithoutEverything := discovery.NetworkMember{
PKIid: common.PKIidType("peerWithoutProperties"),
InternalEndpoint: "peerWithoutProperties",
}

peerWithEverything := discovery.NetworkMember{
PKIid: common.PKIidType("peerWitEverything"),
InternalEndpoint: "peerWitEverything",
Metadata: b,
Properties: &proto.Properties{
LedgerHeight: 10,
},
}

tests := []struct {
shouldGivenBeSelected bool
member discovery.NetworkMember
}{
{member: peerWithoutMetadata, shouldGivenBeSelected: true},
{member: peerWithoutProperties, shouldGivenBeSelected: true},
{member: peerWithoutEverything, shouldGivenBeSelected: false},
{member: peerWithEverything, shouldGivenBeSelected: true},
}

var wg sync.WaitGroup
wg.Add(len(tests))
for _, tst := range tests {
go func(shouldGivenBeSelected bool, member discovery.NetworkMember) {
assert.Equal(t, shouldGivenBeSelected, wasNetworkMemberSelected(t, member, &wg))
}(tst.shouldGivenBeSelected, tst.member)
}
wg.Wait()
}

func TestAccessControl(t *testing.T) {
viper.Set("peer.fileSystemPath", "/tmp/tests/ledger/node")
ledgermgmt.InitializeTestEnv()
Expand Down
10 changes: 10 additions & 0 deletions protos/gossip/extensions.go
Expand Up @@ -580,6 +580,16 @@ func digestsToHex(digests []string) []string {
return a
}

// LedgerHeight returns the ledger height that is specified
// in the StateInfo message
func (msg *StateInfo) LedgerHeight() (uint64, error) {
if msg.Properties != nil {
return msg.Properties.LedgerHeight, nil
}
metaState, err := common.FromBytes(msg.Metadata)
return metaState.LedgerHeight, err
}

// Abs returns abs(a-b)
func abs(a, b uint64) uint64 {
if a > b {
Expand Down

0 comments on commit b48cea6

Please sign in to comment.