Skip to content

Commit

Permalink
sbft: limit backlog
Browse files Browse the repository at this point in the history
Issue: FAB-603
Change-Id: If6ac606018c5ab82eaaf8f16e075417ef2f7e243
Signed-off-by: Simon Schubert <sis@zurich.ibm.com>
  • Loading branch information
corecode committed Nov 3, 2016
1 parent 31b7572 commit 84d1cb1
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 20 deletions.
19 changes: 9 additions & 10 deletions consensus/simplebft/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ limitations under the License.

package simplebft

const maxBacklogSeq = 4
const msgPerSeq = 3 // (pre)prepare, commit, checkpoint

func (s *SBFT) testBacklog(m *Msg, src uint64) bool {
if len(s.replicaState[src].backLog) > 0 {
return true
Expand Down Expand Up @@ -52,17 +55,13 @@ func (s *SBFT) recordBacklogMsg(m *Msg, src uint64) {
if src == s.id {
panic("should never have to backlog my own message")
}
// TODO
//
// Prevent DoS by limiting the number of messages per replica.
//
// If the backlog limit is exceeded, re-establish the
// connection.
//
// After the connection has been re-established, we will
// receive a hello, which will advance our state and discard
// old messages.

s.replicaState[src].backLog = append(s.replicaState[src].backLog, m)

if len(s.replicaState[src].backLog) > maxBacklogSeq*msgPerSeq {
s.discardBacklog(src)
s.sys.Reconnect(src)
}
}

func (s *SBFT) discardBacklog(src uint64) {
Expand Down
13 changes: 4 additions & 9 deletions consensus/simplebft/simplebft.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type System interface {
LastBatch() *Batch
Sign(data []byte) []byte
CheckSig(data []byte, src uint64, sig []byte) error
Reconnect(replica uint64)
}

// Canceller allows cancelling of a scheduled timer event.
Expand Down Expand Up @@ -192,12 +193,6 @@ func (s *SBFT) broadcast(m *Msg) {
func (s *SBFT) Receive(m *Msg, src uint64) {
log.Debugf("received message from %d: %s", src, m)

if s.testBacklog(m, src) {
log.Debugf("message for future seq, storing for later")
s.recordBacklogMsg(m, src)
return
}

if h := m.GetHello(); h != nil {
s.handleHello(h, src)
return
Expand All @@ -212,9 +207,9 @@ func (s *SBFT) Receive(m *Msg, src uint64) {
return
}

if !s.activeView {
log.Infof("we are not active in view %d, discarding message from %d",
s.seq.View, src)
if s.testBacklog(m, src) {
log.Debugf("message for future seq, storing for later")
s.recordBacklogMsg(m, src)
return
}

Expand Down
41 changes: 41 additions & 0 deletions consensus/simplebft/simplebft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,3 +746,44 @@ func TestRestartMissedViewChange(t *testing.T) {
}
}
}

func TestFullBacklog(t *testing.T) {
N := uint64(4)
sys := newTestSystem(N)
var repls []*SBFT
var adapters []*testSystemAdapter
for i := uint64(0); i < N; i++ {
a := sys.NewAdapter(i)
s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 1, RequestTimeoutNsec: 20000000000}, a)
if err != nil {
t.Fatal(err)
}
repls = append(repls, s)
adapters = append(adapters, a)
}

r1 := []byte{1, 2, 3}

connectAll(sys)
sys.enqueue(200*time.Millisecond, &testTimer{id: 999, tf: func() {
repls[0].sys.Send(&Msg{&Msg_Prepare{&Subject{Seq: &SeqView{Seq: 100}}}}, 1)
}})
for i := 0; i < 10; i++ {
sys.enqueue(time.Duration(i)*100*time.Millisecond, &testTimer{id: 999, tf: func() {
repls[0].Request(r1)
}})
}
sys.Run()
if len(repls[1].replicaState[2].backLog) > 4*3 {
t.Errorf("backlog too long: %d", len(repls[1].replicaState[0].backLog))
}
for _, a := range adapters {
if len(a.batches) == 0 {
t.Fatalf("expected execution of batches on %d", a.id)
}
bh := a.batches[len(a.batches)-1].DecodeHeader()
if bh.Seq != 10 {
t.Errorf("wrong request executed on %d: %v", a.id, bh)
}
}
}
24 changes: 23 additions & 1 deletion consensus/simplebft/testsys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (t *testSystemAdapter) SetReceiver(recv Receiver) {
t.receiver = recv
}

func (t *testSystemAdapter) Send(msg *Msg, dest uint64) {
func (t *testSystemAdapter) getArrival(dest uint64) time.Duration {
// XXX for now, define fixed variance per destination
arr, ok := t.arrivals[dest]
if !ok {
Expand All @@ -78,7 +78,11 @@ func (t *testSystemAdapter) Send(msg *Msg, dest uint64) {
arr = inflight + variance
t.arrivals[dest] = arr
}
return arr
}

func (t *testSystemAdapter) Send(msg *Msg, dest uint64) {
arr := t.getArrival(dest)
ev := &testMsgEvent{
inflight: arr,
src: t.id,
Expand Down Expand Up @@ -202,6 +206,24 @@ func (t *testSystemAdapter) CheckSig(data []byte, src uint64, sig []byte) error
return nil
}

func (t *testSystemAdapter) Reconnect(replica uint64) {
testLog.Infof("dropping connection from %d to %d", replica, t.id)
t.sys.queue.filter(func(e testElem) bool {
switch e := e.ev.(type) {
case *testMsgEvent:
if e.dst == t.id && e.src == replica {
return false
}
}
return true
})
arr := t.sys.adapters[replica].arrivals[t.id] * 10
t.sys.enqueue(arr, &testTimer{id: t.id, tf: func() {
testLog.Infof("reconnecting %d to %d", replica, t.id)
t.sys.adapters[replica].receiver.Connection(t.id)
}})
}

// ==============================================

type testEvent interface {
Expand Down

0 comments on commit 84d1cb1

Please sign in to comment.