Skip to content

Commit

Permalink
fix sbft hello msg issue
Browse files Browse the repository at this point in the history
After receiveing a hello message, a replica would not cleanup its sbft
state (checkpointDone and timer cancellation). This would leave a replica
hanging not able to process new requests.

Added a test (TestHelloMsg) to demonstrate this. This required also
enabling sbft TestSystem configuration with disabled Timer interface.

Change-Id: I4f8dc7e57c656a09ad536b229c9709213c5eb6af
Signed-off-by: Marko Vukolic <mvu@zurich.ibm.com>
  • Loading branch information
Marko Vukolic committed Dec 12, 2016
1 parent 3ef851e commit e1467b8
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 14 deletions.
6 changes: 1 addition & 5 deletions orderer/sbft/simplebft/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) {
cp := s.cur.checkpoint[r]
cpset[r] = cp.Signature
}
s.cur.checkpointDone = true

c = s.cur.checkpoint[replicas[0]]

Expand All @@ -98,10 +97,7 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) {
batch := *s.cur.preprep.Batch
batch.Signatures = cpset
s.deliverBatch(&batch)

s.cur.timeout.Cancel()
log.Infof("replica %d: request %s %s completed on %d", s.id, s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id)

log.Infof("replica %d: request %s %s delivered on %d (completed common case)", s.id, s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id)
s.maybeSendNextBatch()
s.processBacklog()
}
3 changes: 3 additions & 0 deletions orderer/sbft/simplebft/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,15 @@ func (s *SBFT) Connection(replica uint64) {

func (s *SBFT) handleHello(h *Hello, src uint64) {
bh, err := s.checkBatch(h.Batch, false, true)
log.Debugf("replica %d: got hello for batch %d from replica %d", s.id, bh.Seq, src)

if err != nil {
log.Warningf("replica %d: invalid hello batch from %d: %s", s.id, src, err)
return
}

if s.sys.LastBatch().DecodeHeader().Seq < bh.Seq {
log.Debugf("replica %d: delivering batch %d after hello from replica %d", s.id, bh.Seq, src)
s.deliverBatch(h.Batch)
}

Expand Down
1 change: 0 additions & 1 deletion orderer/sbft/simplebft/newview.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func (s *SBFT) maybeDeliverUsingXset(nv *NewView) {
// we just received a signature set for a request which we preprepared, but never delivered.
prevBatch.Payloads = s.cur.preprep.Batch.Payloads
}
s.cur.checkpointDone = true
s.deliverBatch(prevBatch)
}
}
2 changes: 2 additions & 0 deletions orderer/sbft/simplebft/simplebft.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func (s *SBFT) handleQueueableMessage(m *Msg, src uint64) {
}

func (s *SBFT) deliverBatch(batch *Batch) {
s.cur.checkpointDone = true
s.cur.timeout.Cancel()
s.sys.Deliver(batch)

for _, req := range batch.Payloads {
Expand Down
66 changes: 66 additions & 0 deletions orderer/sbft/simplebft/simplebft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,72 @@ func TestFullBacklog(t *testing.T) {
}
}

func TestHelloMsg(t *testing.T) {
N := uint64(4)
sys := newTestSystemWOTimers(N)
var repls []*SBFT
var adapters []*testSystemAdapter
for i := uint64(0); i < N; i++ {
a := sys.NewAdapter(i)
s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 1, RequestTimeoutNsec: 20000000000}, a)
if err != nil {
t.Fatal(err)
}
repls = append(repls, s)
adapters = append(adapters, a)
}

phase := 1

// We are going to deliver only pre-prepare of the first request to replica 1
// other messages pertaining to first request, destined to 1 will be dropped
sys.filterFn = func(e testElem) (testElem, bool) {
if msg, ok := e.ev.(*testMsgEvent); ok {
if msg.dst == 1 {
c := msg.msg.GetPreprepare()
if c != nil && c.Seq.View == 0 && phase == 1 {
return e, true // letting the first pre-prepare be delivered to 1
}
if phase > 1 {
return e, true //letting msgs outside phase 1 through
}
return e, false //dropping other phase 1 msgs
}
}
return e, true
}

connectAll(sys)
r1 := []byte{1, 2, 3}
repls[0].Request(r1)
sys.Run()

phase = 2 //start delivering msgs to replica 1

testLog.Notice("restarting replica 1")
repls[1], _ = New(1, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[1])
for _, a := range adapters {
if a.id != 1 {
a.receiver.Connection(1)
adapters[1].receiver.Connection(a.id)
}
}

sys.Run()

phase = 3

r3 := []byte{3, 5, 2}
repls[1].Request(r3)
sys.Run()

for _, a := range adapters {
if len(a.batches) != 2 {
t.Fatal("expected execution of 2 batches")
}
}
}

func TestViewChangeTimer(t *testing.T) {
N := uint64(4)
sys := newTestSystem(N)
Expand Down
27 changes: 19 additions & 8 deletions orderer/sbft/simplebft/testsys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ func (t *testTimer) String() string {

func (t *testSystemAdapter) Timer(d time.Duration, tf func()) Canceller {
tt := &testTimer{id: t.id, tf: tf}
t.sys.enqueue(d, tt)
if !t.sys.disableTimers {
t.sys.enqueue(d, tt)
}
return tt
}

Expand Down Expand Up @@ -171,9 +173,8 @@ func (t *testSystemAdapter) Restore(key string, out proto.Message) bool {
func (t *testSystemAdapter) LastBatch() *Batch {
if len(t.batches) == 0 {
return t.receiver.(*SBFT).makeBatch(0, nil, nil)
} else {
return t.batches[len(t.batches)-1]
}
return t.batches[len(t.batches)-1]
}

func (t *testSystemAdapter) Sign(data []byte) []byte {
Expand Down Expand Up @@ -233,11 +234,12 @@ type testEvent interface {
// ==============================================

type testSystem struct {
rand *rand.Rand
now time.Duration
queue *calendarQueue
adapters map[uint64]*testSystemAdapter
filterFn func(testElem) (testElem, bool)
rand *rand.Rand
now time.Duration
queue *calendarQueue
adapters map[uint64]*testSystemAdapter
filterFn func(testElem) (testElem, bool)
disableTimers bool
}

type testElem struct {
Expand All @@ -257,6 +259,15 @@ func newTestSystem(n uint64) *testSystem {
}
}

func newTestSystemWOTimers(n uint64) *testSystem {
return &testSystem{
rand: rand.New(rand.NewSource(0)),
adapters: make(map[uint64]*testSystemAdapter),
queue: newCalendarQueue(time.Millisecond/time.Duration(n*n), int(n*n)),
disableTimers: true,
}
}

func (t *testSystem) NewAdapter(id uint64) *testSystemAdapter {
key, err := ecdsa.GenerateKey(elliptic.P256(), crand.Reader)
if err != nil {
Expand Down

0 comments on commit e1467b8

Please sign in to comment.