From a6edbff3efec4149839c89f5cbc0511146323371 Mon Sep 17 00:00:00 2001 From: Gennady Laventman Date: Sun, 12 Feb 2017 10:55:34 +0200 Subject: [PATCH] [FAB-1846] Integration between deliver and election Creation of leader election service during channel initialization in gossip and connecting between election callback to deliver service Change-Id: Ic2b8b1b5ebc770abcf4b935d39ce06087086b0c9 Signed-off-by: Gennady Laventman --- core/deliverservice/deliveryclient.go | 66 ++- core/deliverservice/deliveryclient_test.go | 13 +- core/peer/peer_test.go | 13 +- core/scc/cscc/configure_test.go | 13 +- gossip/election/adapter.go | 19 +- gossip/election/adapter_test.go | 10 +- gossip/election/election.go | 26 +- gossip/gossip/gossip_test.go | 9 +- gossip/service/gossip_service.go | 56 +- gossip/service/gossip_service_test.go | 585 +++++++++++++++++++++ peer/core.yaml | 6 +- 11 files changed, 753 insertions(+), 63 deletions(-) diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 298c6302b4c..8ca98fe3597 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "fmt" + "github.com/hyperledger/fabric/core/comm" "github.com/hyperledger/fabric/core/deliverservice/blocksprovider" "github.com/hyperledger/fabric/protos/orderer" @@ -39,10 +41,13 @@ func init() { // DeliverService used to communicate with orderers to obtain // new block and send the to the committer service type DeliverService interface { - // JoinChain once peer joins the chain it should need to check whenever - // it has been selected as a leader and open connection to the configured - // ordering service endpoint - JoinChain(chainID string, ledgerInfo blocksprovider.LedgerInfo) error + // StartDeliverForChannel dynamically starts delivery of new blocks from ordering service + // to channel peers. + StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error + + // StopDeliverForChannel dynamically stops delivery of new blocks from ordering service + // to channel peers. + StopDeliverForChannel(chainID string) error // Stop terminates delivery service and closes the connection Stop() @@ -131,27 +136,29 @@ func NewFactoryDeliverService(gossip blocksprovider.GossipServiceAdapter, factor } } -// JoinChain initialize the grpc stream for given chainID, creates blocks provider instance -// to spawn in go routine to read new blocks starting from the position provided by ledger +// StartDeliverForChannel starts blocks delivery for channel +// initializes the grpc stream for given chainID, creates blocks provider instance +// that spawns in go routine to read new blocks starting from the position provided by ledger // info instance. -func (d *deliverServiceImpl) JoinChain(chainID string, ledgerInfo blocksprovider.LedgerInfo) error { - isLeader := viper.GetBool("peer.gossip.orgLeader") - - if isLeader { +func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error { + d.lock.Lock() + defer d.lock.Unlock() + if d.stopping { + errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID) + logger.Errorf(errMsg) + return errors.New(errMsg) + } + if _, exist := d.clients[chainID]; exist { + errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID) + logger.Errorf(errMsg) + return errors.New(errMsg) + } else { abc, err := d.clientsFactory.Create() if err != nil { logger.Errorf("Unable to initialize atomic broadcast, due to %s", err) return err } - - d.lock.Lock() - defer d.lock.Unlock() - - if d.stopping { - logger.Errorf("Delivery service is stopping cannot join a new channel") - return errors.New("Delivery service is stopping cannot join a new channel") - } - + logger.Debug("This peer will pass blocks from orderer service to other peers") d.clients[chainID] = blocksprovider.NewBlocksProvider(chainID, abc, d.gossip) if err := d.clients[chainID].RequestBlocks(ledgerInfo); err == nil { @@ -162,6 +169,27 @@ func (d *deliverServiceImpl) JoinChain(chainID string, ledgerInfo blocksprovider return nil } +// StopDeliverForChannel stops blocks delivery for channel by stopping channel block provider +func (d *deliverServiceImpl) StopDeliverForChannel(chainID string) error { + d.lock.Lock() + defer d.lock.Unlock() + if d.stopping { + errMsg := fmt.Sprintf("Delivery service is stopping, cannot stop delivery for channel %s", chainID) + logger.Errorf(errMsg) + return errors.New(errMsg) + } + if client, exist := d.clients[chainID]; exist { + client.Stop() + delete(d.clients, chainID) + logger.Debug("This peer will stop pass blocks from orderer service to other peers") + } else { + errMsg := fmt.Sprintf("Delivery service - no block provider for %s found, can't stop delivery", chainID) + logger.Errorf(errMsg) + return errors.New(errMsg) + } + return nil +} + // Stop all service and release resources func (d *deliverServiceImpl) Stop() { d.lock.Lock() diff --git a/core/deliverservice/deliveryclient_test.go b/core/deliverservice/deliveryclient_test.go index 63955cc582c..df024dbfcb6 100644 --- a/core/deliverservice/deliveryclient_test.go +++ b/core/deliverservice/deliveryclient_test.go @@ -49,9 +49,17 @@ func TestNewDeliverService(t *testing.T) { } service := NewFactoryDeliverService(gossipServiceAdapter, factory, nil) - service.JoinChain("TEST_CHAINID", &mocks.MockLedgerInfo{0}) + assert.NilError(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0})) + + // Lets start deliver twice + assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}), "can't start delivery") + // Lets stop deliver that not started + assert.Error(t, service.StopDeliverForChannel("TEST_CHAINID2"), "can't stop delivery") // Let it try to simulate a few recv -> gossip rounds + time.Sleep(time.Duration(10) * time.Millisecond) + assert.NilError(t, service.StopDeliverForChannel("TEST_CHAINID")) + time.Sleep(time.Duration(10) * time.Millisecond) service.Stop() @@ -61,4 +69,7 @@ func TestNewDeliverService(t *testing.T) { assert.Equal(t, atomic.LoadInt32(&blocksDeliverer.RecvCnt), atomic.LoadInt32(&gossipServiceAdapter.AddPayloadsCnt)) assert.Equal(t, atomic.LoadInt32(&blocksDeliverer.RecvCnt), atomic.LoadInt32(&gossipServiceAdapter.GossipCallsCnt)) + assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}), "Delivery service is stopping") + assert.Error(t, service.StopDeliverForChannel("TEST_CHAINID"), "Delivery service is stopping") + } diff --git a/core/peer/peer_test.go b/core/peer/peer_test.go index 6c8383f6ad8..9ecbb1e1dca 100644 --- a/core/peer/peer_test.go +++ b/core/peer/peer_test.go @@ -40,10 +40,15 @@ import ( type mockDeliveryClient struct { } -// JoinChain once peer joins the chain it should need to check whenever -// it has been selected as a leader and open connection to the configured -// ordering service endpoint -func (*mockDeliveryClient) JoinChain(chainID string, ledgerInfo blocksprovider.LedgerInfo) error { +// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service +// to channel peers. +func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error { + return nil +} + +// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service +// to channel peers. +func (ds *mockDeliveryClient) StopDeliverForChannel(chainID string) error { return nil } diff --git a/core/scc/cscc/configure_test.go b/core/scc/cscc/configure_test.go index 12f7971b91e..44869255999 100644 --- a/core/scc/cscc/configure_test.go +++ b/core/scc/cscc/configure_test.go @@ -46,10 +46,15 @@ import ( type mockDeliveryClient struct { } -// JoinChain once peer joins the chain it should need to check whenever -// it has been selected as a leader and open connection to the configured -// ordering service endpoint -func (*mockDeliveryClient) JoinChain(chainID string, ledgerInfo blocksprovider.LedgerInfo) error { +// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service +// to channel peers. +func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error { + return nil +} + +// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service +// to channel peers. +func (ds *mockDeliveryClient) StopDeliverForChannel(chainID string) error { return nil } diff --git a/gossip/election/adapter.go b/gossip/election/adapter.go index d19e0d68f00..b3a322f039d 100644 --- a/gossip/election/adapter.go +++ b/gossip/election/adapter.go @@ -23,6 +23,7 @@ import ( "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/util" proto "github.com/hyperledger/fabric/protos/gossip" "github.com/op/go-logging" ) @@ -44,7 +45,7 @@ func (mi *msgImpl) IsDeclaration() bool { } type peerImpl struct { - member *discovery.NetworkMember + member discovery.NetworkMember } func (pi *peerImpl) ID() peerID { @@ -66,8 +67,8 @@ type gossip interface { } type adapterImpl struct { - gossip gossip - self *discovery.NetworkMember + gossip gossip + selfPKIid common.PKIidType incTime uint64 seqNum uint64 @@ -81,17 +82,17 @@ type adapterImpl struct { } // NewAdapter creates new leader election adapter -func NewAdapter(gossip gossip, self *discovery.NetworkMember, channel common.ChainID) LeaderElectionAdapter { +func NewAdapter(gossip gossip, pkiid common.PKIidType, channel common.ChainID) LeaderElectionAdapter { return &adapterImpl{ - gossip: gossip, - self: self, + gossip: gossip, + selfPKIid: pkiid, incTime: uint64(time.Now().UnixNano()), seqNum: uint64(0), channel: channel, - logger: logging.MustGetLogger("LeaderElectionAdapter"), + logger: util.GetLogger(util.LoggingElectionModule, ""), doneCh: make(chan struct{}), stopOnce: &sync.Once{}, @@ -134,7 +135,7 @@ func (ai *adapterImpl) CreateMessage(isDeclaration bool) Msg { seqNum := ai.seqNum leadershipMsg := &proto.LeadershipMessage{ - PkiID: ai.self.PKIid, + PkiID: ai.selfPKIid, IsDeclaration: isDeclaration, Timestamp: &proto.PeerTime{ IncNumber: ai.incTime, @@ -156,7 +157,7 @@ func (ai *adapterImpl) Peers() []Peer { var res []Peer for _, peer := range peers { - res = append(res, &peerImpl{&peer}) + res = append(res, &peerImpl{peer}) } return res diff --git a/gossip/election/adapter_test.go b/gossip/election/adapter_test.go index 17ed177d33c..836fec6d5af 100644 --- a/gossip/election/adapter_test.go +++ b/gossip/election/adapter_test.go @@ -41,7 +41,7 @@ func TestNewAdapter(t *testing.T) { peersCluster := newClusterOfPeers("0") peersCluster.addPeer("peer0", mockGossip) - NewAdapter(mockGossip, selfNetworkMember, []byte("channel0")) + NewAdapter(mockGossip, selfNetworkMember.PKIid, []byte("channel0")) } func TestAdapterImpl_CreateMessage(t *testing.T) { @@ -52,7 +52,7 @@ func TestAdapterImpl_CreateMessage(t *testing.T) { } mockGossip := newGossip("peer0", selfNetworkMember) - adapter := NewAdapter(mockGossip, selfNetworkMember, []byte("channel0")) + adapter := NewAdapter(mockGossip, selfNetworkMember.PKIid, []byte("channel0")) msg := adapter.CreateMessage(true) if !msg.(*msgImpl).msg.IsLeadershipMsg() { @@ -142,7 +142,7 @@ func TestAdapterImpl_Gossip(t *testing.T) { case msg := <-channels[fmt.Sprintf("Peer%d", 1)]: if !msg.IsDeclaration() { t.Error("Msg should be declaration") - } else if !bytes.Equal(msg.SenderID(), sender.self.PKIid) { + } else if !bytes.Equal(msg.SenderID(), sender.selfPKIid) { t.Error("Msg Sender is wrong") } else { totalMsg++ @@ -150,7 +150,7 @@ func TestAdapterImpl_Gossip(t *testing.T) { case msg := <-channels[fmt.Sprintf("Peer%d", 2)]: if !msg.IsDeclaration() { t.Error("Msg should be declaration") - } else if !bytes.Equal(msg.SenderID(), sender.self.PKIid) { + } else if !bytes.Equal(msg.SenderID(), sender.selfPKIid) { t.Error("Msg Sender is wrong") } else { totalMsg++ @@ -308,7 +308,7 @@ func createCluster(peers ...int) (*clusterOfPeers, map[string]*adapterImpl) { } mockGossip := newGossip(peerEndpoint, peerMember) - adapter := NewAdapter(mockGossip, peerMember, []byte("channel0")) + adapter := NewAdapter(mockGossip, peerMember.PKIid, []byte("channel0")) adapters[peerEndpoint] = adapter.(*adapterImpl) cluster.addPeer(peerEndpoint, mockGossip) } diff --git a/gossip/election/election.go b/gossip/election/election.go index 67f42115f4e..6ee7e1cfcac 100644 --- a/gossip/election/election.go +++ b/gossip/election/election.go @@ -183,8 +183,8 @@ func (le *leaderElectionSvcImpl) start() { } func (le *leaderElectionSvcImpl) handleMessages() { - le.logger.Info(le.id, ": Entering") - defer le.logger.Info(le.id, ": Exiting") + le.logger.Debug(le.id, ": Entering") + defer le.logger.Debug(le.id, ": Exiting") defer le.stopWG.Done() msgChan := le.adapter.Accept() for { @@ -270,14 +270,14 @@ func (le *leaderElectionSvcImpl) run() { } func (le *leaderElectionSvcImpl) leaderElection() { - le.logger.Info(le.id, ": Entering") - defer le.logger.Info(le.id, ": Exiting") + le.logger.Debug(le.id, ": Entering") + defer le.logger.Debug(le.id, ": Exiting") le.propose() le.waitForInterrupt(leaderElectionDuration) // If someone declared itself as a leader, give up // on trying to become a leader too if le.isLeaderExists() { - le.logger.Info(le.id, ": Some peer is already a leader") + le.logger.Debug(le.id, ": Some peer is already a leader") return } // Leader doesn't exist, let's see if there is a better candidate than us @@ -296,8 +296,8 @@ func (le *leaderElectionSvcImpl) leaderElection() { // propose sends a leadership proposal message to remote peers func (le *leaderElectionSvcImpl) propose() { - le.logger.Info(le.id, ": Entering") - le.logger.Info(le.id, ": Exiting") + le.logger.Debug(le.id, ": Entering") + le.logger.Debug(le.id, ": Exiting") leadershipProposal := le.adapter.CreateMessage(false) le.adapter.Gossip(leadershipProposal) } @@ -324,8 +324,8 @@ func (le *leaderElectionSvcImpl) leader() { // waitForMembershipStabilization waits for membership view to stabilize // or until a time limit expires, or until a peer declares itself as a leader func (le *leaderElectionSvcImpl) waitForMembershipStabilization(timeLimit time.Duration) { - le.logger.Info(le.id, ": Entering") - defer le.logger.Info(le.id, ": Exiting") + le.logger.Debug(le.id, ": Entering") + defer le.logger.Debug(le.id, ": Exiting, peers found", len(le.adapter.Peers())) endTime := time.Now().Add(timeLimit) viewSize := len(le.adapter.Peers()) for !le.shouldStop() { @@ -368,13 +368,13 @@ func (le *leaderElectionSvcImpl) IsLeader() bool { } func (le *leaderElectionSvcImpl) beLeader() { - le.logger.Info(le.id, ": Becoming a leader") + le.logger.Debug(le.id, ": Becoming a leader") atomic.StoreInt32(&le.isLeader, int32(1)) le.callback(true) } func (le *leaderElectionSvcImpl) stopBeingLeader() { - le.logger.Info(le.id, "Stopped being a leader") + le.logger.Debug(le.id, "Stopped being a leader") atomic.StoreInt32(&le.isLeader, int32(0)) le.callback(false) } @@ -385,8 +385,8 @@ func (le *leaderElectionSvcImpl) shouldStop() bool { // Stop stops the LeaderElectionService func (le *leaderElectionSvcImpl) Stop() { - le.logger.Info(le.id, ": Entering") - defer le.logger.Info(le.id, ": Exiting") + le.logger.Debug(le.id, ": Entering") + defer le.logger.Debug(le.id, ": Exiting") atomic.StoreInt32(&le.toDie, int32(1)) le.stopChan <- struct{}{} le.stopWG.Wait() diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index 77a7c763ad1..f05903dd2f6 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -531,8 +531,10 @@ func TestDissemination(t *testing.T) { leadershipChan, _ := peers[i-1].Accept(acceptLeadershp, false) go func(index int, ch <-chan *proto.GossipMessage) { defer wgLeadership.Done() - <-ch - receivedLeadershipMessages[index]++ + msg := <-ch + if bytes.Equal(msg.Channel, common.ChainID("A")) { + receivedLeadershipMessages[index]++ + } }(i-1, leadershipChan) } @@ -886,9 +888,6 @@ func createDataMsg(seqnum uint64, data []byte, hash string, channel common.Chain func createLeadershipMsg(isDeclaration bool, channel common.ChainID, incTime uint64, seqNum uint64, endpoint string, pkiid []byte) *proto.GossipMessage { - metadata := []byte{} - metadata = strconv.AppendBool(metadata, isDeclaration) - leadershipMsg := &proto.LeadershipMessage{ IsDeclaration: isDeclaration, PkiID: pkiid, diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index 8cbbae2ffa4..b896e6c9288 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -24,6 +24,7 @@ import ( "github.com/hyperledger/fabric/core/deliverservice" "github.com/hyperledger/fabric/gossip/api" gossipCommon "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/election" "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/identity" "github.com/hyperledger/fabric/gossip/integration" @@ -74,6 +75,7 @@ func (*deliveryFactoryImpl) Service(g GossipService) (deliverclient.DeliverServi type gossipServiceImpl struct { gossipSvc chains map[string]state.GossipStateProvider + leaderElection map[string]election.LeaderElectionService deliveryService deliverclient.DeliverService deliveryFactory DeliveryServiceFactory lock sync.RWMutex @@ -123,6 +125,7 @@ func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string } if viper.GetBool("peer.gossip.ignoreSecurity") { + logger.Info("This peer ignoring security in gossip") sec := &secImpl{[]byte(endpoint)} mcs = sec secAdv = sec @@ -136,6 +139,7 @@ func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string mcs: mcs, gossipSvc: gossip, chains: make(map[string]state.GossipStateProvider), + leaderElection: make(map[string]election.LeaderElectionService), deliveryFactory: factory, idMapper: idMapper, peerIdentity: peerIdentity, @@ -170,8 +174,26 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committe } if g.deliveryService != nil { - if err := g.deliveryService.JoinChain(chainID, committer); err != nil { - logger.Error("Delivery service is not able to join the chain, due to", err) + leaderElection := viper.GetBool("peer.gossip.useLeaderElection") + staticOrderConnection := viper.GetBool("peer.gossip.orgLeader") + + if leaderElection && staticOrderConnection { + msg := "Setting both orgLeader and useLeaderElection to true isn't supported, aborting execution" + logger.Panic(msg) + } else if leaderElection { + logger.Debug("Delivery uses dynamic leader election mechanism, channel", chainID) + connector := &leaderElectionDeliverConnector{ + deliverer: g.deliveryService, + committer: committer, + chainID: chainID, + } + electionService := g.newLeaderElectionComponent(gossipCommon.ChainID(connector.chainID), connector.leadershipStatusChange) + g.leaderElection[chainID] = electionService + } else if staticOrderConnection { + logger.Debug("This peer is configured to connect to ordering service for blocks delivery, channel", chainID) + g.deliveryService.StartDeliverForChannel(chainID, committer) + } else { + logger.Debug("This peer is not configured to connect to ordering service for blocks delivery, channel", chainID) } } else { logger.Warning("Delivery client is down won't be able to pull blocks for chain", chainID) @@ -225,12 +247,23 @@ func (g *gossipServiceImpl) Stop() { logger.Info("Stopping chain", ch) ch.Stop() } + + for chainID, electionService := range g.leaderElection { + logger.Info("Stopping leader election for %s", chainID) + electionService.Stop() + } g.gossipSvc.Stop() if g.deliveryService != nil { g.deliveryService.Stop() } } +func (g *gossipServiceImpl) newLeaderElectionComponent(channel gossipCommon.ChainID, callback func(bool)) election.LeaderElectionService { + PKIid := g.idMapper.GetPKIidOfCert(g.peerIdentity) + adapter := election.NewAdapter(g, PKIid, channel) + return election.NewLeaderElectionService(adapter, string(PKIid), callback) +} + func (g *gossipServiceImpl) amIinChannel(myOrg string, config Config) bool { for _, orgName := range orgListFromConfig(config) { if orgName == myOrg { @@ -279,3 +312,22 @@ func (s *secImpl) VerifyByChannel(chainID gossipCommon.ChainID, peerIdentity api func (s *secImpl) ValidateIdentity(peerIdentity api.PeerIdentityType) error { return nil } + +type leaderElectionDeliverConnector struct { + deliverer deliverclient.DeliverService + chainID string + committer committer.Committer +} + +func (ledc *leaderElectionDeliverConnector) leadershipStatusChange(isLeader bool) { + if isLeader { + if err := ledc.deliverer.StartDeliverForChannel(ledc.chainID, ledc.committer); err != nil { + logger.Error("Delivery service is not able to start blocks delivery for chain, due to", err) + } + } else { + if err := ledc.deliverer.StopDeliverForChannel(ledc.chainID); err != nil { + logger.Error("Delivery service is not able to stop blocks delivery for chain, due to", err) + } + + } +} diff --git a/gossip/service/gossip_service_test.go b/gossip/service/gossip_service_test.go index 42aff33683d..8b9537652df 100644 --- a/gossip/service/gossip_service_test.go +++ b/gossip/service/gossip_service_test.go @@ -22,13 +22,24 @@ import ( "sync" "testing" + "bytes" "time" mockpolicies "github.com/hyperledger/fabric/common/mocks/policies" + "github.com/hyperledger/fabric/core/deliverservice" + "github.com/hyperledger/fabric/core/deliverservice/blocksprovider" "github.com/hyperledger/fabric/gossip/api" + gossipCommon "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/election" + "github.com/hyperledger/fabric/gossip/gossip" + "github.com/hyperledger/fabric/gossip/identity" + "github.com/hyperledger/fabric/gossip/state" + "github.com/hyperledger/fabric/gossip/util" "github.com/hyperledger/fabric/msp/mgmt" "github.com/hyperledger/fabric/msp/mgmt/testtools" "github.com/hyperledger/fabric/peer/gossip/mcs" + "github.com/hyperledger/fabric/protos/common" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "google.golang.org/grpc" ) @@ -72,3 +83,577 @@ func TestInitGossipService(t *testing.T) { func TestJCMInterface(t *testing.T) { _ = api.JoinChannelMessage(&joinChannelMessage{}) } + +func TestLeaderElectionWithDeliverClient(t *testing.T) { + + //Test check if leader election works with mock deliver service instance + //Configuration set to use dynamic leader election + //10 peers started, added to channel and at the end we check if only for one peer + //mockDeliverService.StartDeliverForChannel was invoked + + viper.Set("peer.gossip.useLeaderElection", true) + viper.Set("peer.gossip.orgLeader", false) + + n := 10 + gossips := startPeers(t, n, 10000) + + channelName := "chanA" + peerIndexes := make([]int, n) + for i := 0; i < n; i++ { + peerIndexes[i] = i + } + addPeersToChannel(t, n, 10000, channelName, gossips, peerIndexes) + + waitForFullMembership(t, gossips, n, time.Second*30, time.Second*2) + + services := make([]*electionService, n) + + for i := 0; i < n; i++ { + deliverServiceFactory := &mockDeliverServiceFactory{ + service: &mockDeliverService{ + running: make(map[string]bool), + }, + } + gossips[i].(*gossipServiceImpl).deliveryFactory = deliverServiceFactory + deliverServiceFactory.service.running[channelName] = false + + gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0}) + service, exist := gossips[i].(*gossipServiceImpl).leaderElection[channelName] + assert.True(t, exist, "Leader election service should be created for peer %d and channel %s", i, channelName) + services[i] = &electionService{nil, false, 0} + services[i].LeaderElectionService = service + } + + assert.True(t, waitForLeaderElection(t, services, 0, time.Second*30, time.Second*2), "One leader (peer 0) should be selected") + + assert.True(t, gossips[0].(*gossipServiceImpl).deliveryService.(*mockDeliverService).running[channelName], "Delivery service should be started in peer %d", 0) + + for i := 1; i < n; i++ { + assert.False(t, gossips[i].(*gossipServiceImpl).deliveryService.(*mockDeliverService).running[channelName], "Delivery service should not be started in peer %d", i) + } + + stopPeers(gossips) +} + +func TestWithStaticDeliverClientLeader(t *testing.T) { + + //Tests check if static leader flag works ok. + //Leader election flag set to false, and static leader flag set to true + //Two gossip service instances (peers) created. + //Each peer is added to channel and should run mock delivery client + //After that each peer added to another client and it should run deliver client for this channel as well. + + viper.Set("peer.gossip.useLeaderElection", false) + viper.Set("peer.gossip.orgLeader", true) + + n := 2 + gossips := startPeers(t, n, 10000) + + channelName := "chanA" + peerIndexes := make([]int, n) + for i := 0; i < n; i++ { + peerIndexes[i] = i + } + + addPeersToChannel(t, n, 10000, channelName, gossips, peerIndexes) + + waitForFullMembership(t, gossips, n, time.Second*30, time.Second*2) + + deliverServiceFactory := &mockDeliverServiceFactory{ + service: &mockDeliverService{ + running: make(map[string]bool), + }, + } + + for i := 0; i < n; i++ { + gossips[i].(*gossipServiceImpl).deliveryFactory = deliverServiceFactory + deliverServiceFactory.service.running[channelName] = false + gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0}) + } + + for i := 0; i < n; i++ { + assert.NotNil(t, gossips[i].(*gossipServiceImpl).deliveryService, "Delivery service not initiated in peer %d", i) + assert.True(t, gossips[i].(*gossipServiceImpl).deliveryService.(*mockDeliverService).running[channelName], "Block deliverer not started for peer %d", i) + } + + channelName = "chanB" + for i := 0; i < n; i++ { + deliverServiceFactory.service.running[channelName] = false + gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0}) + } + + for i := 0; i < n; i++ { + assert.NotNil(t, gossips[i].(*gossipServiceImpl).deliveryService, "Delivery service not initiated in peer %d", i) + assert.True(t, gossips[i].(*gossipServiceImpl).deliveryService.(*mockDeliverService).running[channelName], "Block deliverer not started for peer %d", i) + } + + stopPeers(gossips) +} + +func TestWithStaticDeliverClientNotLeader(t *testing.T) { + viper.Set("peer.gossip.useLeaderElection", false) + viper.Set("peer.gossip.orgLeader", false) + + n := 2 + gossips := startPeers(t, n, 10000) + + channelName := "chanA" + peerIndexes := make([]int, n) + for i := 0; i < n; i++ { + peerIndexes[i] = i + } + + addPeersToChannel(t, n, 10000, channelName, gossips, peerIndexes) + + waitForFullMembership(t, gossips, n, time.Second*30, time.Second*2) + + deliverServiceFactory := &mockDeliverServiceFactory{ + service: &mockDeliverService{ + running: make(map[string]bool), + }, + } + + for i := 0; i < n; i++ { + gossips[i].(*gossipServiceImpl).deliveryFactory = deliverServiceFactory + deliverServiceFactory.service.running[channelName] = false + gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0}) + } + + for i := 0; i < n; i++ { + assert.NotNil(t, gossips[i].(*gossipServiceImpl).deliveryService, "Delivery service not initiated in peer %d", i) + assert.False(t, gossips[i].(*gossipServiceImpl).deliveryService.(*mockDeliverService).running[channelName], "Block deliverer should not be started for peer %d", i) + } + + stopPeers(gossips) +} + +func TestWithStaticDeliverClientBothStaticAndLeaderElection(t *testing.T) { + viper.Set("peer.gossip.useLeaderElection", true) + viper.Set("peer.gossip.orgLeader", true) + + n := 2 + gossips := startPeers(t, n, 10000) + + channelName := "chanA" + peerIndexes := make([]int, n) + for i := 0; i < n; i++ { + peerIndexes[i] = i + } + + addPeersToChannel(t, n, 10000, channelName, gossips, peerIndexes) + + waitForFullMembership(t, gossips, n, time.Second*30, time.Second*2) + + deliverServiceFactory := &mockDeliverServiceFactory{ + service: &mockDeliverService{ + running: make(map[string]bool), + }, + } + + for i := 0; i < n; i++ { + gossips[i].(*gossipServiceImpl).deliveryFactory = deliverServiceFactory + assert.Panics(t, func() { + gossips[i].InitializeChannel(channelName, &mockLedgerInfo{0}) + }, "Dynamic leader lection based and static connection to ordering service can't exist simultaniosly") + } + + stopPeers(gossips) +} + +type mockDeliverServiceFactory struct { + service *mockDeliverService +} + +func (mf *mockDeliverServiceFactory) Service(g GossipService) (deliverclient.DeliverService, error) { + return mf.service, nil +} + +type mockDeliverService struct { + running map[string]bool +} + +func (ds *mockDeliverService) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error { + ds.running[chainID] = true + return nil +} + +func (ds *mockDeliverService) StopDeliverForChannel(chainID string) error { + ds.running[chainID] = false + return nil +} + +func (ds *mockDeliverService) Stop() { +} + +type mockLedgerInfo struct { + Height uint64 +} + +// LedgerHeight returns mocked value to the ledger height +func (li *mockLedgerInfo) LedgerHeight() (uint64, error) { + return li.Height, nil +} + +// Commit block to the ledger +func (li *mockLedgerInfo) Commit(block *common.Block) error { + return nil +} + +// Gets blocks with sequence numbers provided in the slice +func (li *mockLedgerInfo) GetBlocks(blockSeqs []uint64) []*common.Block { + return nil +} + +// Closes committing service +func (li *mockLedgerInfo) Close() { +} + +func TestLeaderElectionWithRealGossip(t *testing.T) { + + // Spawn 10 gossip instances with single channel and inside same organization + // Run leader election on top of each gossip instance and check that only one leader chosen + // Create another channel includes sub-set of peers over same gossip instances {1,3,5,7} + // Run additional leader election services for new channel + // Check correct leader still exist for first channel and new correct leader chosen in second channel + // Stop gossip instances of leader peers for both channels and see that new leader chosen for both + + // Creating gossip service instances for peers + n := 10 + gossips := startPeers(t, n, 10000) + + // Joining all peers to first channel + channelName := "chanA" + peerIndexes := make([]int, n) + for i := 0; i < n; i++ { + peerIndexes[i] = i + } + addPeersToChannel(t, n, 10000, channelName, gossips, peerIndexes) + + waitForFullMembership(t, gossips, n, time.Second*30, time.Second*2) + + logger.Warning("Starting leader election services") + + //Stariting leader election services + services := make([]*electionService, n) + + for i := 0; i < n; i++ { + services[i] = &electionService{nil, false, 0} + services[i].LeaderElectionService = gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(gossipCommon.ChainID(channelName), services[i].callback) + } + + logger.Warning("Waiting for leader election") + + assert.True(t, waitForLeaderElection(t, services, 0, time.Second*30, time.Second*2), "One leader (peer 0) should be selected") + assert.True(t, services[0].callbackInvokeRes, "Callback func for peer 0 should be called (chanA)") + + for i := 1; i < n; i++ { + assert.False(t, services[i].callbackInvokeRes, "Callback func for peer %d should not be called (chanA)", i) + assert.False(t, services[i].IsLeader(), "Peer %d should not be leader in chanA", i) + } + + // Adding some peers to new channel and creating leader election services for peers in new channel + // Expecting peer 1 (first in list of election services) to become leader of second channel + secondChannelPeerIndexes := []int{1, 3, 5, 7} + secondChannelName := "chanB" + secondChannelServices := make([]*electionService, len(secondChannelPeerIndexes)) + addPeersToChannel(t, n, 10000, secondChannelName, gossips, secondChannelPeerIndexes) + + for idx, i := range secondChannelPeerIndexes { + secondChannelServices[idx] = &electionService{nil, false, 0} + secondChannelServices[idx].LeaderElectionService = gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(gossipCommon.ChainID(secondChannelName), secondChannelServices[idx].callback) + } + + assert.True(t, waitForLeaderElection(t, secondChannelServices, 0, time.Second*30, time.Second*2), "One leader (peer 1) should be selected") + assert.True(t, waitForLeaderElection(t, services, 0, time.Second*30, time.Second*2), "One leader (peer 0) should be selected") + + assert.True(t, services[0].callbackInvokeRes, "Callback func for peer 0 should be called (chanA)") + for i := 1; i < n; i++ { + assert.False(t, services[i].callbackInvokeRes, "Callback func for peer %d should not be called (chanA)", i) + assert.False(t, services[i].IsLeader(), "Peer %d should not be leader in chanA", i) + } + + assert.True(t, secondChannelServices[0].callbackInvokeRes, "Callback func for peer 1 should be called (chanB)") + for i := 1; i < len(secondChannelServices); i++ { + assert.False(t, secondChannelServices[i].callbackInvokeRes, "Callback func for peer %d should not be called (chanB)", secondChannelPeerIndexes[i]) + assert.False(t, secondChannelServices[i].IsLeader(), "Peer %d should not be leader in chanB", i) + } + + //Stopping 2 gossip instances(peer 0 and peer 1), should init re-election + //Now peer 2 become leader for first channel and peer 3 for second channel + stopPeers(gossips[:2]) + + assert.True(t, waitForLeaderElection(t, secondChannelServices[1:], 0, time.Second*30, time.Second*2), "One leader (peer 2) should be selected") + assert.True(t, waitForLeaderElection(t, services[2:], 0, time.Second*30, time.Second*2), "One leader (peer 3) should be selected") + + assert.True(t, services[2].callbackInvokeRes, "Callback func for peer 2 should be called (chanA)") + for i := 3; i < n; i++ { + assert.False(t, services[i].callbackInvokeRes, "Callback func for peer %d should not be called (chanA)", i) + assert.False(t, services[i].IsLeader(), "Peer %d should not be leader in chanA", i) + } + + assert.True(t, secondChannelServices[1].callbackInvokeRes, "Callback func for peer 3 should be called (chanB)") + for i := 2; i < len(secondChannelServices); i++ { + assert.False(t, secondChannelServices[i].callbackInvokeRes, "Callback func for peer %d should not be called (chanB)", secondChannelPeerIndexes[i]) + assert.False(t, secondChannelServices[i].IsLeader(), "Peer %d should not be leader in chanB", i) + } + + stopServices(secondChannelServices) + stopServices(services) + stopPeers(gossips[2:]) +} + +type electionService struct { + election.LeaderElectionService + callbackInvokeRes bool + callbackInvokeCount int +} + +func (es *electionService) callback(isLeader bool) { + es.callbackInvokeRes = isLeader + es.callbackInvokeCount = es.callbackInvokeCount + 1 +} + +type joinChanMsg struct { + anchorPeers []api.AnchorPeer +} + +// SequenceNumber returns the sequence number of the block this joinChanMsg +// is derived from +func (jmc *joinChanMsg) SequenceNumber() uint64 { + return uint64(time.Now().UnixNano()) +} + +// AnchorPeers returns all the anchor peers that are in the channel +func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer { + if len(jcm.anchorPeers) == 0 { + return []api.AnchorPeer{{OrgID: orgInChannelA}} + } + return jcm.anchorPeers +} + +func waitForFullMembership(t *testing.T, gossips []GossipService, peersNum int, timeout time.Duration, testPollInterval time.Duration) bool { + end := time.Now().Add(timeout) + var correctPeers int + for time.Now().Before(end) { + correctPeers = 0 + for _, g := range gossips { + if len(g.Peers()) == (peersNum - 1) { + correctPeers++ + } + } + if correctPeers == peersNum { + return true + } + time.Sleep(testPollInterval) + } + logger.Warningf("Only %d peers have full membership", correctPeers) + return false +} + +func waitForMultipleLeadersElection(t *testing.T, services []*electionService, leadersNum int, leaderIndexes []int, timeout time.Duration, testPollInterval time.Duration) bool { + end := time.Now().Add(timeout) + for time.Now().Before(end) { + leaders := 0 + incorrectLeaders := false + for i, s := range services { + if s.IsLeader() { + expectedLeader := false + for _, index := range leaderIndexes { + if i == index { + leaders++ + expectedLeader = true + } + } + if !expectedLeader { + logger.Warning("Incorrect peer", i, "become leader") + incorrectLeaders = true + } + } + } + if leaders == leadersNum && !incorrectLeaders { + return true + } + time.Sleep(testPollInterval) + } + return false +} + +func waitForLeaderElection(t *testing.T, services []*electionService, leaderIndex int, timeout time.Duration, testPollInterval time.Duration) bool { + return waitForMultipleLeadersElection(t, services, 1, []int{leaderIndex}, timeout, testPollInterval) +} + +func waitUntilOrFailBlocking(t *testing.T, f func(), timeout time.Duration) { + successChan := make(chan struct{}, 1) + go func() { + f() + successChan <- struct{}{} + }() + select { + case <-time.NewTimer(timeout).C: + break + case <-successChan: + return + } + util.PrintStackTrace() + assert.Fail(t, "Timeout expired!") +} + +func stopServices(services []*electionService) { + stoppingWg := sync.WaitGroup{} + stoppingWg.Add(len(services)) + for i, sI := range services { + go func(i int, s_i election.LeaderElectionService) { + defer stoppingWg.Done() + s_i.Stop() + }(i, sI) + } + stoppingWg.Wait() + time.Sleep(time.Second * time.Duration(2)) +} + +func stopPeers(peers []GossipService) { + stoppingWg := sync.WaitGroup{} + stoppingWg.Add(len(peers)) + for i, pI := range peers { + go func(i int, p_i GossipService) { + defer stoppingWg.Done() + p_i.Stop() + }(i, pI) + } + stoppingWg.Wait() + time.Sleep(time.Second * time.Duration(2)) +} + +func addPeersToChannel(t *testing.T, n int, portPrefix int, channel string, peers []GossipService, peerIndexes []int) { + jcm := &joinChanMsg{} + + wg := sync.WaitGroup{} + for _, i := range peerIndexes { + wg.Add(1) + go func(i int) { + peers[i].JoinChan(jcm, gossipCommon.ChainID(channel)) + peers[i].UpdateChannelMetadata([]byte("bla bla"), gossipCommon.ChainID(channel)) + wg.Done() + }(i) + } + waitUntilOrFailBlocking(t, wg.Wait, time.Second*10) +} + +func startPeers(t *testing.T, n int, portPrefix int) []GossipService { + + peers := make([]GossipService, n) + wg := sync.WaitGroup{} + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + + peers[i] = newGossipInstance(portPrefix, i, 100, 0, 1, 2, 3, 4, 5) + wg.Done() + }(i) + } + waitUntilOrFailBlocking(t, wg.Wait, time.Second*10) + + return peers +} + +func newGossipInstance(portPrefix int, id int, maxMsgCount int, boot ...int) GossipService { + port := id + portPrefix + conf := &gossip.Config{ + BindPort: port, + BootstrapPeers: bootPeers(portPrefix, boot...), + ID: fmt.Sprintf("p%d", id), + MaxBlockCountToStore: maxMsgCount, + MaxPropagationBurstLatency: time.Duration(500) * time.Millisecond, + MaxPropagationBurstSize: 20, + PropagateIterations: 1, + PropagatePeerNum: 3, + PullInterval: time.Duration(2) * time.Second, + PullPeerNum: 5, + InternalEndpoint: fmt.Sprintf("localhost:%d", port), + ExternalEndpoint: fmt.Sprintf("1.2.3.4:%d", port), + PublishCertPeriod: time.Duration(4) * time.Second, + PublishStateInfoInterval: time.Duration(1) * time.Second, + RequestStateInfoInterval: time.Duration(1) * time.Second, + } + cryptoService := &naiveCryptoService{} + idMapper := identity.NewIdentityMapper(cryptoService) + + gossip := gossip.NewGossipServiceWithServer(conf, &orgCryptoService{}, cryptoService, idMapper, api.PeerIdentityType(conf.InternalEndpoint)) + + gossipService := &gossipServiceImpl{ + gossipSvc: gossip, + chains: make(map[string]state.GossipStateProvider), + leaderElection: make(map[string]election.LeaderElectionService), + deliveryFactory: &deliveryFactoryImpl{}, + idMapper: idMapper, + peerIdentity: api.PeerIdentityType(conf.InternalEndpoint), + } + + return gossipService +} + +func bootPeers(portPrefix int, ids ...int) []string { + peers := []string{} + for _, id := range ids { + peers = append(peers, fmt.Sprintf("localhost:%d", (id+portPrefix))) + } + return peers +} + +type naiveCryptoService struct { +} + +type orgCryptoService struct { +} + +// OrgByPeerIdentity returns the OrgIdentityType +// of a given peer identity +func (*orgCryptoService) OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType { + return orgInChannelA +} + +// Verify verifies a JoinChanMessage, returns nil on success, +// and an error on failure +func (*orgCryptoService) Verify(joinChanMsg api.JoinChannelMessage) error { + return nil +} + +// VerifyByChannel verifies a peer's signature on a message in the context +// of a specific channel +func (*naiveCryptoService) VerifyByChannel(_ gossipCommon.ChainID, _ api.PeerIdentityType, _, _ []byte) error { + return nil +} + +func (*naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error { + return nil +} + +// GetPKIidOfCert returns the PKI-ID of a peer's identity +func (*naiveCryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) gossipCommon.PKIidType { + return gossipCommon.PKIidType(peerIdentity) +} + +// VerifyBlock returns nil if the block is properly signed, +// else returns error +func (*naiveCryptoService) VerifyBlock(chainID gossipCommon.ChainID, signedBlock api.SignedBlock) error { + return nil +} + +// Sign signs msg with this peer's signing key and outputs +// the signature if no error occurred. +func (*naiveCryptoService) Sign(msg []byte) ([]byte, error) { + return msg, nil +} + +// Verify checks that signature is a valid signature of message under a peer's verification key. +// If the verification succeeded, Verify returns nil meaning no error occurred. +// If peerCert is nil, then the signature is verified against this peer's verification key. +func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error { + equal := bytes.Equal(signature, message) + if !equal { + return fmt.Errorf("Wrong signature:%v, %v", signature, message) + } + return nil +} + +var orgInChannelA = api.OrgIdentityType("ORG1") diff --git a/peer/core.yaml b/peer/core.yaml index c4121c948e0..0d2426c418a 100644 --- a/peer/core.yaml +++ b/peer/core.yaml @@ -70,7 +70,11 @@ peer: # Gossip related configuration gossip: bootstrap: 127.0.0.1:7051 - # Is peer is its org leader and should pass blocks from orderer to other peers in org + # Use automatically chosen peer (high avalibility) to distribute blocks in channel or static one + # Setting this true and orgLeader true cause panic exit + useLeaderElection: false + # For debug - is peer is its org leader and should pass blocks from orderer to other peers in org + # Works only if useLeaderElection set to false orgLeader: true # ID of this instance endpoint: