Skip to content

Commit

Permalink
sbft: refactor + document future directions
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit ad540eeb38effe086851c6d9c58a0192e7034b10
Author: Simon Schubert <sis@zurich.ibm.com>
Date:   Tue Oct 4 15:09:38 2016 +0200

    address golint concerns

    Change-Id: I59ae215d771a128cac740489a1d6308ebd1fb2a5
    Signed-off-by: Simon Schubert <sis@zurich.ibm.com>

commit d60d5f54bb5ca04e5857be13cd5dbfd3d67e5ac6
Author: Simon Schubert <sis@zurich.ibm.com>
Date:   Tue Oct 4 14:51:14 2016 +0200

    add comments on backlog and state transfer strategy

    Change-Id: Iedc84f196c76e1cd12c88f2e74505cae64d80752
    Signed-off-by: Simon Schubert <sis@zurich.ibm.com>

commit 6169fd180918940e622a580a55d3be4243ccd919
Author: Simon Schubert <sis@zurich.ibm.com>
Date:   Mon Oct 3 15:35:00 2016 +0200

    properly process queued checkpoint messages

    Change-Id: If6d8e58f8a02178a4a435542162cba52b233df92
    Signed-off-by: Simon Schubert <sis@zurich.ibm.com>

commit 017c174700e0a47015a6dfc8967576cd8872de79
Author: Simon Schubert <sis@zurich.ibm.com>
Date:   Mon Oct 3 15:33:57 2016 +0200

    send hello message on connect

    Change-Id: Id76553a7403d730182ca5f8bd445053c28969f91
    Signed-off-by: Simon Schubert <sis@zurich.ibm.com>

commit b11ed71a695a96f103626d1db8629a39d39db7c4
Author: Simon Schubert <sis@zurich.ibm.com>
Date:   Mon Oct 3 14:28:52 2016 +0200

    rename message Seq to SeqView

    Change-Id: I2cb6b4082b72642342ca09eeb4c0be7eb382356d
    Signed-off-by: Simon Schubert <sis@zurich.ibm.com>

commit 12b1d77eb8e7e0e4e8f1a68e04c5183ddf6a40c0
Author: Simon Schubert <sis@zurich.ibm.com>
Date:   Mon Oct 3 14:24:22 2016 +0200

    record signature origin in batch

    Change-Id: I9416e90ae01c35d548eeedb56ac6f345e9a0b63b
    Signed-off-by: Simon Schubert <sis@zurich.ibm.com>

commit f18a7896183b94edd0ed1285fe2d8d827435e5bc
Author: Simon Schubert <sis@zurich.ibm.com>
Date:   Mon Oct 3 12:50:47 2016 +0200

    change info to warning

    Change-Id: If25917123731484784deec45af589e84a17d9c26
    Signed-off-by: Simon Schubert <sis@zurich.ibm.com>

commit 59e79929ffb18f7fd834f9bf9bd667109f6c9e5d
Author: Simon Schubert <sis@zurich.ibm.com>
Date:   Mon Oct 3 12:50:28 2016 +0200

    conform to protobuf style guide

    Change-Id: I851476b6449c17c88f31539c78221fc81338cadd
    Signed-off-by: Simon Schubert <sis@zurich.ibm.com>

Change-Id: Iaa289455d6ebc7c31a1e4130b9d07f8759d05c88
Signed-off-by: Simon Schubert <sis@zurich.ibm.com>
  • Loading branch information
