Skip to content

Commit

Permalink
sbft: fix restart bug and test
Browse files Browse the repository at this point in the history
commit 24514a027377631cc4b3c53a56d7fbcad75b7e5b
Author: Simon Schubert <sis@zurich.ibm.com>
Date:   Wed Oct 26 18:59:33 2016 +0200

    sbft: resend in-flight messages on reconnect

    This allows a recently disconnected replica to catch up and cancel its
    request timer.

    Change-Id: I6e32176777fcf74e48bd2a6402793cf4712637d5
    Signed-off-by: Simon Schubert <sis@zurich.ibm.com>

commit 08232f825ae54569c8fd296717a8ad242d9ad84d
Author: Simon Schubert <sis@zurich.ibm.com>
Date:   Wed Oct 26 18:59:19 2016 +0200

    sbft: ignore duplicate preprepare messages

    Change-Id: Ibfb47ecf1e005bdd23ed40f3753b01f49dfffd94
    Signed-off-by: Simon Schubert <sis@zurich.ibm.com>

commit 8ba9a9548727c36b8e276003d9a0eb855b2a6afb
Author: Simon Schubert <sis@zurich.ibm.com>
Date:   Wed Oct 26 18:56:40 2016 +0200

    sbft: do not use non-deterministic map iteration in tests

    This change hides a bug that occasionally was exposed due to a specific
    map iteration order.  We add TestErroneousViewChange to explicitly test
    for this bug, which is fixed in a followup commit.

    Change-Id: I192903dd57d637ec000a628f9f61b44749caf602
    Signed-off-by: Simon Schubert <sis@zurich.ibm.com>

Change-Id: I32a3c424dd385704b77ed884b2ec0f6d04f6ac6c
Signed-off-by: Simon Schubert <sis@zurich.ibm.com>
  • Loading branch information
corecode committed Oct 28, 2016
1 parent 4274764 commit bb6bc8d
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 13 deletions.
8 changes: 6 additions & 2 deletions consensus/simplebft/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ import (
"reflect"
)

