From e2d0fa744cfaa095ef35a9829eb1de3be90bcb93 Mon Sep 17 00:00:00 2001 From: Gennady Laventman Date: Thu, 19 Jan 2017 18:11:11 +0200 Subject: [PATCH] FAB-1849 LeaderElectionAdapter implementation Implements adapter that connect between gossip discovery and communication and leader election algorithm implementation Change-Id: I60dc6a0a5d7d36f7e689a2b896371d11e588f46d Signed-off-by: Gennady Laventman --- gossip/election/adapter.go | 223 ++++++++++++++++++++++ gossip/election/adapter_test.go | 317 ++++++++++++++++++++++++++++++++ gossip/proto/extensions.go | 5 + 3 files changed, 545 insertions(+) create mode 100644 gossip/election/adapter.go create mode 100644 gossip/election/adapter_test.go diff --git a/gossip/election/adapter.go b/gossip/election/adapter.go new file mode 100644 index 00000000000..e0b4ad678e7 --- /dev/null +++ b/gossip/election/adapter.go @@ -0,0 +1,223 @@ +/* +Copyright IBM Corp. 2017 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package election + +import ( + "bytes" + "strconv" + "sync" + "time" + + "github.com/hyperledger/fabric/gossip/api" + "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/proto" + "github.com/op/go-logging" +) + +type msgImpl struct { + msg *proto.GossipMessage +} + +func (mi *msgImpl) SenderID() string { + return string(mi.msg.GetLeadershipMsg().GetMembership().PkiID) +} + +func (mi *msgImpl) IsProposal() bool { + return !mi.IsDeclaration() +} + +func (mi *msgImpl) IsDeclaration() bool { + isDeclaration, _ := strconv.ParseBool(string(mi.msg.GetLeadershipMsg().GetMembership().Metadata)) + return isDeclaration +} + +type peerImpl struct { + member *discovery.NetworkMember +} + +func (pi *peerImpl) ID() string { + return string(pi.member.PKIid) +} + +type gossip interface { + // Peers returns the NetworkMembers considered alive + Peers() []discovery.NetworkMember + + // Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate. + // If passThrough is false, the messages are processed by the gossip layer beforehand. + // If passThrough is true, the gossip layer doesn't intervene and the messages + // can be used to send a reply back to the sender + Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan comm.ReceivedMessage) + + // Gossip sends a message to other peers to the network + Gossip(msg *proto.GossipMessage) +} + +// MsgCrypto used to sign messages and verify received messages signatures +type MsgCrypto interface { + // Sign signs a message, returns a signed message on success + // or an error on failure + Sign(msg []byte) ([]byte, error) + + // Verify verifies a signed message + Verify(vkID, signature, message []byte) error + + // Get returns the identity of a given pkiID, or error if such an identity + // isn't found + Get(pkiID common.PKIidType) (api.PeerIdentityType, error) +} + +type adapterImpl struct { + gossip gossip + self *discovery.NetworkMember + + incTime uint64 + seqNum uint64 + + mcs MsgCrypto + + channel common.ChainID + + logger *logging.Logger + + doneCh chan struct{} + stopOnce *sync.Once +} + +// NewAdapter creates new leader election adapter +func NewAdapter(gossip gossip, self *discovery.NetworkMember, mcs MsgCrypto, channel common.ChainID) LeaderElectionAdapter { + return &adapterImpl{ + gossip: gossip, + self: self, + + incTime: uint64(time.Now().UnixNano()), + seqNum: uint64(0), + + mcs: mcs, + + channel: channel, + + logger: logging.MustGetLogger("LeaderElectionAdapter"), + + doneCh: make(chan struct{}), + stopOnce: &sync.Once{}, + } +} + +func (ai *adapterImpl) Gossip(msg Msg) { + ai.gossip.Gossip(msg.(*msgImpl).msg) +} + +func (ai *adapterImpl) Accept() <-chan Msg { + adapterCh, _ := ai.gossip.Accept(func(message interface{}) bool { + // Get only leadership org and channel messages + validMsg := message.(*proto.GossipMessage).Tag == proto.GossipMessage_CHAN_AND_ORG && + message.(*proto.GossipMessage).IsLeadershipMsg() && + bytes.Equal(message.(*proto.GossipMessage).Channel, ai.channel) + if validMsg { + leadershipMsg := message.(*proto.GossipMessage).GetLeadershipMsg() + + verifier := func(identity []byte, signature, message []byte) error { + return ai.mcs.Verify(identity, signature, message) + } + identity, err := ai.mcs.Get(leadershipMsg.GetMembership().PkiID) + if err != nil { + ai.logger.Error("Failed verify, can't get identity", leadershipMsg, ":", err) + return false + } + + if err := message.(*proto.GossipMessage).Verify(identity, verifier); err != nil { + ai.logger.Error("Failed verify", leadershipMsg, ":", err) + return false + } + return true + } + return false + }, false) + + msgCh := make(chan Msg) + + go func(inCh <-chan *proto.GossipMessage, outCh chan Msg, stopCh chan struct{}) { + for { + select { + case <-stopCh: + return + case gossipMsg, ok := <-inCh: + if ok { + outCh <- &msgImpl{gossipMsg} + } else { + return + } + } + } + }(adapterCh, msgCh, ai.doneCh) + return msgCh +} + +func (ai *adapterImpl) CreateMessage(isDeclaration bool) Msg { + ai.seqNum++ + seqNum := ai.seqNum + + metadata := []byte{} + metadata = strconv.AppendBool(metadata, isDeclaration) + + leadershipMsg := &proto.LeadershipMessage{ + Membership: &proto.Member{ + PkiID: ai.self.PKIid, + Endpoint: ai.self.Endpoint, + Metadata: metadata, + }, + Timestamp: &proto.PeerTime{ + IncNumber: ai.incTime, + SeqNum: seqNum, + }, + } + + msg := &proto.GossipMessage{ + Nonce: 0, + Tag: proto.GossipMessage_CHAN_AND_ORG, + Content: &proto.GossipMessage_LeadershipMsg{LeadershipMsg: leadershipMsg}, + Channel: ai.channel, + } + + signer := func(msg []byte) ([]byte, error) { + return ai.mcs.Sign(msg) + } + + msg.Sign(signer) + return &msgImpl{msg} +} + +func (ai *adapterImpl) Peers() []Peer { + peers := ai.gossip.Peers() + + var res []Peer + for _, peer := range peers { + res = append(res, &peerImpl{&peer}) + } + + return res +} + +func (ai *adapterImpl) Stop() { + stopFunc := func() { + close(ai.doneCh) + } + ai.stopOnce.Do(stopFunc) +} diff --git a/gossip/election/adapter_test.go b/gossip/election/adapter_test.go new file mode 100644 index 00000000000..c89d2e0e7de --- /dev/null +++ b/gossip/election/adapter_test.go @@ -0,0 +1,317 @@ +/* +Copyright IBM Corp. 2017 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package election + +import ( + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/hyperledger/fabric/gossip/api" + "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/proto" +) + +func TestNewAdapter(t *testing.T) { + selfNetworkMember := &discovery.NetworkMember{ + Endpoint: "p0", + Metadata: []byte{}, + PKIid: []byte{byte(0)}, + } + mockGossip := newGossip("peer0", selfNetworkMember) + + peersCluster := newClusterOfPeers("0") + peersCluster.addPeer("peer0", mockGossip) + + NewAdapter(mockGossip, selfNetworkMember, &mockMsgCrypto{}, []byte("channel0")) +} + +func TestAdapterImpl_CreateMessage(t *testing.T) { + selfNetworkMember := &discovery.NetworkMember{ + Endpoint: "p0", + Metadata: []byte{}, + PKIid: []byte{byte(0)}, + } + mockGossip := newGossip("peer0", selfNetworkMember) + + adapter := NewAdapter(mockGossip, selfNetworkMember, &mockMsgCrypto{}, []byte("channel0")) + msg := adapter.CreateMessage(true) + + if !msg.(*msgImpl).msg.IsLeadershipMsg() { + t.Error("Newly created message should be LeadershipMsg") + } + + if !msg.IsDeclaration() { + t.Error("Newly created msg should be Declaration msg") + } + + msg = adapter.CreateMessage(false) + + if !msg.(*msgImpl).msg.IsLeadershipMsg() { + t.Error("Newly created message should be LeadershipMsg") + } + + if !msg.IsProposal() || msg.IsDeclaration() { + t.Error("Newly created msg should be Proposal msg") + } +} + +func TestAdapterImpl_Peers(t *testing.T) { + _, adapters := createCluster(0, 1, 2, 3, 4, 5) + + peersPKIDs := make(map[string]string) + peersPKIDs[string([]byte{0})] = string([]byte{0}) + peersPKIDs[string([]byte{1})] = string([]byte{1}) + peersPKIDs[string([]byte{2})] = string([]byte{2}) + peersPKIDs[string([]byte{3})] = string([]byte{2}) + peersPKIDs[string([]byte{4})] = string([]byte{4}) + peersPKIDs[string([]byte{5})] = string([]byte{5}) + + for _, adapter := range adapters { + peers := adapter.Peers() + if len(peers) != 6 { + t.Errorf("Should return 6 peers, not %d", len(peers)) + } + + for _, peer := range peers { + if _, exist := peersPKIDs[peer.ID()]; !exist { + t.Errorf("Peer %s PKID not found", peer.(*peerImpl).member.Endpoint) + } + } + } + +} + +func TestAdapterImpl_Stop(t *testing.T) { + _, adapters := createCluster(0, 1, 2, 3, 4, 5) + var ch []<-chan Msg + + for _, adapter := range adapters { + ch = append(ch, adapter.Accept()) + } + + for _, adapter := range adapters { + adapter.Stop() + } +} + +func TestAdapterImpl_Gossip(t *testing.T) { + _, adapters := createCluster(0, 1, 2) + + channels := make(map[string]<-chan Msg) + + for peerId, adapter := range adapters { + channels[peerId] = adapter.Accept() + } + + sender := adapters[fmt.Sprintf("Peer%d", 0)] + + sender.Gossip(sender.CreateMessage(true)) + + totalMsg := 0 + + timer := time.After(time.Duration(1) * time.Second) + + for { + select { + case <-timer: + if totalMsg != 2 { + t.Error("Not all messages accepted") + t.FailNow() + } else { + return + } + case msg := <-channels[fmt.Sprintf("Peer%d", 1)]: + if !msg.IsDeclaration() { + t.Error("Msg should be declaration") + } else if strings.Compare(msg.SenderID(), string(sender.self.PKIid)) != 0 { + t.Error("Msg Sender is wrong") + } else { + totalMsg++ + } + case msg := <-channels[fmt.Sprintf("Peer%d", 2)]: + if !msg.IsDeclaration() { + t.Error("Msg should be declaration") + } else if strings.Compare(msg.SenderID(), string(sender.self.PKIid)) != 0 { + t.Error("Msg Sender is wrong") + } else { + totalMsg++ + } + } + + } + +} + +type mockMsgCrypto struct { +} + +// Sign signs a message, returns a signed message on success +// or an error on failure +func (is *mockMsgCrypto) Sign(msg []byte) ([]byte, error) { + return msg, nil +} + +// Verify verifies a signed message +func (is *mockMsgCrypto) Verify(vkID, signature, message []byte) error { + return nil +} + +// Get returns the identity of a given pkiID, or error if such an identity +// isn't found +func (is *mockMsgCrypto) Get(pkiID common.PKIidType) (api.PeerIdentityType, error) { + return nil, nil +} + +type mockAcceptor struct { + ch chan *proto.GossipMessage + acceptor common.MessageAcceptor +} + +type peerMockGossip struct { + cluster *clusterOfPeers + member *discovery.NetworkMember + acceptors []*mockAcceptor + acceptorLock *sync.RWMutex + clusterLock *sync.RWMutex + id string +} + +func (g *peerMockGossip) Peers() []discovery.NetworkMember { + + g.clusterLock.RLock() + if g.cluster == nil { + return []discovery.NetworkMember{*g.member} + } + peerLock := g.cluster.peersLock + g.clusterLock.RUnlock() + + peerLock.RLock() + res := make([]discovery.NetworkMember, 0) + g.clusterLock.RLock() + for _, val := range g.cluster.peersGossip { + res = append(res, *val.member) + + } + g.clusterLock.RUnlock() + peerLock.RUnlock() + return res +} + +func (g *peerMockGossip) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan comm.ReceivedMessage) { + ch := make(chan *proto.GossipMessage, 100) + g.acceptorLock.Lock() + g.acceptors = append(g.acceptors, &mockAcceptor{ + ch: ch, + acceptor: acceptor, + }) + g.acceptorLock.Unlock() + return ch, nil +} + +func (g *peerMockGossip) Gossip(msg *proto.GossipMessage) { + g.clusterLock.RLock() + if g.cluster == nil { + return + } + peersLock := g.cluster.peersLock + g.clusterLock.RUnlock() + + peersLock.RLock() + g.clusterLock.RLock() + for _, val := range g.cluster.peersGossip { + if strings.Compare(val.id, g.id) != 0 { + val.putToAcceptors(msg) + } + } + g.clusterLock.RUnlock() + peersLock.RUnlock() + +} + +func (g *peerMockGossip) putToAcceptors(msg *proto.GossipMessage) { + g.acceptorLock.RLock() + for _, acceptor := range g.acceptors { + if acceptor.acceptor(msg) { + if len(acceptor.ch) < 10 { + acceptor.ch <- msg + } + } + } + g.acceptorLock.RUnlock() + +} + +func newGossip(peerID string, member *discovery.NetworkMember) *peerMockGossip { + return &peerMockGossip{ + id: peerID, + member: member, + acceptorLock: &sync.RWMutex{}, + clusterLock: &sync.RWMutex{}, + acceptors: make([]*mockAcceptor, 0), + } +} + +type clusterOfPeers struct { + peersGossip map[string]*peerMockGossip + peersLock *sync.RWMutex + id string +} + +func (cop *clusterOfPeers) addPeer(peerID string, gossip *peerMockGossip) { + cop.peersLock.Lock() + cop.peersGossip[peerID] = gossip + gossip.clusterLock.Lock() + gossip.cluster = cop + gossip.clusterLock.Unlock() + cop.peersLock.Unlock() + +} + +func newClusterOfPeers(id string) *clusterOfPeers { + return &clusterOfPeers{ + id: id, + peersGossip: make(map[string]*peerMockGossip), + peersLock: &sync.RWMutex{}, + } + +} + +func createCluster(peers ...int) (*clusterOfPeers, map[string]*adapterImpl) { + adapters := make(map[string]*adapterImpl) + cluster := newClusterOfPeers("0") + for _, peer := range peers { + peerEndpoint := fmt.Sprintf("Peer%d", peer) + peerPKID := []byte{byte(peer)} + peerMember := &discovery.NetworkMember{ + Metadata: []byte{}, + Endpoint: peerEndpoint, + PKIid: peerPKID, + } + + mockGossip := newGossip(peerEndpoint, peerMember) + adapter := NewAdapter(mockGossip, peerMember, &mockMsgCrypto{}, []byte("channel0")) + adapters[peerEndpoint] = adapter.(*adapterImpl) + cluster.addPeer(peerEndpoint, mockGossip) + } + + return cluster, adapters +} diff --git a/gossip/proto/extensions.go b/gossip/proto/extensions.go index 02e131bc9cf..1721b66ff3d 100644 --- a/gossip/proto/extensions.go +++ b/gossip/proto/extensions.go @@ -217,6 +217,11 @@ func (m *GossipMessage) IsDigestMsg() bool { return m.GetDataDig() != nil } +// IsLeadershipMsg returns whether this GossipMessage is a leadership (leader election) message +func (m *GossipMessage) IsLeadershipMsg() bool { + return m.GetLeadershipMsg() != nil +} + // MsgConsumer invokes code given a GossipMessage type MsgConsumer func(*GossipMessage)