Skip to content

Commit

Permalink
sbft: consolidate replica state
Browse files Browse the repository at this point in the history
Instead of maintaining the state of a remote replica spread out over
several maps, instead maintain the state in a single structure.

Change-Id: Idbc9915e5e824116675ad485a297caa10b7a7518
Signed-off-by: Simon Schubert <sis@zurich.ibm.com>
  • Loading branch information
corecode committed Oct 28, 2016
1 parent 3b52a9f commit 4274764
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 35 deletions.
15 changes: 9 additions & 6 deletions consensus/simplebft/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package simplebft

func (s *SBFT) testBacklog(m *Msg, src uint64) bool {
if len(s.backLog[src]) > 0 {
if len(s.replicaState[src].backLog) > 0 {
return true
}

Expand Down Expand Up @@ -66,7 +66,7 @@ func (s *SBFT) recordBacklogMsg(m *Msg, src uint64) {
// to make progress, the backlog processing algorithm as lined
// out below will take care of starting a state transfer,
// using the hello message we received on reconnect.
s.backLog[src] = append(s.backLog[src], m)
s.replicaState[src].backLog = append(s.replicaState[src].backLog, m)
}

func (s *SBFT) processBacklog() {
Expand All @@ -75,14 +75,17 @@ func (s *SBFT) processBacklog() {

for processed {
processed = false
for src := range s.backLog {
for len(s.backLog[src]) > 0 {
m, rest := s.backLog[src][0], s.backLog[src][1:]
for src := range s.replicaState {
state := &s.replicaState[src]
src := uint64(src)

for len(state.backLog) > 0 {
m, rest := state.backLog[0], state.backLog[1:]
if s.testBacklog2(m, src) {
notReady++
break
}
s.backLog[src] = rest
state.backLog = rest

log.Debugf("processing stored message from %d: %s", src, m)

Expand Down
1 change: 1 addition & 0 deletions consensus/simplebft/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ func (s *SBFT) Connection(replica uint64) {
}

func (s *SBFT) handleHello(h *Batch, src uint64) {
s.replicaState[src].hello = h
}
16 changes: 8 additions & 8 deletions consensus/simplebft/newview.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func (s *SBFT) maybeSendNewView() {
vset := make(map[uint64]*Signed)
var vcs []*ViewChange

for src, vc := range s.viewchange {
if vc.vc.View == s.seq.View {
vset[src] = vc.svc
vcs = append(vcs, vc.vc)
for src, state := range s.replicaState {
if state.viewchange != nil && state.viewchange.View == s.seq.View {
vset[uint64(src)] = state.signedViewchange
vcs = append(vcs, state.viewchange)
}
}

Expand Down Expand Up @@ -73,7 +73,7 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
return
}

if onv, ok := s.newview[s.primaryIDView(nv.View)]; ok && onv.View >= nv.View {
if onv := s.replicaState[s.primaryIDView(nv.View)].newview; onv != nil && onv.View >= nv.View {
log.Debugf("discarding duplicate new view for %d", nv.View)
return
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
return
}

s.newview[s.primaryIDView(nv.View)] = nv
s.replicaState[s.primaryIDView(nv.View)].newview = nv

s.processNewView()
}
Expand All @@ -138,8 +138,8 @@ func (s *SBFT) processNewView() {
return
}

nv, ok := s.newview[s.primaryIDView(s.seq.View)]
if !ok || nv.View != s.seq.View {
nv := s.replicaState[s.primaryIDView(s.seq.View)].newview
if nv == nil || nv.View != s.seq.View {
return
}

Expand Down
18 changes: 8 additions & 10 deletions consensus/simplebft/simplebft.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,10 @@ type SBFT struct {
batchTimer Canceller
cur reqInfo
activeView bool
viewchange map[uint64]*viewChangeInfo
newview map[uint64]*NewView
lastNewViewSent uint64
viewChangeTimeout time.Duration
viewChangeTimer Canceller

backLog map[uint64][]*Msg
replicaState []replicaInfo
}

type reqInfo struct {
Expand All @@ -82,9 +79,12 @@ type reqInfo struct {
checkpointDone bool
}

type viewChangeInfo struct {
svc *Signed
vc *ViewChange
type replicaInfo struct {
backLog []*Msg
hello *Batch
signedViewchange *Signed
viewchange *ViewChange
newview *NewView
}

var log = logging.MustGetLogger("sbft")
Expand All @@ -103,10 +103,8 @@ func New(id uint64, config *Config, sys System) (*SBFT, error) {
config: *config,
sys: sys,
id: id,
viewchange: make(map[uint64]*viewChangeInfo),
newview: make(map[uint64]*NewView),
viewChangeTimer: dummyCanceller{},
backLog: make(map[uint64][]*Msg),
replicaState: make([]replicaInfo, config.N),
}
s.sys.SetReceiver(s)

Expand Down
2 changes: 1 addition & 1 deletion consensus/simplebft/simplebft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var testLog = logging.MustGetLogger("test")

func init() {
logging.SetLevel(logging.NOTICE, "")
logging.SetLevel(logging.NOTICE, "test")
logging.SetLevel(logging.DEBUG, "test")
logging.SetLevel(logging.DEBUG, "sbft")
}

Expand Down
27 changes: 17 additions & 10 deletions consensus/simplebft/viewchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ func (s *SBFT) sendViewChange() {
s.seq.View = s.nextView()
s.cur.timeout.Cancel()
s.activeView = false
for r, vs := range s.viewchange {
if vs.vc.View < s.seq.View {
delete(s.viewchange, r)
for src := range s.replicaState {
state := &s.replicaState[src]
if state.viewchange != nil && state.viewchange.View < s.seq.View {
state.viewchange = nil
}
}
log.Noticef("sending viewchange for view %d", s.seq.View)
Expand Down Expand Up @@ -71,21 +72,27 @@ func (s *SBFT) handleViewChange(svc *Signed, src uint64) {
log.Debugf("old view change from %s for view %d, we are in view %d", src, vc.View, s.seq.View)
return
}
if ovc, ok := s.viewchange[src]; ok && vc.View <= ovc.vc.View {
if ovc := s.replicaState[src].viewchange; ovc != nil && vc.View <= ovc.View {
log.Noticef("duplicate view change for %d from %d", vc.View, src)
return
}

log.Infof("viewchange from %d for view %d", src, vc.View)
s.viewchange[src] = &viewChangeInfo{svc: svc, vc: vc}
s.replicaState[src].viewchange = vc
s.replicaState[src].signedViewchange = svc

if len(s.viewchange) == s.oneCorrectQuorum() {
min := vc.View
for _, vc := range s.viewchange {
if vc.vc.View < min {
min = vc.vc.View
min := vc.View
quorum := 0
for _, state := range s.replicaState {
if state.viewchange != nil {
quorum++
if state.viewchange.View < min {
min = state.viewchange.View
}
}
}

if quorum == s.oneCorrectQuorum() {
// catch up to the minimum view
if s.seq.View < min {
log.Notice("we are behind on view change, resending for newer view")
Expand Down

0 comments on commit 4274764

Please sign in to comment.