corecode committed Oct 4, 2016
1 parent 52c8407 commit cbd1ea0
Show file tree
Hide file tree
Showing 15 changed files with 307 additions and 212 deletions.
64 changes: 50 additions & 14 deletions consensus/simplebft/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,33 @@ func (s *SBFT) recordBacklogMsg(m *Msg, src uint64) {
if src == s.id {
panic("should never have to backlog my own message")
}
// TODO prevent DoS by limiting the number of messages per replica
// TODO
//
// Prevent DoS by limiting the number of messages per replica.
//
// If the backlog limit is exceeded, discard all messages with
// Seq before the replica's hello message (we can, because we
// can play forward to this batch via state transfer). If
// there is no hello message, we must be really slow or the
// replica must be byzantine. In this case we probably should
// re-establish the connection.
//
// After the connection has been re-established, we will
// receive a hello, and the following messages will trigger
// the pruning of old messages. If this pruning lead us not
// to make progress, the backlog processing algorithm as lined
// out below will take care of starting a state transfer,
// using the hello message we received on reconnect.
s.backLog[src] = append(s.backLog[src], m)
}

func (s *SBFT) processBacklog() {
processed := true
notReady := uint64(0)

for processed {
processed = false
notReady := uint64(0)
for src, _ := range s.backLog {
for src := range s.backLog {
for len(s.backLog[src]) > 0 {
m, rest := s.backLog[src][0], s.backLog[src][1:]
if s.testBacklog2(m, src) {
Expand All @@ -74,16 +90,36 @@ func (s *SBFT) processBacklog() {
processed = true
}
}

// all minus us
if notReady >= s.config.N-1 {
// This is a problem - we consider all other replicas
// too far ahead for us. We need to do a state transfer
// to get out of this rut.
for src := range s.backLog {
delete(s.backLog, src)
}
// TODO trigger state transfer
}
}

// TODO
//
// Detect when we need to reconsider our options.
//
// We arrived here because either all is fine, we're with the
// pack. Or we have messages in the backlog because we're
// connected asymmetrically, and a close replica already
// started talking about the next batch while we're still
// waiting for rounds to arrive for our current batch. That's
// still fine.
//
// We might also be here because we lost connectivity, and we
// either missed some messages, or our connection is bad and
// we should reconnect to get a working connection going
// again.
//
// If a noFaultyQuorum (-1, because we're not faulty, just
// were disconnected) is backlogged, we know that we need to
// perform a state transfer. Of course, f of these might be
// byzantine, and the remaining f that are not backlogged will
// allow us to get unstuck. To check against that, we need to
// only consider backlogged replicas of which we have a hello
// message that talks about a future Seq.
//
// We need to pick the highest Seq of all the hello messages
// we received, perform a state transfer to that Batch, and
// discard all backlogged messages that refer to a lower Seq.
//
// Do we need to detect that a connection is stuck and we
// should reconnect?
}
1 change: 1 addition & 0 deletions consensus/simplebft/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (s *SBFT) checkBatch(b *Batch) (*BatchHeader, error) {

////////////////////////////////////////

// Hash returns the hash of the Batch.
func (b *Batch) Hash() []byte {
return hash(b.Header)
}
15 changes: 6 additions & 9 deletions consensus/simplebft/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,10 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) {

// got a weak checkpoint

cpset := &CheckpointSet{make(map[uint64]*Checkpoint)}
var sigs [][]byte
cpset := make(map[uint64][]byte)
for _, r := range replicas {
cp := s.cur.checkpoint[r]
cpset.CheckpointSet[r] = cp
sigs = append(sigs, cp.Signature)
cpset[r] = cp.Signature
}
s.cur.checkpointDone = true

Expand All @@ -93,11 +91,10 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) {
}

// ignore null requests
if s.cur.preprep.Batch != nil {
batch := *s.cur.preprep.Batch
batch.Signatures = sigs
s.sys.Deliver(&batch)
}
batch := *s.cur.preprep.Batch
batch.Signatures = cpset
s.sys.Deliver(&batch)

s.cur.timeout.Cancel()
log.Infof("request %s %s completed on %d", s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id)

Expand Down
2 changes: 1 addition & 1 deletion consensus/simplebft/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *SBFT) handleCommit(c *Subject, src uint64) {
}

if !reflect.DeepEqual(c, &s.cur.subject) {
log.Infof("commit does not match expected subject %v, got %v", &s.cur.subject, c)
log.Warningf("commit does not match expected subject %v, got %v", &s.cur.subject, c)
return
}
if _, ok := s.cur.commit[src]; ok {
Expand Down
42 changes: 42 additions & 0 deletions consensus/simplebft/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package simplebft

// Connection is an event from system to notify a new connection with
// replica.
// On connection, we send our latest (weak) checkpoint, and we expect
// to receive one from replica.
func (s *SBFT) Connection(replica uint64) {
batch := *s.sys.LastBatch()
batch.Payloads = nil // don't send the big payload
s.sys.Send(&Msg{&Msg_Hello{&batch}}, replica)

// TODO
//
// A reconnecting replica can play forward its blockchain to
// the batch listed in the hello message. However, the
// currently in-flight batch will not be reflected in the
// Hello message, nor will all messages be present to actually
// commit the in-flight batch at the reconnecting replica.
//
// Therefore we also send the most recent (pre)prepare,
// commit, checkpoint so that the reconnecting replica can
// catch up on the in-flight batch.
}

func (s *SBFT) handleHello(h *Batch, src uint64) {
}
5 changes: 2 additions & 3 deletions consensus/simplebft/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ func merkleHashDigests(digests [][]byte) []byte {
digests = nextDigests
}

if len(digests) > 0 {
return digests[0]
} else {
if len(digests) == 0 {
return nil
}
return digests[0]
}

////////////////////////////////////////////////
Expand Down
8 changes: 4 additions & 4 deletions consensus/simplebft/newview.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func (s *SBFT) maybeSendNewView() {
}

func (s *SBFT) handleNewView(nv *NewView, src uint64) {
if src != s.primaryIdView(nv.View) {
if src != s.primaryIDView(nv.View) {
log.Warningf("invalid new view from %d for %d", src, nv.View)
return
}

if onv, ok := s.newview[s.primaryIdView(nv.View)]; ok && onv.View >= nv.View {
if onv, ok := s.newview[s.primaryIDView(nv.View)]; ok && onv.View >= nv.View {
log.Debugf("discarding duplicate new view for %d", nv.View)
return
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
return
}

s.newview[s.primaryIdView(nv.View)] = nv
s.newview[s.primaryIDView(nv.View)] = nv

s.processNewView()
}
Expand All @@ -138,7 +138,7 @@ func (s *SBFT) processNewView() {
return
}

nv, ok := s.newview[s.primaryIdView(s.seq.View)]
nv, ok := s.newview[s.primaryIDView(s.seq.View)]
if !ok || nv.View != s.seq.View {
return
}
Expand Down
58 changes: 29 additions & 29 deletions consensus/simplebft/newview_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,26 @@ import (
)

func TestXsetNoByz(t *testing.T) {
s := &SBFT{config: Config{N: 4, F: 1}, seq: Seq{3, 1}}
s := &SBFT{config: Config{N: 4, F: 1}, seq: SeqView{3, 1}}
vcs := []*ViewChange{
&ViewChange{
View: 3,
Pset: nil,
Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")},
&Subject{&Seq{2, 2}, []byte("val2")}},
Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")},
&Subject{&SeqView{2, 2}, []byte("val2")}},
Executed: 1,
},
&ViewChange{
View: 3,
Pset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}},
Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}},
Pset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}},
Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}},
Executed: 1,
},
&ViewChange{
View: 3,
Pset: []*Subject{&Subject{&Seq{2, 2}, []byte("val2")}},
Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")},
&Subject{&Seq{2, 2}, []byte("val2")}},
Pset: []*Subject{&Subject{&SeqView{2, 2}, []byte("val2")}},
Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")},
&Subject{&SeqView{2, 2}, []byte("val2")}},
Executed: 1,
},
}
Expand All @@ -51,13 +51,13 @@ func TestXsetNoByz(t *testing.T) {
t.Fatal("no xset")
}

