From 4237ed34c46a3e91607a8617530c41205215377b Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 13 Dec 2023 17:37:55 +0000 Subject: [PATCH 1/2] pcr: extend retries to a few minutes Previously we'd only retry for 20us but that is likely too short for most network issues to resolve themselves. Release note (enterprise change): PCR now retries for just over 3 minutes before failing. Epic: none. --- .../streamingccl/streamingest/stream_ingestion_job.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 7eaf5a2d2dd..a8eed68d66c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -196,11 +196,12 @@ func getRetryPolicy(knobs *sql.StreamingTestingKnobs) retry.Options { if knobs != nil && knobs.DistSQLRetryPolicy != nil { return *knobs.DistSQLRetryPolicy } - return retry.Options{ - InitialBackoff: time.Microsecond, - Multiplier: 1, - MaxBackoff: 2 * time.Microsecond, - MaxRetries: 20} + + // This feature is potentially running over WAN network links / the public + // internet, so we want to recover on our own from hiccups that could last a + // few seconds or even minutes. Thus we allow a relatively long MaxBackoff and + // number of retries that should cause us to retry for a few minutes. + return retry.Options{MaxBackoff: 15 * time.Second, MaxRetries: 20} // 205.5s. } func ingestWithRetries( From 878aada3237769403206f30acf2cf2c583eb8de9 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 13 Dec 2023 17:38:51 +0000 Subject: [PATCH 2/2] pcr: clarify status during retry Previously it implied it was waiting even when it had resumed the retry. Release note: none. Epic: none. --- pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index a8eed68d66c..2112b683bf5 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -223,8 +223,12 @@ func ingestWithRetries( if jobs.IsPermanentJobError(err) || errors.Is(err, context.Canceled) { break } - status := redact.Sprintf("waiting before retrying error: %s", err) - updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError, status) + // If we're retrying repeatedly, update the status to reflect the error we + // are hitting. + if i := r.CurrentAttempt(); i > 5 { + status := redact.Sprintf("retrying after error on attempt %d: %s", i, err) + updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError, status) + } newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob) if lastReplicatedTime.Less(newReplicatedTime) { r.Reset()