diff --git a/orderer/sbft/simplebft/checkpoint.go b/orderer/sbft/simplebft/checkpoint.go index ac5bc0f0b13..b6130e476a9 100644 --- a/orderer/sbft/simplebft/checkpoint.go +++ b/orderer/sbft/simplebft/checkpoint.go @@ -84,7 +84,6 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) { cp := s.cur.checkpoint[r] cpset[r] = cp.Signature } - s.cur.checkpointDone = true c = s.cur.checkpoint[replicas[0]] @@ -98,10 +97,7 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) { batch := *s.cur.preprep.Batch batch.Signatures = cpset s.deliverBatch(&batch) - - s.cur.timeout.Cancel() - log.Infof("replica %d: request %s %s completed on %d", s.id, s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id) - + log.Infof("replica %d: request %s %s delivered on %d (completed common case)", s.id, s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id) s.maybeSendNextBatch() s.processBacklog() } diff --git a/orderer/sbft/simplebft/connection.go b/orderer/sbft/simplebft/connection.go index af13d37c57f..8d054b001e4 100644 --- a/orderer/sbft/simplebft/connection.go +++ b/orderer/sbft/simplebft/connection.go @@ -66,12 +66,15 @@ func (s *SBFT) Connection(replica uint64) { func (s *SBFT) handleHello(h *Hello, src uint64) { bh, err := s.checkBatch(h.Batch, false, true) + log.Debugf("replica %d: got hello for batch %d from replica %d", s.id, bh.Seq, src) + if err != nil { log.Warningf("replica %d: invalid hello batch from %d: %s", s.id, src, err) return } if s.sys.LastBatch().DecodeHeader().Seq < bh.Seq { + log.Debugf("replica %d: delivering batch %d after hello from replica %d", s.id, bh.Seq, src) s.deliverBatch(h.Batch) } diff --git a/orderer/sbft/simplebft/newview.go b/orderer/sbft/simplebft/newview.go index 3a0406541a4..40ac9f8eecb 100644 --- a/orderer/sbft/simplebft/newview.go +++ b/orderer/sbft/simplebft/newview.go @@ -199,7 +199,6 @@ func (s *SBFT) maybeDeliverUsingXset(nv *NewView) { // we just received a signature set for a request which we preprepared, but never delivered. prevBatch.Payloads = s.cur.preprep.Batch.Payloads } - s.cur.checkpointDone = true s.deliverBatch(prevBatch) } } diff --git a/orderer/sbft/simplebft/simplebft.go b/orderer/sbft/simplebft/simplebft.go index 13f4df2b236..628f711e71e 100644 --- a/orderer/sbft/simplebft/simplebft.go +++ b/orderer/sbft/simplebft/simplebft.go @@ -230,6 +230,8 @@ func (s *SBFT) handleQueueableMessage(m *Msg, src uint64) { } func (s *SBFT) deliverBatch(batch *Batch) { + s.cur.checkpointDone = true + s.cur.timeout.Cancel() s.sys.Deliver(batch) for _, req := range batch.Payloads { diff --git a/orderer/sbft/simplebft/simplebft_test.go b/orderer/sbft/simplebft/simplebft_test.go index 877bf203051..fba08f83f36 100644 --- a/orderer/sbft/simplebft/simplebft_test.go +++ b/orderer/sbft/simplebft/simplebft_test.go @@ -940,6 +940,72 @@ func TestFullBacklog(t *testing.T) { } } +func TestHelloMsg(t *testing.T) { + N := uint64(4) + sys := newTestSystemWOTimers(N) + var repls []*SBFT + var adapters []*testSystemAdapter + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 1, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + t.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + + phase := 1 + + // We are going to deliver only pre-prepare of the first request to replica 1 + // other messages pertaining to first request, destined to 1 will be dropped + sys.filterFn = func(e testElem) (testElem, bool) { + if msg, ok := e.ev.(*testMsgEvent); ok { + if msg.dst == 1 { + c := msg.msg.GetPreprepare() + if c != nil && c.Seq.View == 0 && phase == 1 { + return e, true // letting the first pre-prepare be delivered to 1 + } + if phase > 1 { + return e, true //letting msgs outside phase 1 through + } + return e, false //dropping other phase 1 msgs + } + } + return e, true + } + + connectAll(sys) + r1 := []byte{1, 2, 3} + repls[0].Request(r1) + sys.Run() + + phase = 2 //start delivering msgs to replica 1 + + testLog.Notice("restarting replica 1") + repls[1], _ = New(1, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[1]) + for _, a := range adapters { + if a.id != 1 { + a.receiver.Connection(1) + adapters[1].receiver.Connection(a.id) + } + } + + sys.Run() + + phase = 3 + + r3 := []byte{3, 5, 2} + repls[1].Request(r3) + sys.Run() + + for _, a := range adapters { + if len(a.batches) != 2 { + t.Fatal("expected execution of 2 batches") + } + } +} + func TestViewChangeTimer(t *testing.T) { N := uint64(4) sys := newTestSystem(N) diff --git a/orderer/sbft/simplebft/testsys_test.go b/orderer/sbft/simplebft/testsys_test.go index f70e2f17e87..9bb0bf6fa4c 100644 --- a/orderer/sbft/simplebft/testsys_test.go +++ b/orderer/sbft/simplebft/testsys_test.go @@ -139,7 +139,9 @@ func (t *testTimer) String() string { func (t *testSystemAdapter) Timer(d time.Duration, tf func()) Canceller { tt := &testTimer{id: t.id, tf: tf} - t.sys.enqueue(d, tt) + if !t.sys.disableTimers { + t.sys.enqueue(d, tt) + } return tt } @@ -171,9 +173,8 @@ func (t *testSystemAdapter) Restore(key string, out proto.Message) bool { func (t *testSystemAdapter) LastBatch() *Batch { if len(t.batches) == 0 { return t.receiver.(*SBFT).makeBatch(0, nil, nil) - } else { - return t.batches[len(t.batches)-1] } + return t.batches[len(t.batches)-1] } func (t *testSystemAdapter) Sign(data []byte) []byte { @@ -233,11 +234,12 @@ type testEvent interface { // ============================================== type testSystem struct { - rand *rand.Rand - now time.Duration - queue *calendarQueue - adapters map[uint64]*testSystemAdapter - filterFn func(testElem) (testElem, bool) + rand *rand.Rand + now time.Duration + queue *calendarQueue + adapters map[uint64]*testSystemAdapter + filterFn func(testElem) (testElem, bool) + disableTimers bool } type testElem struct { @@ -257,6 +259,15 @@ func newTestSystem(n uint64) *testSystem { } } +func newTestSystemWOTimers(n uint64) *testSystem { + return &testSystem{ + rand: rand.New(rand.NewSource(0)), + adapters: make(map[uint64]*testSystemAdapter), + queue: newCalendarQueue(time.Millisecond/time.Duration(n*n), int(n*n)), + disableTimers: true, + } +} + func (t *testSystem) NewAdapter(id uint64) *testSystemAdapter { key, err := ecdsa.GenerateKey(elliptic.P256(), crand.Reader) if err != nil {