if !reflect.DeepEqual(xset, &Subject{&Seq{3, 2}, []byte("val2")}) {
if !reflect.DeepEqual(xset, &Subject{&SeqView{3, 2}, []byte("val2")}) {
t.Error(xset)
}
}

func TestXsetByz0(t *testing.T) {
s := &SBFT{config: Config{N: 4, F: 1}, seq: Seq{3, 1}}
s := &SBFT{config: Config{N: 4, F: 1}, seq: SeqView{3, 1}}
vcs := []*ViewChange{
&ViewChange{
View: 3,
Expand All @@ -67,15 +67,15 @@ func TestXsetByz0(t *testing.T) {
},
&ViewChange{
View: 3,
Pset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}},
Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}},
Pset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}},
Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}},
Executed: 1,
},
&ViewChange{
View: 3,
Pset: []*Subject{&Subject{&Seq{2, 2}, []byte("val2")}},
Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")},
&Subject{&Seq{2, 2}, []byte("val2")}},
Pset: []*Subject{&Subject{&SeqView{2, 2}, []byte("val2")}},
Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")},
&Subject{&SeqView{2, 2}, []byte("val2")}},
Executed: 1,
},
}
Expand All @@ -87,41 +87,41 @@ func TestXsetByz0(t *testing.T) {

vcs = append(vcs, &ViewChange{
View: 3,
Pset: []*Subject{&Subject{&Seq{2, 2}, []byte("val2")}},
Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")},
&Subject{&Seq{2, 2}, []byte("val2")}},
Pset: []*Subject{&Subject{&SeqView{2, 2}, []byte("val2")}},
Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")},
&Subject{&SeqView{2, 2}, []byte("val2")}},
Executed: 2,
})

