Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Round Times: Period 0 deadline timeout #5850

Merged
merged 5 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions agreement/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ type ensureAction struct {
Payload proposal
// the certificate proving commitment
Certificate Certificate

// The time that the winning proposal-vote was validated, relative to the beginning of the round
// The time that the winning proposal-vote was validated for round credentialRoundLag back from the current one
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
voteValidatedAt time.Duration
// The dynamic filter timeout calculated for this round, even if not enabled, for reporting to telemetry.
dynamicFilterTimeout time.Duration
Expand Down
23 changes: 16 additions & 7 deletions agreement/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,19 @@ func (p *player) handle(r routerHandle, e event) []action {
r.t.logTimeout(*p)
}

var deadlineTimeout time.Duration
if e.Proto.Version == "" || e.Proto.Err != nil {
r.t.log.Errorf("failed to read valid protocol version for timeout event (proto %v): %v. "+
"Falling Back to default deadline timeout.", e.Proto.Version, e.Proto.Err)
deadlineTimeout = DefaultDeadlineTimeout()
} else {
deadlineTimeout = DeadlineTimeout(p.Period, e.Proto.Version)
}

switch p.Step {
case soft:
// precondition: nap = false
actions = p.issueSoftVote(r)
actions = p.issueSoftVote(r, deadlineTimeout)
p.Step = cert
// update tracer state to match player
r.t.setMetadata(tracerMetadata{p.Round, p.Period, p.Step})
Expand All @@ -113,16 +122,16 @@ func (p *player) handle(r routerHandle, e event) []action {
p.Step = next
// update tracer state to match player
r.t.setMetadata(tracerMetadata{p.Round, p.Period, p.Step})
return p.issueNextVote(r)
return p.issueNextVote(r, deadlineTimeout)
default:
if p.Napping {
return p.issueNextVote(r) // sets p.Napping to false
return p.issueNextVote(r, deadlineTimeout) // sets p.Napping to false
}
// not napping, so we should enter a new step
p.Step++ // note: this must happen before next timeout setting.
// TODO add unit test to ensure that deadlines increase monotonically here

lower, upper := p.Step.nextVoteRanges()
lower, upper := p.Step.nextVoteRanges(deadlineTimeout)
delta := time.Duration(e.RandomEntropy % uint64(upper-lower))

p.Napping = true
Expand Down Expand Up @@ -158,7 +167,7 @@ func (p *player) handleFastTimeout(r routerHandle, e timeoutEvent) []action {
return p.issueFastVote(r)
}

func (p *player) issueSoftVote(r routerHandle) (actions []action) {
func (p *player) issueSoftVote(r routerHandle, deadlineTimeout time.Duration) (actions []action) {
defer func() {
p.Deadline = Deadline{Duration: deadlineTimeout, Type: TimeoutDeadline}
}()
Expand Down Expand Up @@ -202,7 +211,7 @@ func (p *player) issueCertVote(r routerHandle, e committableEvent) action {
return pseudonodeAction{T: attest, Round: p.Round, Period: p.Period, Step: cert, Proposal: e.Proposal}
}

func (p *player) issueNextVote(r routerHandle) []action {
func (p *player) issueNextVote(r routerHandle, deadlineTimeout time.Duration) []action {
actions := p.partitionPolicy(r)

a := pseudonodeAction{T: attest, Round: p.Round, Period: p.Period, Step: p.Step, Proposal: bottom}
Expand All @@ -226,7 +235,7 @@ func (p *player) issueNextVote(r routerHandle) []action {

r.t.timeR().RecStep(p.Period, p.Step, a.Proposal)

_, upper := p.Step.nextVoteRanges()
_, upper := p.Step.nextVoteRanges(deadlineTimeout)
p.Napping = false
p.Deadline = Deadline{Duration: upper, Type: TimeoutDeadline}
return actions
Expand Down
36 changes: 18 additions & 18 deletions agreement/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ func TestAgreementFastRecoveryDownMiss(t *testing.T) {
triggerGlobalTimeout(FilterTimeout(0, version), TimeoutFilter, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(0, TimeoutFastRecovery, clocks, activityMonitor) // activates fast partition recovery timer
Expand Down Expand Up @@ -1435,7 +1435,7 @@ func TestAgreementFastRecoveryLate(t *testing.T) {
}
}

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(0, TimeoutFastRecovery, clocks, activityMonitor) // activates fast partition recovery timer
Expand Down Expand Up @@ -1548,7 +1548,7 @@ func TestAgreementFastRecoveryRedo(t *testing.T) {
}
}

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(0, TimeoutFastRecovery, clocks, activityMonitor) // activates fast partition recovery timer
Expand Down Expand Up @@ -1589,7 +1589,7 @@ func TestAgreementFastRecoveryRedo(t *testing.T) {
triggerGlobalTimeout(FilterTimeout(1, version), TimeoutFilter, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(1, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(0, TimeoutFastRecovery, clocks, activityMonitor) // activates fast partition recovery timer
Expand Down Expand Up @@ -1681,7 +1681,7 @@ func TestAgreementBlockReplayBug_b29ea57(t *testing.T) {
triggerGlobalTimeout(FilterTimeout(0, version), TimeoutFilter, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
}

Expand All @@ -1690,7 +1690,7 @@ func TestAgreementBlockReplayBug_b29ea57(t *testing.T) {
triggerGlobalTimeout(FilterTimeout(1, version), TimeoutFilter, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(1, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
}

Expand Down Expand Up @@ -1743,7 +1743,7 @@ func TestAgreementLateCertBug(t *testing.T) {
closeFn()
baseNetwork.repairAll()

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
}

Expand Down Expand Up @@ -1819,7 +1819,7 @@ func TestAgreementRecoverGlobalStartingValue(t *testing.T) {
}
}

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
require.Equal(t, 4, int(zeroes))
}
Expand All @@ -1846,7 +1846,7 @@ func TestAgreementRecoverGlobalStartingValue(t *testing.T) {
}
}

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(1, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
require.Equal(t, 5, int(zeroes))
}
Expand Down Expand Up @@ -1924,7 +1924,7 @@ func TestAgreementRecoverGlobalStartingValueBadProposal(t *testing.T) {
}
return params
})
triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
require.Equal(t, 4, int(zeroes))
}
Expand All @@ -1950,7 +1950,7 @@ func TestAgreementRecoverGlobalStartingValueBadProposal(t *testing.T) {
panic(errstr)
}
}
triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(1, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)

}
Expand Down Expand Up @@ -2025,7 +2025,7 @@ func TestAgreementRecoverBothVAndBotQuorums(t *testing.T) {
}
// generate a bottom quorum; let only one node see it.
baseNetwork.crown(0)
triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
if clocks[0].(*testingClock).zeroes != zeroes+1 {
errstr := fmt.Sprintf("node 0 did not enter new period from bot quorum")
panic(errstr)
Expand All @@ -2043,11 +2043,11 @@ func TestAgreementRecoverBothVAndBotQuorums(t *testing.T) {
activityMonitor.waitForQuiet()

// actually create the value quorum
_, upper := (next).nextVoteRanges()
_, upper := (next).nextVoteRanges(DeadlineTimeout(0, version))
triggerGlobalTimeout(upper, TimeoutDeadline, clocks[1:], activityMonitor) // activates next timers
zeroes = expectNoNewPeriod(clocks[1:], zeroes)

lower, upper := (next + 1).nextVoteRanges()
lower, upper := (next + 1).nextVoteRanges(DeadlineTimeout(0, version))
delta := time.Duration(testingRand{}.Uint64() % uint64(upper-lower))
triggerGlobalTimeout(lower+delta, TimeoutDeadline, clocks[1:], activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
Expand Down Expand Up @@ -2076,7 +2076,7 @@ func TestAgreementRecoverBothVAndBotQuorums(t *testing.T) {
}
}

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(1, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
}

Expand Down Expand Up @@ -2190,7 +2190,7 @@ func TestAgreementSlowPayloadsPostDeadline(t *testing.T) {
{
triggerGlobalTimeout(FilterTimeout(0, version), TimeoutFilter, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)
triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
}

Expand Down Expand Up @@ -2251,7 +2251,7 @@ func TestAgreementLargePeriods(t *testing.T) {
zeroes = expectNoNewPeriod(clocks, zeroes)

baseNetwork.repairAll()
triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(period(p), version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
require.Equal(t, 4+p, int(zeroes))
}
Expand Down Expand Up @@ -2363,7 +2363,7 @@ func TestAgreementRegression_WrongPeriodPayloadVerificationCancellation_8ba23942
// release proposed blocks in a controlled manner to prevent oversubscription of verification
pocket1 := make(chan multicastParams, 100)
closeFn = baseNetwork.pocketAllCompound(pocket1)
triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
baseNetwork.repairAll()
close(pocket1)
{
Expand Down
22 changes: 15 additions & 7 deletions agreement/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Deadline struct {
Type TimeoutType
}

var deadlineTimeout = config.Protocol.BigLambda + config.Protocol.SmallLambda
var defaultDeadlineTimeout = config.Protocol.BigLambda + config.Protocol.SmallLambda
var partitionStep = next + 3
var recoveryExtraTimeout = config.Protocol.SmallLambda

Expand All @@ -63,9 +63,17 @@ func FilterTimeout(p period, v protocol.ConsensusVersion) time.Duration {
return config.Consensus[v].AgreementFilterTimeout
}

// DeadlineTimeout is the duration of the second agreement step.
func DeadlineTimeout() time.Duration {
return deadlineTimeout
// DeadlineTimeout is the duration of the second agreement step, varying based on period and consensus version.
func DeadlineTimeout(p period, v protocol.ConsensusVersion) time.Duration {
if p == 0 {
return config.Consensus[v].AgreementDeadlineTimeoutPeriod0
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
}
return defaultDeadlineTimeout
}

// DefaultDeadlineTimeout is the default duration of the second agreement step.
func DefaultDeadlineTimeout() time.Duration {
return defaultDeadlineTimeout
}

type (
Expand All @@ -92,10 +100,10 @@ const (
down
)

func (s step) nextVoteRanges() (lower, upper time.Duration) {
func (s step) nextVoteRanges(deadlineTimeout time.Duration) (lower, upper time.Duration) {
extra := recoveryExtraTimeout // eg 2000 ms
lower = deadlineTimeout // eg 17000 ms (15000 + 2000)
upper = lower + extra // eg 19000 ms
lower = deadlineTimeout // based on types.DeadlineTimeout()
upper = lower + extra

for i := next; i < s; i++ {
extra *= 2
Expand Down
16 changes: 8 additions & 8 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
net network.GossipNode
auth BlockAuthenticator
parallelBlocks uint64
deadlineTimeout time.Duration
roundTimeEstimate time.Duration
prevBlockFetchTime time.Time
blockValidationPool execpool.BacklogPool

Expand Down Expand Up @@ -146,7 +146,7 @@
s.unmatchedPendingCertificates = unmatchedPendingCertificates
s.log = log.With("Context", "sync")
s.parallelBlocks = config.CatchupParallelBlocks
s.deadlineTimeout = agreement.DeadlineTimeout()
s.roundTimeEstimate = agreement.DefaultDeadlineTimeout()
s.blockValidationPool = blockValidationPool
s.syncNow = make(chan struct{}, 1)

Expand Down Expand Up @@ -556,11 +556,11 @@

// if ledger is busy, pause for some time to let the fetchAndWrite goroutines to finish fetching in-flight blocks.
start := time.Now()
for (s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas()) && time.Since(start) < s.deadlineTimeout {
for (s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas()) && time.Since(start) < s.roundTimeEstimate {
time.Sleep(100 * time.Millisecond)
}

// if ledger is still busy after s.deadlineTimeout timeout then abort the current pipelinedFetch invocation.
// if ledger is still busy after s.roundTimeEstimate timeout then abort the current pipelinedFetch invocation.

// if we're writing a catchpoint file, stop catching up to reduce the memory pressure. Once we finish writing the file we
// could resume with the catchup.
Expand Down Expand Up @@ -616,7 +616,7 @@
s.sync()
}
stuckInARow := 0
sleepDuration := s.deadlineTimeout
sleepDuration := s.roundTimeEstimate
for {
currBlock := s.ledger.LastRound()
select {
Expand All @@ -627,7 +627,7 @@
stuckInARow = 0
// go to sleep for a short while, for a random duration.
// we want to sleep for a random duration since it would "de-syncronize" us from the ledger advance sync
sleepDuration = time.Duration(crypto.RandUint63()) % s.deadlineTimeout
sleepDuration = time.Duration(crypto.RandUint63()) % s.roundTimeEstimate

Check warning on line 630 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L630

Added line #L630 was not covered by tests
continue
case <-s.syncNow:
if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas() {
Expand All @@ -637,8 +637,8 @@
s.log.Info("Immediate resync triggered; resyncing")
s.sync()
case <-time.After(sleepDuration):
if sleepDuration < s.deadlineTimeout || s.cfg.DisableNetworking {
sleepDuration = s.deadlineTimeout
if sleepDuration < s.roundTimeEstimate || s.cfg.DisableNetworking {
sleepDuration = s.roundTimeEstimate

Check warning on line 641 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L641

Added line #L641 was not covered by tests
continue
}
// if the catchup is disabled in the config file, just skip it.
Expand Down