Skip to content

Commit

Permalink
Ignore equivocating and duplicate messages from a sender.
Browse files Browse the repository at this point in the history
  • Loading branch information
anorth committed May 14, 2024
1 parent 93b1910 commit 0014efe
Showing 1 changed file with 64 additions and 28 deletions.
92 changes: 64 additions & 28 deletions gpbft/gpbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,7 @@ func (i *instance) receiveOne(msg *GMessage) error {
switch msg.Vote.Step {
case QUALITY_PHASE:
// Receive each prefix of the proposal independently.
for j := range msg.Vote.Value.Suffix() {
prefix := msg.Vote.Value.Prefix(j + 1)
i.quality.Receive(msg.Sender, prefix, msg.Signature)
}
i.quality.ReceiveEachPrefix(msg.Sender, msg.Vote.Value)
case CONVERGE_PHASE:
if err := round.converged.Receive(msg.Sender, msg.Vote.Value, msg.Ticket, msg.Justification); err != nil {
return fmt.Errorf("failed processing CONVERGE message: %w", err)
Expand Down Expand Up @@ -856,7 +853,8 @@ func (i *instance) sign(msg []byte) ([]byte, error) {

// Accumulates values from a collection of senders and incrementally calculates
// which values have reached a strong quorum of support.
// Supports receiving multiple values from each sender, and hence multiple strong quorum values.
// Supports receiving multiple values from a sender at once, and hence multiple strong quorum values.
// Subsequent messages from a single sender are dropped.
type quorumState struct {
// Set of senders from which a message has been received.
senders map[ActorID]struct{}
Expand Down Expand Up @@ -890,39 +888,63 @@ func newQuorumState(powerTable PowerTable) *quorumState {
}
}

// Receives a new chain from a sender.
// Receives a chain from a sender.
// Ignores any subsequent value from a sender from which a value has already been received.
func (q *quorumState) Receive(sender ActorID, value ECChain, signature []byte) {
senderPower, _ := q.powerTable.Get(sender)
senderPower, ok := q.receiveSender(sender)
if !ok {
return
}
q.receiveInner(sender, value, senderPower, signature)
}

// Add sender's power to total the first time a value is senders from them.
if _, ok := q.senders[sender]; !ok {
q.senders[sender] = struct{}{}
q.sendersTotalPower.Add(q.sendersTotalPower, senderPower)
// Receives each prefix of a chain as a distinct value from a sender.
// Note that this method does not store signatures, so it is not possible later to
// create an aggregate for these prefixes.
// This is intended for use in the QUALITY phase.
// Ignores any subsequent values from a sender from which a value has already been received.
func (q *quorumState) ReceiveEachPrefix(sender ActorID, values ECChain) {
senderPower, ok := q.receiveSender(sender)
if !ok {
return
}
for j := range values.Suffix() {
prefix := values.Prefix(j + 1)
q.receiveInner(sender, prefix, senderPower, nil)
}
}

// Adds sender's power to total the first time a value is received from them.
// Returns the sender's power, and whether this was the first invocation for this sender.
func (q *quorumState) receiveSender(sender ActorID) (*StoragePower, bool) {
if _, found := q.senders[sender]; found {
return nil, false
}
q.senders[sender] = struct{}{}
senderPower, _ := q.powerTable.Get(sender)
q.sendersTotalPower.Add(q.sendersTotalPower, senderPower)
return senderPower, true
}

// Receives a chain from a sender.
func (q *quorumState) receiveInner(sender ActorID, value ECChain, power *StoragePower, signature []byte) {
key := value.Key()
candidate, ok := q.chainSupport[key]
if ok {
// Don't double-count the same chain for a single participant.
if _, ok := candidate.signatures[sender]; ok {
return
}
} else {
if !ok {
candidate = chainSupport{
chain: value,
power: NewStoragePower(0),
signatures: map[ActorID][]byte{},
hasStrongQuorum: false,
hasWeakQuorum: false,
}
q.chainSupport[key] = candidate
}

candidate.power.Add(candidate.power, power)
if candidate.signatures[sender] != nil {
panic("duplicate message should have been dropped")
}
candidate.signatures[sender] = signature

// Add sender's power to the chain's total power.
candidate.power.Add(candidate.power, senderPower)

candidate.hasStrongQuorum = hasStrongQuorum(candidate.power, q.powerTable.Total)
candidate.hasWeakQuorum = hasWeakQuorum(candidate.power, q.powerTable.Total)
q.chainSupport[key] = candidate
Expand All @@ -933,7 +955,11 @@ func (q *quorumState) ReceiveJustification(value ECChain, justification *Justifi
if justification == nil {
panic("nil justification")
}
q.receivedJustification[value.Key()] = justification
// Keep only the first one received.
key := value.Key()
if _, ok := q.receivedJustification[key]; !ok {
q.receivedJustification[key] = justification
}
}

// Lists all values that have been senders from any sender.
Expand Down Expand Up @@ -967,6 +993,7 @@ type QuorumResult struct {
func (q QuorumResult) Aggregate(v Verifier) ([]byte, error) {
return v.Aggregate(q.PubKeys, q.Signatures)
}

func (q QuorumResult) SignersBitfield() bitfield.BitField {
signers := make([]uint64, 0, len(q.Signers))
for _, s := range q.Signers {
Expand Down Expand Up @@ -1072,7 +1099,9 @@ func (q *quorumState) FindStrongQuorumValue() (quorumValue ECChain, foundQuorum
//// CONVERGE phase helper /////

type convergeState struct {
// Chains indexed by key
// Participants from which a message has been received.
senders map[ActorID]struct{}
// Chains indexed by key.
values map[ChainKey]ConvergeValue
// Tickets provided by proposers of each chain.
tickets map[ChainKey][]ConvergeTicket
Expand All @@ -1090,22 +1119,29 @@ type ConvergeTicket struct {

func newConvergeState() *convergeState {
return &convergeState{
senders: map[ActorID]struct{}{},
values: map[ChainKey]ConvergeValue{},
tickets: map[ChainKey][]ConvergeTicket{},
}
}

// Receives a new CONVERGE value from a sender.
// Ignores any subsequent value from a sender from which a value has already been received.
func (c *convergeState) Receive(sender ActorID, value ECChain, ticket Ticket, justification *Justification) error {
if _, ok := c.senders[sender]; ok {
return nil
}
c.senders[sender] = struct{}{}
if value.IsZero() {
return fmt.Errorf("bottom cannot be justified for CONVERGE")
}
key := value.Key()

// !!! TODO: Drop duplicates (see https://github.com/filecoin-project/go-f3/issues/12)
c.values[key] = ConvergeValue{Chain: value, Justification: justification}
c.tickets[key] = append(c.tickets[key], ConvergeTicket{Sender: sender, Ticket: ticket})

// Keep only the first justification and ticket received for a value.
if _, found := c.values[key]; !found {
c.values[key] = ConvergeValue{Chain: value, Justification: justification}
c.tickets[key] = append(c.tickets[key], ConvergeTicket{Sender: sender, Ticket: ticket})
}
return nil
}

Expand Down

0 comments on commit 0014efe

Please sign in to comment.