Skip to content

Commit

Permalink
core/qbft: refactor buffering to fifo (#539)
Browse files Browse the repository at this point in the history
Refactors buffering to FIFO per peer to prevent DDoS of future round messages.

category: refactor
ticket: #524
  • Loading branch information
corverroos committed May 15, 2022
1 parent 7bf083e commit 008b1bf
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 74 deletions.
3 changes: 3 additions & 0 deletions core/consensus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func NewComponent(tcpNode host.Host, peers []p2p.Peer,

// Nodes is the number of nodes.
Nodes: len(peers),

// FIFOLimit caps the max buffered messages per peer.
FIFOLimit: recvBuffer,
}

return c, nil
Expand Down
116 changes: 46 additions & 70 deletions core/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Definition[I any, V comparable] struct {
LogUponRule func(ctx context.Context, instance I, process, round int64, msg Msg[I, V], uponRule string)
// Nodes is the total number of nodes/processes participating in consensus.
Nodes int
// FIFOLimit limits the amount of message buffered for each peer.
FIFOLimit int
}

// Quorum returns the quorum count for the system.
Expand Down Expand Up @@ -132,9 +134,8 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo
return errors.New("zero input value not supported")
}
defer func() {
// Errors are unexpected since this algorithm doesn't do IO
// or have other sources of errors. Panics are used for sanity
// checks to improve readability. Catch them here.
// Panics are used for assertions and sanity checks to reduce lines of code
// and to improve readability. Catch them here.
if r := recover(); r != nil {
err = fmt.Errorf("qbft sanity check: %v", r)
}
Expand All @@ -148,8 +149,7 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo
preparedValue V
preparedJustification []Msg[I, V]
qCommit []Msg[I, V]
buffer []Msg[I, V]
dedupMsgs = make(map[dedupKey]bool)
buffer = make(map[int64][]Msg[I, V])
dedupRules = make(map[uponRule]bool)
timerChan <-chan time.Time
stopTimer func()
Expand All @@ -169,39 +169,17 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo
zeroVal[V](), preparedRound, preparedValue, preparedJustification)
}

// bufferMsg returns true if the message is unique and was added to the buffer.
// It returns false if the message is a duplicate and should be discarded.
bufferMsg := func(msg Msg[I, V]) bool {
if dedupMsgs[key(msg)] {
return false
}
dedupMsgs[key(msg)] = true
buffer = append(buffer, msg)

return true
}

