Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/qbft: send qcommit as decided message #508

Merged
merged 3 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions core/qbft/msgtype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

80 changes: 60 additions & 20 deletions core/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ type Transport[I any, V Value[V]] struct {
// processes in the system (including this process).
Broadcast func(typ MsgType, instance I, source int64, round int64, value V, pr int64, pv V, justification []Msg[I, V])

// SendQCommit sends the commit messages to a specific process.
SendQCommit func(target int64, qCommit []Msg[I, V])

// Receive returns a stream of messages received
// from other processes in the system (including this process).
Receive <-chan Msg[I, V]
Expand Down Expand Up @@ -88,7 +85,8 @@ const (
MsgPrepare MsgType = 2
MsgCommit MsgType = 3
MsgRoundChange MsgType = 4
msgSentinel MsgType = 5
MsgDecided MsgType = 5
msgSentinel MsgType = 6
)

func (i MsgType) Valid() bool {
Expand Down Expand Up @@ -129,6 +127,8 @@ const (
uponUnjustRoundChange
uponFPlus1RoundChanges
uponQuorumRoundChanges
uponJustifiedDecided
uponUnjustDecided
)

// Run executes the consensus algorithm until the context closed.
Expand Down Expand Up @@ -177,14 +177,6 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport
zeroVal[V](), preparedRound, preparedValue, preparedJustification)
}

// sendQCommit sends qCommit to the target process.
sendQCommit := func(target int64) {
if len(qCommit) == 0 {
panic("bug: send empty Qcommit")
}
t.SendQCommit(target, qCommit)
}

