Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Round Times: Period 0 deadline timeout #5850

Merged
merged 5 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions agreement/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ type ensureAction struct {
Payload proposal
// the certificate proving commitment
Certificate Certificate

// The time that the winning proposal-vote was validated, relative to the beginning of the round
// The time that the winning proposal-vote was validated for round credentialRoundLag back from the current one
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
voteValidatedAt time.Duration
// The dynamic filter timeout calculated for this round, even if not enabled, for reporting to telemetry.
dynamicFilterTimeout time.Duration
Expand Down
12 changes: 10 additions & 2 deletions agreement/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (p *player) handle(r routerHandle, e event) []action {
switch p.Step {
case soft:
// precondition: nap = false
actions = p.issueSoftVote(r)
actions = p.issueSoftVote(r, e)
p.Step = cert
// update tracer state to match player
r.t.setMetadata(tracerMetadata{p.Round, p.Period, p.Step})
Expand Down Expand Up @@ -158,8 +158,16 @@ func (p *player) handleFastTimeout(r routerHandle, e timeoutEvent) []action {
return p.issueFastVote(r)
}

func (p *player) issueSoftVote(r routerHandle) (actions []action) {
func (p *player) issueSoftVote(r routerHandle, te timeoutEvent) (actions []action) {
defer func() {
var deadlineTimeout time.Duration
if te.Proto.Version == "" || te.Proto.Err != nil {
r.t.log.Errorf("failed to read valid protocol version for timeout event (proto %v): %v. "+
"Falling Back to default deadline timeout.", te.Proto.Version, te.Proto.Err)
deadlineTimeout = defaultDeadlineTimeout
zeldovich marked this conversation as resolved.
Show resolved Hide resolved
} else {
deadlineTimeout = DeadlineTimeout(p.Period, te.Proto.Version)
}
p.Deadline = Deadline{Duration: deadlineTimeout, Type: TimeoutDeadline}
}()

Expand Down
32 changes: 16 additions & 16 deletions agreement/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ func TestAgreementFastRecoveryDownMiss(t *testing.T) {
triggerGlobalTimeout(FilterTimeout(0, version), TimeoutFilter, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(0, TimeoutFastRecovery, clocks, activityMonitor) // activates fast partition recovery timer
Expand Down Expand Up @@ -1435,7 +1435,7 @@ func TestAgreementFastRecoveryLate(t *testing.T) {
}
}

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(0, TimeoutFastRecovery, clocks, activityMonitor) // activates fast partition recovery timer
Expand Down Expand Up @@ -1548,7 +1548,7 @@ func TestAgreementFastRecoveryRedo(t *testing.T) {
}
}

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(0, TimeoutFastRecovery, clocks, activityMonitor) // activates fast partition recovery timer
Expand Down Expand Up @@ -1589,7 +1589,7 @@ func TestAgreementFastRecoveryRedo(t *testing.T) {
triggerGlobalTimeout(FilterTimeout(1, version), TimeoutFilter, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(1, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(0, TimeoutFastRecovery, clocks, activityMonitor) // activates fast partition recovery timer
Expand Down Expand Up @@ -1681,7 +1681,7 @@ func TestAgreementBlockReplayBug_b29ea57(t *testing.T) {
triggerGlobalTimeout(FilterTimeout(0, version), TimeoutFilter, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
}

Expand All @@ -1690,7 +1690,7 @@ func TestAgreementBlockReplayBug_b29ea57(t *testing.T) {
triggerGlobalTimeout(FilterTimeout(1, version), TimeoutFilter, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(1, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
}

Expand Down Expand Up @@ -1743,7 +1743,7 @@ func TestAgreementLateCertBug(t *testing.T) {
closeFn()
baseNetwork.repairAll()

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
}

Expand Down Expand Up @@ -1819,7 +1819,7 @@ func TestAgreementRecoverGlobalStartingValue(t *testing.T) {
}
}

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
require.Equal(t, 4, int(zeroes))
}
Expand All @@ -1846,7 +1846,7 @@ func TestAgreementRecoverGlobalStartingValue(t *testing.T) {
}
}

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(1, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
require.Equal(t, 5, int(zeroes))
}
Expand Down Expand Up @@ -1924,7 +1924,7 @@ func TestAgreementRecoverGlobalStartingValueBadProposal(t *testing.T) {
}
return params
})
triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
require.Equal(t, 4, int(zeroes))
}
Expand All @@ -1950,7 +1950,7 @@ func TestAgreementRecoverGlobalStartingValueBadProposal(t *testing.T) {
panic(errstr)
}
}
triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(1, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)

}
Expand Down Expand Up @@ -2025,7 +2025,7 @@ func TestAgreementRecoverBothVAndBotQuorums(t *testing.T) {
}
// generate a bottom quorum; let only one node see it.
baseNetwork.crown(0)
triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
if clocks[0].(*testingClock).zeroes != zeroes+1 {
errstr := fmt.Sprintf("node 0 did not enter new period from bot quorum")
panic(errstr)
Expand Down Expand Up @@ -2076,7 +2076,7 @@ func TestAgreementRecoverBothVAndBotQuorums(t *testing.T) {
}
}

triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(1, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
}

Expand Down Expand Up @@ -2190,7 +2190,7 @@ func TestAgreementSlowPayloadsPostDeadline(t *testing.T) {
{
triggerGlobalTimeout(FilterTimeout(0, version), TimeoutFilter, clocks, activityMonitor)
zeroes = expectNoNewPeriod(clocks, zeroes)
triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
}

Expand Down Expand Up @@ -2251,7 +2251,7 @@ func TestAgreementLargePeriods(t *testing.T) {
zeroes = expectNoNewPeriod(clocks, zeroes)

baseNetwork.repairAll()
triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(period(p), version), TimeoutDeadline, clocks, activityMonitor)
zeroes = expectNewPeriod(clocks, zeroes)
require.Equal(t, 4+p, int(zeroes))
}
Expand Down Expand Up @@ -2363,7 +2363,7 @@ func TestAgreementRegression_WrongPeriodPayloadVerificationCancellation_8ba23942
// release proposed blocks in a controlled manner to prevent oversubscription of verification
pocket1 := make(chan multicastParams, 100)
closeFn = baseNetwork.pocketAllCompound(pocket1)
triggerGlobalTimeout(deadlineTimeout, TimeoutDeadline, clocks, activityMonitor)
triggerGlobalTimeout(DeadlineTimeout(0, version), TimeoutDeadline, clocks, activityMonitor)
baseNetwork.repairAll()
close(pocket1)
{
Expand Down
22 changes: 15 additions & 7 deletions agreement/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
Type TimeoutType
}

var deadlineTimeout = config.Protocol.BigLambda + config.Protocol.SmallLambda
var defaultDeadlineTimeout = config.Protocol.BigLambda + config.Protocol.SmallLambda
var partitionStep = next + 3
var recoveryExtraTimeout = config.Protocol.SmallLambda

Expand All @@ -63,9 +63,17 @@
return config.Consensus[v].AgreementFilterTimeout
}

// DeadlineTimeout is the duration of the second agreement step.
func DeadlineTimeout() time.Duration {
return deadlineTimeout
// DeadlineTimeout is the duration of the second agreement step, varying based on period and consensus version.
func DeadlineTimeout(p period, v protocol.ConsensusVersion) time.Duration {
if p == 0 {
return config.Consensus[v].AgreementDeadlineTimeoutPeriod0
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
}
return defaultDeadlineTimeout
}

// DefaultDeadlineTimeout is the default duration of the second agreement step.
func DefaultDeadlineTimeout() time.Duration {
return defaultDeadlineTimeout

Check warning on line 76 in agreement/types.go

View check run for this annotation

Codecov / codecov/patch

agreement/types.go#L75-L76

Added lines #L75 - L76 were not covered by tests
}

type (
Expand Down Expand Up @@ -93,9 +101,9 @@
)

func (s step) nextVoteRanges() (lower, upper time.Duration) {
extra := recoveryExtraTimeout // eg 2000 ms
lower = deadlineTimeout // eg 17000 ms (15000 + 2000)
upper = lower + extra // eg 19000 ms
extra := recoveryExtraTimeout // eg 2000 ms
lower = defaultDeadlineTimeout // eg 17000 ms (15000 + 2000)
upper = lower + extra // eg 19000 ms
gmalouf marked this conversation as resolved.
Show resolved Hide resolved

for i := next; i < s; i++ {
extra *= 2
Expand Down
16 changes: 8 additions & 8 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
net network.GossipNode
auth BlockAuthenticator
parallelBlocks uint64
deadlineTimeout time.Duration
roundTimeEstimate time.Duration
prevBlockFetchTime time.Time
blockValidationPool execpool.BacklogPool

Expand Down Expand Up @@ -146,7 +146,7 @@
s.unmatchedPendingCertificates = unmatchedPendingCertificates
s.log = log.With("Context", "sync")
s.parallelBlocks = config.CatchupParallelBlocks
s.deadlineTimeout = agreement.DeadlineTimeout()
s.roundTimeEstimate = agreement.DefaultDeadlineTimeout()
s.blockValidationPool = blockValidationPool
s.syncNow = make(chan struct{}, 1)

Expand Down Expand Up @@ -556,11 +556,11 @@

// if ledger is busy, pause for some time to let the fetchAndWrite goroutines to finish fetching in-flight blocks.
start := time.Now()
for (s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas()) && time.Since(start) < s.deadlineTimeout {
for (s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas()) && time.Since(start) < s.roundTimeEstimate {
time.Sleep(100 * time.Millisecond)
}

// if ledger is still busy after s.deadlineTimeout timeout then abort the current pipelinedFetch invocation.
// if ledger is still busy after s.roundTimeEstimate timeout then abort the current pipelinedFetch invocation.

// if we're writing a catchpoint file, stop catching up to reduce the memory pressure. Once we finish writing the file we
// could resume with the catchup.
Expand Down Expand Up @@ -616,7 +616,7 @@
s.sync()
}
stuckInARow := 0
sleepDuration := s.deadlineTimeout
sleepDuration := s.roundTimeEstimate
for {
currBlock := s.ledger.LastRound()
select {
Expand All @@ -627,7 +627,7 @@
stuckInARow = 0
// go to sleep for a short while, for a random duration.
// we want to sleep for a random duration since it would "de-syncronize" us from the ledger advance sync
sleepDuration = time.Duration(crypto.RandUint63()) % s.deadlineTimeout
sleepDuration = time.Duration(crypto.RandUint63()) % s.roundTimeEstimate

Check warning on line 630 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L630

Added line #L630 was not covered by tests
continue
case <-s.syncNow:
if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas() {
Expand All @@ -637,8 +637,8 @@
s.log.Info("Immediate resync triggered; resyncing")
s.sync()
case <-time.After(sleepDuration):
if sleepDuration < s.deadlineTimeout || s.cfg.DisableNetworking {
sleepDuration = s.deadlineTimeout
if sleepDuration < s.roundTimeEstimate || s.cfg.DisableNetworking {
sleepDuration = s.roundTimeEstimate

Check warning on line 641 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L641

Added line #L641 was not covered by tests
continue
}
// if the catchup is disabled in the config file, just skip it.
Expand Down
20 changes: 10 additions & 10 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestSyncRound(t *testing.T) {
localCfg := config.GetDefaultLocal()
s := MakeService(logging.Base(), localCfg, net, local, auth, nil, nil)
s.log = &periodicSyncLogger{Logger: logging.Base()}
s.deadlineTimeout = 2 * time.Second
s.roundTimeEstimate = 2 * time.Second

// Set disable round success
err = s.SetDisableSyncRound(3)
Expand All @@ -246,14 +246,14 @@ func TestSyncRound(t *testing.T) {
s.Start()
defer s.Stop()
// wait past the initial sync - which is known to fail due to the above "auth"
time.Sleep(s.deadlineTimeout*2 - 200*time.Millisecond)
time.Sleep(s.roundTimeEstimate*2 - 200*time.Millisecond)
require.Equal(t, initialLocalRound, local.LastRound())
auth.alter(-1, false)

// wait until the catchup is done. Since we've might have missed the sleep window, we need to wait
// until the synchronization is complete.
waitStart := time.Now()
for time.Since(waitStart) < 2*s.deadlineTimeout {
for time.Since(waitStart) < 2*s.roundTimeEstimate {
if remote.LastRound() == local.LastRound() {
break
}
Expand All @@ -276,7 +276,7 @@ func TestSyncRound(t *testing.T) {
s.UnsetDisableSyncRound()
// wait until the catchup is done
waitStart = time.Now()
for time.Since(waitStart) < 8*s.deadlineTimeout {
for time.Since(waitStart) < 8*s.roundTimeEstimate {
if remote.LastRound() == local.LastRound() {
break
}
Expand Down Expand Up @@ -326,19 +326,19 @@ func TestPeriodicSync(t *testing.T) {
// Make Service
s := MakeService(logging.Base(), defaultConfig, net, local, auth, nil, nil)
s.log = &periodicSyncLogger{Logger: logging.Base()}
s.deadlineTimeout = 2 * time.Second
s.roundTimeEstimate = 2 * time.Second

s.Start()
defer s.Stop()
// wait past the initial sync - which is known to fail due to the above "auth"
time.Sleep(s.deadlineTimeout*2 - 200*time.Millisecond)
time.Sleep(s.roundTimeEstimate*2 - 200*time.Millisecond)
require.Equal(t, initialLocalRound, local.LastRound())
auth.alter(-1, false)

// wait until the catchup is done. Since we've might have missed the sleep window, we need to wait
// until the synchronization is complete.
waitStart := time.Now()
for time.Since(waitStart) < 10*s.deadlineTimeout {
for time.Since(waitStart) < 10*s.roundTimeEstimate {
if remote.LastRound() == local.LastRound() {
break
}
Expand Down Expand Up @@ -717,7 +717,7 @@ func helperTestOnSwitchToUnSupportedProtocol(

// Make Service
s := MakeService(logging.Base(), config, net, local, &mockedAuthenticator{errorRound: -1}, nil, nil)
s.deadlineTimeout = 2 * time.Second
s.roundTimeEstimate = 2 * time.Second
s.Start()
defer s.Stop()

Expand Down Expand Up @@ -1198,7 +1198,7 @@ func TestServiceLedgerUnavailable(t *testing.T) {
cfg.CatchupParallelBlocks = 2
s := MakeService(logging.Base(), cfg, net, local, auth, nil, nil)
s.log = &periodicSyncLogger{Logger: logging.Base()}
s.deadlineTimeout = 2 * time.Second
s.roundTimeEstimate = 2 * time.Second

s.testStart()
defer s.Stop()
Expand Down Expand Up @@ -1245,7 +1245,7 @@ func TestServiceNoBlockForRound(t *testing.T) {
s := MakeService(logging.Base(), cfg, net, local, auth, nil, nil)
pl := &periodicSyncDebugLogger{periodicSyncLogger: periodicSyncLogger{Logger: logging.Base()}}
s.log = pl
s.deadlineTimeout = 1 * time.Second
s.roundTimeEstimate = 1 * time.Second

s.testStart()
defer s.Stop()
Expand Down
9 changes: 7 additions & 2 deletions config/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ type ConsensusParams struct {
// time for nodes to wait for block proposal headers for period = 0, value should be configured to suit best case
// critical path
AgreementFilterTimeoutPeriod0 time.Duration
// Duration of the second agreement step for period=0, value should be configured to suit best case critical path
AgreementDeadlineTimeoutPeriod0 time.Duration

FastRecoveryLambda time.Duration // time between fast recovery attempts

Expand Down Expand Up @@ -848,8 +850,9 @@ func initConsensusProtocols() {
DownCommitteeSize: 10000,
DownCommitteeThreshold: 7750,

AgreementFilterTimeout: 4 * time.Second,
AgreementFilterTimeoutPeriod0: 4 * time.Second,
AgreementFilterTimeout: 4 * time.Second,
AgreementFilterTimeoutPeriod0: 4 * time.Second,
AgreementDeadlineTimeoutPeriod0: Protocol.BigLambda + Protocol.SmallLambda,

FastRecoveryLambda: 5 * time.Minute,

Expand Down Expand Up @@ -1389,6 +1392,8 @@ func initConsensusProtocols() {
vFuture.LogicSigVersion = 10 // When moving this to a release, put a new higher LogicSigVersion here
vFuture.EnableLogicSigCostPooling = true

vFuture.AgreementDeadlineTimeoutPeriod0 = 4 * time.Second

vFuture.StateProofBlockHashInLightHeader = true

// Setting DynamicFilterTimeout in vFuture will impact e2e test performance
Expand Down