diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index aba32a29fd59b..3066c31754302 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -12,8 +12,6 @@ import ( "sync" "time" - "github.com/VictoriaMetrics/metrics" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" @@ -24,6 +22,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" + "github.com/VictoriaMetrics/metrics" ) var ( @@ -484,8 +483,9 @@ var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*ti // calculateRetryAfterDuration calculate the retry duration. // 1. Calculate next retry duration by backoff policy (x2) and max retry duration limit. -// 2. If Retry-After exists in response header, use max(Retry-After duration, next retry duration). -// It returns `retryDuration` if `retryAfterString` does not follows RFC 7231. +// 2. If Retry-After is valid, use max(Retry-After duration, next retry duration). +// +// It returns `retryDuration` if `retryAfterString` does not follow RFC 7231. // Also see: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6097 func calculateRetryDuration(retryAfterString string, retryDuration, maxRetryDuration time.Duration) time.Duration { // default backoff retry policy @@ -494,12 +494,27 @@ func calculateRetryDuration(retryAfterString string, retryDuration, maxRetryDura retryDuration = maxRetryDuration } - if retryAfterString == "" { + retryAfterDuration := parseRetryAfterHeader(retryAfterString) + + if retryDuration > retryAfterDuration { + // Stick with the default policy when: `retryAfter` is shorter, or `retryAfter` is not valid. return retryDuration } + // Follow retryAfter + return timeutil.AddJitterToDuration(retryAfterDuration) +} + +// parseRetryAfterHeader parse Retry-After value retrieved from HTTP response header. +// `retryAfterString` should be in either HTTP-date or a number of seconds. +// It will return time.Duration(0) if `retryAfterString` does not follow RFC 7231. +func parseRetryAfterHeader(retryAfterString string) time.Duration { var retryAfterDuration time.Duration + if retryAfterString == "" { + return retryAfterDuration + } + // Retry-After could be in "Mon, 02 Jan 2006 15:04:05 GMT" format. parsedTime, err := time.Parse(http.TimeFormat, retryAfterString) if err == nil { @@ -508,15 +523,8 @@ func calculateRetryDuration(retryAfterString string, retryDuration, maxRetryDura // Retry-After could be in seconds. if d, err := strconv.Atoi(retryAfterString); err == nil { retryAfterDuration = time.Duration(d) * time.Second - } else { - // Format does not match RFC 7231. Fallback with retryDuration - return retryDuration } } - if retryDuration > retryAfterDuration { - return retryDuration - } - - return timeutil.AddJitterToDuration(retryAfterDuration) + return retryAfterDuration } diff --git a/app/vmagent/remotewrite/client_test.go b/app/vmagent/remotewrite/client_test.go index 53c03c49097a4..5d5d08748eca7 100644 --- a/app/vmagent/remotewrite/client_test.go +++ b/app/vmagent/remotewrite/client_test.go @@ -1,45 +1,85 @@ package remotewrite import ( + "math" "net/http" "testing" "time" ) func TestCalculateRetryDuration(t *testing.T) { - // testFunc evaluate if the result of calculateRetryDuration is + // `testFunc` call `calculateRetryDuration` for `n` times + // and evaluate if the result of `calculateRetryDuration` is // 1. >= expectMinDuration // 2. <= expectMinDuration + 10% (see timeutil.AddJitterToDuration) - testFunc := func(name string, retryAfterString string, retryDuration, expectMinDuration time.Duration) { + testFunc := func(name string, retryAfterString string, retryDuration time.Duration, n int, expectMinDuration time.Duration) { t.Run(name, func(t *testing.T) { - result := calculateRetryDuration(retryAfterString, retryDuration, time.Minute) + for i := 0; i < n; i++ { + retryDuration = calculateRetryDuration(retryAfterString, retryDuration, time.Minute) + } expectMaxDuration := helper(expectMinDuration) expectMinDuration = expectMinDuration - (1000 * time.Millisecond) // Avoid edge case when calculating time.Until(now) - if !(result >= expectMinDuration && result <= expectMaxDuration) { + if !(retryDuration >= expectMinDuration && retryDuration <= expectMaxDuration) { t.Fatalf( "incorrect retry duration, want (ms): [%d, %d], got (ms): %d", expectMinDuration.Milliseconds(), expectMaxDuration.Milliseconds(), - result.Milliseconds(), + retryDuration.Milliseconds(), ) } }) } - // default timezone + // Call calculateRetryDuration for 1 time. + { + // default backoff policy test cases + testFunc("default backoff policy", "", time.Second, 1, 2*time.Second) + testFunc("default backoff policy exceed max limit", "", 10*time.Minute, 1, time.Minute) + + // retry after header test cases + testFunc("retry after header > default backoff policy", "10", 1*time.Second, 1, 10*time.Second) + testFunc("retry after header < default backoff policy", "1", 10*time.Second, 1, 20*time.Second) + testFunc("retry after header invalid", "in-correct-header", time.Second, 1, 2*time.Second) + } + + // Call calculateRetryDuration for multiple times. + { + testFunc("default backoff policy 2 times", "", time.Second, 2, 4*time.Second) + testFunc("default backoff policy 3 times", "", time.Second, 3, 8*time.Second) + testFunc("default backoff policy N times exceed max limit", "", time.Second, 10, time.Minute) + + testFunc("retry after header 10s 2 times", "10", time.Second, 2, 20*time.Second) + testFunc("retry after header 10s 3 times", "10", time.Second, 3, 40*time.Second) + testFunc("retry after header 10s 4 times exceed max limit", "10", time.Second, 4, time.Minute) + testFunc("retry after header 10s 10 times exceed max limit", "10", time.Second, 10, time.Minute) + + testFunc("retry after header 120s 1 times", "120", time.Second, 1, 120*time.Second) + testFunc("retry after header 120s 2 times", "120", time.Second, 2, 120*time.Second) + testFunc("retry after header 120s 10 times", "120", time.Second, 10, 120*time.Second) + } +} - // default backoff policy test cases - testFunc("default backoff policy", "", time.Second, 2*time.Second) - testFunc("default backoff policy exceed max limit", "", 10*time.Minute, time.Minute) +func TestParseRetryAfterHeader(t *testing.T) { + testFunc := func(name string, retryAfterString string, expectResult time.Duration) { + t.Run(name, func(t *testing.T) { - // retry after header test cases - testFunc("retry after header in seconds", "10", time.Second, 10*time.Second) - testFunc("retry after header in date time", time.Now().Add(10*time.Second).UTC().Format(http.TimeFormat), time.Second, 10*time.Second) - testFunc("retry after header < default backoff policy", "1", 10*time.Second, 20*time.Second) - testFunc("retry after header invalid", "in-correct-header", time.Second, 2*time.Second) - testFunc("retry after header not in GMT", time.Now().Add(10*time.Second).Format("Mon, 02 Jan 2006 15:04:05 FAKETZ"), time.Second, 2*time.Second) + result := parseRetryAfterHeader(retryAfterString) + // expect `expectResult == result` when retryAfterString is in seconds or invalid + // expect the difference between result and expectResult to be lower than 10% + if !(expectResult == result || math.Abs(float64(expectResult-result))/float64(expectResult) < 0.10) { + t.Fatalf( + "incorrect retry after duration, want (ms): %d, got (ms): %d", + expectResult.Milliseconds(), result.Milliseconds(), + ) + } + }) + } + testFunc("retry after header in seconds", "10", 10*time.Second) + testFunc("retry after header in date time", time.Now().Add(30*time.Second).UTC().Format(http.TimeFormat), 30*time.Second) + testFunc("retry after header invalid", "invalid-retry-after", 0) + testFunc("retry after header not in GMT", time.Now().Add(10*time.Second).Format("Mon, 02 Jan 2006 15:04:05 FAKETZ"), 0) } // helper calculate the max possible time duration calculated by timeutil.AddJitterToDuration.