diff --git a/core/consensus/component.go b/core/consensus/component.go index a49392b00..d358a79f8 100644 --- a/core/consensus/component.go +++ b/core/consensus/component.go @@ -100,6 +100,9 @@ func NewComponent(tcpNode host.Host, peers []p2p.Peer, // Nodes is the number of nodes. Nodes: len(peers), + + // FIFOLimit caps the max buffered messages per peer. + FIFOLimit: recvBuffer, } return c, nil diff --git a/core/qbft/qbft.go b/core/qbft/qbft.go index 9c8f6ba6c..246ca9f88 100644 --- a/core/qbft/qbft.go +++ b/core/qbft/qbft.go @@ -52,6 +52,8 @@ type Definition[I any, V comparable] struct { LogUponRule func(ctx context.Context, instance I, process, round int64, msg Msg[I, V], uponRule string) // Nodes is the total number of nodes/processes participating in consensus. Nodes int + // FIFOLimit limits the amount of message buffered for each peer. + FIFOLimit int } // Quorum returns the quorum count for the system. @@ -132,9 +134,8 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo return errors.New("zero input value not supported") } defer func() { - // Errors are unexpected since this algorithm doesn't do IO - // or have other sources of errors. Panics are used for sanity - // checks to improve readability. Catch them here. + // Panics are used for assertions and sanity checks to reduce lines of code + // and to improve readability. Catch them here. if r := recover(); r != nil { err = fmt.Errorf("qbft sanity check: %v", r) } @@ -148,8 +149,7 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo preparedValue V preparedJustification []Msg[I, V] qCommit []Msg[I, V] - buffer []Msg[I, V] - dedupMsgs = make(map[dedupKey]bool) + buffer = make(map[int64][]Msg[I, V]) dedupRules = make(map[uponRule]bool) timerChan <-chan time.Time stopTimer func() @@ -169,39 +169,17 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo zeroVal[V](), preparedRound, preparedValue, preparedJustification) } - // bufferMsg returns true if the message is unique and was added to the buffer. - // It returns false if the message is a duplicate and should be discarded. - bufferMsg := func(msg Msg[I, V]) bool { - if dedupMsgs[key(msg)] { - return false - } - dedupMsgs[key(msg)] = true - buffer = append(buffer, msg) - - return true - } - - // trimBuffer drops all older round's buffered messages. - trimBuffer := func() { - var selected []Msg[I, V] - for _, msg := range buffer { - if msg.Round() >= round-1 { - selected = append(selected, msg) - } - } - buffer = selected - - dedup := make(map[dedupKey]bool) - for k := range dedupMsgs { - if k.Round >= round { - dedup[k] = true - } + // bufferMsg adds the message to each process' FIFO queue. + bufferMsg := func(msg Msg[I, V]) { + fifo := buffer[msg.Source()] + fifo = append(fifo, msg) + if len(fifo) > d.FIFOLimit { + fifo = fifo[len(fifo)-d.FIFOLimit:] } - dedupMsgs = dedup + buffer[msg.Source()] = fifo } // isDuplicatedRule returns true if the rule has been already executed since last round change. - // As an exception for uponJustifiedDecided always returns false, so it can be executed multiple times per round. isDuplicatedRule := func(rule uponRule) bool { if dedupRules[rule] { return true @@ -246,19 +224,16 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo break } - if !isJustified(d, instance, msg) { + if !isJustified(d, instance, msg) { // Drop unjust messages + d.LogUponRule(ctx, instance, process, round, msg, "unjust"+msg.Type().String()) break } - if !bufferMsg(msg) { - break - } + bufferMsg(msg) rule, justification := classify(d, instance, round, process, buffer, msg) - if rule == uponNothing { - break - } - if isDuplicatedRule(rule) { + if rule == uponNothing || isDuplicatedRule(rule) { + // Do nothing more if no rule or duplicate rule was triggered break } @@ -268,7 +243,6 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo case uponJustifiedPrePrepare: // Algorithm 2:1 // Applicable to current or future rounds (since justified) changeRound(msg.Round()) - trimBuffer() stopTimer() timerChan, stopTimer = d.NewTimer(round) @@ -296,7 +270,6 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo case uponFPlus1RoundChanges: // Algorithm 3:5 // Only applicable to future rounds changeRound(nextMinRound(d, justification, round /* < msg.Round */)) - trimBuffer() stopTimer() timerChan, stopTimer = d.NewTimer(round) @@ -324,7 +297,6 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo case <-timerChan: // Algorithm 3:1 round++ - trimBuffer() stopTimer() timerChan, stopTimer = d.NewTimer(round) @@ -335,14 +307,14 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo return ctx.Err() } - if err != nil { + if err != nil { // Errors are considered fatal. return err } } } // classify returns the rule triggered upon receipt of the last message and its justifications. -func classify[I any, V comparable](d Definition[I, V], instance I, round, process int64, buffer []Msg[I, V], msg Msg[I, V]) (uponRule, []Msg[I, V]) { +func classify[I any, V comparable](d Definition[I, V], instance I, round, process int64, buffer map[int64][]Msg[I, V], msg Msg[I, V]) (uponRule, []Msg[I, V]) { switch msg.Type() { case MsgDecided: return uponJustifiedDecided, msg.Justification() @@ -361,7 +333,7 @@ func classify[I any, V comparable](d Definition[I, V], instance I, round, proces if msg.Round() != round { return uponNothing, nil } - prepares := filterByRoundAndValue(buffer, MsgPrepare, msg.Round(), msg.Value()) + prepares := filterByRoundAndValue(flatten(buffer), MsgPrepare, msg.Round(), msg.Value()) if len(prepares) >= d.Quorum() { return uponQuorumPrepares, prepares } @@ -371,7 +343,7 @@ func classify[I any, V comparable](d Definition[I, V], instance I, round, proces if msg.Round() != round { return uponNothing, nil } - commits := filterByRoundAndValue(buffer, MsgCommit, msg.Round(), msg.Value()) + commits := filterByRoundAndValue(flatten(buffer), MsgCommit, msg.Round(), msg.Value()) if len(commits) >= d.Quorum() { return uponQuorumCommits, commits } @@ -382,9 +354,11 @@ func classify[I any, V comparable](d Definition[I, V], instance I, round, proces return uponNothing, nil } + all := flatten(buffer) + if msg.Round() > round { // Jump ahead if we received F+1 higher ROUND-CHANGEs. - if frc, ok := getFPlus1RoundChanges(d, buffer, round); ok { + if frc, ok := getFPlus1RoundChanges(d, all, round); ok { return uponFPlus1RoundChanges, frc } @@ -393,11 +367,11 @@ func classify[I any, V comparable](d Definition[I, V], instance I, round, proces /* else msg.Round == round */ - if qrc := filterRoundChange(buffer, msg.Round()); len(qrc) < d.Quorum() { + if qrc := filterRoundChange(all, msg.Round()); len(qrc) < d.Quorum() { return uponNothing, nil } - qrc, ok := getJustifiedQrc(d, buffer, msg.Round()) + qrc, ok := getJustifiedQrc(d, all, msg.Round()) if !ok { return uponUnjustQuorumRoundChanges, nil } @@ -468,7 +442,7 @@ func nextMinRound[I any, V comparable](d Definition[I, V], frc []Msg[I, V], roun func isJustified[I any, V comparable](d Definition[I, V], instance I, msg Msg[I, V]) bool { switch msg.Type() { case MsgPrePrepare: - return isJustifiedPrePrepare(d, instance, msg) + return IsJustifiedPrePrepare(d, instance, msg) case MsgPrepare: return true case MsgCommit: @@ -478,7 +452,7 @@ func isJustified[I any, V comparable](d Definition[I, V], instance I, msg Msg[I, case MsgDecided: return isJustifiedDecided(d, msg) default: - panic("invalid message type") + panic("bug: invalid message type") } } @@ -537,8 +511,8 @@ func isJustifiedDecided[I any, V comparable](d Definition[I, V], msg Msg[I, V]) return len(commits) >= d.Quorum() } -// isJustifiedPrePrepare returns true if the PRE-PREPARE message is justified. -func isJustifiedPrePrepare[I any, V comparable](d Definition[I, V], instance I, msg Msg[I, V]) bool { +// IsJustifiedPrePrepare returns true if the PRE-PREPARE message is justified. +func IsJustifiedPrePrepare[I any, V comparable](d Definition[I, V], instance I, msg Msg[I, V]) bool { if msg.Type() != MsgPrePrepare { panic("bug: not a preprepare message") } @@ -600,15 +574,15 @@ func containsJustifiedQrc[I any, V comparable](d Definition[I, V], justification } // getJustifiedQrc implements algorithm 4:1 and returns a justified quorum ROUND_CHANGEs (Qrc). -func getJustifiedQrc[I any, V comparable](d Definition[I, V], buffer []Msg[I, V], round int64) ([]Msg[I, V], bool) { - if qrc, ok := quorumNullPrepared(d, buffer, round); ok { +func getJustifiedQrc[I any, V comparable](d Definition[I, V], all []Msg[I, V], round int64) ([]Msg[I, V], bool) { + if qrc, ok := quorumNullPrepared(d, all, round); ok { // Return any quorum null pv ROUND_CHANGE messages as Qrc. return qrc, true } - roundChanges := filterRoundChange(buffer, round) + roundChanges := filterRoundChange(all, round) - for _, prepares := range getPrepareQuorums(d, buffer) { + for _, prepares := range getPrepareQuorums(d, all) { // See if we have quorum ROUND-CHANGE with HIGHEST_PREPARED(qrc) == prepares.Round. var ( qrc []Msg[I, V] @@ -640,9 +614,9 @@ func getJustifiedQrc[I any, V comparable](d Definition[I, V], buffer []Msg[I, V] // getFPlus1RoundChanges returns true and Faulty+1 ROUND-CHANGE messages (Frc) with // the rounds higher than the provided round. It returns the highest round // per process in order to jump furthest. -func getFPlus1RoundChanges[I any, V comparable](d Definition[I, V], buffer []Msg[I, V], round int64) ([]Msg[I, V], bool) { +func getFPlus1RoundChanges[I any, V comparable](d Definition[I, V], all []Msg[I, V], round int64) ([]Msg[I, V], bool) { highestBySource := make(map[int64]Msg[I, V]) - for _, msg := range buffer { + for _, msg := range all { if msg.Type() != MsgRoundChange { continue } @@ -680,9 +654,9 @@ type preparedKey[I any, V comparable] struct { // getPrepareQuorums returns all sets of quorum PREPARE messages // with identical rounds and values. -func getPrepareQuorums[I any, V comparable](d Definition[I, V], buffer []Msg[I, V]) [][]Msg[I, V] { +func getPrepareQuorums[I any, V comparable](d Definition[I, V], all []Msg[I, V]) [][]Msg[I, V] { sets := make(map[preparedKey[I, V]]map[int64]Msg[I, V]) // map[preparedKey]map[process]Msg - for _, msg := range flatten(buffer) { // Flatten to get PREPARES included as ROUND-CHANGE justifications. + for _, msg := range all { // Flatten to get PREPARES included as ROUND-CHANGE justifications. if msg.Type() != MsgPrepare { continue } @@ -797,14 +771,16 @@ func isZeroVal[V comparable](v V) bool { return v == zeroVal[V]() } -// flatten returns a new list of messages containing all the buffered messages +// flatten returns the buffer as a list containing all the buffered messages // as well as all their justifications. -func flatten[I any, V comparable](buffer []Msg[I, V]) []Msg[I, V] { +func flatten[I any, V comparable](buffer map[int64][]Msg[I, V]) []Msg[I, V] { var resp []Msg[I, V] - for _, msg := range buffer { - resp = append(resp, msg) - for _, j := range msg.Justification() { - resp = append(resp, j) + for _, msgs := range buffer { + for _, msg := range msgs { + resp = append(resp, msg) + for _, j := range msg.Justification() { + resp = append(resp, j) + } } } diff --git a/core/qbft/qbft_test.go b/core/qbft/qbft_test.go index 3d0a6c438..41bf8fbd6 100644 --- a/core/qbft/qbft_test.go +++ b/core/qbft/qbft_test.go @@ -210,8 +210,9 @@ func testQBFT(t *testing.T, test test) { t.Helper() const ( - n = 4 - maxRound = 50 + n = 4 + maxRound = 50 + fifoLimit = 100 ) var ( @@ -243,12 +244,13 @@ func testQBFT(t *testing.T, test test) { t.Logf("%s %d => %v@%d -> %v@%d ~= %v", clock.NowStr(), msg.Source(), msg.Type(), msg.Round(), process, round, rule) if round > maxRound { cancel() - } else if !test.Fuzz && strings.Contains(rule, "Unjust") { + } else if !test.Fuzz && strings.Contains(strings.ToLower(rule), "unjust") { t.Logf("%s: %#v", rule, msg) cancel() } }, - Nodes: n, + Nodes: n, + FIFOLimit: fifoLimit, } for i := int64(1); i <= n; i++ { @@ -404,6 +406,7 @@ func newMsg(typ qbft.MsgType, instance int64, source int64, round int64, value i var msgs []msg for _, j := range justify { m := j.(msg) + m.justify = nil // Clear nested justifications. msgs = append(msgs, m) }