Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/qbft: fix trimming issue #507

Merged
merged 3 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 80 additions & 60 deletions core/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,22 +199,21 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport

// trimBuffer drops all older round's buffered messages.
trimBuffer := func() {
// TODO(corver): Fix trimming.
// var selected []Msg[I, V]
// for _, msg := range buffer {
// if msg.Round() >= round-1 {
// selected = append(selected, msg)
// }
// }
// buffer = selected
//
// dedup := make(map[dedupKey]bool)
// for k := range dedupIn {
// if k.Round >= round {
// dedup[k] = true
// }
// }
// dedupIn = dedup
var selected []Msg[I, V]
for _, msg := range buffer {
if msg.Round() >= round-1 {
selected = append(selected, msg)
}
}
buffer = selected

dedup := make(map[dedupKey]bool)
for k := range dedupIn {
if k.Round >= round {
dedup[k] = true
}
}
dedupIn = dedup
}

// === Algorithm ===
Expand Down Expand Up @@ -245,13 +244,6 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport
continue
}

// Buffer justifications
for _, j := range msg.Justification() {
if !bufferMsg(j) {
continue
}
}

rule, justification := classify(d, instance, round, process, buffer, msg)
if rule == uponNothing {
continue
Expand Down Expand Up @@ -386,15 +378,16 @@ func classify[I any, V Value[V]](d Definition[I, V], instance I, round, process
return uponNothing, nil
}

if !d.IsLeader(instance, msg.Round(), process) {
return uponNothing, nil
qrc, ok := getJustifiedQrc(d, buffer, msg.Round())
if !ok {
panic("bug: unjust Qrc")
}

if qrc, ok := getJustifiedQrc(d, buffer, msg.Round()); ok {
return uponQuorumRoundChanges, qrc
if !d.IsLeader(instance, msg.Round(), process) {
return uponNothing, nil
}

panic("bug: unjust Qrc")
return uponQuorumRoundChanges, qrc

default:
panic("bug: invalid type")
Expand Down Expand Up @@ -428,25 +421,27 @@ func highestPrepared[I any, V Value[V]](qrc []Msg[I, V]) (int64, V) {

// nextMinRound implements algorithm 3:6 and returns the next minimum round
// from received round change messages.
func nextMinRound[I any, V Value[V]](d Definition[I, V], msgs []Msg[I, V], round int64) int64 {
func nextMinRound[I any, V Value[V]](d Definition[I, V], frc []Msg[I, V], round int64) int64 {
// Get all RoundChange messages with round (rj) higher than current round (ri)
frc, ok := getFPlus1RoundChanges(d, msgs, round)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to filter again, the input is already Frc

if !ok {
panic("bug: too few round change messages")

if len(frc) < d.Faulty()+1 {
panic("bug: Frc too short")
}

// Get the smallest round in the set.
rmin := int64(math.MaxInt64)
for _, msg := range frc {
if msg.Type() != MsgRoundChange {
panic("bug: Frc contain non-round change")
} else if msg.Round() <= round {
panic("bug: Frc round not in future")
}

if rmin > msg.Round() {
rmin = msg.Round()
}
}

if rmin <= round {
panic("bug: next rmin not after round")
}

return rmin
}

Expand All @@ -457,21 +452,35 @@ func isJustifiedRoundChange[I any, V Value[V]](d Definition[I, V], msg Msg[I, V]
panic("bug: not a round change message")
}

if msg.PreparedRound() == 0 && isZeroVal(msg.PreparedValue()) && len(msg.Justification()) == 0 {
// No need to justify null prepared round and value.
return true
// ROUND-CHANGE justification contains quorum PREPARE messages that justifies Pr and Pv.
prepares := msg.Justification()
pr := msg.PreparedRound()
pv := msg.PreparedValue()

if len(prepares) == 0 {
// If no justification, ensure null prepared round and value.
return pr == 0 && isZeroVal(pv)
}

// No need to check for all possible combinations, since justified should only contain a one.

if len(msg.Justification()) < d.Quorum() {
if len(prepares) < d.Quorum() {
return false
}

pv := msg.PreparedValue()
prepares := filterMsgs(msg.Justification(), MsgPrepare, msg.PreparedRound(), &pv, nil, nil)
for _, prepare := range prepares {
if prepare.Type() != MsgPrepare {
return false
}
if prepare.Round() != pr {
return false
}
if !prepare.Value().Equal(pv) {
return false
}
}

return len(msg.Justification()) == len(prepares)
return true
}

// isJustifiedPrePrepare returns true if the PRE-PREPARE message is justified.
Expand All @@ -497,11 +506,7 @@ func isJustifiedPrePrepare[I any, V Value[V]](d Definition[I, V], instance I, ms
return true // New value being proposed
}

if msg.Value().Equal(pv) {
return true
}

return false
return msg.Value().Equal(pv) // Ensure Pv is being proposed
}

// containsJustifiedQrc implements algorithm 4:1 and returns true and pv if
Expand Down Expand Up @@ -541,29 +546,30 @@ func containsJustifiedQrc[I any, V Value[V]](d Definition[I, V], justification [
}

// getJustifiedQrc implements algorithm 4:1 and returns a justified quorum ROUND_CHANGEs (Qrc).
func getJustifiedQrc[I any, V Value[V]](d Definition[I, V], all []Msg[I, V], round int64) ([]Msg[I, V], bool) {
if qrc, ok := quorumNullPrepared(d, all, round); ok {
func getJustifiedQrc[I any, V Value[V]](d Definition[I, V], buffer []Msg[I, V], round int64) ([]Msg[I, V], bool) {
if qrc, ok := quorumNullPrepared(d, buffer, round); ok {
// Return any quorum null pv ROUND_CHANGE messages as Qrc.
return qrc, true
}

rc := filterRoundChange(all, round)
for _, prepares := range getPrepareQuorums(d, all) {
roundChanges := filterRoundChange(buffer, round)

for _, prepares := range getPrepareQuorums(d, buffer) {
// See if we have quorum ROUND-CHANGE with HIGHEST_PREPARED(qrc) == prepares.Round.
var (
qrc []Msg[I, V]
hasHighestPrepared bool
pr = prepares[0].Round()
pv = prepares[0].Value()
)
for _, msg := range rc {
if msg.PreparedRound() > pr {
for _, rc := range roundChanges {
if rc.PreparedRound() > pr {
continue
}
if msg.PreparedRound() == pr && msg.PreparedValue().Equal(pv) {
if rc.PreparedRound() == pr && rc.PreparedValue().Equal(pv) {
hasHighestPrepared = true
}
qrc = append(qrc, msg)
qrc = append(qrc, rc)
}
if len(qrc) >= d.Quorum() && hasHighestPrepared {
return append(qrc, prepares...), true
Expand All @@ -576,9 +582,9 @@ func getJustifiedQrc[I any, V Value[V]](d Definition[I, V], all []Msg[I, V], rou
// getFPlus1RoundChanges returns true and Faulty+1 ROUND-CHANGE messages (Frc) with
// the rounds higher than the provided round. It returns the highest round
// per process in order to jump furthest.
func getFPlus1RoundChanges[I any, V Value[V]](d Definition[I, V], msgs []Msg[I, V], round int64) ([]Msg[I, V], bool) {
func getFPlus1RoundChanges[I any, V Value[V]](d Definition[I, V], buffer []Msg[I, V], round int64) ([]Msg[I, V], bool) {
highestBySource := make(map[int64]Msg[I, V])
for _, msg := range msgs {
for _, msg := range buffer {
if msg.Type() != MsgRoundChange {
continue
}
Expand Down Expand Up @@ -618,9 +624,9 @@ type prepareSet[I any, V Value[V]] struct {

// getPrepareQuorums returns all sets of quorum PREPARE messages
// with identical rounds and values.
func getPrepareQuorums[I any, V Value[V]](d Definition[I, V], msgs []Msg[I, V]) [][]Msg[I, V] {
func getPrepareQuorums[I any, V Value[V]](d Definition[I, V], buffer []Msg[I, V]) [][]Msg[I, V] {
var sets []prepareSet[I, V]
for _, msg := range msgs {
for _, msg := range flatten(buffer) { // Flatten to get PREPARES included as ROUND-CHANGE justifications.
if msg.Type() != MsgPrepare {
continue
}
Expand Down Expand Up @@ -746,3 +752,17 @@ func zeroVal[V Value[V]]() V {
func isZeroVal[V Value[V]](v V) bool {
return v.Equal(zeroVal[V]())
}

// flatten returns a new list of messages containing all the buffered messages
// as well as all their justifications.
func flatten[I any, V Value[V]](buffer []Msg[I, V]) []Msg[I, V] {
var resp []Msg[I, V]
for _, msg := range buffer {
resp = append(resp, msg)
for _, j := range msg.Justification() {
resp = append(resp, j)
}
}

return resp
}
68 changes: 36 additions & 32 deletions core/qbft/qbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,6 @@ import (
"github.com/obolnetwork/charon/core/qbft"
)

func TestFormulas(t *testing.T) {
// assert given N asserts Q and F.
assert := func(t *testing.T, n, q, f int) {
t.Helper()
d := qbft.Definition[any, value]{Nodes: n}
require.Equalf(t, q, d.Quorum(), "Quorum given N=%d", n)
require.Equalf(t, f, d.Faulty(), "Faulty given N=%d", n)
}

assert(t, 1, 1, 0)
assert(t, 2, 2, 0)
assert(t, 3, 2, 0)
assert(t, 4, 3, 1)
assert(t, 5, 4, 1)
assert(t, 6, 4, 1)
assert(t, 7, 5, 2)
assert(t, 8, 6, 2)
assert(t, 9, 6, 2)
assert(t, 10, 7, 3)
assert(t, 11, 8, 3)
assert(t, 12, 8, 3)
assert(t, 13, 9, 4)
assert(t, 15, 10, 4)
assert(t, 17, 12, 5)
assert(t, 19, 13, 6)
assert(t, 21, 14, 6)
}

func TestQBFT(t *testing.T) {
t.Run("happy 0", func(t *testing.T) {
testQBFT(t, test{
Expand Down Expand Up @@ -190,7 +162,7 @@ func testQBFT(t *testing.T, test test) {
var (
ctx, cancel = context.WithCancel(context.Background())
clock = new(fakeClock)
receives []chan qbft.Msg[int64, value]
receives = make(map[int64]chan qbft.Msg[int64, value])
broadcast = make(chan qbft.Msg[int64, value])
resultChan = make(chan []qbft.Msg[int64, value], n)
errChan = make(chan error, n)
Expand All @@ -217,7 +189,7 @@ func testQBFT(t *testing.T, test test) {
if round > 50 {
cancel()
} else if strings.Contains(rule, "Unjust") {
t.Logf("Unjustified PRE-PREPARE: %#v", msg)
t.Logf("%s: %#v", rule, msg)
cancel()
}
},
Expand All @@ -226,12 +198,13 @@ func testQBFT(t *testing.T, test test) {

for i := int64(1); i <= n; i++ {
receive := make(chan qbft.Msg[int64, value], 1000)
receives = append(receives, receive)
receives[i] = receive
trans := qbft.Transport[int64, value]{
Broadcast: func(typ qbft.MsgType, instance int64, source int64, round int64, value value,
pr int64, pv value, justify []qbft.Msg[int64, value],
) {
msg := newMsg(typ, instance, source, round, value, pr, pv, justify)
receive <- msg // Always send to self first (no jitter, no drops).
bcast(broadcast, msg, test.BCastJitterMS, clock)
},
SendQCommit: func(_ int64, qCommit []qbft.Msg[int64, value]) {
Expand Down Expand Up @@ -274,7 +247,10 @@ func testQBFT(t *testing.T, test test) {
select {
case msg := <-broadcast:
t.Logf("%s %v => %v@%d", clock.NowStr(), msg.Source(), msg.Type(), msg.Round())
for _, out := range receives {
for target, out := range receives {
if target == msg.Source() {
continue // Do not broadcast to self, we sent to self already.
}
if p, ok := test.DropProb[msg.Source()]; ok {
if rand.Float64() < p {
continue // Drop
Expand Down Expand Up @@ -406,3 +382,31 @@ type value int64
func (v value) Equal(v2 value) bool {
return int64(v) == int64(v2)
}

func TestFormulas(t *testing.T) {
// assert given N asserts Q and F.
assert := func(t *testing.T, n, q, f int) {
t.Helper()
d := qbft.Definition[any, value]{Nodes: n}
require.Equalf(t, q, d.Quorum(), "Quorum given N=%d", n)
require.Equalf(t, f, d.Faulty(), "Faulty given N=%d", n)
}

assert(t, 1, 1, 0)
assert(t, 2, 2, 0)
assert(t, 3, 2, 0)
assert(t, 4, 3, 1)
assert(t, 5, 4, 1)
assert(t, 6, 4, 1)
assert(t, 7, 5, 2)
assert(t, 8, 6, 2)
assert(t, 9, 6, 2)
assert(t, 10, 7, 3)
assert(t, 11, 8, 3)
assert(t, 12, 8, 3)
assert(t, 13, 9, 4)
assert(t, 15, 10, 4)
assert(t, 17, 12, 5)
assert(t, 19, 13, 6)
assert(t, 21, 14, 6)
}