// bufferMsg returns true if the message is unique and was added to the buffer.
// It returns false if the message is a duplicate and should be discarded.
bufferMsg := func(msg Msg[I, V]) bool {
Expand Down Expand Up @@ -233,7 +225,7 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport
// Just send Qcommit if consensus already decided
if len(qCommit) > 0 {
if msg.Source() != process && msg.Type() == MsgRoundChange { // Algorithm 3:17
sendQCommit(msg.Source())
broadcastMsg(MsgDecided, qCommit[0].Value(), qCommit)
}

continue
Expand Down Expand Up @@ -270,8 +262,10 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport

broadcastMsg(MsgCommit, preparedValue, nil)

case uponQuorumCommits: // Algorithm 2:8
case uponQuorumCommits, uponJustifiedDecided: // Algorithm 2:8
// Applicable to any round (since can be justified)
round = msg.Round()

stopTimer()
qCommit = justification

Expand Down Expand Up @@ -299,7 +293,7 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport

broadcastMsg(MsgPrePrepare, value, justification)

case uponUnjustPrePrepare, uponUnjustRoundChange:
case uponUnjustPrePrepare, uponUnjustRoundChange, uponUnjustDecided:
// Ignore bug or byzantium.

default:
Expand All @@ -324,6 +318,13 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport
// classify returns the rule triggered upon receipt of the last message and its justifications.
func classify[I any, V Value[V]](d Definition[I, V], instance I, round, process int64, buffer []Msg[I, V], msg Msg[I, V]) (uponRule, []Msg[I, V]) {
switch msg.Type() {
case MsgDecided:
if isJustifiedDecided(d, msg) {
return uponJustifiedDecided, msg.Justification()
}

return uponUnjustDecided, nil

case MsgPrePrepare:
if !isJustifiedPrePrepare(d, instance, msg) {
return uponUnjustPrePrepare, nil
Expand Down Expand Up @@ -468,7 +469,11 @@ func isJustifiedRoundChange[I any, V Value[V]](d Definition[I, V], msg Msg[I, V]
return false
}

uniq := uniqSource[I, V]()
for _, prepare := range prepares {
if !uniq(prepare) {
Copy link
Contributor Author

@corverroos corverroos May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when counting/filtering messages, ensure that only one message per source/process is counted.

return false
}
if prepare.Type() != MsgPrepare {
return false
}
Expand All @@ -483,6 +488,19 @@ func isJustifiedRoundChange[I any, V Value[V]](d Definition[I, V], msg Msg[I, V]
return true
}

// isJustifiedDecided returns true if the decided message is justified by quorum COMMIT messages
// of identical round and value.
func isJustifiedDecided[I any, V Value[V]](d Definition[I, V], msg Msg[I, V]) bool {
if msg.Type() != MsgDecided {
panic("bug: not a decided message")
}

v := msg.Value()
commits := filterMsgs(msg.Justification(), MsgCommit, msg.Round(), &v, nil, nil)

return len(commits) >= d.Quorum()
}

// isJustifiedPrePrepare returns true if the PRE-PREPARE message is justified.
func isJustifiedPrePrepare[I any, V Value[V]](d Definition[I, V], instance I, msg Msg[I, V]) bool {
if msg.Type() != MsgPrePrepare {
Expand Down Expand Up @@ -561,11 +579,15 @@ func getJustifiedQrc[I any, V Value[V]](d Definition[I, V], buffer []Msg[I, V],
hasHighestPrepared bool
pr = prepares[0].Round()
pv = prepares[0].Value()
uniq = uniqSource[I, V]()
)
for _, rc := range roundChanges {
if rc.PreparedRound() > pr {
continue
}
if !uniq(rc) {
continue
}
if rc.PreparedRound() == pr && rc.PreparedValue().Equal(pv) {
hasHighestPrepared = true
}
Expand Down Expand Up @@ -695,7 +717,7 @@ func filterRoundChange[I any, V Value[V]](msgs []Msg[I, V], round int64) []Msg[I
func filterMsgs[I any, V Value[V]](msgs []Msg[I, V], typ MsgType, round int64, value *V, pr *int64, pv *V) []Msg[I, V] {
var (
resp []Msg[I, V]
dups = make(map[dedupKey]bool)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already did this in filterMsgs but since we do not always use this function, generalise the uniqSource[I, V]() helper function

uniq = uniqSource[I, V]()
)
for _, msg := range msgs {
if typ != msg.Type() {
Expand All @@ -718,11 +740,9 @@ func filterMsgs[I any, V Value[V]](msgs []Msg[I, V], typ MsgType, round int64, v
continue
}

if dups[key(msg)] {
continue
if uniq(msg) {
resp = append(resp, msg)
}
dups[key(msg)] = true
resp = append(resp, msg)
}

return resp
Expand Down Expand Up @@ -766,3 +786,23 @@ func flatten[I any, V Value[V]](buffer []Msg[I, V]) []Msg[I, V] {

return resp
}

// uniqSource returns a function that returns true if the message is from a unique source.
func uniqSource[I any, V Value[V]](msgs ...Msg[I, V]) func(Msg[I, V]) bool {
duplicate := make(map[int64]bool)
for _, msg := range msgs {
if duplicate[msg.Source()] {
panic("seeding uniq with duplicates")
}
duplicate[msg.Source()] = true
}

return func(msg Msg[I, V]) bool {
if duplicate[msg.Source()] {
return false
}
duplicate[msg.Source()] = true

return true
}
}
46 changes: 34 additions & 12 deletions core/qbft/qbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,20 @@ func TestQBFT(t *testing.T) {
RandomRound: true,
})
})

t.Run("drop 30% const", func(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this issue was uncovered when running multiple 30% lossy tests

testQBFT(t, test{
Instance: 1,
DropProb: map[int64]float64{
0: 0.3,
1: 0.3,
2: 0.3,
3: 0.3,
},
ConstPeriod: true,
RandomRound: true,
})
})
}

type test struct {
Expand All @@ -158,7 +172,10 @@ type test struct {
func testQBFT(t *testing.T, test test) {
t.Helper()

const n = 4
const (
n = 4
maxRound = 50
)

var (
ctx, cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -187,7 +204,7 @@ func testQBFT(t *testing.T, test test) {
},
LogUponRule: func(instance int64, process, round int64, msg qbft.Msg[int64, value], rule string) {
t.Logf("%s %d => %v@%d -> %v@%d ~= %v", clock.NowStr(), msg.Source(), msg.Type(), msg.Round(), process, round, rule)
if round > 50 {
if round > maxRound {
cancel()
} else if strings.Contains(rule, "Unjust") {
t.Logf("%s: %#v", rule, msg)
Expand All @@ -204,22 +221,24 @@ func testQBFT(t *testing.T, test test) {
Broadcast: func(typ qbft.MsgType, instance int64, source int64, round int64, value value,
pr int64, pv value, justify []qbft.Msg[int64, value],
) {
if round > maxRound {
cancel()
return
}
t.Logf("%s %v => %v@%d", clock.NowStr(), source, typ, round)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more test logs

msg := newMsg(typ, instance, source, round, value, pr, pv, justify)
receive <- msg // Always send to self first (no jitter, no drops).
bcast(broadcast, msg, test.BCastJitterMS, clock)
},
SendQCommit: func(_ int64, qCommit []qbft.Msg[int64, value]) {
for _, msg := range qCommit {
broadcast <- msg // Just broadcast
}
bcast(t, broadcast, msg, test.BCastJitterMS, clock)
},
Receive: receive,
}

go func(i int64) {
if d, ok := test.StartDelay[i]; ok {
t.Logf("%s Node %d start delay %s", clock.NowStr(), i, d)
ch, _ := clock.NewTimer(d)
<-ch
t.Logf("%s Node %d starting %s", clock.NowStr(), i, d)

// Drain any buffered messages
for {
Expand Down Expand Up @@ -247,13 +266,13 @@ func testQBFT(t *testing.T, test test) {
for {
select {
case msg := <-broadcast:
t.Logf("%s %v => %v@%d", clock.NowStr(), msg.Source(), msg.Type(), msg.Round())
for target, out := range receives {
if target == msg.Source() {
continue // Do not broadcast to self, we sent to self already.
}
if p, ok := test.DropProb[msg.Source()]; ok {
if rand.Float64() < p {
t.Logf("%s %v => %v@%d => %d (dropped)", clock.NowStr(), msg.Source(), msg.Type(), msg.Round(), target)
continue // Drop
}
}
Expand Down Expand Up @@ -291,16 +310,19 @@ func testQBFT(t *testing.T, test test) {
}

// bcast delays the message broadcast by between 1x and 2x jitterMS and drops messages.
func bcast[I any, V qbft.Value[V]](broadcast chan qbft.Msg[I, V], msg qbft.Msg[I, V], jitterMS int, clock *fakeClock) {
func bcast[I any, V qbft.Value[V]](t *testing.T, broadcast chan qbft.Msg[I, V], msg qbft.Msg[I, V], jitterMS int, clock *fakeClock) {
t.Helper()

if jitterMS == 0 {
broadcast <- msg

return
}

go func() {
deltaMS := int(float64(jitterMS) * rand.Float64())
ch, _ := clock.NewTimer(time.Duration(jitterMS+deltaMS) * time.Millisecond)
delay := time.Duration(jitterMS+deltaMS) * time.Millisecond
t.Logf("%s %v => %v@%d (bcast delay %s)", clock.NowStr(), msg.Source(), msg.Type(), msg.Round(), delay)
ch, _ := clock.NewTimer(delay)
<-ch
broadcast <- msg
}()
Expand Down
6 changes: 4 additions & 2 deletions core/qbft/uponrule_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.