From b3b4e54dd9ada5905de1d6959056498957ae82d5 Mon Sep 17 00:00:00 2001 From: Artem Barger Date: Tue, 15 Nov 2016 11:51:15 -0500 Subject: [PATCH] [FAB-773] gossip state transfer, block re-ordering Change-Id: Ie14804b54e645f25f0e3d95aabdca670679a53dd Signed-off-by: Artem Barger --- gossip/state/state.go | 400 +++++++++++++++++++++++++++++++++++++ gossip/state/state_test.go | 349 ++++++++++++++++++++++++++++++++ 2 files changed, 749 insertions(+) create mode 100644 gossip/state/state.go create mode 100644 gossip/state/state_test.go diff --git a/gossip/state/state.go b/gossip/state/state.go new file mode 100644 index 00000000000..a11eb195804 --- /dev/null +++ b/gossip/state/state.go @@ -0,0 +1,400 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "github.com/hyperledger/fabric/gossip/gossip" + "github.com/hyperledger/fabric/gossip/proto" + pb "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/gossip/comm" + "github.com/op/go-logging" + "sync" + "sync/atomic" + "time" + "math/rand" + "github.com/hyperledger/fabric/protos/peer" + "github.com/hyperledger/fabric/core/committer" +) + +// GossipStateProvider is the interface to acquire sequences of the ledger blocks +// capable to full fill missing blocks by running state replication and +// sending request to get missing block to other nodes +type GossipStateProvider interface { + + // Retrieve block with sequence number equal to index + GetBlock(index uint64) *peer.Block2 + + AddPayload(payload *proto.Payload) error + + // Stop terminates state transfer object + Stop() +} + +var logFormat = logging.MustStringFormatter( + `%{color}%{level} %{longfunc}():%{color:reset}(%{module})%{message}`, +) + +const ( + defPollingPeriod = 200 * time.Millisecond + defAntiEntropyInterval = 10 * time.Second +) + +// GossipStateProviderImpl the implementation of the GossipStateProvider interface +// the struct to handle in memory sliding window of +// new ledger block to be acquired by hyper ledger +type GossipStateProviderImpl struct { + // The gossiping service + gossip gossip.Gossip; + + // Channel to read gossip messages from + gossipChan <- chan *proto.GossipMessage; + + commChan <- chan comm.ReceivedMessage; + + // Flag which signals for termination + stopFlag int32; + + mutex sync.RWMutex; + + // Queue of payloads which wasn't acquired yet + payloads PayloadsBuffer; + + comm comm.Comm; + + committer committer.Committer; + + logger *logging.Logger; + + done sync.WaitGroup; +} + +// NewGossipStateProvider creates initialized instance of gossip state provider +func NewGossipStateProvider(g gossip.Gossip, c comm.Comm, committer committer.Committer) GossipStateProvider { + logger, _ := logging.GetLogger("GossipStateProvider") + + gossipChan := g.Accept(func(message interface{}) bool { + // Get only data messages + return message.(*proto.GossipMessage).GetDataMsg() != nil + }) + + // Filter message which are only relevant for state transfer + commChan := c.Accept(func(message interface{}) bool { + return message.(comm.ReceivedMessage).GetGossipMessage().GetStateRequest() != nil || + message.(comm.ReceivedMessage).GetGossipMessage().GetStateResponse() != nil + }) + + height, err := committer.LedgerHeight() + + if err != nil { + logger.Error("Could not read ledger info to obtain current ledger height due to: ", err) + // Exiting as without ledger it will be impossible + // to deliver new blocks + return nil + } + + s := &GossipStateProviderImpl{ + // Instance of the gossip + gossip : g, + + // Channel to read new messages from + gossipChan: gossipChan, + + // Channel to read direct messages from other peers + commChan: commChan, + + stopFlag: 0, + // Create a queue for payload received + payloads: NewPayloadsBuffer(height + 1), + + comm : c, + + committer: committer, + + logger: logger, + } + + logging.SetFormatter(logFormat) + + state := NewNodeMetastate(height) + + s.logger.Infof("Updating node metadata information, current ledger sequence is at = %d, next expected block is = %d", state.LedgerHeight, s.payloads.Next()) + bytes, err := state.Bytes() + if err == nil { + g.UpdateMetadata(bytes) + } else { + s.logger.Errorf("Unable to serialize node meta state, error = %s", err) + } + + s.done.Add(3) + + // Listen for incoming communication + go s.listen() + // Deliver in order messages into the incoming channel + go s.deliverPayloads() + // Execute anti entropy to fill missing gaps + go s.antiEntropy() + + return s +} + +func (s *GossipStateProviderImpl) listen() { + for !s.isDone() { + // Do not block on waiting message from channel + // check each 500ms whenever is done indicates to + // finish + next: + select { + case msg := <-s.gossipChan: + { + s.logger.Debug("Received new message via gossip channel") + go s.queueNewMessage(msg) + } + case msg := <-s.commChan: + { + s.logger.Debug("Direct message ", msg) + go s.directMessage(msg) + } + case <-time.After(defPollingPeriod): + break next + } + } + s.logger.Debug("[XXX]: Stop listening for new messages") + s.done.Done() +} + +func (s *GossipStateProviderImpl) directMessage(msg comm.ReceivedMessage) { + s.logger.Debugf("[ENTER] -> directMessage, ", msg) + defer s.logger.Debug("[EXIT] -> directMessage") + + if msg == nil { + s.logger.Error("Got nil message via end-to-end channel, should not happen!") + return + } + + incoming := msg.GetGossipMessage() + + if incoming.GetStateRequest() != nil { + s.handleStateRequest(msg) + } else if incoming.GetStateResponse() != nil { + s.handleStateResponse(msg) + } +} + +func (s *GossipStateProviderImpl) handleStateRequest(msg comm.ReceivedMessage) { + request := msg.GetGossipMessage().GetStateRequest() + response := &proto.RemoteStateResponse{Payloads:make([]*proto.Payload, 0)} + for _, seqNum := range request.SeqNums { + s.logger.Debug("Reading block ", seqNum, " from the committer service") + blocks := s.committer.GetBlocks([]uint64{seqNum}) + + if blocks == nil || len(blocks) < 1 { + s.logger.Errorf("Wasn't able to read block with sequence number %d from ledger, skipping....", seqNum) + continue + } + + blockBytes, err := pb.Marshal(blocks[0]) + if err != nil { + s.logger.Errorf("Could not marshal block: %s", err) + } + + if err != nil { + s.logger.Errorf("Could not calculate hash of block: %s", err) + } + + response.Payloads = append(response.Payloads, &proto.Payload{ + SeqNum: seqNum, + Data: blockBytes, + // TODO: Check hash generation for given block from the ledger + Hash: "", + }) + } + // Sending back response with missing blocks + msg.Respond(&proto.GossipMessage{ + Content: &proto.GossipMessage_StateResponse{response}, + }) +} + +func (s *GossipStateProviderImpl) handleStateResponse(msg comm.ReceivedMessage) { + response := msg.GetGossipMessage().GetStateResponse() + for _, payload := range response.GetPayloads() { + s.logger.Debugf("Received payload with sequence number %d.", payload.SeqNum) + err := s.payloads.Push(payload) + if err != nil { + s.logger.Warningf("Payload with sequence number %d was received earlier", payload.SeqNum) + } + } +} + +// Internal function to check whenever we need to finish listening +// for new messages to arrive +func (s *GossipStateProviderImpl) isDone() bool { + return atomic.LoadInt32(&s.stopFlag) == 1 +} + +// Stop function send halting signal to all go routines +func (s *GossipStateProviderImpl) Stop() { + atomic.StoreInt32(&s.stopFlag, 1) + s.done.Wait() +} + +// New message notification/handler +func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage) { + dataMsg := msg.GetDataMsg() + if (dataMsg != nil) { + // Add new payload to ordered set + s.logger.Debugf("Received new payload with sequence number = [%d]", dataMsg.Payload.SeqNum) + s.payloads.Push(dataMsg.GetPayload()) + } else { + s.logger.Debug("Gossip message received is not of data message type, usually this should not happen.") + } +} + +func (s *GossipStateProviderImpl) deliverPayloads() { + for !s.isDone() { + next: + select { + // Wait for notification that next seq has arrived + case <-s.payloads.Ready(): + { + s.logger.Debugf("Ready to transfer payloads to the ledger, next sequence number is = [%d]", s.payloads.Next()) + // Collect all subsequent payloads + for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() { + rawblock := &peer.Block2{} + if err := pb.Unmarshal(payload.Data, rawblock); err != nil { + s.logger.Errorf("Error getting block with seqNum = %d due to (%s)...dropping block\n", payload.SeqNum, err) + continue + } + s.logger.Debug("New block with sequence number ", payload.SeqNum, " is ", rawblock) + + s.commitBlock(rawblock, payload.SeqNum) + } + } + case <-time.After(defPollingPeriod): + { + break next + } + } + } + s.logger.Debug("State provider has been stoped, finishing to push new blocks.") + s.done.Done() +} + +func (s *GossipStateProviderImpl) antiEntropy() { + checkPoint := time.Now() + for (!s.isDone()) { + time.Sleep(defPollingPeriod) + if time.Since(checkPoint).Nanoseconds() <= defAntiEntropyInterval.Nanoseconds() { + continue + } + checkPoint = time.Now() + + current, _ := s.committer.LedgerHeight() + max, _ := s.committer.LedgerHeight() + + for _, p := range s.gossip.GetPeers() { + if state, err:= FromBytes(p.Metadata); err == nil { + if max < state.LedgerHeight { + max = state.LedgerHeight + } + } + } + + if current == max { + // No messages in the buffer or there are no gaps + s.logger.Debugf("Current ledger height is the same as ledger height on other peers.") + continue + } + + s.logger.Debugf("Requesting new blocks in range [%d...%d].", current + 1, max) + s.requestBlocksInRange(uint64(current + 1), uint64(max)) + } + s.logger.Debug("[XXX]: Stateprovider stopped, stoping anti entropy procedure.") + s.done.Done() +} + +// GetBlocksInRange capable to acquire blocks with sequence +// numbers in the range [start...end]. +func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64) { + var peers []*comm.RemotePeer + // Filtering peers which might have relevant blocks + for _, value := range s.gossip.GetPeers() { + nodeMetadata, err := FromBytes(value.Metadata) + if err == nil { + if nodeMetadata.LedgerHeight >= end { + peers = append(peers, &comm.RemotePeer{Endpoint: value.Endpoint, PKIID:value.PKIid}) + } + } else { + s.logger.Errorf("Unable to de-serialize node meta state, error = %s", err) + } + } + + n := len(peers) + if (n == 0) { + s.logger.Warningf("There is not peer nodes to ask for missing blocks in range [%d, %d)", start, end) + return + } + // Select peers to ask for blocks + peer := peers[rand.Intn(n)] + s.logger.Infof("State transfer, with peer %s, the min available sequence number %d next block %d", peer.Endpoint, start, end) + + request := &proto.RemoteStateRequest{ + SeqNums: make([]uint64, 0), + } + + for i := start; i <= end; i++ { + request.SeqNums = append(request.SeqNums, uint64(i)); + } + + s.logger.Debug("[$$$$$$$$$$$$$$$$]: Sending direct request to complete missing blocks, ", request) + s.comm.Send(&proto.GossipMessage{ + Content: &proto.GossipMessage_StateRequest{request}, + }, peer) +} + +func (s *GossipStateProviderImpl) GetBlock(index uint64) *peer.Block2 { + // Try to read missing block from the ledger, should return no nil with + // content including at least one block + if blocks := s.committer.GetBlocks([]uint64{index}); blocks != nil && len(blocks) > 0 { + return blocks[0] + } + + return nil +} + +func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error { + return s.payloads.Push(payload) +} + +func (s *GossipStateProviderImpl) commitBlock(block *peer.Block2, seqNum uint64) error { + if err := s.committer.CommitBlock(block); err != nil { + s.logger.Errorf("Got error while committing(%s)\n", err) + return err + } + + // Update ledger level within node metadata + state := NewNodeMetastate(seqNum) + // Decode state to byte array + bytes, err := state.Bytes() + if err == nil { + s.gossip.UpdateMetadata(bytes) + } else { + s.logger.Errorf("Unable to serialize node meta state, error = %s", err) + } + + s.logger.Debug("[XXX]: Commit success, created a block!") + return nil +} \ No newline at end of file diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go new file mode 100644 index 00000000000..d778b181ede --- /dev/null +++ b/gossip/state/state_test.go @@ -0,0 +1,349 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "testing" + "fmt" + "time" + "github.com/hyperledger/fabric/gossip/gossip" + "github.com/hyperledger/fabric/gossip/comm" + "bytes" + "github.com/hyperledger/fabric/gossip/proto" + "github.com/hyperledger/fabric/core/committer" + "github.com/hyperledger/fabric/core/ledger/kvledger" + "os" + "strconv" + "github.com/stretchr/testify/assert" + "github.com/hyperledger/fabric/protos/peer" + pb "github.com/golang/protobuf/proto" + "github.com/op/go-logging" +) + +var ( + portPrefix = 5610 + logger, _ = logging.GetLogger("GossipStateProviderTest") +) + +type naiveCryptoService struct { +} + +func (*naiveCryptoService) ValidateAliveMsg(am *proto.AliveMessage) bool { + return true +} + +func (*naiveCryptoService) SignMessage(am *proto.AliveMessage) *proto.AliveMessage { + return am +} + +func (*naiveCryptoService) IsEnabled() bool { + return true +} + +func (*naiveCryptoService) Sign(msg []byte) ([]byte, error) { + return msg, nil +} + +func (*naiveCryptoService) Verify(vkID, signature, message []byte) error { + if bytes.Equal(signature, message) { + return nil + } + return fmt.Errorf("Failed verifying") +} + +func bootPeers(ids ...int) []string { + peers := []string{} + for _, id := range ids { + peers = append(peers, fmt.Sprintf("localhost:%d", (id + portPrefix))) + } + return peers +} + +// Simple presentation of peer which includes only +// communication module, gossip and state transfer +type peerNode struct { + c comm.Comm + g gossip.Gossip + s GossipStateProvider + + commit committer.Committer +} + +// Shutting down all modules used +func (node *peerNode) shutdown() { + node.s.Stop() + node.c.Stop() + node.g.Stop() +} + +// Default configuration to be used for gossip and communication modules +func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config { + port := id + portPrefix + return &gossip.Config{ + BindPort: port, + BootstrapPeers: bootPeers(boot...), + ID: fmt.Sprintf("p%d", id), + MaxMessageCountToStore: maxMsgCount, + MaxPropagationBurstLatency: time.Duration(10) * time.Millisecond, + MaxPropagationBurstSize: 10, + PropagateIterations: 1, + PropagatePeerNum: 3, + PullInterval: time.Duration(4) * time.Second, + PullPeerNum: 5, + SelfEndpoint: fmt.Sprintf("localhost:%d", port), + } +} + +// Create gossip instance +func newGossipInstance(config *gossip.Config, comm comm.Comm) gossip.Gossip { + return gossip.NewGossipService(config, comm, &naiveCryptoService{}) +} + +// Setup and create basic communication module +// need to be used for peer-to-peer communication +// between peers and state transfer +func newCommInstance(config *gossip.Config) comm.Comm { + comm, err := comm.NewCommInstanceWithServer(config.BindPort, &naiveCryptoService{}, []byte(config.SelfEndpoint)) + if err != nil { + panic(err) + } + + return comm +} + +// Create new instance of KVLedger to be used for testing +func newCommitter(id int, basePath string) committer.Committer { + conf := kvledger.NewConf(basePath + strconv.Itoa(id), 0) + ledger, _ := kvledger.NewKVLedger(conf) + return committer.NewLedgerCommitter(ledger) +} + +// Constructing pseudo peer node, simulating only gossip and state transfer part +func newPeerNode(config *gossip.Config, committer committer.Committer) *peerNode { + + // Create communication module instance + comm := newCommInstance(config) + // Gossip component based on configuration provided and communication module + gossip := newGossipInstance(config, comm) + + // Initialize pseudo peer simulator, which has only three + // basic parts + return &peerNode{ + c: comm, + g: gossip, + s: NewGossipStateProvider(gossip, comm, committer), + + commit: committer, + } +} + +func createDataMsg(seqnum uint64, data []byte, hash string) *proto.GossipMessage { + return &proto.GossipMessage{ + Nonce: 0, + Content: &proto.GossipMessage_DataMsg{ + DataMsg: &proto.DataMessage{ + Payload: &proto.Payload{ + Data: data, + Hash: hash, + SeqNum: seqnum, + }, + }, + }, + } +} + +// Simple scenario to start first booting node, gossip a message +// then start second node and verify second node also receives it +func TestNewGossipStateProvider_GossipingOneMessage(t *testing.T) { + bootId := 0 + ledgerPath := "/tmp/tests/ledger/" + defer os.RemoveAll(ledgerPath) + + bootNodeCommitter := newCommitter(bootId, ledgerPath + "node/") + defer bootNodeCommitter.Close() + + bootNode := newPeerNode(newGossipConfig(bootId, 100), bootNodeCommitter) + defer bootNode.shutdown() + + rawblock := &peer.Block2{} + if err := pb.Unmarshal([]byte{}, rawblock); err != nil { + t.Fail() + } + + if bytes, err := pb.Marshal(rawblock); err == nil { + payload := &proto.Payload{1, "", bytes} + bootNode.s.AddPayload(payload) + } else { + t.Fail() + } + + waitUntilTrueOrTimeout(t, func() bool { + if block := bootNode.s.GetBlock(uint64(1)); block != nil { + return true + } + return false + }, 5 * time.Second) + + bootNode.g.Gossip(createDataMsg(uint64(1), []byte{}, "")) + + peerCommitter := newCommitter(1, ledgerPath + "node/") + defer peerCommitter.Close() + + peer := newPeerNode(newGossipConfig(1, 100, bootId), peerCommitter) + defer peer.shutdown() + + ready := make(chan interface{}) + + go func(p *peerNode) { + for len(p.g.GetPeers()) != 1 { + time.Sleep(100 * time.Millisecond) + } + ready <- struct{}{} + }(peer) + + select { + case <-ready: + { + break + } + case <-time.After(1 * time.Second): + { + t.Fail() + } + } + + // Let sure anti-entropy will have a chance to bring missing block + waitUntilTrueOrTimeout(t, func() bool { + if block := peer.s.GetBlock(uint64(1)); block != nil { + return true + } + return false + }, defAntiEntropyInterval + 1 * time.Second) + + block := peer.s.GetBlock(uint64(1)) + + assert.NotNil(t, block) +} + +func TestNewGossipStateProvider_RepeatGossipingOneMessage(t *testing.T) { + for i := 0; i < 10; i++ { + TestNewGossipStateProvider_GossipingOneMessage(t) + } +} + +func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) { + ledgerPath := "/tmp/tests/ledger/" + defer os.RemoveAll(ledgerPath) + + bootstrapSetSize := 5 + bootstrapSet := make([]*peerNode, 0) + + for i := 0; i < bootstrapSetSize; i++ { + committer := newCommitter(i, ledgerPath + "node/") + bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i, 100), committer)) + } + + defer func() { + for _, p := range bootstrapSet { + p.shutdown() + } + }() + + msgCount := 10 + + for i := 1; i <= msgCount; i++ { + rawblock := &peer.Block2{} + if err := pb.Unmarshal([]byte{}, rawblock); err != nil { + t.Fail() + } + + if bytes, err := pb.Marshal(rawblock); err == nil { + payload := &proto.Payload{uint64(i), "", bytes} + bootstrapSet[0].s.AddPayload(payload) + } else { + t.Fail() + } + } + + standartPeersSize := 10 + peersSet := make([]*peerNode, 0) + + for i := 0; i < standartPeersSize; i++ { + committer := newCommitter(standartPeersSize + i, ledgerPath + "node/") + peersSet = append(peersSet, newPeerNode(newGossipConfig(standartPeersSize + i, 100, 0, 1, 2, 3, 4), committer)) + } + + defer func() { + for _, p := range peersSet { + p.shutdown() + } + }() + + waitUntilTrueOrTimeout(t, func() bool { + for _, p := range peersSet { + if len(p.g.GetPeers()) != bootstrapSetSize + standartPeersSize - 1 { + logger.Debug("[XXXXXXX]: Peer discovery has not finished yet") + return false + } + } + logger.Debug("[AAAAAA]: All peer discovered each other!!!") + return true + }, 30 * time.Second) + + logger.Debug("[!!!!!]: Waiting for all blocks to arrive.") + waitUntilTrueOrTimeout(t, func() bool { + logger.Debug("[*****]: Trying to see all peers get all blocks") + for _, p := range peersSet { + height, err := p.commit.LedgerHeight() + if height != uint64(msgCount) || err != nil { + logger.Debug("[XXXXXXX]: Ledger height is at: ", height) + return false + } + } + logger.Debug("[#####]: All peers have same ledger height!!!") + return true + }, 60 * time.Second) + +} + +func waitUntilTrueOrTimeout(t *testing.T, predicate func() bool, timeout time.Duration) { + ch := make(chan interface{}) + defer close(ch) + done := false + go func () { + logger.Debug("[@@@@@]: Started to spin off, until predicate will be satisfied.") + for !done { + if !predicate() { + time.Sleep(1 * time.Second) + continue + } + ch <- struct {}{} + break + } + logger.Debug("[@@@@@]: Done.") + }() + + select { + case <-ch: { } + case <-time.After(timeout): + { + t.Fatal("Timeout has expired") + } + } + done = true + logger.Debug("[>>>>>] Stop wainting until timeout or true") +}