Skip to content

Commit

Permalink
Merge "Make Sbft log messages better"
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborh-da authored and Gerrit Code Review committed Nov 30, 2016
2 parents af0cd3e + df2cc3f commit a89b8cc
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 60 deletions.
6 changes: 4 additions & 2 deletions orderer/sbft/simplebft/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.

package simplebft

import "fmt"

const maxBacklogSeq = 4
const msgPerSeq = 3 // (pre)prepare, commit, checkpoint

Expand Down Expand Up @@ -53,7 +55,7 @@ func (s *SBFT) testBacklogMessage(m *Msg, src uint64) bool {

func (s *SBFT) recordBacklogMsg(m *Msg, src uint64) {
if src == s.id {
panic("should never have to backlog my own message")
panic(fmt.Sprintf("should never have to backlog my own message (replica ID: %d)", src))
}

s.replicaState[src].backLog = append(s.replicaState[src].backLog, m)
Expand Down Expand Up @@ -86,7 +88,7 @@ func (s *SBFT) processBacklog() {
}
state.backLog = rest

log.Debugf("processing stored message from %d: %s", src, m)
log.Debugf("replica %d: processing stored message from %d: %s", s.id, src, m)

s.handleQueueableMessage(m, src)
processed = true
Expand Down
12 changes: 6 additions & 6 deletions orderer/sbft/simplebft/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) {

err := s.checkBytesSig(c.Digest, src, c.Signature)
if err != nil {
log.Infof("checkpoint signature invalid for %d from %d", c.Seq, src)
log.Infof("replica %d: checkpoint signature invalid for %d from %d", s.id, c.Seq, src)
return
}

// TODO should we always accept checkpoints?
if c.Seq != s.cur.subject.Seq.Seq {
log.Infof("checkpoint does not match expected subject %v, got %v", &s.cur.subject, c)
log.Infof("replica %d: checkpoint does not match expected subject %v, got %v", s.id, &s.cur.subject, c)
return
}
if _, ok := s.cur.checkpoint[src]; ok {
log.Infof("duplicate checkpoint for %d from %d", c.Seq, src)
log.Infof("replica %d: duplicate checkpoint for %d from %d", s.id, c.Seq, src)
}
s.cur.checkpoint[src] = c

Expand Down Expand Up @@ -89,8 +89,8 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) {
c = s.cur.checkpoint[replicas[0]]

if !reflect.DeepEqual(c.Digest, s.cur.subject.Digest) {
log.Fatalf("weak checkpoint %x does not match our state %x",
c.Digest, s.cur.subject.Digest)
log.Fatalf("replica %d: weak checkpoint %x does not match our state %x",
s.id, c.Digest, s.cur.subject.Digest)
// NOT REACHED
}

Expand All @@ -100,7 +100,7 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) {
s.deliverBatch(&batch)

s.cur.timeout.Cancel()
log.Infof("request %s %s completed on %d", s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id)
log.Infof("replica %d: request %s %s completed on %d", s.id, s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id)

s.maybeSendNextBatch()
s.processBacklog()
Expand Down
6 changes: 3 additions & 3 deletions orderer/sbft/simplebft/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ func (s *SBFT) handleCommit(c *Subject, src uint64) {
}

if !reflect.DeepEqual(c, &s.cur.subject) {
log.Warningf("commit does not match expected subject %v %x, got %v %x",
s.cur.subject.Seq, s.cur.subject.Digest, c.Seq, c.Digest)
log.Warningf("replica %d: commit does not match expected subject %v %x, got %v %x",
s.id, s.cur.subject.Seq, s.cur.subject.Digest, c.Seq, c.Digest)
return
}
if _, ok := s.cur.commit[src]; ok {
log.Infof("duplicate commit for %v from %d", *c.Seq, src)
log.Infof("replica %d: duplicate commit for %v from %d", s.id, *c.Seq, src)
return
}
s.cur.commit[src] = c
Expand Down
8 changes: 4 additions & 4 deletions orderer/sbft/simplebft/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *SBFT) Connection(replica uint64) {
func (s *SBFT) handleHello(h *Hello, src uint64) {
bh, err := s.checkBatch(h.Batch, false, true)
if err != nil {
log.Warningf("invalid hello batch from %d: %s", src, err)
log.Warningf("replica %d: invalid hello batch from %d: %s", s.id, src, err)
return
}

Expand All @@ -77,19 +77,19 @@ func (s *SBFT) handleHello(h *Hello, src uint64) {

if h.NewView != nil {
if s.primaryIDView(h.NewView.View) != src {
log.Warningf("invalid hello with new view from non-primary %d", src)
log.Warningf("replica %d: invalid hello with new view from non-primary %d", s.id, src)
return
}

vcs, err := s.checkNewViewSignatures(h.NewView)
if err != nil {
log.Warningf("invalid hello new view from %d: %s", src, err)
log.Warningf("replica %d: invalid hello new view from %d: %s", s.id, src, err)
return
}

_, _, ok := s.makeXset(vcs)
if !ok {
log.Warningf("invalid hello new view xset from %d", src)
log.Warningf("replica %d: invalid hello new view xset from %d", s.id, src)
return
}

Expand Down
2 changes: 1 addition & 1 deletion orderer/sbft/simplebft/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (s *SBFT) maybeExecute() {
return
}
s.cur.executed = true
log.Noticef("%d is executing %v %x", s.id, s.cur.subject.Seq, s.cur.subject.Digest)
log.Noticef("replica %d: executing %v %x", s.id, s.cur.subject.Seq, s.cur.subject.Digest)

s.sys.Persist("execute", &s.cur.subject)

Expand Down
26 changes: 13 additions & 13 deletions orderer/sbft/simplebft/newview.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *SBFT) maybeSendNewView() {

xset, _, ok := s.makeXset(vcs)
if !ok {
log.Debug("xset not yet sufficient")
log.Debugf("replica %d: xset not yet sufficient", s.id)
return
}

Expand All @@ -49,7 +49,7 @@ func (s *SBFT) maybeSendNewView() {
} else if reflect.DeepEqual(s.cur.subject.Digest, xset.Digest) {
batch = s.cur.preprep.Batch
} else {
log.Warningf("forfeiting primary - do not have request in store for %d %x", xset.Seq.Seq, xset.Digest)
log.Warningf("replica %d: forfeiting primary - do not have request in store for %d %x", s.id, xset.Seq.Seq, xset.Digest)
xset = nil
}

Expand All @@ -60,7 +60,7 @@ func (s *SBFT) maybeSendNewView() {
Batch: batch,
}

log.Noticef("sending new view for %d", nv.View)
log.Noticef("replica %d: sending new view for %d", s.id, nv.View)
s.lastNewViewSent = nv
s.broadcast(&Msg{&Msg_NewView{nv}})
}
Expand All @@ -87,48 +87,48 @@ func (s *SBFT) checkNewViewSignatures(nv *NewView) ([]*ViewChange, error) {

func (s *SBFT) handleNewView(nv *NewView, src uint64) {
if src != s.primaryIDView(nv.View) {
log.Warningf("invalid new view from %d for %d", src, nv.View)
log.Warningf("replica %d: invalid new view from %d for %d", s.id, src, nv.View)
return
}

if onv := s.replicaState[s.primaryIDView(nv.View)].newview; onv != nil && onv.View >= nv.View {
log.Debugf("discarding duplicate new view for %d", nv.View)
log.Debugf("replica %d: discarding duplicate new view for %d", s.id, nv.View)
return
}

vcs, err := s.checkNewViewSignatures(nv)
if err != nil {
log.Warningf("invalid new view from %d: %s", src, err)
log.Warningf("replica %d: invalid new view from %d: %s", s.id, src, err)
s.sendViewChange()
return
}

xset, _, ok := s.makeXset(vcs)

if !ok || !reflect.DeepEqual(nv.Xset, xset) {
log.Warningf("invalid new view from %d: xset incorrect: %v, %v", src, nv.Xset, xset)
log.Warningf("replica %d: invalid new view from %d: xset incorrect: %v, %v", s.id, src, nv.Xset, xset)
s.sendViewChange()
return
}

if nv.Xset == nil {
if nv.Batch != nil {
log.Warningf("invalid new view from %d: null request should come with null batch", src)
log.Warningf("replica %d: invalid new view from %d: null request should come with null batch", s.id, src)
s.sendViewChange()
return
}
} else if nv.Batch == nil || !bytes.Equal(nv.Batch.Hash(), nv.Xset.Digest) {
log.Warningf("invalid new view from %d: batch head hash does not match xset: %x, %x, %v",
src, hash(nv.Batch.Header), nv.Xset.Digest, nv)
log.Warningf("replica %d: invalid new view from %d: batch head hash does not match xset: %x, %x, %v",
s.id, src, hash(nv.Batch.Header), nv.Xset.Digest, nv)
s.sendViewChange()
return
}

if nv.Batch != nil {
_, err = s.checkBatch(nv.Batch, true, false)
if err != nil {
log.Warningf("invalid new view from %d: invalid batch, %s",
src, err)
log.Warningf("replica %d: invalid new view from %d: invalid batch, %s",
s.id, src, err)
s.sendViewChange()
return
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func (s *SBFT) processNewView() {

s.handleCheckedPreprepare(pp)
} else {
log.Debugf("%+v", s)
log.Debugf("replica %d: %+v", s.id, s)
s.cancelViewChangeTimer()
s.maybeSendNextBatch()
}
Expand Down
4 changes: 2 additions & 2 deletions orderer/sbft/simplebft/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ func (s *SBFT) handlePrepare(p *Subject, src uint64) {
}

if !reflect.DeepEqual(p, &s.cur.subject) {
log.Infof("prepare does not match expected subject %v, got %v", &s.cur.subject, p)
log.Infof("replica %d: prepare does not match expected subject %v, got %v", s.id, &s.cur.subject, p)
return
}
if _, ok := s.cur.prep[src]; ok {
log.Infof("duplicate prepare for %v from %d", *p.Seq, src)
log.Infof("replica %d: duplicate prepare for %v from %d", s.id, *p.Seq, src)
return
}
s.cur.prep[src] = p
Expand Down
18 changes: 9 additions & 9 deletions orderer/sbft/simplebft/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,36 +43,36 @@ func (s *SBFT) sendPreprepare(batch []*Request) {

func (s *SBFT) handlePreprepare(pp *Preprepare, src uint64) {
if src == s.id {
log.Infof("Ignoring preprepare from self: %d", src)
log.Infof("replica %d: ignoring preprepare from self: %d", s.id, src)
return
}
if src != s.primaryID() {
log.Infof("preprepare from non-primary %d", src)
log.Infof("replica %d: preprepare from non-primary %d", s.id, src)
return
}
nextSeq := s.nextSeq()
if *pp.Seq != nextSeq {
log.Infof("preprepare does not match expected %v, got %v", nextSeq, *pp.Seq)
log.Infof("replica %d: preprepare does not match expected %v, got %v", s.id, nextSeq, *pp.Seq)
return
}
if s.cur.subject.Seq.Seq == pp.Seq.Seq {
log.Infof("duplicate preprepare for %v", *pp.Seq)
log.Infof("replica %d: duplicate preprepare for %v", s.id, *pp.Seq)
return
}
if pp.Batch == nil {
log.Infof("preprepare without batch")
log.Infof("replica %d: preprepare without batch", s.id)
return
}

batchheader, err := s.checkBatch(pp.Batch, true, false)
if err != nil || batchheader.Seq != pp.Seq.Seq {
log.Infof("preprepare %v batch head inconsistent from %d: %s", pp.Seq, src, err)
log.Infof("replica %d: preprepare %v batch head inconsistent from %d: %s", s.id, pp.Seq, src, err)
return
}

prevhash := s.sys.LastBatch().Hash()
if !bytes.Equal(batchheader.PrevHash, prevhash) {
log.Infof("preprepare batch prev hash does not match expected %s, got %s", hash2str(batchheader.PrevHash), hash2str(prevhash))
log.Infof("replica %d: preprepare batch prev hash does not match expected %s, got %s", s.id, hash2str(batchheader.PrevHash), hash2str(prevhash))
return
}

Expand All @@ -82,7 +82,7 @@ func (s *SBFT) handlePreprepare(pp *Preprepare, src uint64) {
func (s *SBFT) acceptPreprepare(pp *Preprepare) {
sub := Subject{Seq: pp.Seq, Digest: pp.Batch.Hash()}

log.Infof("accepting preprepare for %v, %x", sub.Seq, sub.Digest)
log.Infof("replica %d: accepting preprepare for %v, %x", s.id, sub.Seq, sub.Digest)
s.sys.Persist("preprepare", pp)

s.cur = reqInfo{
Expand All @@ -107,6 +107,6 @@ func (s *SBFT) handleCheckedPreprepare(pp *Preprepare) {
////////////////////////////////////////////////

func (s *SBFT) requestTimeout() {
log.Infof("request timed out: %s", s.cur.subject.Seq)
log.Infof("replica %d: request timed out: %s", s.id, s.cur.subject.Seq)
s.sendViewChange()
}
2 changes: 1 addition & 1 deletion orderer/sbft/simplebft/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (s *SBFT) Request(req []byte) {

func (s *SBFT) handleRequest(req *Request, src uint64) {
key := hash2str(hash(req.Payload))
log.Infof("replica %d inserting %x into pending", s.id, key)
log.Infof("replica %d: inserting %x into pending", s.id, key)
s.pending[key] = req
if s.isPrimary() && s.activeView {
s.batch = append(s.batch, req)
Expand Down
8 changes: 4 additions & 4 deletions orderer/sbft/simplebft/simplebft.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s *SBFT) broadcast(m *Msg) {

// Receive is the ingress method for SBFT messages.
func (s *SBFT) Receive(m *Msg, src uint64) {
log.Debugf("received message from %d: %s", src, m)
log.Debugf("replica %d: received message from %d: %s", s.id, src, m)

if h := m.GetHello(); h != nil {
s.handleHello(h, src)
Expand All @@ -203,7 +203,7 @@ func (s *SBFT) Receive(m *Msg, src uint64) {
}

if s.testBacklog(m, src) {
log.Debugf("message for future seq, storing for later")
log.Debugf("replica %d: message for future seq, storing for later", s.id)
s.recordBacklogMsg(m, src)
return
}
Expand All @@ -226,15 +226,15 @@ func (s *SBFT) handleQueueableMessage(m *Msg, src uint64) {
return
}

log.Warningf("received invalid message from %d", src)
log.Warningf("replica %d: received invalid message from %d", s.id, src)
}

func (s *SBFT) deliverBatch(batch *Batch) {
s.sys.Deliver(batch)

for _, req := range batch.Payloads {
key := hash2str(hash(req))
log.Infof("replica %d attempting to remove %x from pending", s.id, key)
log.Infof("replica %d: attempting to remove %x from pending", s.id, key)
delete(s.pending, key)
}
}
16 changes: 8 additions & 8 deletions orderer/sbft/simplebft/viewchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *SBFT) sendViewChange() {
state.viewchange = nil
}
}
log.Noticef("sending viewchange for view %d", s.view)
log.Noticef("replica %d: sending viewchange for view %d", s.id, s.view)

var q, p []*Subject
if s.cur.sentCommit {
Expand Down Expand Up @@ -67,19 +67,19 @@ func (s *SBFT) handleViewChange(svc *Signed, src uint64) {
_, err = s.checkBatch(vc.Checkpoint, false, true)
}
if err != nil {
log.Noticef("invalid viewchange: %s", err)
log.Noticef("replica %d: invalid viewchange: %s", s.id, err)
return
}
if vc.View < s.view {
log.Debugf("old view change from %d for view %d, we are in view %d", src, vc.View, s.view)
log.Debugf("replica %d: old view change from %d for view %d, we are in view %d", s.id, src, vc.View, s.view)
return
}
if ovc := s.replicaState[src].viewchange; ovc != nil && vc.View <= ovc.View {
log.Noticef("duplicate view change for %d from %d", vc.View, src)
log.Noticef("replica %d: duplicate view change for %d from %d", s.id, vc.View, src)
return
}

log.Infof("viewchange from %d: %v", src, vc)
log.Infof("replica %d: viewchange from %d: %v", s.id, src, vc)
s.replicaState[src].viewchange = vc
s.replicaState[src].signedViewchange = svc

Expand All @@ -97,18 +97,18 @@ func (s *SBFT) handleViewChange(svc *Signed, src uint64) {
if quorum == s.oneCorrectQuorum() {
// catch up to the minimum view
if s.view < min {
log.Notice("we are behind on view change, resending for newer view")
log.Noticef("replica %d: we are behind on view change, resending for newer view", s.id)
s.view = min - 1
s.sendViewChange()
return
}
}

if quorum == s.noFaultyQuorum() {
log.Notice("received 2f+1 view change messages, starting view change timer")
log.Noticef("replica %d: received 2f+1 view change messages, starting view change timer", s.id)
s.viewChangeTimer = s.sys.Timer(s.viewChangeTimeout, func() {
s.viewChangeTimeout *= 2
log.Notice("view change timed out, sending next")
log.Noticef("replica %d: view change timed out, sending next", s.id)
s.sendViewChange()
})
}
Expand Down
Loading

0 comments on commit a89b8cc

Please sign in to comment.