Skip to content

Commit

Permalink
core/qbft: refactor value to generic comparable
Browse files Browse the repository at this point in the history
  • Loading branch information
corverroos committed May 10, 2022
1 parent db999b9 commit a739003
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 99 deletions.
106 changes: 44 additions & 62 deletions core/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,11 @@ import (
"github.com/obolnetwork/charon/app/errors"
)

// Value defines the constraints of the generic value type.
// The only constraint is an equality method.
type Value[V any] interface {
Equal(V) bool
}

// Transport abstracts the transport layer between processes in the consensus system.
//
// Note that broadcasting doesn't return an error. Since this algorithm is idempotent
// it is suggested to just retry broadcasting indefinitely until it succeeds or times out.
type Transport[I any, V Value[V]] struct {
type Transport[I any, V comparable] struct {
// Broadcast sends a message with the provided fields to all other
// processes in the system (including this process).
Broadcast func(typ MsgType, instance I, source int64, round int64, value V, pr int64, pv V, justification []Msg[I, V])
Expand All @@ -48,7 +42,7 @@ type Transport[I any, V Value[V]] struct {

// Definition defines the consensus system parameters that are external to the qbft algorithm.
// This remains constant across multiple instances of consensus (calls to Run).
type Definition[I any, V Value[V]] struct {
type Definition[I any, V comparable] struct {
// IsLeader is a deterministic leader election function.
IsLeader func(instance I, round, process int64) bool
// NewTimer returns a new timer channel and stop function for the round.
Expand Down Expand Up @@ -94,7 +88,7 @@ func (i MsgType) Valid() bool {
}

// Msg defines the inter process messages.
type Msg[I any, V Value[V]] interface {
type Msg[I any, V comparable] interface {
// Type of the message.
Type() MsgType
// Instance identifies the consensus instance.
Expand Down Expand Up @@ -136,7 +130,7 @@ const (
// The generic type V is the arbitrary data value being proposed; it only requires an Equal method.
//
//nolint:gocognit // It is indeed a complex algorithm.
func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport[I, V], instance I, process int64, inputValue V) (err error) {
func Run[I any, V comparable](ctx context.Context, d Definition[I, V], t Transport[I, V], instance I, process int64, inputValue V) (err error) {
if isZeroVal(inputValue) {
return errors.New("zero input value not supported")
}
Expand Down Expand Up @@ -316,7 +310,7 @@ func Run[I any, V Value[V]](ctx context.Context, d Definition[I, V], t Transport
}

// classify returns the rule triggered upon receipt of the last message and its justifications.
func classify[I any, V Value[V]](d Definition[I, V], instance I, round, process int64, buffer []Msg[I, V], msg Msg[I, V]) (uponRule, []Msg[I, V]) {
func classify[I any, V comparable](d Definition[I, V], instance I, round, process int64, buffer []Msg[I, V], msg Msg[I, V]) (uponRule, []Msg[I, V]) {
switch msg.Type() {
case MsgDecided:
if isJustifiedDecided(d, msg) {
Expand Down Expand Up @@ -400,7 +394,7 @@ func classify[I any, V Value[V]](d Definition[I, V], instance I, round, process
// highestPrepared implements algorithm 4:5 and returns
// the highest prepared round (and pv) from the set of quorum
// round change messages (Qrc).
func highestPrepared[I any, V Value[V]](qrc []Msg[I, V]) (int64, V) {
func highestPrepared[I any, V comparable](qrc []Msg[I, V]) (int64, V) {
if len(qrc) == 0 {
// Expect: len(Qrc) >= quorum
panic("bug: qrc empty")
Expand All @@ -422,7 +416,7 @@ func highestPrepared[I any, V Value[V]](qrc []Msg[I, V]) (int64, V) {

// nextMinRound implements algorithm 3:6 and returns the next minimum round
// from received round change messages.
func nextMinRound[I any, V Value[V]](d Definition[I, V], frc []Msg[I, V], round int64) int64 {
func nextMinRound[I any, V comparable](d Definition[I, V], frc []Msg[I, V], round int64) int64 {
// Get all RoundChange messages with round (rj) higher than current round (ri)

if len(frc) < d.Faulty()+1 {
Expand All @@ -448,7 +442,7 @@ func nextMinRound[I any, V Value[V]](d Definition[I, V], frc []Msg[I, V], round

// isJustifiedRoundChange returns true if the ROUND_CHANGE message's
// prepared round and value is justified.
func isJustifiedRoundChange[I any, V Value[V]](d Definition[I, V], msg Msg[I, V]) bool {
func isJustifiedRoundChange[I any, V comparable](d Definition[I, V], msg Msg[I, V]) bool {
if msg.Type() != MsgRoundChange {
panic("bug: not a round change message")
}
Expand Down Expand Up @@ -480,7 +474,7 @@ func isJustifiedRoundChange[I any, V Value[V]](d Definition[I, V], msg Msg[I, V]
if prepare.Round() != pr {
return false
}
if !prepare.Value().Equal(pv) {
if prepare.Value() != pv {
return false
}
}
Expand All @@ -490,7 +484,7 @@ func isJustifiedRoundChange[I any, V Value[V]](d Definition[I, V], msg Msg[I, V]

// isJustifiedDecided returns true if the decided message is justified by quorum COMMIT messages
// of identical round and value.
func isJustifiedDecided[I any, V Value[V]](d Definition[I, V], msg Msg[I, V]) bool {
func isJustifiedDecided[I any, V comparable](d Definition[I, V], msg Msg[I, V]) bool {
if msg.Type() != MsgDecided {
panic("bug: not a decided message")
}
Expand All @@ -502,7 +496,7 @@ func isJustifiedDecided[I any, V Value[V]](d Definition[I, V], msg Msg[I, V]) bo
}

// isJustifiedPrePrepare returns true if the PRE-PREPARE message is justified.
func isJustifiedPrePrepare[I any, V Value[V]](d Definition[I, V], instance I, msg Msg[I, V]) bool {
func isJustifiedPrePrepare[I any, V comparable](d Definition[I, V], instance I, msg Msg[I, V]) bool {
if msg.Type() != MsgPrePrepare {
panic("bug: not a preprepare message")
}
Expand All @@ -524,12 +518,12 @@ func isJustifiedPrePrepare[I any, V Value[V]](d Definition[I, V], instance I, ms
return true // New value being proposed
}

return msg.Value().Equal(pv) // Ensure Pv is being proposed
return msg.Value() == pv // Ensure Pv is being proposed
}

// containsJustifiedQrc implements algorithm 4:1 and returns true and pv if
// the messages contains a justified quorum ROUND_CHANGEs (Qrc).
func containsJustifiedQrc[I any, V Value[V]](d Definition[I, V], justification []Msg[I, V], round int64) (V, bool) {
func containsJustifiedQrc[I any, V comparable](d Definition[I, V], justification []Msg[I, V], round int64) (V, bool) {
qrc := filterRoundChange(justification, round)
if len(qrc) < d.Quorum() {
return zeroVal[V](), false
Expand Down Expand Up @@ -564,7 +558,7 @@ func containsJustifiedQrc[I any, V Value[V]](d Definition[I, V], justification [
}

// getJustifiedQrc implements algorithm 4:1 and returns a justified quorum ROUND_CHANGEs (Qrc).
func getJustifiedQrc[I any, V Value[V]](d Definition[I, V], buffer []Msg[I, V], round int64) ([]Msg[I, V], bool) {
func getJustifiedQrc[I any, V comparable](d Definition[I, V], buffer []Msg[I, V], round int64) ([]Msg[I, V], bool) {
if qrc, ok := quorumNullPrepared(d, buffer, round); ok {
// Return any quorum null pv ROUND_CHANGE messages as Qrc.
return qrc, true
Expand All @@ -588,7 +582,7 @@ func getJustifiedQrc[I any, V Value[V]](d Definition[I, V], buffer []Msg[I, V],
if !uniq(rc) {
continue
}
if rc.PreparedRound() == pr && rc.PreparedValue().Equal(pv) {
if rc.PreparedRound() == pr && rc.PreparedValue() == pv {
hasHighestPrepared = true
}
qrc = append(qrc, rc)
Expand All @@ -604,7 +598,7 @@ func getJustifiedQrc[I any, V Value[V]](d Definition[I, V], buffer []Msg[I, V],
// getFPlus1RoundChanges returns true and Faulty+1 ROUND-CHANGE messages (Frc) with
// the rounds higher than the provided round. It returns the highest round
// per process in order to jump furthest.
func getFPlus1RoundChanges[I any, V Value[V]](d Definition[I, V], buffer []Msg[I, V], round int64) ([]Msg[I, V], bool) {
func getFPlus1RoundChanges[I any, V comparable](d Definition[I, V], buffer []Msg[I, V], round int64) ([]Msg[I, V], bool) {
highestBySource := make(map[int64]Msg[I, V])
for _, msg := range buffer {
if msg.Type() != MsgRoundChange {
Expand Down Expand Up @@ -636,52 +630,38 @@ func getFPlus1RoundChanges[I any, V Value[V]](d Definition[I, V], buffer []Msg[I
return resp, true
}

// prepareSet defines a set of PREPARE messages (one per process)
// with identical round and value.
type prepareSet[I any, V Value[V]] struct {
// preparedKey defines the round and value of set of identical PREPARE messages.
type preparedKey[I any, V comparable] struct {
round int64
value V
msgs map[int64]Msg[I, V] // map[process]Msg
}

// getPrepareQuorums returns all sets of quorum PREPARE messages
// with identical rounds and values.
func getPrepareQuorums[I any, V Value[V]](d Definition[I, V], buffer []Msg[I, V]) [][]Msg[I, V] {
var sets []prepareSet[I, V]
for _, msg := range flatten(buffer) { // Flatten to get PREPARES included as ROUND-CHANGE justifications.
func getPrepareQuorums[I any, V comparable](d Definition[I, V], buffer []Msg[I, V]) [][]Msg[I, V] {
sets := make(map[preparedKey[I, V]]map[int64]Msg[I, V]) // map[preparedKey]map[process]Msg
for _, msg := range flatten(buffer) { // Flatten to get PREPARES included as ROUND-CHANGE justifications.
if msg.Type() != MsgPrepare {
continue
}

var found bool
for _, s := range sets {
if s.round != msg.Round() || !s.value.Equal(msg.Value()) {
continue
}
s.msgs[msg.Source()] = msg
found = true

break
}
if found {
continue
key := preparedKey[I, V]{round: msg.Round(), value: msg.Value()}
msgs, ok := sets[key]
if !ok {
msgs = make(map[int64]Msg[I, V])
}

sets = append(sets, prepareSet[I, V]{
round: msg.Round(),
value: msg.Value(),
msgs: map[int64]Msg[I, V]{msg.Source(): msg},
})
msgs[msg.Source()] = msg
sets[key] = msgs
}

// Return all quorums
var quorums [][]Msg[I, V]
for _, set := range sets {
if len(set.msgs) < d.Quorum() {
for _, msgs := range sets {
if len(msgs) < d.Quorum() {
continue
}
var quorum []Msg[I, V]
for _, msg := range set.msgs {
for _, msg := range msgs {
quorum = append(quorum, msg)
}
quorums = append(quorums, quorum)
Expand All @@ -692,7 +672,7 @@ func getPrepareQuorums[I any, V Value[V]](d Definition[I, V], buffer []Msg[I, V]

// quorumNullPrepared implements condition J1 and returns Qrc and true if a quorum
// of round changes messages (Qrc) for the round have null prepared round and value.
func quorumNullPrepared[I any, V Value[V]](d Definition[I, V], all []Msg[I, V], round int64) ([]Msg[I, V], bool) {
func quorumNullPrepared[I any, V comparable](d Definition[I, V], all []Msg[I, V], round int64) ([]Msg[I, V], bool) {
var (
nullPr int64
nullPv V
Expand All @@ -703,18 +683,18 @@ func quorumNullPrepared[I any, V Value[V]](d Definition[I, V], all []Msg[I, V],
}

// filterByRoundAndValue returns the messages matching the type and value.
func filterByRoundAndValue[I any, V Value[V]](msgs []Msg[I, V], typ MsgType, round int64, value V) []Msg[I, V] {
func filterByRoundAndValue[I any, V comparable](msgs []Msg[I, V], typ MsgType, round int64, value V) []Msg[I, V] {
return filterMsgs(msgs, typ, round, &value, nil, nil)
}

// filterRoundChange returns all round change messages for the provided round.
func filterRoundChange[I any, V Value[V]](msgs []Msg[I, V], round int64) []Msg[I, V] {
func filterRoundChange[I any, V comparable](msgs []Msg[I, V], round int64) []Msg[I, V] {
return filterMsgs(msgs, MsgRoundChange, round, nil, nil, nil)
}

// filterMsgs returns one message per process matching the provided type and round
// and optional value, pr, pv.
func filterMsgs[I any, V Value[V]](msgs []Msg[I, V], typ MsgType, round int64, value *V, pr *int64, pv *V) []Msg[I, V] {
func filterMsgs[I any, V comparable](msgs []Msg[I, V], typ MsgType, round int64, value *V, pr *int64, pv *V) []Msg[I, V] {
var (
resp []Msg[I, V]
uniq = uniqSource[I, V]()
Expand All @@ -728,11 +708,11 @@ func filterMsgs[I any, V Value[V]](msgs []Msg[I, V], typ MsgType, round int64, v
continue
}

if value != nil && !msg.Value().Equal(*value) {
if value != nil && msg.Value() != *value {
continue
}

if pv != nil && !msg.PreparedValue().Equal(*pv) {
if pv != nil && msg.PreparedValue() != *pv {
continue
}

Expand All @@ -749,7 +729,7 @@ func filterMsgs[I any, V Value[V]](msgs []Msg[I, V], typ MsgType, round int64, v
}

// key returns the message dedup key.
func key[I any, V Value[V]](msg Msg[I, V]) dedupKey {
func key[I any, V comparable](msg Msg[I, V]) dedupKey {
return dedupKey{
Source: msg.Source(),
Type: msg.Type(),
Expand All @@ -764,18 +744,20 @@ type dedupKey struct {
Round int64
}

func zeroVal[V Value[V]]() V {
// zeroVal returns a zero value.
func zeroVal[V comparable]() V {
var zero V
return zero
}

func isZeroVal[V Value[V]](v V) bool {
return v.Equal(zeroVal[V]())
// isZeroVal returns true if the value is a zero value.
func isZeroVal[V comparable](v V) bool {
return v == zeroVal[V]()
}

// flatten returns a new list of messages containing all the buffered messages
// as well as all their justifications.
func flatten[I any, V Value[V]](buffer []Msg[I, V]) []Msg[I, V] {
func flatten[I any, V comparable](buffer []Msg[I, V]) []Msg[I, V] {
var resp []Msg[I, V]
for _, msg := range buffer {
resp = append(resp, msg)
Expand All @@ -788,7 +770,7 @@ func flatten[I any, V Value[V]](buffer []Msg[I, V]) []Msg[I, V] {
}

// uniqSource returns a function that returns true if the message is from a unique source.
func uniqSource[I any, V Value[V]](msgs ...Msg[I, V]) func(Msg[I, V]) bool {
func uniqSource[I any, V comparable](msgs ...Msg[I, V]) func(Msg[I, V]) bool {
dedup := make(map[int64]bool)
for _, msg := range msgs {
if dedup[msg.Source()] {
Expand Down
Loading

0 comments on commit a739003

Please sign in to comment.