From 31b75722391000c9f9e96c3c83757170592bcc4b Mon Sep 17 00:00:00 2001 From: Simon Schubert Date: Mon, 31 Oct 2016 18:53:37 +0100 Subject: [PATCH] sbft: sync state on reconnect Change-Id: I90fc0e866de288dd592b9afb01a806e3c1df5695 Signed-off-by: Simon Schubert --- consensus/simplebft/backlog.go | 48 ++++------ consensus/simplebft/batch.go | 29 +++++- consensus/simplebft/checkpoint.go | 1 + consensus/simplebft/connection.go | 53 +++++++++-- consensus/simplebft/execute.go | 3 +- consensus/simplebft/newview.go | 48 +++++----- consensus/simplebft/preprepare.go | 44 +++++---- consensus/simplebft/simplebft.go | 24 ++--- consensus/simplebft/simplebft.pb.go | 127 ++++++++++++++++---------- consensus/simplebft/simplebft.proto | 7 +- consensus/simplebft/simplebft_test.go | 74 ++++++++++++++- 11 files changed, 309 insertions(+), 149 deletions(-) diff --git a/consensus/simplebft/backlog.go b/consensus/simplebft/backlog.go index 53d29dea78f..d266c5730b8 100644 --- a/consensus/simplebft/backlog.go +++ b/consensus/simplebft/backlog.go @@ -25,22 +25,25 @@ func (s *SBFT) testBacklog(m *Msg, src uint64) bool { } func (s *SBFT) testBacklog2(m *Msg, src uint64) bool { - record := func(seq uint64) bool { - if seq > s.cur.subject.Seq.Seq { + record := func(seq *SeqView) bool { + if !s.activeView { + return true + } + if seq.Seq > s.cur.subject.Seq.Seq || seq.View > s.seq.View { return true } return false } - if pp := m.GetPreprepare(); pp != nil && !s.cur.executed { - return true + if pp := m.GetPreprepare(); pp != nil { + return record(pp.Seq) && !s.cur.checkpointDone } else if p := m.GetPrepare(); p != nil { - return record(p.Seq.Seq) + return record(p.Seq) } else if c := m.GetCommit(); c != nil { - return record(c.Seq.Seq) + return record(c.Seq) } else if cs := m.GetCheckpoint(); cs != nil { c := &Checkpoint{} - return record(c.Seq) + return record(&SeqView{Seq: c.Seq}) } return false } @@ -53,22 +56,19 @@ func (s *SBFT) recordBacklogMsg(m *Msg, src uint64) { // // Prevent DoS by limiting the number of messages per replica. // - // If the backlog limit is exceeded, discard all messages with - // Seq before the replica's hello message (we can, because we - // can play forward to this batch via state transfer). If - // there is no hello message, we must be really slow or the - // replica must be byzantine. In this case we probably should - // re-establish the connection. + // If the backlog limit is exceeded, re-establish the + // connection. // // After the connection has been re-established, we will - // receive a hello, and the following messages will trigger - // the pruning of old messages. If this pruning lead us not - // to make progress, the backlog processing algorithm as lined - // out below will take care of starting a state transfer, - // using the hello message we received on reconnect. + // receive a hello, which will advance our state and discard + // old messages. s.replicaState[src].backLog = append(s.replicaState[src].backLog, m) } +func (s *SBFT) discardBacklog(src uint64) { + s.replicaState[src].backLog = nil +} + func (s *SBFT) processBacklog() { processed := true notReady := uint64(0) @@ -111,18 +111,6 @@ func (s *SBFT) processBacklog() { // we should reconnect to get a working connection going // again. // - // If a noFaultyQuorum (-1, because we're not faulty, just - // were disconnected) is backlogged, we know that we need to - // perform a state transfer. Of course, f of these might be - // byzantine, and the remaining f that are not backlogged will - // allow us to get unstuck. To check against that, we need to - // only consider backlogged replicas of which we have a hello - // message that talks about a future Seq. - // - // We need to pick the highest Seq of all the hello messages - // we received, perform a state transfer to that Batch, and - // discard all backlogged messages that refer to a lower Seq. - // // Do we need to detect that a connection is stuck and we // should reconnect? } diff --git a/consensus/simplebft/batch.go b/consensus/simplebft/batch.go index df54e6c4791..80eb9d4e8f6 100644 --- a/consensus/simplebft/batch.go +++ b/consensus/simplebft/batch.go @@ -41,17 +41,26 @@ func (s *SBFT) makeBatch(seq uint64, prevHash []byte, data [][]byte) *Batch { } } -func (s *SBFT) checkBatch(b *Batch) (*BatchHeader, error) { - datahash := merkleHashData(b.Payloads) - +func (s *SBFT) checkBatch(b *Batch, checkData bool) (*BatchHeader, error) { batchheader := &BatchHeader{} err := proto.Unmarshal(b.Header, batchheader) if err != nil { return nil, err } - if !reflect.DeepEqual(datahash, batchheader.DataHash) { - return nil, fmt.Errorf("malformed batch: invalid hash") + if checkData { + datahash := merkleHashData(b.Payloads) + if !reflect.DeepEqual(datahash, batchheader.DataHash) { + return nil, fmt.Errorf("malformed batch: invalid hash") + } + } + + bh := b.Hash() + for r, sig := range b.Signatures { + err = s.sys.CheckSig(bh, r, sig) + if err != nil { + return nil, err + } } return batchheader, nil @@ -63,3 +72,13 @@ func (s *SBFT) checkBatch(b *Batch) (*BatchHeader, error) { func (b *Batch) Hash() []byte { return hash(b.Header) } + +func (b *Batch) DecodeHeader() *BatchHeader { + batchheader := &BatchHeader{} + err := proto.Unmarshal(b.Header, batchheader) + if err != nil { + panic(err) + } + + return batchheader +} diff --git a/consensus/simplebft/checkpoint.go b/consensus/simplebft/checkpoint.go index 0d0ed62f60e..ed0a5a27748 100644 --- a/consensus/simplebft/checkpoint.go +++ b/consensus/simplebft/checkpoint.go @@ -98,6 +98,7 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) { batch := *s.cur.preprep.Batch batch.Signatures = cpset s.sys.Deliver(&batch) + s.seq = *s.cur.subject.Seq s.cur.timeout.Cancel() log.Infof("request %s %s completed on %d", s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id) diff --git a/consensus/simplebft/connection.go b/consensus/simplebft/connection.go index 76211ba6f41..463a921f168 100644 --- a/consensus/simplebft/connection.go +++ b/consensus/simplebft/connection.go @@ -16,8 +16,6 @@ limitations under the License. package simplebft -import "github.com/golang/protobuf/proto" - // Connection is an event from system to notify a new connection with // replica. // On connection, we send our latest (weak) checkpoint, and we expect @@ -25,7 +23,11 @@ import "github.com/golang/protobuf/proto" func (s *SBFT) Connection(replica uint64) { batch := *s.sys.LastBatch() batch.Payloads = nil // don't send the big payload - s.sys.Send(&Msg{&Msg_Hello{&batch}}, replica) + hello := &Hello{Batch: &batch} + if s.isPrimary() && s.activeView && s.lastNewViewSent != nil { + hello.NewView = s.lastNewViewSent + } + s.sys.Send(&Msg{&Msg_Hello{hello}}, replica) // A reconnecting replica can play forward its blockchain to // the batch listed in the hello message. However, the @@ -43,13 +45,12 @@ func (s *SBFT) Connection(replica uint64) { // connecting right after a new-view message was received, and // its xset batch is in-flight. - batchheader := &BatchHeader{} - err := proto.Unmarshal(batch.Header, batchheader) + batchheader, err := s.checkBatch(&batch, false) if err != nil { panic(err) } - if s.cur.subject.Seq.Seq > batchheader.Seq { + if s.cur.subject.Seq.Seq > batchheader.Seq && s.activeView { if s.isPrimary() { s.sys.Send(&Msg{&Msg_Preprepare{s.cur.preprep}}, replica) } else { @@ -64,6 +65,44 @@ func (s *SBFT) Connection(replica uint64) { } } -func (s *SBFT) handleHello(h *Batch, src uint64) { +func (s *SBFT) handleHello(h *Hello, src uint64) { + bh, err := s.checkBatch(h.Batch, false) + if err != nil { + log.Warningf("invalid hello batch from %d: %s", src, err) + return + } + + if s.sys.LastBatch().DecodeHeader().Seq < bh.Seq { + s.sys.Deliver(h.Batch) + s.seq.Seq = bh.Seq + } + + if h.NewView != nil { + if s.primaryIDView(h.NewView.View) != src { + log.Warningf("invalid hello with new view from non-primary %d", src) + return + } + + vcs, err := s.checkNewViewSignatures(h.NewView) + if err != nil { + log.Warningf("invalid hello new view from %d: %s", src, err) + return + } + + _, ok := s.makeXset(vcs) + if !ok { + log.Warningf("invalid hello new view xset from %d", src) + return + } + + if s.seq.View <= h.NewView.View { + s.seq.View = h.NewView.View + } + s.activeView = true + } + s.replicaState[src].hello = h + + s.discardBacklog(src) + s.processBacklog() } diff --git a/consensus/simplebft/execute.go b/consensus/simplebft/execute.go index 0ffe4134bef..987959814d9 100644 --- a/consensus/simplebft/execute.go +++ b/consensus/simplebft/execute.go @@ -21,8 +21,7 @@ func (s *SBFT) maybeExecute() { return } s.cur.executed = true - s.seq = *s.cur.subject.Seq - log.Noticef("executing %v", s.seq) + log.Noticef("executing %v", s.cur.subject) s.sys.Persist("execute", &s.cur.subject) diff --git a/consensus/simplebft/newview.go b/consensus/simplebft/newview.go index 288fce33812..64b715ac20f 100644 --- a/consensus/simplebft/newview.go +++ b/consensus/simplebft/newview.go @@ -22,7 +22,7 @@ import ( ) func (s *SBFT) maybeSendNewView() { - if s.lastNewViewSent == s.seq.View { + if s.lastNewViewSent != nil && s.lastNewViewSent.View == s.seq.View { return } @@ -63,21 +63,11 @@ func (s *SBFT) maybeSendNewView() { } log.Noticef("sending new view for %d", nv.View) - s.lastNewViewSent = nv.View + s.lastNewViewSent = nv s.broadcast(&Msg{&Msg_NewView{nv}}) } -func (s *SBFT) handleNewView(nv *NewView, src uint64) { - if src != s.primaryIDView(nv.View) { - log.Warningf("invalid new view from %d for %d", src, nv.View) - return - } - - if onv := s.replicaState[s.primaryIDView(nv.View)].newview; onv != nil && onv.View >= nv.View { - log.Debugf("discarding duplicate new view for %d", nv.View) - return - } - +func (s *SBFT) checkNewViewSignatures(nv *NewView) ([]*ViewChange, error) { var vcs []*ViewChange for vcsrc, svc := range nv.Vset { vc := &ViewChange{} @@ -88,13 +78,31 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) { } } if err != nil { - log.Warningf("invalid new view from %d: view change for %d: %s", src, vcsrc, err) - s.sendViewChange() - return + return nil, err } vcs = append(vcs, vc) } + return vcs, nil +} + +func (s *SBFT) handleNewView(nv *NewView, src uint64) { + if src != s.primaryIDView(nv.View) { + log.Warningf("invalid new view from %d for %d", src, nv.View) + return + } + + if onv := s.replicaState[s.primaryIDView(nv.View)].newview; onv != nil && onv.View >= nv.View { + log.Debugf("discarding duplicate new view for %d", nv.View) + return + } + + vcs, err := s.checkNewViewSignatures(nv) + if err != nil { + log.Warningf("invalid new view from %d: %s", src, err) + s.sendViewChange() + } + xset, ok := s.makeXset(vcs) if xset.Digest == nil { // null request special treatment @@ -120,7 +128,7 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) { return } - _, err := s.checkBatch(nv.Batch) + _, err = s.checkBatch(nv.Batch, true) if err != nil { log.Warningf("invalid new view from %d: invalid batch, %s", src, err) @@ -155,9 +163,5 @@ func (s *SBFT) processNewView() { } s.activeView = true - var h []byte - if nv.Batch != nil { - h = hash(nv.Batch.Header) - } - s.acceptPreprepare(Subject{Seq: &nextSeq, Digest: h}, pp) + s.handleCheckedPreprepare(pp) } diff --git a/consensus/simplebft/preprepare.go b/consensus/simplebft/preprepare.go index c0a29792174..1bd99443913 100644 --- a/consensus/simplebft/preprepare.go +++ b/consensus/simplebft/preprepare.go @@ -54,27 +54,32 @@ func (s *SBFT) handlePreprepare(pp *Preprepare, src uint64) { log.Infof("duplicate preprepare for %v", *pp.Seq) return } - var blockhash []byte - if pp.Batch != nil { - blockhash = hash(pp.Batch.Header) - - batchheader, err := s.checkBatch(pp.Batch) - if err != nil || batchheader.Seq != pp.Seq.Seq { - log.Infof("preprepare %v batch head inconsistent from %d", pp.Seq, src) - return - } - - prevhash := hash(s.sys.LastBatch().Header) - if !bytes.Equal(batchheader.PrevHash, prevhash) { - log.Infof("preprepare batch prev hash does not match expected %s, got %s", hash2str(batchheader.PrevHash), hash2str(prevhash)) - return - } + if pp.Batch == nil { + log.Infof("preprepare without batch") + return + } + + batchheader, err := s.checkBatch(pp.Batch, true) + if err != nil || batchheader.Seq != pp.Seq.Seq { + log.Infof("preprepare %v batch head inconsistent from %d", pp.Seq, src) + return + } + + prevhash := s.sys.LastBatch().Hash() + if !bytes.Equal(batchheader.PrevHash, prevhash) { + log.Infof("preprepare batch prev hash does not match expected %s, got %s", hash2str(batchheader.PrevHash), hash2str(prevhash)) + return } - s.acceptPreprepare(Subject{Seq: &nextSeq, Digest: blockhash}, pp) + s.handleCheckedPreprepare(pp) } -func (s *SBFT) acceptPreprepare(sub Subject, pp *Preprepare) { +func (s *SBFT) acceptPreprepare(pp *Preprepare) { + sub := Subject{Seq: pp.Seq, Digest: pp.Batch.Hash()} + + log.Infof("accepting preprepare for %v, %x", sub.Seq, sub.Digest) + s.sys.Persist("preprepare", pp) + s.cur = reqInfo{ subject: sub, timeout: s.sys.Timer(time.Duration(s.config.RequestTimeoutNsec)*time.Nanosecond, s.requestTimeout), @@ -83,9 +88,10 @@ func (s *SBFT) acceptPreprepare(sub Subject, pp *Preprepare) { commit: make(map[uint64]*Subject), checkpoint: make(map[uint64]*Checkpoint), } +} - log.Infof("accepting preprepare for %v, %x", sub.Seq, sub.Digest) - s.sys.Persist("preprepare", pp) +func (s *SBFT) handleCheckedPreprepare(pp *Preprepare) { + s.acceptPreprepare(pp) s.cancelViewChangeTimer() if !s.isPrimary() { s.sendPrepare() diff --git a/consensus/simplebft/simplebft.go b/consensus/simplebft/simplebft.go index 7e76a643674..7198465dfd6 100644 --- a/consensus/simplebft/simplebft.go +++ b/consensus/simplebft/simplebft.go @@ -61,7 +61,7 @@ type SBFT struct { batchTimer Canceller cur reqInfo activeView bool - lastNewViewSent uint64 + lastNewViewSent *NewView viewChangeTimeout time.Duration viewChangeTimer Canceller replicaState []replicaInfo @@ -81,7 +81,7 @@ type reqInfo struct { type replicaInfo struct { backLog []*Msg - hello *Batch + hello *Hello signedViewchange *Signed viewchange *ViewChange newview *NewView @@ -109,7 +109,7 @@ func New(id uint64, config *Config, sys System) (*SBFT, error) { s.sys.SetReceiver(s) lastBatch := s.sys.LastBatch() - bh, err := s.checkBatch(lastBatch) + bh, err := s.checkBatch(lastBatch, false) if err != nil { panic(err) } @@ -128,8 +128,7 @@ func New(id uint64, config *Config, sys System) (*SBFT, error) { if pp.Seq.Seq > bh.Seq { s.seq = *pp.Seq s.seq.Seq -= 1 - s.handlePreprepare(pp, s.primaryIDView(pp.Seq.View)) - // ideally we wouldn't send a prepare here + s.acceptPreprepare(pp) } } c := &Subject{} @@ -141,8 +140,9 @@ func New(id uint64, config *Config, sys System) (*SBFT, error) { s.cur.executed = true } - // XXX set active after checking with the network - s.activeView = true + if s.seq.Seq == 0 { + s.activeView = true + } s.cancelViewChangeTimer() return s, nil @@ -198,7 +198,10 @@ func (s *SBFT) Receive(m *Msg, src uint64) { return } - if req := m.GetRequest(); req != nil { + if h := m.GetHello(); h != nil { + s.handleHello(h, src) + return + } else if req := m.GetRequest(); req != nil { s.handleRequest(req, src) return } else if vs := m.GetViewChange(); vs != nil { @@ -219,10 +222,7 @@ func (s *SBFT) Receive(m *Msg, src uint64) { } func (s *SBFT) handleQueueableMessage(m *Msg, src uint64) { - if h := m.GetHello(); h != nil { - s.handleHello(h, src) - return - } else if pp := m.GetPreprepare(); pp != nil { + if pp := m.GetPreprepare(); pp != nil { s.handlePreprepare(pp, src) return } else if p := m.GetPrepare(); p != nil { diff --git a/consensus/simplebft/simplebft.pb.go b/consensus/simplebft/simplebft.pb.go index 45a2a7fb897..4c12938bd3e 100644 --- a/consensus/simplebft/simplebft.pb.go +++ b/consensus/simplebft/simplebft.pb.go @@ -21,6 +21,7 @@ It has these top-level messages: Signed NewView Checkpoint + Hello */ package simplebft @@ -96,7 +97,7 @@ type Msg_Checkpoint struct { Checkpoint *Checkpoint `protobuf:"bytes,7,opt,name=checkpoint,oneof"` } type Msg_Hello struct { - Hello *Batch `protobuf:"bytes,8,opt,name=hello,oneof"` + Hello *Hello `protobuf:"bytes,8,opt,name=hello,oneof"` } func (*Msg_Request) isMsg_Type() {} @@ -164,7 +165,7 @@ func (m *Msg) GetCheckpoint() *Checkpoint { return nil } -func (m *Msg) GetHello() *Batch { +func (m *Msg) GetHello() *Hello { if x, ok := m.GetType().(*Msg_Hello); ok { return x.Hello } @@ -299,7 +300,7 @@ func _Msg_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (b if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } - msg := new(Batch) + msg := new(Hello) err := b.DecodeMessage(msg) m.Type = &Msg_Hello{msg} return true, err @@ -528,6 +529,30 @@ func (m *Checkpoint) String() string { return proto.CompactTextString func (*Checkpoint) ProtoMessage() {} func (*Checkpoint) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} } +type Hello struct { + Batch *Batch `protobuf:"bytes,1,opt,name=batch" json:"batch,omitempty"` + NewView *NewView `protobuf:"bytes,2,opt,name=new_view,json=newView" json:"new_view,omitempty"` +} + +func (m *Hello) Reset() { *m = Hello{} } +func (m *Hello) String() string { return proto.CompactTextString(m) } +func (*Hello) ProtoMessage() {} +func (*Hello) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} } + +func (m *Hello) GetBatch() *Batch { + if m != nil { + return m.Batch + } + return nil +} + +func (m *Hello) GetNewView() *NewView { + if m != nil { + return m.NewView + } + return nil +} + func init() { proto.RegisterType((*Config)(nil), "simplebft.Config") proto.RegisterType((*Msg)(nil), "simplebft.Msg") @@ -541,56 +566,58 @@ func init() { proto.RegisterType((*Signed)(nil), "simplebft.Signed") proto.RegisterType((*NewView)(nil), "simplebft.NewView") proto.RegisterType((*Checkpoint)(nil), "simplebft.Checkpoint") + proto.RegisterType((*Hello)(nil), "simplebft.Hello") } func init() { proto.RegisterFile("simplebft.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 736 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x55, 0xdd, 0x6a, 0xdb, 0x4a, - 0x10, 0x8e, 0x22, 0x5b, 0xb6, 0xc7, 0xe1, 0x24, 0xd9, 0x93, 0x13, 0x44, 0x4e, 0x2e, 0x8c, 0x5a, - 0x52, 0x5f, 0x14, 0x27, 0xb8, 0x85, 0x96, 0x40, 0xa1, 0x24, 0x2d, 0x35, 0x85, 0x86, 0xa2, 0x84, - 0x40, 0x7b, 0x23, 0x64, 0x69, 0x6c, 0xa9, 0xb1, 0x25, 0x59, 0xbb, 0xb2, 0xe3, 0x3c, 0x43, 0x9f, - 0xa1, 0xcf, 0xd0, 0x07, 0xe8, 0x1b, 0xf5, 0x25, 0xca, 0xfe, 0x58, 0x52, 0xfd, 0x53, 0x0a, 0xbe, - 0xd8, 0xd9, 0xef, 0x9b, 0x9d, 0xf9, 0xe6, 0x47, 0x86, 0x5d, 0x1a, 0x8e, 0x93, 0x11, 0xf6, 0x07, - 0xac, 0x93, 0xa4, 0x31, 0x8b, 0x49, 0x23, 0xbf, 0xb0, 0xbe, 0x6b, 0x60, 0x5c, 0xc6, 0xd1, 0x20, - 0x1c, 0x92, 0x1d, 0xd0, 0x22, 0x53, 0x6b, 0x69, 0xed, 0x8a, 0xad, 0x45, 0xdc, 0x1a, 0x98, 0xdb, - 0xd2, 0x1a, 0x90, 0x0e, 0xfc, 0xdb, 0x77, 0x99, 0x17, 0x38, 0x7e, 0x96, 0xba, 0x2c, 0x8c, 0x23, - 0x27, 0xa2, 0xe8, 0x99, 0xba, 0xc0, 0xf7, 0x05, 0xf4, 0x46, 0x21, 0x57, 0x14, 0x3d, 0xd2, 0x86, - 0x3d, 0xc9, 0xa7, 0xe1, 0x03, 0x3a, 0xfd, 0x39, 0x43, 0x6a, 0x56, 0x04, 0xf9, 0x1f, 0x71, 0x7f, - 0x1d, 0x3e, 0xe0, 0x05, 0xbf, 0x25, 0x67, 0x70, 0x90, 0xe2, 0x24, 0x43, 0xca, 0x1c, 0x16, 0x8e, - 0x31, 0xce, 0x98, 0x7c, 0xba, 0x2a, 0xd8, 0x44, 0x61, 0x37, 0x12, 0xe2, 0x6f, 0x5b, 0xdf, 0x74, - 0xd0, 0x3f, 0xd0, 0x21, 0xe9, 0x40, 0x4d, 0xa1, 0x22, 0xeb, 0x66, 0x97, 0x74, 0x0a, 0xa1, 0xb6, - 0x44, 0x7a, 0x5b, 0xf6, 0x82, 0x44, 0x5e, 0x00, 0x24, 0x29, 0xf2, 0x9f, 0x9b, 0xa2, 0x90, 0xd6, - 0xec, 0xfe, 0x57, 0x72, 0xf9, 0x98, 0x83, 0xbd, 0x2d, 0xbb, 0x44, 0xe5, 0x81, 0x16, 0x5e, 0xfa, - 0x4a, 0xa0, 0xeb, 0xac, 0xff, 0x05, 0x3d, 0x11, 0x68, 0xc1, 0x7f, 0x0a, 0x86, 0x17, 0x8f, 0xc7, - 0x21, 0x13, 0x92, 0x37, 0xd1, 0x15, 0x87, 0x3c, 0x87, 0xe6, 0x34, 0xc4, 0x99, 0xe3, 0x05, 0x6e, - 0x34, 0x44, 0xa1, 0xbb, 0xd9, 0xdd, 0x2f, 0xbb, 0x84, 0xc3, 0x08, 0x7d, 0x9e, 0x13, 0xe7, 0x5d, - 0x0a, 0x1a, 0x39, 0x85, 0x7a, 0x84, 0x33, 0x87, 0xdf, 0x98, 0xc6, 0x4a, 0x94, 0x2b, 0x9c, 0xdd, - 0x86, 0x38, 0xe3, 0x49, 0x45, 0xf2, 0xc8, 0xd5, 0x7b, 0x01, 0x7a, 0x77, 0x49, 0x1c, 0x46, 0xcc, - 0xac, 0xad, 0xa8, 0xbf, 0xcc, 0x41, 0x1e, 0xa9, 0xa0, 0x92, 0x36, 0x54, 0x03, 0x1c, 0x8d, 0x62, - 0xb3, 0x2e, 0x7c, 0xf6, 0x4a, 0x3e, 0x17, 0xbc, 0x95, 0xbd, 0x2d, 0x5b, 0x12, 0x2e, 0x0c, 0xa8, - 0xb0, 0x79, 0x82, 0xd6, 0x23, 0xa8, 0xa9, 0xf2, 0x13, 0x13, 0x6a, 0x89, 0x3b, 0x1f, 0xc5, 0xae, - 0x2f, 0x7a, 0xb4, 0x63, 0x2f, 0x4c, 0xeb, 0x14, 0x6a, 0xd7, 0x38, 0x11, 0xa9, 0x11, 0xa8, 0x08, - 0x1d, 0x72, 0xf6, 0xc4, 0x99, 0xec, 0x81, 0x4e, 0x71, 0xa2, 0x06, 0x90, 0x1f, 0xad, 0x4f, 0xd0, - 0x94, 0xf1, 0xd0, 0xf5, 0x31, 0x5d, 0x10, 0xb4, 0x9c, 0x40, 0xfe, 0x87, 0x46, 0x92, 0xe2, 0xd4, - 0x09, 0x5c, 0x1a, 0x08, 0xc7, 0x1d, 0xbb, 0xce, 0x2f, 0x7a, 0x2e, 0x0d, 0x38, 0xe8, 0xbb, 0xcc, - 0x95, 0xa0, 0x2e, 0x41, 0x7e, 0xc1, 0x41, 0xeb, 0x87, 0x06, 0x55, 0xf1, 0x36, 0x39, 0x04, 0x23, - 0x10, 0xef, 0xab, 0x74, 0x95, 0x45, 0x8e, 0xa0, 0xae, 0x12, 0xa7, 0xe6, 0x76, 0x4b, 0x17, 0x4f, - 0x2b, 0x9b, 0xbc, 0x06, 0xa0, 0xe1, 0x30, 0x72, 0x59, 0x96, 0x22, 0x35, 0xf5, 0x96, 0xde, 0x6e, - 0x76, 0x5b, 0xcb, 0x55, 0x12, 0x5d, 0x94, 0x94, 0xb7, 0x11, 0x4b, 0xe7, 0x76, 0xc9, 0xe7, 0xe8, - 0x15, 0xec, 0x2e, 0xc1, 0x5c, 0xde, 0x1d, 0xce, 0x17, 0xf2, 0xee, 0x70, 0x4e, 0x0e, 0xa0, 0x3a, - 0x75, 0x47, 0x19, 0x2a, 0x69, 0xd2, 0x38, 0xdf, 0x7e, 0xa9, 0x59, 0x9f, 0x01, 0x8a, 0xd9, 0x25, - 0x8f, 0x8b, 0xc2, 0x2c, 0x8d, 0x9e, 0x2c, 0xb7, 0x2c, 0xd6, 0x09, 0x54, 0xc5, 0x22, 0xaa, 0x3d, - 0x58, 0xe9, 0xaa, 0x2d, 0x61, 0xeb, 0x1d, 0xd4, 0xd4, 0xc8, 0xfe, 0xe5, 0xc3, 0x87, 0x60, 0xf8, - 0xe1, 0x90, 0x2f, 0xa5, 0xcc, 0x53, 0x59, 0xd6, 0x57, 0x0d, 0xe0, 0xb6, 0x98, 0xdf, 0x75, 0x3d, - 0x3f, 0x81, 0x4a, 0x42, 0x91, 0x89, 0x02, 0xaf, 0xdd, 0x1a, 0x5b, 0xe0, 0x9c, 0x37, 0xe1, 0x3c, - 0x7d, 0x33, 0x8f, 0xe3, 0xbc, 0x69, 0x78, 0x8f, 0x5e, 0xc6, 0xd0, 0x57, 0x1f, 0x9f, 0xdc, 0xb6, - 0xce, 0xc1, 0x90, 0x7b, 0xc5, 0x33, 0xe1, 0x83, 0xa0, 0x1a, 0x2e, 0xce, 0xe4, 0x18, 0x1a, 0x79, - 0x7b, 0x94, 0x8e, 0xe2, 0xc2, 0xfa, 0xa9, 0x41, 0x4d, 0x6d, 0xd8, 0x5a, 0x1d, 0x67, 0x50, 0x99, - 0x16, 0x3a, 0x8e, 0x57, 0xf7, 0xb2, 0x73, 0x4b, 0x91, 0xc9, 0x31, 0x10, 0x4c, 0xae, 0xe8, 0x5e, - 0x2a, 0xd2, 0x36, 0x29, 0xba, 0x97, 0x3c, 0xd5, 0xb5, 0xca, 0x1f, 0xbb, 0x76, 0xf4, 0x1e, 0x1a, - 0x79, 0x88, 0x35, 0xa3, 0xf4, 0xa4, 0x3c, 0x4a, 0xeb, 0x3e, 0x36, 0xe5, 0xe9, 0xba, 0x01, 0x28, - 0xbe, 0x0d, 0x6b, 0xd6, 0x6e, 0x43, 0xc3, 0x7f, 0xaf, 0xa1, 0xbe, 0x54, 0xc3, 0xbe, 0x21, 0xfe, - 0x89, 0x9e, 0xfd, 0x0a, 0x00, 0x00, 0xff, 0xff, 0x63, 0xf4, 0x2e, 0x0e, 0x9c, 0x06, 0x00, 0x00, + // 752 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x55, 0x5d, 0x6e, 0xd3, 0x40, + 0x10, 0xc6, 0x75, 0x7e, 0x27, 0x15, 0x4d, 0x97, 0x82, 0xac, 0xd2, 0x87, 0xca, 0xa0, 0xd2, 0x07, + 0x48, 0xab, 0x80, 0x04, 0xaa, 0x84, 0x84, 0x5a, 0x10, 0x11, 0x12, 0x15, 0x72, 0xab, 0x4a, 0xf0, + 0x40, 0xe4, 0x38, 0x9b, 0xc4, 0x34, 0xb1, 0x13, 0x7b, 0xdd, 0x36, 0x9c, 0x81, 0x33, 0x70, 0x06, + 0x0e, 0xc0, 0x8d, 0xb8, 0x04, 0x3b, 0xb3, 0x1b, 0xdb, 0xe4, 0xa7, 0x42, 0xca, 0x83, 0x67, 0xbe, + 0x6f, 0x76, 0xe7, 0x9b, 0x9f, 0x0d, 0x6c, 0xc4, 0xfe, 0x68, 0x3c, 0xe4, 0x9d, 0x9e, 0x68, 0x8c, + 0xa3, 0x50, 0x84, 0xac, 0x9a, 0x3a, 0xec, 0x5f, 0x06, 0x94, 0x4e, 0xc2, 0xa0, 0xe7, 0xf7, 0xd9, + 0x3a, 0x18, 0x81, 0x65, 0xec, 0x1a, 0xfb, 0x05, 0xc7, 0x08, 0xd0, 0xea, 0x59, 0x6b, 0xca, 0xea, + 0xb1, 0x06, 0xdc, 0xeb, 0xb8, 0xc2, 0x1b, 0xb4, 0xbb, 0x49, 0xe4, 0x0a, 0x3f, 0x0c, 0xda, 0x41, + 0xcc, 0x3d, 0xcb, 0x24, 0x7c, 0x93, 0xa0, 0xb7, 0x1a, 0x39, 0x95, 0x00, 0xdb, 0x87, 0xba, 0xe2, + 0xc7, 0xfe, 0x77, 0xde, 0xee, 0x4c, 0x05, 0x8f, 0xad, 0x02, 0x91, 0xef, 0x92, 0xff, 0x4c, 0xba, + 0x8f, 0xd1, 0xcb, 0x0e, 0x61, 0x2b, 0xe2, 0x93, 0x84, 0xc7, 0xa2, 0x2d, 0xfc, 0x11, 0x0f, 0x13, + 0xa1, 0x8e, 0x2e, 0x12, 0x9b, 0x69, 0xec, 0x5c, 0x41, 0x78, 0xb6, 0xfd, 0xd3, 0x04, 0xf3, 0x63, + 0xdc, 0x97, 0x39, 0x95, 0x35, 0x4a, 0x59, 0xd7, 0x9a, 0xac, 0x91, 0x09, 0x75, 0x14, 0xd2, 0xba, + 0xe3, 0xcc, 0x48, 0xec, 0x25, 0xc0, 0x38, 0xe2, 0xf8, 0x73, 0x23, 0x4e, 0xd2, 0x6a, 0xcd, 0xfb, + 0xb9, 0x90, 0x4f, 0x29, 0x28, 0xa3, 0x72, 0x54, 0xbc, 0x68, 0x16, 0x65, 0x2e, 0x5c, 0x74, 0x96, + 0x74, 0xbe, 0x71, 0x8f, 0x2e, 0x9a, 0xf1, 0x9f, 0x42, 0xc9, 0x0b, 0x47, 0x23, 0x5f, 0x90, 0xe4, + 0x55, 0x74, 0xcd, 0x61, 0x2f, 0xa0, 0x76, 0xe5, 0xf3, 0xeb, 0xb6, 0x37, 0x70, 0x83, 0x3e, 0x27, + 0xdd, 0xb5, 0xe6, 0x66, 0x3e, 0xc4, 0xef, 0x07, 0xbc, 0x8b, 0x39, 0x21, 0xef, 0x84, 0x68, 0xec, + 0x00, 0x2a, 0x81, 0x0c, 0x42, 0x8f, 0x55, 0x5a, 0xb8, 0xe5, 0x94, 0x5f, 0x5f, 0x48, 0x04, 0x93, + 0x0a, 0xd4, 0x27, 0xaa, 0xf7, 0x06, 0xdc, 0xbb, 0x1c, 0x87, 0x7e, 0x20, 0xac, 0xf2, 0x82, 0xfa, + 0x93, 0x14, 0xc4, 0x9b, 0x32, 0xaa, 0x6c, 0x65, 0x71, 0xc0, 0x87, 0xc3, 0xd0, 0xaa, 0x50, 0x4c, + 0x3d, 0x17, 0xd3, 0x42, 0xbf, 0xa4, 0x2b, 0xc2, 0x71, 0x09, 0x0a, 0x62, 0x3a, 0xe6, 0xf6, 0x23, + 0x28, 0xeb, 0xf2, 0x33, 0x4b, 0x96, 0xce, 0x9d, 0x0e, 0x43, 0xb7, 0x4b, 0x3d, 0x5a, 0x77, 0x66, + 0xa6, 0x7d, 0x00, 0xe5, 0x33, 0x3e, 0xa1, 0xd4, 0x18, 0x14, 0x48, 0x87, 0x9a, 0x3d, 0xfa, 0x66, + 0x75, 0x30, 0x63, 0x3e, 0xd1, 0x03, 0x88, 0x9f, 0xf6, 0x67, 0xa8, 0x1d, 0xe3, 0xe8, 0xb4, 0xb8, + 0xdb, 0xe5, 0xd1, 0x8c, 0x60, 0xa4, 0x04, 0xf6, 0x10, 0xaa, 0xb2, 0x03, 0x57, 0xed, 0x81, 0x1b, + 0x0f, 0x28, 0x70, 0xdd, 0xa9, 0xa0, 0xa3, 0x25, 0x6d, 0x04, 0xbb, 0xae, 0x70, 0x15, 0x68, 0x2a, + 0x10, 0x1d, 0x08, 0xda, 0xbf, 0x0d, 0x28, 0xd2, 0xd9, 0xec, 0x01, 0x94, 0x06, 0x74, 0xbe, 0x4e, + 0x57, 0x5b, 0x6c, 0x1b, 0x2a, 0x3a, 0xf1, 0x58, 0x1e, 0x6d, 0xd2, 0xd1, 0xda, 0x66, 0x6f, 0x00, + 0x62, 0xd9, 0x22, 0x57, 0x24, 0x91, 0x9c, 0x72, 0x53, 0xa2, 0xb5, 0xe6, 0x6e, 0xae, 0x4a, 0x74, + 0x32, 0x75, 0x51, 0x51, 0xde, 0x05, 0x22, 0x9a, 0x3a, 0xb9, 0x98, 0xed, 0xd7, 0xb0, 0x31, 0x07, + 0xa3, 0xbc, 0x4b, 0x3e, 0x9d, 0xc9, 0x93, 0x9f, 0x6c, 0x0b, 0x8a, 0x57, 0xee, 0x30, 0xe1, 0x5a, + 0x9a, 0x32, 0x8e, 0xd6, 0x5e, 0x19, 0xf6, 0x17, 0x80, 0x6c, 0x76, 0xd9, 0xe3, 0xac, 0x30, 0x73, + 0xa3, 0xa7, 0xca, 0xad, 0x8a, 0xb5, 0x07, 0x45, 0x5a, 0x44, 0xbd, 0x07, 0xf5, 0xf9, 0x7c, 0x1d, + 0x05, 0xdb, 0xef, 0x65, 0x9b, 0xd4, 0xc8, 0xfe, 0xe7, 0xc1, 0xb2, 0x82, 0x5d, 0xbf, 0x8f, 0x4b, + 0xa9, 0xf2, 0xd4, 0x96, 0xfd, 0xc3, 0x00, 0xb8, 0xc8, 0xe6, 0x77, 0x59, 0xcf, 0xf7, 0xa0, 0x30, + 0x8e, 0xb9, 0xa0, 0x02, 0x2f, 0xdd, 0x1a, 0x87, 0x70, 0xe4, 0x4d, 0x90, 0x67, 0xae, 0xe6, 0x21, + 0x8e, 0x4d, 0xe3, 0x37, 0xdc, 0x4b, 0x04, 0xef, 0xea, 0xc7, 0x27, 0xb5, 0xed, 0x23, 0x28, 0xa9, + 0xbd, 0xc2, 0x4c, 0x70, 0x10, 0x74, 0xc3, 0xe9, 0x9b, 0xed, 0x40, 0x35, 0x6d, 0x8f, 0xd6, 0x91, + 0x39, 0xec, 0x3f, 0x06, 0x94, 0xf5, 0x86, 0x2d, 0xd5, 0x71, 0x28, 0x7d, 0x99, 0x8e, 0x9d, 0xc5, + 0xbd, 0x6c, 0x5c, 0x48, 0x58, 0x8d, 0x01, 0x31, 0x51, 0xd1, 0x8d, 0x52, 0x64, 0xac, 0x52, 0x74, + 0xa3, 0x78, 0xba, 0x6b, 0x85, 0x5b, 0xbb, 0xb6, 0xfd, 0x01, 0xaa, 0xe9, 0x15, 0x4b, 0x46, 0xe9, + 0x49, 0x7e, 0x94, 0x96, 0x3d, 0x36, 0xf9, 0xe9, 0x3a, 0x07, 0xc8, 0xde, 0x86, 0x25, 0x6b, 0xb7, + 0xa2, 0xe1, 0xff, 0xd6, 0xd0, 0x9c, 0xaf, 0xe1, 0x57, 0x28, 0xd2, 0xeb, 0x91, 0x49, 0x32, 0x6e, + 0x95, 0xc4, 0x9e, 0xe5, 0x1e, 0xbc, 0xb5, 0x55, 0x0f, 0x5e, 0xfa, 0xdc, 0x75, 0x4a, 0xf4, 0x4f, + 0xf7, 0xfc, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8c, 0xf6, 0xc5, 0xe5, 0xfc, 0x06, 0x00, 0x00, } diff --git a/consensus/simplebft/simplebft.proto b/consensus/simplebft/simplebft.proto index b5b8370e985..742e9075fe5 100644 --- a/consensus/simplebft/simplebft.proto +++ b/consensus/simplebft/simplebft.proto @@ -35,7 +35,7 @@ message Msg { Signed view_change = 5; NewView new_view = 6; Checkpoint checkpoint = 7; - Batch hello = 8; + Hello hello = 8; }; }; @@ -94,3 +94,8 @@ message Checkpoint { bytes digest = 2; bytes signature = 3; }; + +message Hello { + Batch batch = 1; + NewView new_view = 2; +}; diff --git a/consensus/simplebft/simplebft_test.go b/consensus/simplebft/simplebft_test.go index c4828b357fa..7125d8bcc19 100644 --- a/consensus/simplebft/simplebft_test.go +++ b/consensus/simplebft/simplebft_test.go @@ -373,7 +373,7 @@ func TestRestart(t *testing.T) { sys.Run() for _, a := range adapters { if len(a.batches) != 3 { - t.Fatal("expected execution of 3 batches") + t.Fatalf("expected execution of 3 batches, %d got %v", a.id, a.batches) } if !reflect.DeepEqual([][]byte{r1}, a.batches[1].Payloads) { t.Error("wrong request executed (1)") @@ -410,6 +410,7 @@ func TestRestartAfterPrepare(t *testing.T) { if p := msg.msg.GetPrepare(); p != nil && p.Seq.Seq == 3 && !restarted { restarted = true + testLog.Notice("restarting 0") repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0]) for _, a := range adapters { if a.id != 0 { @@ -674,3 +675,74 @@ func TestErroneousViewChange(t *testing.T) { } } } + +func TestRestartMissedViewChange(t *testing.T) { + N := uint64(4) + sys := newTestSystem(N) + var repls []*SBFT + var adapters []*testSystemAdapter + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + t.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + + disconnect := false + + // network outage after prepares are received + sys.filterFn = func(e testElem) (testElem, bool) { + if msg, ok := e.ev.(*testMsgEvent); ok { + if disconnect && (msg.src == 0 || msg.dst == 0) { + return e, false + } + } + + return e, true + } + + connectAll(sys) + + r1 := []byte{1, 2, 3} + repls[0].Request(r1) + sys.Run() + + disconnect = true + // move to view 1 + for _, r := range repls { + if r.id != 0 { + r.sendViewChange() + } + } + + r2 := []byte{3, 1, 2} + repls[1].Request(r2) + sys.Run() + + disconnect = false + testLog.Notice("restarting 0") + repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0]) + for _, a := range adapters { + if a.id != 0 { + a.receiver.Connection(0) + adapters[0].receiver.Connection(a.id) + } + } + + r3 := []byte{3, 5, 2} + repls[1].Request(r3) + sys.Run() + + for _, a := range adapters { + if len(a.batches) == 0 { + t.Fatalf("expected execution of some batches on %d", a.id) + } + + if !reflect.DeepEqual([][]byte{r3}, a.batches[len(a.batches)-1].Payloads) { + t.Errorf("wrong request executed on %d: %v", a.id, a.batches[2]) + } + } +}