// trimBuffer drops all older round's buffered messages.
trimBuffer := func() {
var selected []Msg[I, V]
for _, msg := range buffer {
if msg.Round() >= round-1 {
selected = append(selected, msg)
}
}
buffer = selected

dedup := make(map[dedupKey]bool)
for k := range dedupMsgs {
if k.Round >= round {
dedup[k] = true
}
// bufferMsg adds the message to each process' FIFO queue.
bufferMsg := func(msg Msg[I, V]) {
fifo := buffer[msg.Source()]
fifo = append(fifo, msg)
if len(fifo) > d.FIFOLimit {
fifo = fifo[len(fifo)-d.FIFOLimit:]
}
dedupMsgs = dedup
buffer[msg.Source()] = fifo
}

// isDuplicatedRule returns true if the rule has been already executed since last round change.
// As an exception for uponJustifiedDecided always returns false, so it can be executed multiple times per round.
isDuplicatedRule := func(rule uponRule) bool {
if dedupRules[rule] {
return true
Expand Down Expand Up @@ -246,19 +224,16 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo
break
}

if !isJustified(d, instance, msg) {
if !isJustified(d, instance, msg) { // Drop unjust messages
d.LogUponRule(ctx, instance, process, round, msg, "unjust"+msg.Type().String())
break
}

if !bufferMsg(msg) {
break
}
bufferMsg(msg)

rule, justification := classify(d, instance, round, process, buffer, msg)
if rule == uponNothing {
break
}
if isDuplicatedRule(rule) {
if rule == uponNothing || isDuplicatedRule(rule) {
// Do nothing more if no rule or duplicate rule was triggered
break
}

Expand All @@ -268,7 +243,6 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo
case uponJustifiedPrePrepare: // Algorithm 2:1
// Applicable to current or future rounds (since justified)
changeRound(msg.Round())
trimBuffer()

stopTimer()
timerChan, stopTimer = d.NewTimer(round)
Expand Down Expand Up @@ -296,7 +270,6 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo
case uponFPlus1RoundChanges: // Algorithm 3:5
// Only applicable to future rounds
changeRound(nextMinRound(d, justification, round /* < msg.Round */))
trimBuffer()

stopTimer()
timerChan, stopTimer = d.NewTimer(round)
Expand Down Expand Up @@ -324,7 +297,6 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo

case <-timerChan: // Algorithm 3:1
round++
trimBuffer()

stopTimer()
timerChan, stopTimer = d.NewTimer(round)
Expand All @@ -335,14 +307,14 @@ func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transpo
return ctx.Err()
}

if err != nil {
if err != nil { // Errors are considered fatal.
return err
}
}
}

// classify returns the rule triggered upon receipt of the last message and its justifications.
func classify[I any, V comparable](d Definition[I, V], instance I, round, process int64, buffer []Msg[I, V], msg Msg[I, V]) (uponRule, []Msg[I, V]) {
func classify[I any, V comparable](d Definition[I, V], instance I, round, process int64, buffer map[int64][]Msg[I, V], msg Msg[I, V]) (uponRule, []Msg[I, V]) {
switch msg.Type() {
case MsgDecided:
return uponJustifiedDecided, msg.Justification()
Expand All @@ -361,7 +333,7 @@ func classify[I any, V comparable](d Definition[I, V], instance I, round, proces
if msg.Round() != round {
return uponNothing, nil
}
prepares := filterByRoundAndValue(buffer, MsgPrepare, msg.Round(), msg.Value())
prepares := filterByRoundAndValue(flatten(buffer), MsgPrepare, msg.Round(), msg.Value())
if len(prepares) >= d.Quorum() {
return uponQuorumPrepares, prepares
}
Expand All @@ -371,7 +343,7 @@ func classify[I any, V comparable](d Definition[I, V], instance I, round, proces
if msg.Round() != round {
return uponNothing, nil
}
commits := filterByRoundAndValue(buffer, MsgCommit, msg.Round(), msg.Value())
commits := filterByRoundAndValue(flatten(buffer), MsgCommit, msg.Round(), msg.Value())
if len(commits) >= d.Quorum() {
return uponQuorumCommits, commits
}
Expand All @@ -382,9 +354,11 @@ func classify[I any, V comparable](d Definition[I, V], instance I, round, proces
return uponNothing, nil
}

all := flatten(buffer)

if msg.Round() > round {
// Jump ahead if we received F+1 higher ROUND-CHANGEs.
if frc, ok := getFPlus1RoundChanges(d, buffer, round); ok {
if frc, ok := getFPlus1RoundChanges(d, all, round); ok {
return uponFPlus1RoundChanges, frc
}

Expand All @@ -393,11 +367,11 @@ func classify[I any, V comparable](d Definition[I, V], instance I, round, proces

/* else msg.Round == round */

if qrc := filterRoundChange(buffer, msg.Round()); len(qrc) < d.Quorum() {
if qrc := filterRoundChange(all, msg.Round()); len(qrc) < d.Quorum() {
return uponNothing, nil
}

qrc, ok := getJustifiedQrc(d, buffer, msg.Round())
qrc, ok := getJustifiedQrc(d, all, msg.Round())
if !ok {
return uponUnjustQuorumRoundChanges, nil
}
Expand Down Expand Up @@ -468,7 +442,7 @@ func nextMinRound[I any, V comparable](d Definition[I, V], frc []Msg[I, V], roun
func isJustified[I any, V comparable](d Definition[I, V], instance I, msg Msg[I, V]) bool {
switch msg.Type() {
case MsgPrePrepare:
return isJustifiedPrePrepare(d, instance, msg)
return IsJustifiedPrePrepare(d, instance, msg)
case MsgPrepare:
return true
case MsgCommit:
Expand All @@ -478,7 +452,7 @@ func isJustified[I any, V comparable](d Definition[I, V], instance I, msg Msg[I,
case MsgDecided:
return isJustifiedDecided(d, msg)
default:
panic("invalid message type")
panic("bug: invalid message type")
}
}

Expand Down Expand Up @@ -537,8 +511,8 @@ func isJustifiedDecided[I any, V comparable](d Definition[I, V], msg Msg[I, V])
return len(commits) >= d.Quorum()
}

// isJustifiedPrePrepare returns true if the PRE-PREPARE message is justified.
func isJustifiedPrePrepare[I any, V comparable](d Definition[I, V], instance I, msg Msg[I, V]) bool {
// IsJustifiedPrePrepare returns true if the PRE-PREPARE message is justified.
func IsJustifiedPrePrepare[I any, V comparable](d Definition[I, V], instance I, msg Msg[I, V]) bool {
if msg.Type() != MsgPrePrepare {
panic("bug: not a preprepare message")
}
Expand Down Expand Up @@ -600,15 +574,15 @@ func containsJustifiedQrc[I any, V comparable](d Definition[I, V], justification
}

// getJustifiedQrc implements algorithm 4:1 and returns a justified quorum ROUND_CHANGEs (Qrc).
func getJustifiedQrc[I any, V comparable](d Definition[I, V], buffer []Msg[I, V], round int64) ([]Msg[I, V], bool) {
if qrc, ok := quorumNullPrepared(d, buffer, round); ok {
func getJustifiedQrc[I any, V comparable](d Definition[I, V], all []Msg[I, V], round int64) ([]Msg[I, V], bool) {
if qrc, ok := quorumNullPrepared(d, all, round); ok {
// Return any quorum null pv ROUND_CHANGE messages as Qrc.
return qrc, true
}

roundChanges := filterRoundChange(buffer, round)
roundChanges := filterRoundChange(all, round)

for _, prepares := range getPrepareQuorums(d, buffer) {
for _, prepares := range getPrepareQuorums(d, all) {
// See if we have quorum ROUND-CHANGE with HIGHEST_PREPARED(qrc) == prepares.Round.
var (
qrc []Msg[I, V]
Expand Down Expand Up @@ -640,9 +614,9 @@ func getJustifiedQrc[I any, V comparable](d Definition[I, V], buffer []Msg[I, V]
// getFPlus1RoundChanges returns true and Faulty+1 ROUND-CHANGE messages (Frc) with
// the rounds higher than the provided round. It returns the highest round
// per process in order to jump furthest.
func getFPlus1RoundChanges[I any, V comparable](d Definition[I, V], buffer []Msg[I, V], round int64) ([]Msg[I, V], bool) {
func getFPlus1RoundChanges[I any, V comparable](d Definition[I, V], all []Msg[I, V], round int64) ([]Msg[I, V], bool) {
highestBySource := make(map[int64]Msg[I, V])
for _, msg := range buffer {
for _, msg := range all {
if msg.Type() != MsgRoundChange {
continue
}
Expand Down Expand Up @@ -680,9 +654,9 @@ type preparedKey[I any, V comparable] struct {

// getPrepareQuorums returns all sets of quorum PREPARE messages
// with identical rounds and values.
func getPrepareQuorums[I any, V comparable](d Definition[I, V], buffer []Msg[I, V]) [][]Msg[I, V] {
func getPrepareQuorums[I any, V comparable](d Definition[I, V], all []Msg[I, V]) [][]Msg[I, V] {
sets := make(map[preparedKey[I, V]]map[int64]Msg[I, V]) // map[preparedKey]map[process]Msg
for _, msg := range flatten(buffer) { // Flatten to get PREPARES included as ROUND-CHANGE justifications.
for _, msg := range all { // Flatten to get PREPARES included as ROUND-CHANGE justifications.
if msg.Type() != MsgPrepare {
continue
}
Expand Down Expand Up @@ -797,14 +771,16 @@ func isZeroVal[V comparable](v V) bool {
return v == zeroVal[V]()
}

// flatten returns a new list of messages containing all the buffered messages
// flatten returns the buffer as a list containing all the buffered messages
// as well as all their justifications.
func flatten[I any, V comparable](buffer []Msg[I, V]) []Msg[I, V] {
func flatten[I any, V comparable](buffer map[int64][]Msg[I, V]) []Msg[I, V] {
var resp []Msg[I, V]
for _, msg := range buffer {
resp = append(resp, msg)
for _, j := range msg.Justification() {
resp = append(resp, j)
for _, msgs := range buffer {
for _, msg := range msgs {
resp = append(resp, msg)
for _, j := range msg.Justification() {
resp = append(resp, j)
}
}
}

Expand Down
11 changes: 7 additions & 4 deletions core/qbft/qbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ func testQBFT(t *testing.T, test test) {
t.Helper()

const (
n = 4
maxRound = 50
n = 4
maxRound = 50
fifoLimit = 100
)

var (
Expand Down Expand Up @@ -243,12 +244,13 @@ func testQBFT(t *testing.T, test test) {
t.Logf("%s %d => %v@%d -> %v@%d ~= %v", clock.NowStr(), msg.Source(), msg.Type(), msg.Round(), process, round, rule)
if round > maxRound {
cancel()
} else if !test.Fuzz && strings.Contains(rule, "Unjust") {
} else if !test.Fuzz && strings.Contains(strings.ToLower(rule), "unjust") {
t.Logf("%s: %#v", rule, msg)
cancel()
}
},
Nodes: n,
Nodes: n,
FIFOLimit: fifoLimit,
}

for i := int64(1); i <= n; i++ {
Expand Down Expand Up @@ -404,6 +406,7 @@ func newMsg(typ qbft.MsgType, instance int64, source int64, round int64, value i
var msgs []msg
for _, j := range justify {
m := j.(msg)
m.justify = nil // Clear nested justifications.
msgs = append(msgs, m)
}

Expand Down

0 comments on commit 008b1bf

Please sign in to comment.