Skip to content

Commit

Permalink
sbft: sync state on reconnect
Browse files Browse the repository at this point in the history
Change-Id: I90fc0e866de288dd592b9afb01a806e3c1df5695
Signed-off-by: Simon Schubert <sis@zurich.ibm.com>
  • Loading branch information
corecode committed Nov 3, 2016
1 parent 18a44d0 commit 31b7572
Show file tree
Hide file tree
Showing 11 changed files with 309 additions and 149 deletions.
48 changes: 18 additions & 30 deletions consensus/simplebft/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,25 @@ func (s *SBFT) testBacklog(m *Msg, src uint64) bool {
}

func (s *SBFT) testBacklog2(m *Msg, src uint64) bool {
record := func(seq uint64) bool {
if seq > s.cur.subject.Seq.Seq {
record := func(seq *SeqView) bool {
if !s.activeView {
return true
}
if seq.Seq > s.cur.subject.Seq.Seq || seq.View > s.seq.View {
return true
}
return false
}

if pp := m.GetPreprepare(); pp != nil && !s.cur.executed {
return true
if pp := m.GetPreprepare(); pp != nil {
return record(pp.Seq) && !s.cur.checkpointDone
} else if p := m.GetPrepare(); p != nil {
return record(p.Seq.Seq)
return record(p.Seq)
} else if c := m.GetCommit(); c != nil {
return record(c.Seq.Seq)
return record(c.Seq)
} else if cs := m.GetCheckpoint(); cs != nil {
c := &Checkpoint{}
return record(c.Seq)
return record(&SeqView{Seq: c.Seq})
}
return false
}
Expand All @@ -53,22 +56,19 @@ func (s *SBFT) recordBacklogMsg(m *Msg, src uint64) {
//
// Prevent DoS by limiting the number of messages per replica.
//
// If the backlog limit is exceeded, discard all messages with
// Seq before the replica's hello message (we can, because we
// can play forward to this batch via state transfer). If
// there is no hello message, we must be really slow or the
// replica must be byzantine. In this case we probably should
// re-establish the connection.
// If the backlog limit is exceeded, re-establish the
// connection.
//
// After the connection has been re-established, we will
// receive a hello, and the following messages will trigger
// the pruning of old messages. If this pruning lead us not
// to make progress, the backlog processing algorithm as lined
// out below will take care of starting a state transfer,
// using the hello message we received on reconnect.
// receive a hello, which will advance our state and discard
// old messages.
s.replicaState[src].backLog = append(s.replicaState[src].backLog, m)
}

func (s *SBFT) discardBacklog(src uint64) {
s.replicaState[src].backLog = nil
}

func (s *SBFT) processBacklog() {
processed := true
notReady := uint64(0)
Expand Down Expand Up @@ -111,18 +111,6 @@ func (s *SBFT) processBacklog() {
// we should reconnect to get a working connection going
// again.
//
// If a noFaultyQuorum (-1, because we're not faulty, just
// were disconnected) is backlogged, we know that we need to
// perform a state transfer. Of course, f of these might be
// byzantine, and the remaining f that are not backlogged will
// allow us to get unstuck. To check against that, we need to
// only consider backlogged replicas of which we have a hello
// message that talks about a future Seq.
//
// We need to pick the highest Seq of all the hello messages
// we received, perform a state transfer to that Batch, and
// discard all backlogged messages that refer to a lower Seq.
//
// Do we need to detect that a connection is stuck and we
// should reconnect?
}
29 changes: 24 additions & 5 deletions consensus/simplebft/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,26 @@ func (s *SBFT) makeBatch(seq uint64, prevHash []byte, data [][]byte) *Batch {
}
}

func (s *SBFT) checkBatch(b *Batch) (*BatchHeader, error) {
datahash := merkleHashData(b.Payloads)

func (s *SBFT) checkBatch(b *Batch, checkData bool) (*BatchHeader, error) {
batchheader := &BatchHeader{}
err := proto.Unmarshal(b.Header, batchheader)
if err != nil {
return nil, err
}

if !reflect.DeepEqual(datahash, batchheader.DataHash) {
return nil, fmt.Errorf("malformed batch: invalid hash")
if checkData {
datahash := merkleHashData(b.Payloads)
if !reflect.DeepEqual(datahash, batchheader.DataHash) {
return nil, fmt.Errorf("malformed batch: invalid hash")
}
}

bh := b.Hash()
for r, sig := range b.Signatures {
err = s.sys.CheckSig(bh, r, sig)
if err != nil {
return nil, err
}
}

return batchheader, nil
Expand All @@ -63,3 +72,13 @@ func (s *SBFT) checkBatch(b *Batch) (*BatchHeader, error) {
func (b *Batch) Hash() []byte {
return hash(b.Header)
}

func (b *Batch) DecodeHeader() *BatchHeader {
batchheader := &BatchHeader{}
err := proto.Unmarshal(b.Header, batchheader)
if err != nil {
panic(err)
}

return batchheader
}
1 change: 1 addition & 0 deletions consensus/simplebft/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) {
batch := *s.cur.preprep.Batch
batch.Signatures = cpset
s.sys.Deliver(&batch)
s.seq = *s.cur.subject.Seq

s.cur.timeout.Cancel()
log.Infof("request %s %s completed on %d", s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id)
Expand Down
53 changes: 46 additions & 7 deletions consensus/simplebft/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ limitations under the License.

package simplebft

import "github.com/golang/protobuf/proto"

// Connection is an event from system to notify a new connection with
// replica.
// On connection, we send our latest (weak) checkpoint, and we expect
// to receive one from replica.
func (s *SBFT) Connection(replica uint64) {
batch := *s.sys.LastBatch()
batch.Payloads = nil // don't send the big payload
s.sys.Send(&Msg{&Msg_Hello{&batch}}, replica)
hello := &Hello{Batch: &batch}
if s.isPrimary() && s.activeView && s.lastNewViewSent != nil {
hello.NewView = s.lastNewViewSent
}
s.sys.Send(&Msg{&Msg_Hello{hello}}, replica)

// A reconnecting replica can play forward its blockchain to
// the batch listed in the hello message. However, the
Expand All @@ -43,13 +45,12 @@ func (s *SBFT) Connection(replica uint64) {
// connecting right after a new-view message was received, and
// its xset batch is in-flight.

batchheader := &BatchHeader{}
err := proto.Unmarshal(batch.Header, batchheader)
batchheader, err := s.checkBatch(&batch, false)
if err != nil {
panic(err)
}

if s.cur.subject.Seq.Seq > batchheader.Seq {
if s.cur.subject.Seq.Seq > batchheader.Seq && s.activeView {
if s.isPrimary() {
s.sys.Send(&Msg{&Msg_Preprepare{s.cur.preprep}}, replica)
} else {
Expand All @@ -64,6 +65,44 @@ func (s *SBFT) Connection(replica uint64) {
}
}

func (s *SBFT) handleHello(h *Batch, src uint64) {
func (s *SBFT) handleHello(h *Hello, src uint64) {
bh, err := s.checkBatch(h.Batch, false)
if err != nil {
log.Warningf("invalid hello batch from %d: %s", src, err)
return
}

if s.sys.LastBatch().DecodeHeader().Seq < bh.Seq {
s.sys.Deliver(h.Batch)
s.seq.Seq = bh.Seq
}

if h.NewView != nil {
if s.primaryIDView(h.NewView.View) != src {
log.Warningf("invalid hello with new view from non-primary %d", src)
return
}

vcs, err := s.checkNewViewSignatures(h.NewView)
if err != nil {
log.Warningf("invalid hello new view from %d: %s", src, err)
return
}

_, ok := s.makeXset(vcs)
if !ok {
log.Warningf("invalid hello new view xset from %d", src)
return
}

if s.seq.View <= h.NewView.View {
s.seq.View = h.NewView.View
}
s.activeView = true
}

s.replicaState[src].hello = h

s.discardBacklog(src)
s.processBacklog()
}
3 changes: 1 addition & 2 deletions consensus/simplebft/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ func (s *SBFT) maybeExecute() {
return
}
s.cur.executed = true
s.seq = *s.cur.subject.Seq
log.Noticef("executing %v", s.seq)
log.Noticef("executing %v", s.cur.subject)

s.sys.Persist("execute", &s.cur.subject)

Expand Down
48 changes: 26 additions & 22 deletions consensus/simplebft/newview.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

func (s *SBFT) maybeSendNewView() {
if s.lastNewViewSent == s.seq.View {
if s.lastNewViewSent != nil && s.lastNewViewSent.View == s.seq.View {
return
}

Expand Down Expand Up @@ -63,21 +63,11 @@ func (s *SBFT) maybeSendNewView() {
}

log.Noticef("sending new view for %d", nv.View)
s.lastNewViewSent = nv.View
s.lastNewViewSent = nv
s.broadcast(&Msg{&Msg_NewView{nv}})
}

func (s *SBFT) handleNewView(nv *NewView, src uint64) {
if src != s.primaryIDView(nv.View) {
log.Warningf("invalid new view from %d for %d", src, nv.View)
return
}

if onv := s.replicaState[s.primaryIDView(nv.View)].newview; onv != nil && onv.View >= nv.View {
log.Debugf("discarding duplicate new view for %d", nv.View)
return
}

func (s *SBFT) checkNewViewSignatures(nv *NewView) ([]*ViewChange, error) {
var vcs []*ViewChange
for vcsrc, svc := range nv.Vset {
vc := &ViewChange{}
Expand All @@ -88,13 +78,31 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
}
}
if err != nil {
log.Warningf("invalid new view from %d: view change for %d: %s", src, vcsrc, err)
s.sendViewChange()
return
return nil, err
}
vcs = append(vcs, vc)
}

return vcs, nil
}

func (s *SBFT) handleNewView(nv *NewView, src uint64) {
if src != s.primaryIDView(nv.View) {
log.Warningf("invalid new view from %d for %d", src, nv.View)
return
}

if onv := s.replicaState[s.primaryIDView(nv.View)].newview; onv != nil && onv.View >= nv.View {
log.Debugf("discarding duplicate new view for %d", nv.View)
return
}

vcs, err := s.checkNewViewSignatures(nv)
if err != nil {
log.Warningf("invalid new view from %d: %s", src, err)
s.sendViewChange()
}

xset, ok := s.makeXset(vcs)
if xset.Digest == nil {
// null request special treatment
Expand All @@ -120,7 +128,7 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
return
}

_, err := s.checkBatch(nv.Batch)
_, err = s.checkBatch(nv.Batch, true)
if err != nil {
log.Warningf("invalid new view from %d: invalid batch, %s",
src, err)
Expand Down Expand Up @@ -155,9 +163,5 @@ func (s *SBFT) processNewView() {
}

s.activeView = true
var h []byte
if nv.Batch != nil {
h = hash(nv.Batch.Header)
}
s.acceptPreprepare(Subject{Seq: &nextSeq, Digest: h}, pp)
s.handleCheckedPreprepare(pp)
}
44 changes: 25 additions & 19 deletions consensus/simplebft/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,32 @@ func (s *SBFT) handlePreprepare(pp *Preprepare, src uint64) {
log.Infof("duplicate preprepare for %v", *pp.Seq)
return
}
var blockhash []byte
if pp.Batch != nil {
blockhash = hash(pp.Batch.Header)

batchheader, err := s.checkBatch(pp.Batch)
if err != nil || batchheader.Seq != pp.Seq.Seq {
log.Infof("preprepare %v batch head inconsistent from %d", pp.Seq, src)
return
}

prevhash := hash(s.sys.LastBatch().Header)
if !bytes.Equal(batchheader.PrevHash, prevhash) {
log.Infof("preprepare batch prev hash does not match expected %s, got %s", hash2str(batchheader.PrevHash), hash2str(prevhash))
return
}
if pp.Batch == nil {
log.Infof("preprepare without batch")
return
}

batchheader, err := s.checkBatch(pp.Batch, true)
if err != nil || batchheader.Seq != pp.Seq.Seq {
log.Infof("preprepare %v batch head inconsistent from %d", pp.Seq, src)
return
}

prevhash := s.sys.LastBatch().Hash()
if !bytes.Equal(batchheader.PrevHash, prevhash) {
log.Infof("preprepare batch prev hash does not match expected %s, got %s", hash2str(batchheader.PrevHash), hash2str(prevhash))
return
}

s.acceptPreprepare(Subject{Seq: &nextSeq, Digest: blockhash}, pp)
s.handleCheckedPreprepare(pp)
}

func (s *SBFT) acceptPreprepare(sub Subject, pp *Preprepare) {
func (s *SBFT) acceptPreprepare(pp *Preprepare) {
sub := Subject{Seq: pp.Seq, Digest: pp.Batch.Hash()}

log.Infof("accepting preprepare for %v, %x", sub.Seq, sub.Digest)
s.sys.Persist("preprepare", pp)

s.cur = reqInfo{
subject: sub,
timeout: s.sys.Timer(time.Duration(s.config.RequestTimeoutNsec)*time.Nanosecond, s.requestTimeout),
Expand All @@ -83,9 +88,10 @@ func (s *SBFT) acceptPreprepare(sub Subject, pp *Preprepare) {
commit: make(map[uint64]*Subject),
checkpoint: make(map[uint64]*Checkpoint),
}
}

log.Infof("accepting preprepare for %v, %x", sub.Seq, sub.Digest)
s.sys.Persist("preprepare", pp)
func (s *SBFT) handleCheckedPreprepare(pp *Preprepare) {
s.acceptPreprepare(pp)
s.cancelViewChangeTimer()
if !s.isPrimary() {
s.sendPrepare()
Expand Down
Loading

0 comments on commit 31b7572

Please sign in to comment.