Skip to content

Commit

Permalink
Merge pull request #116402 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.2-116341

release-23.2: pcr: extend retries to a few minutes
  • Loading branch information
dt committed Dec 16, 2023
2 parents 7d89a1c + 878aada commit 9a13104
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Expand Up @@ -196,11 +196,12 @@ func getRetryPolicy(knobs *sql.StreamingTestingKnobs) retry.Options {
if knobs != nil && knobs.DistSQLRetryPolicy != nil {
return *knobs.DistSQLRetryPolicy
}
return retry.Options{
InitialBackoff: time.Microsecond,
Multiplier: 1,
MaxBackoff: 2 * time.Microsecond,
MaxRetries: 20}

// This feature is potentially running over WAN network links / the public
// internet, so we want to recover on our own from hiccups that could last a
// few seconds or even minutes. Thus we allow a relatively long MaxBackoff and
// number of retries that should cause us to retry for a few minutes.
return retry.Options{MaxBackoff: 15 * time.Second, MaxRetries: 20} // 205.5s.
}

func ingestWithRetries(
Expand All @@ -222,8 +223,12 @@ func ingestWithRetries(
if jobs.IsPermanentJobError(err) || errors.Is(err, context.Canceled) {
break
}
status := redact.Sprintf("waiting before retrying error: %s", err)
updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError, status)
// If we're retrying repeatedly, update the status to reflect the error we
// are hitting.
if i := r.CurrentAttempt(); i > 5 {
status := redact.Sprintf("retrying after error on attempt %d: %s", i, err)
updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError, status)
}
newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob)
if lastReplicatedTime.Less(newReplicatedTime) {
r.Reset()
Expand Down

0 comments on commit 9a13104

Please sign in to comment.