diff --git a/core/committer/noopssinglechain/client.go b/core/committer/noopssinglechain/client.go index d715d33ee98..a1b6e3482ec 100644 --- a/core/committer/noopssinglechain/client.go +++ b/core/committer/noopssinglechain/client.go @@ -17,13 +17,13 @@ limitations under the License. package noopssinglechain import ( + "fmt" "time" "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/chaincode" "github.com/hyperledger/fabric/core/committer" "github.com/hyperledger/fabric/core/ledger/kvledger" - "github.com/hyperledger/fabric/core/util" "github.com/hyperledger/fabric/events/producer" "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/orderer" @@ -33,14 +33,10 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" - "fmt" - "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/peer" - "github.com/hyperledger/fabric/gossip/gossip" - "github.com/hyperledger/fabric/gossip/integration" gossip_proto "github.com/hyperledger/fabric/gossip/proto" - "github.com/hyperledger/fabric/gossip/state" + "github.com/hyperledger/fabric/gossip/service" pb "github.com/hyperledger/fabric/protos/peer" ) @@ -57,11 +53,9 @@ type DeliverService struct { client orderer.AtomicBroadcast_DeliverClient windowSize uint64 unAcknowledged uint64 - committer *committer.LedgerCommitter - stateProvider state.GossipStateProvider - gossip gossip.Gossip - conn *grpc.ClientConn + chainID string + conn *grpc.ClientConn } // StopDeliveryService sends stop to the delivery service reference @@ -73,25 +67,22 @@ func StopDeliveryService(service *DeliverService) { // NewDeliverService construction function to create and initilize // delivery service instance -func NewDeliverService(chainID string, address string, grpcServer *grpc.Server) *DeliverService { +func NewDeliverService(chainID string) *DeliverService { if viper.GetBool("peer.committer.enabled") { logger.Infof("Creating committer for single noops endorser") - deliverService := &DeliverService{ // Instance of RawLedger - committer: committer.NewLedgerCommitter(kvledger.GetLedger(chainID)), + chainID: chainID, windowSize: 10, } - deliverService.initStateProvider(address, grpcServer) - return deliverService } logger.Infof("Committer disabled") return nil } -func (d *DeliverService) startDeliver() error { +func (d *DeliverService) startDeliver(committer committer.Committer) error { logger.Info("Starting deliver service client") err := d.initDeliver() @@ -100,7 +91,7 @@ func (d *DeliverService) startDeliver() error { return err } - height, err := d.committer.LedgerHeight() + height, err := committer.LedgerHeight() if err != nil { logger.Errorf("Can't get legder height from committer [%s]", err) return err @@ -153,36 +144,24 @@ func (d *DeliverService) stopDeliver() { } } -func (d *DeliverService) initStateProvider(address string, grpcServer *grpc.Server) error { - bootstrap := viper.GetStringSlice("peer.gossip.bootstrap") - logger.Debug("Initializing state provideer, endpoint = ", address, " bootstrap set = ", bootstrap) - - gossip, gossipComm := integration.NewGossipComponent(address, grpcServer, bootstrap...) - - d.gossip = gossip - d.stateProvider = state.NewGossipStateProvider(gossip, gossipComm, d.committer) - return nil -} - -// Start the delivery service to read the block via delivery -// protocol from the orderers -func (d *DeliverService) Start() { - go d.checkLeaderAndRunDeliver() -} - // Stop all service and release resources func (d *DeliverService) Stop() { d.stopDeliver() - d.stateProvider.Stop() - d.gossip.Stop() } -func (d *DeliverService) checkLeaderAndRunDeliver() { +func (d *DeliverService) JoinChannel(committer committer.Committer, configBlock *common.Block) { + if err := service.GetGossipService().JoinChannel(committer, configBlock); err != nil { + panic("Cannot join channel, exiting") + } + + go d.checkLeaderAndRunDeliver(committer) +} +func (d *DeliverService) checkLeaderAndRunDeliver(committer committer.Committer) { isLeader := viper.GetBool("peer.gossip.orgLeader") if isLeader { - d.startDeliver() + d.startDeliver(committer) } } @@ -192,7 +171,7 @@ func (d *DeliverService) seekOldest() error { Seek: &orderer.SeekInfo{ Start: orderer.SeekInfo_OLDEST, WindowSize: d.windowSize, - ChainID: util.GetTestChainID(), + ChainID: d.chainID, }, }, }) @@ -205,7 +184,7 @@ func (d *DeliverService) seekLatestFromCommitter(height uint64) error { Start: orderer.SeekInfo_SPECIFIED, WindowSize: d.windowSize, SpecifiedNumber: height, - ChainID: util.GetTestChainID(), + ChainID: d.chainID, }, }, }) @@ -317,17 +296,17 @@ func (d *DeliverService) readUntilClose() { } } - numberOfPeers := len(d.gossip.GetPeers()) + numberOfPeers := len(service.GetGossipService().GetPeers()) // Create payload with a block received payload := createPayload(seqNum, block) // Use payload to create gossip message gossipMsg := createGossipMsg(payload) logger.Debugf("Adding payload locally, buffer seqNum = [%d], peers number [%d]", seqNum, numberOfPeers) // Add payload to local state payloads buffer - d.stateProvider.AddPayload(payload) + service.GetGossipService().AddPayload(d.chainID, payload) // Gossip messages with other nodes logger.Debugf("Gossiping block [%d], peers number [%d]", seqNum, numberOfPeers) - d.gossip.Gossip(gossipMsg) + service.GetGossipService().Gossip(gossipMsg) if err = producer.SendProducerBlockEvent(block); err != nil { logger.Errorf("Error sending block event %s", err) } diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go new file mode 100644 index 00000000000..27010e20f88 --- /dev/null +++ b/gossip/service/gossip_service.go @@ -0,0 +1,145 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "sync" + "fmt" + + pb "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/core/committer" + "github.com/hyperledger/fabric/gossip/comm" + gossipCommon "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/gossip" + "github.com/hyperledger/fabric/gossip/integration" + "github.com/hyperledger/fabric/gossip/proto" + "github.com/hyperledger/fabric/gossip/state" + "github.com/hyperledger/fabric/protos/common" + "google.golang.org/grpc" +) + +var ( + gossipServiceInstance *gossipServiceImpl + once sync.Once +) + +// GossipService encapsulates gossip and state capabilities into single interface +type GossipService interface { + gossip.Gossip + + // JoinChannel joins new chain given the configuration block and initialized committer service + JoinChannel(committer committer.Committer, block *common.Block) error + // GetBlock returns block for given chain + GetBlock(chainID string, index uint64) *common.Block + // AddPayload appends message payload to for given chain + AddPayload(chainID string, payload *proto.Payload) error +} + +type gossipServiceImpl struct { + comm comm.Comm + gossip gossip.Gossip + chains map[string]state.GossipStateProvider + lock sync.RWMutex +} + +// InitGossipService initialize gossip service +func InitGossipService(endpoint string, s *grpc.Server, bootPeers ...string) { + once.Do(func() { + gossip, communication := integration.NewGossipComponent(endpoint, s, bootPeers...) + gossipServiceInstance = &gossipServiceImpl{ + gossip: gossip, + comm: communication, + chains: make(map[string]state.GossipStateProvider), + } + }) +} + +// GetGossipService returns an instance of gossip service +func GetGossipService() GossipService { + return gossipServiceInstance +} + +// JoinChannel joins the channel and initialize gossip state with given committer +func (g *gossipServiceImpl) JoinChannel(commiter committer.Committer, block *common.Block) error { + g.lock.Lock() + defer g.lock.Unlock() + + if block.Data == nil || block.Data.Data == nil || len(block.Data.Data) == 0 { + return fmt.Errorf("Cannot join channel, configuration block is empty") + } + + envelope := &common.Envelope{} + if err := pb.Unmarshal(block.Data.Data[0], envelope); err != nil { + return err + } + + payload := &common.Payload{} + if err := pb.Unmarshal(envelope.Payload, payload); err != nil { + return err + } + + chainID := payload.Header.ChainHeader.ChainID + if len(chainID) == 0 { + return fmt.Errorf("Cannot join channel, with empty chainID") + } + // Initialize new state provider for given committer + g.chains[chainID] = state.NewGossipStateProvider(g.gossip, g.comm, commiter) + return nil +} + +// GetPeers returns a mapping of endpoint --> []discovery.NetworkMember +func (g *gossipServiceImpl) GetPeers() []discovery.NetworkMember { + return g.gossip.GetPeers() +} + +// UpdateMetadata updates the self metadata of the discovery layer +func (g *gossipServiceImpl) UpdateMetadata(data []byte) { + g.gossip.UpdateMetadata(data) +} + +// Gossip sends a message to other peers to the network +func (g *gossipServiceImpl) Gossip(msg *proto.GossipMessage) { + g.gossip.Gossip(msg) +} + +// Accept returns a channel that outputs messages from other peers +func (g *gossipServiceImpl) Accept(acceptor gossipCommon.MessageAcceptor) <-chan *proto.GossipMessage { + return g.gossip.Accept(acceptor) +} + +// GetBlock returns block for given chain +func (g *gossipServiceImpl) GetBlock(chainID string, index uint64) *common.Block { + g.lock.RLock() + defer g.lock.RUnlock() + return g.chains[chainID].GetBlock(index) +} + +// AddPayload appends message payload to for given chain +func (g *gossipServiceImpl) AddPayload(chainID string, payload *proto.Payload) error { + g.lock.RLock() + defer g.lock.RUnlock() + return g.chains[chainID].AddPayload(payload) +} + +// Stop stops the gossip component +func (g *gossipServiceImpl) Stop() { + for _, ch := range g.chains { + ch.Stop() + } + g.gossip.Stop() +} diff --git a/gossip/service/gossip_service_test.go b/gossip/service/gossip_service_test.go new file mode 100644 index 00000000000..b40b08fb7f0 --- /dev/null +++ b/gossip/service/gossip_service_test.go @@ -0,0 +1,56 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "fmt" + "net" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" +) + +func TestInitGossipService(t *testing.T) { + // Test whenever gossip service is indeed singleton + grpcServer := grpc.NewServer() + socket, error := net.Listen("tcp", fmt.Sprintf("%s:%d", "", 5611)) + assert.NoError(t, error) + + go grpcServer.Serve(socket) + defer grpcServer.Stop() + + wg := sync.WaitGroup{} + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + InitGossipService("localhost:5611", grpcServer) + wg.Done() + }() + } + wg.Wait() + + defer GetGossipService().Stop() + gossip := GetGossipService() + + for i := 0; i < 10; i++ { + go func(gossipInstance GossipService) { + assert.Equal(t, gossip, GetGossipService()) + }(gossip) + } +} diff --git a/peer/common/ordererclient.go b/peer/common/ordererclient.go index fb941e3491c..829e041f4b8 100644 --- a/peer/common/ordererclient.go +++ b/peer/common/ordererclient.go @@ -56,12 +56,12 @@ func GetBroadcastClient() (BroadcastClient, error) { conn, err := grpc.Dial(orderer, opts...) if err != nil { - return nil, fmt.Errorf("Error connecting: %s", err) + return nil, fmt.Errorf("Error connecting to %s due to %s", orderer, err) } client, err := ab.NewAtomicBroadcastClient(conn).Broadcast(context.TODO()) if err != nil { conn.Close() - return nil, fmt.Errorf("Error connecting: %s", err) + return nil, fmt.Errorf("Error connecting to %s due to %s", orderer, err) } return &broadcastClient{conn: conn, client: client}, nil diff --git a/peer/node/start.go b/peer/node/start.go index 6dd5dbc2e5f..42b97d3b766 100755 --- a/peer/node/start.go +++ b/peer/node/start.go @@ -31,15 +31,17 @@ import ( "github.com/hyperledger/fabric/core" "github.com/hyperledger/fabric/core/chaincode" "github.com/hyperledger/fabric/core/comm" + "github.com/hyperledger/fabric/core/committer" "github.com/hyperledger/fabric/core/committer/noopssinglechain" "github.com/hyperledger/fabric/core/endorser" "github.com/hyperledger/fabric/core/ledger/kvledger" "github.com/hyperledger/fabric/core/peer" "github.com/hyperledger/fabric/core/util" "github.com/hyperledger/fabric/events/producer" + "github.com/hyperledger/fabric/gossip/service" + "github.com/hyperledger/fabric/orderer/common/bootstrap/static" "github.com/hyperledger/fabric/peer/common" pb "github.com/hyperledger/fabric/protos/peer" - "github.com/spf13/cobra" "github.com/spf13/viper" "google.golang.org/grpc" @@ -163,14 +165,24 @@ func serve(args []string) error { //create the default chain (pending join) createChain(chainID) + // Initialize gossip component + bootstrap := viper.GetStringSlice("peer.gossip.bootstrap") + service.InitGossipService(peerEndpoint.Address, grpcServer, bootstrap...) + defer service.GetGossipService().Stop() + //this shoul not need the chainID. Delivery should be //split up into network part and chain part. This should //only init the network part...TBD, part of Join work - deliverService := noopssinglechain.NewDeliverService(chainID, peerEndpoint.Address, grpcServer) + deliverService := noopssinglechain.NewDeliverService(chainID) + commit := committer.NewLedgerCommitter(kvledger.GetLedger(chainID)) + + // TODO: Should real configuration block + block, err := static.New().GenesisBlock() - if deliverService != nil { - deliverService.Start() + if nil != err { + panic(fmt.Sprintf("Unable to create genesis block for [%s] due to [%s]", chainID, err)) } + deliverService.JoinChannel(commit, block) defer noopssinglechain.StopDeliveryService(deliverService)