xset, ok = s.makeXset(vcs)
if !ok {
t.Error("no xset")
}
if !reflect.DeepEqual(xset, &Subject{&Seq{3, 2}, []byte("val2")}) {
if !reflect.DeepEqual(xset, &Subject{&SeqView{3, 2}, []byte("val2")}) {
t.Error(xset)
}
}

func TestXsetByz2(t *testing.T) {
s := &SBFT{config: Config{N: 4, F: 1}, seq: Seq{3, 1}}
s := &SBFT{config: Config{N: 4, F: 1}, seq: SeqView{3, 1}}
vcs := []*ViewChange{
&ViewChange{
View: 3,
Pset: nil,
Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}},
Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}},
Executed: 1,
},
&ViewChange{
View: 3,
Pset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}},
Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}},
Pset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}},
Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}},
Executed: 1,
},
&ViewChange{
View: 3,
Pset: []*Subject{&Subject{&Seq{2, 2}, []byte("val2")}},
Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")},
&Subject{&Seq{2, 2}, []byte("val2")}},
Pset: []*Subject{&Subject{&SeqView{2, 2}, []byte("val2")}},
Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")},
&Subject{&SeqView{2, 2}, []byte("val2")}},
Executed: 1,
},
}
Expand All @@ -133,16 +133,16 @@ func TestXsetByz2(t *testing.T) {

vcs = append(vcs, &ViewChange{
View: 3,
Pset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}},
Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}},
Pset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}},
Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}},
Executed: 2,
})

xset, ok = s.makeXset(vcs)
if !ok {
t.Error("no xset")
}
if !reflect.DeepEqual(xset, &Subject{&Seq{3, 2}, []byte("val1")}) {
if !reflect.DeepEqual(xset, &Subject{&SeqView{3, 2}, []byte("val1")}) {
t.Error(xset)
}
}
2 changes: 1 addition & 1 deletion consensus/simplebft/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *SBFT) sendPreprepare(batch []*Request) {
}

func (s *SBFT) handlePreprepare(pp *Preprepare, src uint64) {
if src != s.primaryId() {
if src != s.primaryID() {
log.Infof("preprepare from non-primary %d", src)
return
}
Expand Down
1 change: 1 addition & 0 deletions consensus/simplebft/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package simplebft

import "time"

// Request proposes a new request to the BFT network.
func (s *SBFT) Request(req []byte) {
s.broadcast(&Msg{&Msg_Request{&Request{req}}})
}
Expand Down
Loading

0 comments on commit cbd1ea0

Please sign in to comment.