func (s *SBFT) sendCheckpoint() {
func (s *SBFT) makeCheckpoint() *Checkpoint {
sig := s.sys.Sign(s.cur.subject.Digest)
c := &Checkpoint{
Seq: s.cur.subject.Seq.Seq,
Digest: s.cur.subject.Digest,
Signature: sig,
}
s.broadcast(&Msg{&Msg_Checkpoint{c}})
return c
}

func (s *SBFT) sendCheckpoint() {
s.broadcast(&Msg{&Msg_Checkpoint{s.makeCheckpoint()}})
}

func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) {
Expand Down
30 changes: 28 additions & 2 deletions consensus/simplebft/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.

package simplebft

import "github.com/golang/protobuf/proto"

// Connection is an event from system to notify a new connection with
// replica.
// On connection, we send our latest (weak) checkpoint, and we expect
Expand All @@ -25,8 +27,6 @@ func (s *SBFT) Connection(replica uint64) {
batch.Payloads = nil // don't send the big payload
s.sys.Send(&Msg{&Msg_Hello{&batch}}, replica)

// TODO
//
// A reconnecting replica can play forward its blockchain to
// the batch listed in the hello message. However, the
// currently in-flight batch will not be reflected in the
Expand All @@ -36,6 +36,32 @@ func (s *SBFT) Connection(replica uint64) {
// Therefore we also send the most recent (pre)prepare,
// commit, checkpoint so that the reconnecting replica can
// catch up on the in-flight batch.
//
// TODO We need to communicate the latest view to the
// connecting replica. The new view message is not signed, so
// we cannot send that message. The worst corner case is
// connecting right after a new-view message was received, and
// its xset batch is in-flight.

batchheader := &BatchHeader{}
err := proto.Unmarshal(batch.Header, batchheader)
if err != nil {
panic(err)
}

if s.cur.subject.Seq.Seq > batchheader.Seq {
if s.isPrimary() {
s.sys.Send(&Msg{&Msg_Preprepare{s.cur.preprep}}, replica)
} else {
s.sys.Send(&Msg{&Msg_Prepare{&s.cur.subject}}, replica)
}
if s.cur.sentCommit {
s.sys.Send(&Msg{&Msg_Commit{&s.cur.subject}}, replica)
}
if s.cur.executed {
s.sys.Send(&Msg{&Msg_Checkpoint{s.makeCheckpoint()}}, replica)
}
}
}

func (s *SBFT) handleHello(h *Batch, src uint64) {
Expand Down
4 changes: 4 additions & 0 deletions consensus/simplebft/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (s *SBFT) handlePreprepare(pp *Preprepare, src uint64) {
log.Infof("preprepare does not match expected %v, got %v", nextSeq, *pp.Seq)
return
}
if s.cur.subject.Seq.Seq == pp.Seq.Seq {
log.Infof("duplicate preprepare for %v", *pp.Seq)
return
}
var blockhash []byte
if pp.Batch != nil {
blockhash = hash(pp.Batch.Header)
Expand Down
3 changes: 1 addition & 2 deletions consensus/simplebft/simplebft.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,16 @@ func New(id uint64, config *Config, sys System) (*SBFT, error) {
s.seq = *pp.Seq
s.seq.Seq -= 1
s.handlePreprepare(pp, s.primaryIDView(pp.Seq.View))
// ideally we wouldn't send a prepare here
}
}
c := &Subject{}
if s.sys.Restore("commit", c) && reflect.DeepEqual(c, &s.cur.subject) {
s.cur.sentCommit = true
s.sendCommit()
}
ex := &Subject{}
if s.sys.Restore("execute", ex) && reflect.DeepEqual(c, &s.cur.subject) {
s.cur.executed = true
s.sendCheckpoint()
}

// XXX set active after checking with the network
Expand Down
117 changes: 110 additions & 7 deletions consensus/simplebft/simplebft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,25 @@ func init() {
}

func connectAll(sys *testSystem) {
// map iteration is non-deterministic, so use linear iteration instead
max := uint64(0)
for _, a := range sys.adapters {
for _, b := range sys.adapters {
if a.id > max {
max = a.id
}
}

for i := uint64(0); i <= max; i++ {
a, ok := sys.adapters[i]
if !ok {
continue
}

for j := uint64(0); j <= max; j++ {
b, ok := sys.adapters[j]
if !ok {
continue
}
if a.id != b.id {
a.receiver.Connection(b.id)
}
Expand Down Expand Up @@ -342,7 +359,7 @@ func TestRestart(t *testing.T) {

testLog.Notice("restarting 0")
repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0])
for _, a := range sys.adapters {
for _, a := range adapters {
if a.id != 0 {
a.receiver.Connection(0)
adapters[0].receiver.Connection(a.id)
Expand Down Expand Up @@ -394,7 +411,7 @@ func TestRestartAfterPrepare(t *testing.T) {
if p := msg.msg.GetPrepare(); p != nil && p.Seq.Seq == 3 && !restarted {
restarted = true
repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0])
for _, a := range sys.adapters {
for _, a := range adapters {
if a.id != 0 {
a.receiver.Connection(0)
adapters[0].receiver.Connection(a.id)
Expand Down Expand Up @@ -462,7 +479,7 @@ func TestRestartAfterCommit(t *testing.T) {
restarted = true
testLog.Notice("restarting 0")
repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0])
for _, a := range sys.adapters {
for _, a := range adapters {
if a.id != 0 {
a.receiver.Connection(0)
adapters[0].receiver.Connection(a.id)
Expand Down Expand Up @@ -503,8 +520,6 @@ func TestRestartAfterCommit(t *testing.T) {
}

func TestRestartAfterCheckpoint(t *testing.T) {
// TODO re-enable this test after https://jira.hyperledger.org/browse/FAB-624 has been resolved
t.Skip()
N := uint64(4)
sys := newTestSystem(N)
var repls []*SBFT
Expand Down Expand Up @@ -532,7 +547,7 @@ func TestRestartAfterCheckpoint(t *testing.T) {
restarted = true
testLog.Notice("restarting 0")
repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0])
for _, a := range sys.adapters {
for _, a := range adapters {
if a.id != 0 {
a.receiver.Connection(0)
adapters[0].receiver.Connection(a.id)
Expand Down Expand Up @@ -571,3 +586,91 @@ func TestRestartAfterCheckpoint(t *testing.T) {
}
}
}

func TestErroneousViewChange(t *testing.T) {
N := uint64(4)
sys := newTestSystem(N)
var repls []*SBFT
var adapters []*testSystemAdapter
for i := uint64(0); i < N; i++ {
a := sys.NewAdapter(i)
s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, a)
if err != nil {
t.Fatal(err)
}
repls = append(repls, s)
adapters = append(adapters, a)
}

restarted := false

// network outage after prepares are received
sys.filterFn = func(e testElem) (testElem, bool) {
if msg, ok := e.ev.(*testMsgEvent); ok {
if msg.src == msg.dst || msg.src != 0 {
return e, true
}

if c := msg.msg.GetCheckpoint(); c != nil && c.Seq == 3 && !restarted {
restarted = true
testLog.Notice("restarting 0")
repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0])
for _, a := range adapters {
if a.id != 0 {
a.receiver.Connection(0)
adapters[0].receiver.Connection(a.id)
}
}
}
}

return e, true
}

// iteration order here is essential to trigger the bug
outer := []uint64{2, 3, 0, 1}
inner := []uint64{0, 1, 2, 3}
for _, i := range outer {
a, ok := sys.adapters[i]
if !ok {
continue
}

for _, j := range inner {
b, ok := sys.adapters[j]
if !ok {
continue
}
if a.id != b.id {
a.receiver.Connection(b.id)
}
}
}
sys.Run()

// move to view 1
for _, r := range repls {
r.sendViewChange()
}

r1 := []byte{1, 2, 3}
repls[0].Request(r1)
sys.Run()

r2 := []byte{3, 1, 2}
r3 := []byte{3, 5, 2}
repls[1].Request(r2)
repls[1].Request(r3)
sys.Run()
for _, a := range adapters {
if len(a.batches) != 3 {
t.Fatal("expected execution of 3 batches")
}
if !reflect.DeepEqual([][]byte{r1}, a.batches[1].Payloads) {
t.Error("wrong request executed (1)")
}
if !reflect.DeepEqual([][]byte{r2, r3}, a.batches[2].Payloads) {
t.Error("wrong request executed (2)")
}
}
}

0 comments on commit bb6bc8d

Please sign in to comment.