-
Notifications
You must be signed in to change notification settings - Fork 13
Avoid loading round state for dropped messages #300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -245,10 +245,15 @@ func (i *instance) Receive(msg *GMessage) error { | |
| if i.terminated() { | ||
| return ErrReceivedAfterTermination | ||
| } | ||
| if err := i.receiveOne(msg); err != nil { | ||
| stateChanged, err := i.receiveOne(msg) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| i.postReceive(msg.Vote.Round) | ||
| if stateChanged { | ||
| // Further process the message's round only if it may have had an effect. | ||
| // This avoids loading state for dropped messages (including spam). | ||
| i.postReceive(msg.Vote.Round) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -262,15 +267,18 @@ func (i *instance) ReceiveMany(msgs []*GMessage) error { | |
| // Received each message and remember which rounds were received. | ||
| roundsReceived := map[uint64]struct{}{} | ||
| for _, msg := range msgs { | ||
| if err := i.receiveOne(msg); err != nil { | ||
| stateChanged, err := i.receiveOne(msg) | ||
| if err != nil { | ||
| if errors.Is(err, ErrValidationWrongBase) || errors.Is(err, ErrValidationWrongSupplement) { | ||
| // Drop late-binding validation errors. | ||
| i.log("dropping invalid message: %s", err) | ||
| } else { | ||
| return err | ||
| } | ||
| } | ||
| roundsReceived[msg.Vote.Round] = struct{}{} | ||
| if stateChanged { | ||
| roundsReceived[msg.Vote.Round] = struct{}{} | ||
| } | ||
| } | ||
| // Build unique, ordered list of rounds received. | ||
| rounds := make([]uint64, 0, len(roundsReceived)) | ||
|
|
@@ -294,90 +302,90 @@ func (i *instance) Describe() string { | |
| } | ||
|
|
||
| // Processes a single message. | ||
| func (i *instance) receiveOne(msg *GMessage) error { | ||
| // Returns true if the message might have caused a change in state. | ||
| func (i *instance) receiveOne(msg *GMessage) (bool, error) { | ||
| // Check the message is for this instance, to guard against programming error. | ||
| if msg.Vote.Instance != i.instanceID { | ||
| return fmt.Errorf("%w: message for instance %d, expected %d", | ||
| return false, fmt.Errorf("%w: message for instance %d, expected %d", | ||
| ErrReceivedWrongInstance, msg.Vote.Instance, i.instanceID) | ||
| } | ||
| // Perform validation that could not be done until the instance started. | ||
| // Check supplemental data matches this instance's expectation. | ||
| if !msg.Vote.SupplementalData.Eq(i.supplementalData) { | ||
| return fmt.Errorf("%w: message supplement %s, expected %s", | ||
| return false, fmt.Errorf("%w: message supplement %s, expected %s", | ||
| ErrValidationWrongSupplement, msg.Vote.SupplementalData, i.supplementalData) | ||
| } | ||
| // Check proposal has the expected base chain. | ||
| if !(msg.Vote.Value.IsZero() || msg.Vote.Value.HasBase(i.input.Base())) { | ||
| return fmt.Errorf("%w: message base %s, expected %s", | ||
| return false, fmt.Errorf("%w: message base %s, expected %s", | ||
| ErrValidationWrongBase, &msg.Vote.Value, i.input.Base()) | ||
| } | ||
|
|
||
| if i.phase == TERMINATED_PHASE { | ||
| return nil // No-op | ||
| return false, nil // No-op | ||
| } | ||
| // Ignore QUALITY messages after exiting the QUALITY phase. | ||
| // Ignore CONVERGE and PREPARE messages for prior rounds. | ||
| forPriorRound := msg.Vote.Round < i.round | ||
| if (msg.Vote.Step == QUALITY_PHASE && i.phase != QUALITY_PHASE) || | ||
| (forPriorRound && msg.Vote.Step == CONVERGE_PHASE) || | ||
| (forPriorRound && msg.Vote.Step == PREPARE_PHASE) { | ||
| return nil | ||
| return false, nil | ||
| } | ||
|
|
||
| // Drop message that: | ||
| // * belong to future rounds, beyond the configured max lookahead threshold, and | ||
| // * carry no justification, i.e. are spammable. | ||
| beyondMaxLookaheadRounds := msg.Vote.Round > i.round+i.participant.maxLookaheadRounds | ||
| if beyondMaxLookaheadRounds && isSpammable(msg) { | ||
| return nil | ||
| return false, nil | ||
| } | ||
|
|
||
| round := i.roundState(msg.Vote.Round) | ||
| // Load the round state and process further only valid, non-spammable messages. | ||
| // Equivocations are handled by the quorum state. | ||
| msgRound := i.getRound(msg.Vote.Round) | ||
| switch msg.Vote.Step { | ||
| case QUALITY_PHASE: | ||
| // Receive each prefix of the proposal independently. | ||
| i.quality.ReceiveEachPrefix(msg.Sender, msg.Vote.Value) | ||
| case CONVERGE_PHASE: | ||
| if err := round.converged.Receive(msg.Sender, msg.Vote.Value, msg.Ticket, msg.Justification); err != nil { | ||
| return fmt.Errorf("failed processing CONVERGE message: %w", err) | ||
| if err := msgRound.converged.Receive(msg.Sender, msg.Vote.Value, msg.Ticket, msg.Justification); err != nil { | ||
| return false, fmt.Errorf("failed processing CONVERGE message: %w", err) | ||
| } | ||
| case PREPARE_PHASE: | ||
| round.prepared.Receive(msg.Sender, msg.Vote.Value, msg.Signature) | ||
| msgRound.prepared.Receive(msg.Sender, msg.Vote.Value, msg.Signature) | ||
| case COMMIT_PHASE: | ||
| round.committed.Receive(msg.Sender, msg.Vote.Value, msg.Signature) | ||
| msgRound.committed.Receive(msg.Sender, msg.Vote.Value, msg.Signature) | ||
| // The only justifications that need to be stored for future propagation are for COMMITs | ||
| // to non-bottom values. | ||
| // This evidence can be brought forward to justify a CONVERGE message in the next round. | ||
| if !msg.Vote.Value.IsZero() { | ||
| round.committed.ReceiveJustification(msg.Vote.Value, msg.Justification) | ||
| msgRound.committed.ReceiveJustification(msg.Vote.Value, msg.Justification) | ||
| } | ||
| case DECIDE_PHASE: | ||
| i.decision.Receive(msg.Sender, msg.Vote.Value, msg.Signature) | ||
| if i.phase != DECIDE_PHASE { | ||
| i.skipToDecide(msg.Vote.Value, msg.Justification) | ||
| } | ||
| if err := i.tryDecide(); err != nil { | ||
| return fmt.Errorf("failed to decide: %w", err) | ||
| } | ||
| default: | ||
| i.log("unexpected message %v", msg) | ||
| return false, fmt.Errorf("unexpected message step %s", msg.Vote.Step) | ||
| } | ||
|
|
||
| // Every COMMIT phase stays open to new messages even after the protocol moves on to | ||
| // a new round. Late-arriving COMMITS can still (must) cause a local decision, *in that round*. | ||
| // Try to complete the COMMIT phase for the round specified by the message. | ||
| if msg.Vote.Step == COMMIT_PHASE && i.phase != DECIDE_PHASE { | ||
| return i.tryCommit(msg.Vote.Round) | ||
| return true, i.tryCommit(msg.Vote.Round) | ||
| } | ||
| // Try to complete the current phase in the current round. | ||
| return i.tryCurrentPhase() | ||
| return true, i.tryCurrentPhase() | ||
| } | ||
|
|
||
| func (i *instance) postReceive(roundsReceived ...uint64) { | ||
| // Check whether the instance should skip ahead to future round, in descending order. | ||
| slices.Reverse(roundsReceived) | ||
| for _, r := range roundsReceived { | ||
| round := i.roundState(r) | ||
| round := i.getRound(r) | ||
| if chain, justification, skip := i.shouldSkipToRound(r, round); skip { | ||
| i.skipToRound(r, chain, justification) | ||
| return | ||
|
|
@@ -391,22 +399,21 @@ func (i *instance) postReceive(roundsReceived ...uint64) { | |
| // | ||
| // See: skipToRound. | ||
| func (i *instance) shouldSkipToRound(round uint64, state *roundState) (ECChain, *Justification, bool) { | ||
|
|
||
| // Check if the given round is ahead of current round and this instance is not in | ||
| // DECIDE phase. | ||
| if round <= i.round || i.phase == DECIDE_PHASE { | ||
| return nil, nil, false | ||
| } | ||
| if !state.prepared.ReceivedFromWeakQuorum() { | ||
| return nil, nil, false | ||
| } | ||
|
Comment on lines
+407
to
+409
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved this cheaper and harder-to-meet check to happen first |
||
| proposal := state.converged.FindMaxTicketProposal(i.powerTable) | ||
| if proposal.Justification == nil { | ||
| // FindMaxTicketProposal returns a zero-valued ConvergeValue if no such ticket is | ||
| // found. Hence the check for nil. Otherwise, if found such ConvergeValue must | ||
| // have a non-nil justification. | ||
| return nil, nil, false | ||
| } | ||
| if !state.prepared.ReceivedFromWeakQuorum() { | ||
| return nil, nil, false | ||
| } | ||
| return proposal.Chain, proposal.Justification, true | ||
| } | ||
|
|
||
|
|
@@ -644,8 +651,8 @@ func (i *instance) tryConverge() error { | |
| return nil | ||
| } | ||
|
|
||
| possibleDecisionLastRound := !i.roundState(i.round - 1).committed.HasStrongQuorumFor("") | ||
| winner := i.roundState(i.round).converged.FindMaxTicketProposal(i.powerTable) | ||
| possibleDecisionLastRound := !i.getRound(i.round - 1).committed.HasStrongQuorumFor("") | ||
| winner := i.getRound(i.round).converged.FindMaxTicketProposal(i.powerTable) | ||
| if winner.Chain.IsZero() { | ||
| return fmt.Errorf("no values at CONVERGE") | ||
| } | ||
|
|
@@ -661,7 +668,7 @@ func (i *instance) tryConverge() error { | |
| i.log("adopting proposal %s after converge", &winner.Chain) | ||
| } else { | ||
| // Else preserve own proposal. | ||
| fallback, ok := i.roundState(i.round).converged.FindProposalFor(i.proposal) | ||
| fallback, ok := i.getRound(i.round).converged.FindProposalFor(i.proposal) | ||
| if !ok { | ||
| panic("own proposal not found at CONVERGE") | ||
| } | ||
|
|
@@ -690,7 +697,7 @@ func (i *instance) tryPrepare() error { | |
| return fmt.Errorf("unexpected phase %s, expected %s", i.phase, PREPARE_PHASE) | ||
| } | ||
|
|
||
| prepared := i.roundState(i.round).prepared | ||
| prepared := i.getRound(i.round).prepared | ||
| // Optimisation: we could advance phase once a strong quorum on our proposal is not possible. | ||
| foundQuorum := prepared.HasStrongQuorumFor(i.proposal.Key()) | ||
| timedOut := atOrAfter(i.participant.host.Time(), i.phaseTimeout) && prepared.ReceivedFromStrongQuorum() | ||
|
|
@@ -717,7 +724,7 @@ func (i *instance) beginCommit() { | |
| // No justification is required for committing bottom. | ||
| var justification *Justification | ||
| if !i.value.IsZero() { | ||
| if quorum, ok := i.roundState(i.round).prepared.FindStrongQuorumFor(i.value.Key()); ok { | ||
| if quorum, ok := i.getRound(i.round).prepared.FindStrongQuorumFor(i.value.Key()); ok { | ||
| // Found a strong quorum of PREPARE, build the justification for it. | ||
| justification = i.buildJustification(quorum, i.round, PREPARE_PHASE, i.value) | ||
| } else { | ||
|
|
@@ -732,7 +739,7 @@ func (i *instance) tryCommit(round uint64) error { | |
| // Unlike all other phases, the COMMIT phase stays open to new messages even after an initial quorum is reached, | ||
| // and the algorithm moves on to the next round. | ||
| // A subsequent COMMIT message can cause the node to decide, so there is no check on the current phase. | ||
| committed := i.roundState(round).committed | ||
| committed := i.getRound(round).committed | ||
| quorumValue, foundStrongQuorum := committed.FindStrongQuorumValue() | ||
| timedOut := atOrAfter(i.participant.host.Time(), i.phaseTimeout) && committed.ReceivedFromStrongQuorum() | ||
|
|
||
|
|
@@ -771,11 +778,9 @@ func (i *instance) tryCommit(round uint64) error { | |
|
|
||
| func (i *instance) beginDecide(round uint64) { | ||
| i.phase = DECIDE_PHASE | ||
| roundState := i.roundState(round) | ||
|
|
||
| var justification *Justification | ||
| // Value cannot be empty here. | ||
| if quorum, ok := roundState.committed.FindStrongQuorumFor(i.value.Key()); ok { | ||
| if quorum, ok := i.getRound(round).committed.FindStrongQuorumFor(i.value.Key()); ok { | ||
| // Build justification for strong quorum of COMMITs for the value. | ||
| justification = i.buildJustification(quorum, round, COMMIT_PHASE, i.value) | ||
| } else { | ||
|
|
@@ -814,7 +819,7 @@ func (i *instance) tryDecide() error { | |
| return nil | ||
| } | ||
|
|
||
| func (i *instance) roundState(r uint64) *roundState { | ||
| func (i *instance) getRound(r uint64) *roundState { | ||
| round, ok := i.rounds[r] | ||
| if !ok { | ||
| round = newRoundState(i.powerTable) | ||
|
|
@@ -827,17 +832,17 @@ func (i *instance) beginNextRound() { | |
| i.round += 1 | ||
| i.log("moving to round %d with %s", i.round, i.proposal.String()) | ||
|
|
||
| prevRoundState := i.roundState(i.round - 1) | ||
| prevRound := i.getRound(i.round - 1) | ||
| // Proposal was updated at the end of COMMIT phase to be some value for which | ||
| // this node received a COMMIT message (bearing justification), if there were any. | ||
| // If there were none, there must have been a strong quorum for bottom instead. | ||
| var justification *Justification | ||
| if quorum, ok := prevRoundState.committed.FindStrongQuorumFor(""); ok { | ||
| if quorum, ok := prevRound.committed.FindStrongQuorumFor(""); ok { | ||
| // Build justification for strong quorum of COMMITs for bottom in the previous round. | ||
| justification = i.buildJustification(quorum, i.round-1, COMMIT_PHASE, ECChain{}) | ||
| } else { | ||
| // Extract the justification received from some participant (possibly this node itself). | ||
| justification, ok = prevRoundState.committed.receivedJustification[i.proposal.Key()] | ||
| justification, ok = prevRound.committed.receivedJustification[i.proposal.Key()] | ||
| if !ok { | ||
| panic("beginConverge called but no justification for proposal") | ||
| } | ||
|
|
@@ -1226,13 +1231,13 @@ func newConvergeState() *convergeState { | |
| // Receives a new CONVERGE value from a sender. | ||
| // Ignores any subsequent value from a sender from which a value has already been received. | ||
| func (c *convergeState) Receive(sender ActorID, value ECChain, ticket Ticket, justification *Justification) error { | ||
| if value.IsZero() { | ||
| return fmt.Errorf("bottom cannot be justified for CONVERGE") | ||
| } | ||
| if _, ok := c.senders[sender]; ok { | ||
| return nil | ||
| } | ||
| c.senders[sender] = struct{}{} | ||
| if value.IsZero() { | ||
| return fmt.Errorf("bottom cannot be justified for CONVERGE") | ||
| } | ||
| key := value.Key() | ||
|
|
||
| // Keep only the first justification and ticket received for a value. | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.