From 84d1cb13bbc59e0497f6060c4ca4f4b15f12a7e6 Mon Sep 17 00:00:00 2001 From: Simon Schubert Date: Tue, 1 Nov 2016 15:45:49 +0100 Subject: [PATCH] sbft: limit backlog Issue: FAB-603 Change-Id: If6ac606018c5ab82eaaf8f16e075417ef2f7e243 Signed-off-by: Simon Schubert --- consensus/simplebft/backlog.go | 19 ++++++------- consensus/simplebft/simplebft.go | 13 +++------ consensus/simplebft/simplebft_test.go | 41 +++++++++++++++++++++++++++ consensus/simplebft/testsys_test.go | 24 +++++++++++++++- 4 files changed, 77 insertions(+), 20 deletions(-) diff --git a/consensus/simplebft/backlog.go b/consensus/simplebft/backlog.go index d266c5730b8..a4919fe4328 100644 --- a/consensus/simplebft/backlog.go +++ b/consensus/simplebft/backlog.go @@ -16,6 +16,9 @@ limitations under the License. package simplebft +const maxBacklogSeq = 4 +const msgPerSeq = 3 // (pre)prepare, commit, checkpoint + func (s *SBFT) testBacklog(m *Msg, src uint64) bool { if len(s.replicaState[src].backLog) > 0 { return true @@ -52,17 +55,13 @@ func (s *SBFT) recordBacklogMsg(m *Msg, src uint64) { if src == s.id { panic("should never have to backlog my own message") } - // TODO - // - // Prevent DoS by limiting the number of messages per replica. - // - // If the backlog limit is exceeded, re-establish the - // connection. - // - // After the connection has been re-established, we will - // receive a hello, which will advance our state and discard - // old messages. + s.replicaState[src].backLog = append(s.replicaState[src].backLog, m) + + if len(s.replicaState[src].backLog) > maxBacklogSeq*msgPerSeq { + s.discardBacklog(src) + s.sys.Reconnect(src) + } } func (s *SBFT) discardBacklog(src uint64) { diff --git a/consensus/simplebft/simplebft.go b/consensus/simplebft/simplebft.go index 7198465dfd6..1beb69a74a3 100644 --- a/consensus/simplebft/simplebft.go +++ b/consensus/simplebft/simplebft.go @@ -43,6 +43,7 @@ type System interface { LastBatch() *Batch Sign(data []byte) []byte CheckSig(data []byte, src uint64, sig []byte) error + Reconnect(replica uint64) } // Canceller allows cancelling of a scheduled timer event. @@ -192,12 +193,6 @@ func (s *SBFT) broadcast(m *Msg) { func (s *SBFT) Receive(m *Msg, src uint64) { log.Debugf("received message from %d: %s", src, m) - if s.testBacklog(m, src) { - log.Debugf("message for future seq, storing for later") - s.recordBacklogMsg(m, src) - return - } - if h := m.GetHello(); h != nil { s.handleHello(h, src) return @@ -212,9 +207,9 @@ func (s *SBFT) Receive(m *Msg, src uint64) { return } - if !s.activeView { - log.Infof("we are not active in view %d, discarding message from %d", - s.seq.View, src) + if s.testBacklog(m, src) { + log.Debugf("message for future seq, storing for later") + s.recordBacklogMsg(m, src) return } diff --git a/consensus/simplebft/simplebft_test.go b/consensus/simplebft/simplebft_test.go index 7125d8bcc19..5b4ec954e91 100644 --- a/consensus/simplebft/simplebft_test.go +++ b/consensus/simplebft/simplebft_test.go @@ -746,3 +746,44 @@ func TestRestartMissedViewChange(t *testing.T) { } } } + +func TestFullBacklog(t *testing.T) { + N := uint64(4) + sys := newTestSystem(N) + var repls []*SBFT + var adapters []*testSystemAdapter + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 1, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + t.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + + r1 := []byte{1, 2, 3} + + connectAll(sys) + sys.enqueue(200*time.Millisecond, &testTimer{id: 999, tf: func() { + repls[0].sys.Send(&Msg{&Msg_Prepare{&Subject{Seq: &SeqView{Seq: 100}}}}, 1) + }}) + for i := 0; i < 10; i++ { + sys.enqueue(time.Duration(i)*100*time.Millisecond, &testTimer{id: 999, tf: func() { + repls[0].Request(r1) + }}) + } + sys.Run() + if len(repls[1].replicaState[2].backLog) > 4*3 { + t.Errorf("backlog too long: %d", len(repls[1].replicaState[0].backLog)) + } + for _, a := range adapters { + if len(a.batches) == 0 { + t.Fatalf("expected execution of batches on %d", a.id) + } + bh := a.batches[len(a.batches)-1].DecodeHeader() + if bh.Seq != 10 { + t.Errorf("wrong request executed on %d: %v", a.id, bh) + } + } +} diff --git a/consensus/simplebft/testsys_test.go b/consensus/simplebft/testsys_test.go index bfbdaab84d7..c4488cc2184 100644 --- a/consensus/simplebft/testsys_test.go +++ b/consensus/simplebft/testsys_test.go @@ -65,7 +65,7 @@ func (t *testSystemAdapter) SetReceiver(recv Receiver) { t.receiver = recv } -func (t *testSystemAdapter) Send(msg *Msg, dest uint64) { +func (t *testSystemAdapter) getArrival(dest uint64) time.Duration { // XXX for now, define fixed variance per destination arr, ok := t.arrivals[dest] if !ok { @@ -78,7 +78,11 @@ func (t *testSystemAdapter) Send(msg *Msg, dest uint64) { arr = inflight + variance t.arrivals[dest] = arr } + return arr +} +func (t *testSystemAdapter) Send(msg *Msg, dest uint64) { + arr := t.getArrival(dest) ev := &testMsgEvent{ inflight: arr, src: t.id, @@ -202,6 +206,24 @@ func (t *testSystemAdapter) CheckSig(data []byte, src uint64, sig []byte) error return nil } +func (t *testSystemAdapter) Reconnect(replica uint64) { + testLog.Infof("dropping connection from %d to %d", replica, t.id) + t.sys.queue.filter(func(e testElem) bool { + switch e := e.ev.(type) { + case *testMsgEvent: + if e.dst == t.id && e.src == replica { + return false + } + } + return true + }) + arr := t.sys.adapters[replica].arrivals[t.id] * 10 + t.sys.enqueue(arr, &testTimer{id: t.id, tf: func() { + testLog.Infof("reconnecting %d to %d", replica, t.id) + t.sys.adapters[replica].receiver.Connection(t.id) + }}) +} + // ============================================== type testEvent interface {