From b90ae7d766d0cf14eb9ec632fefae91b60bda67b Mon Sep 17 00:00:00 2001 From: Jiekun Date: Wed, 17 Apr 2024 10:58:08 +0800 Subject: [PATCH] feat: [retry after] vmagent remote write Respect Retry-After header for non-(2xx/409/400) response --- app/vmagent/remotewrite/client.go | 47 +++++++++++++++++++++-- app/vmagent/remotewrite/client_test.go | 53 ++++++++++++++++++++++++++ docs/CHANGELOG.md | 2 + 3 files changed, 98 insertions(+), 4 deletions(-) create mode 100644 app/vmagent/remotewrite/client_test.go diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 49e2071d6168..aba32a29fd59 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "net/url" + "strconv" "strings" "sync" "time" @@ -456,10 +457,9 @@ again: // Unexpected status code returned retriesCount++ - retryDuration *= 2 - if retryDuration > maxRetryDuration { - retryDuration = maxRetryDuration - } + retryDuration = calculateRetryDuration(resp.Header.Get("Retry-After"), retryDuration, maxRetryDuration) + + // Handle response body, err := io.ReadAll(resp.Body) _ = resp.Body.Close() if err != nil { @@ -481,3 +481,42 @@ again: } var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second) + +// calculateRetryAfterDuration calculate the retry duration. +// 1. Calculate next retry duration by backoff policy (x2) and max retry duration limit. +// 2. If Retry-After exists in response header, use max(Retry-After duration, next retry duration). +// It returns `retryDuration` if `retryAfterString` does not follows RFC 7231. +// Also see: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6097 +func calculateRetryDuration(retryAfterString string, retryDuration, maxRetryDuration time.Duration) time.Duration { + // default backoff retry policy + retryDuration *= 2 + if retryDuration > maxRetryDuration { + retryDuration = maxRetryDuration + } + + if retryAfterString == "" { + return retryDuration + } + + var retryAfterDuration time.Duration + + // Retry-After could be in "Mon, 02 Jan 2006 15:04:05 GMT" format. + parsedTime, err := time.Parse(http.TimeFormat, retryAfterString) + if err == nil { + retryAfterDuration = time.Duration(time.Until(parsedTime).Seconds()) * time.Second + } else { + // Retry-After could be in seconds. + if d, err := strconv.Atoi(retryAfterString); err == nil { + retryAfterDuration = time.Duration(d) * time.Second + } else { + // Format does not match RFC 7231. Fallback with retryDuration + return retryDuration + } + } + + if retryDuration > retryAfterDuration { + return retryDuration + } + + return timeutil.AddJitterToDuration(retryAfterDuration) +} diff --git a/app/vmagent/remotewrite/client_test.go b/app/vmagent/remotewrite/client_test.go new file mode 100644 index 000000000000..53c03c49097a --- /dev/null +++ b/app/vmagent/remotewrite/client_test.go @@ -0,0 +1,53 @@ +package remotewrite + +import ( + "net/http" + "testing" + "time" +) + +func TestCalculateRetryDuration(t *testing.T) { + // testFunc evaluate if the result of calculateRetryDuration is + // 1. >= expectMinDuration + // 2. <= expectMinDuration + 10% (see timeutil.AddJitterToDuration) + testFunc := func(name string, retryAfterString string, retryDuration, expectMinDuration time.Duration) { + t.Run(name, func(t *testing.T) { + result := calculateRetryDuration(retryAfterString, retryDuration, time.Minute) + + expectMaxDuration := helper(expectMinDuration) + expectMinDuration = expectMinDuration - (1000 * time.Millisecond) // Avoid edge case when calculating time.Until(now) + + if !(result >= expectMinDuration && result <= expectMaxDuration) { + t.Fatalf( + "incorrect retry duration, want (ms): [%d, %d], got (ms): %d", + expectMinDuration.Milliseconds(), expectMaxDuration.Milliseconds(), + result.Milliseconds(), + ) + } + }) + } + + // default timezone + + // default backoff policy test cases + testFunc("default backoff policy", "", time.Second, 2*time.Second) + testFunc("default backoff policy exceed max limit", "", 10*time.Minute, time.Minute) + + // retry after header test cases + testFunc("retry after header in seconds", "10", time.Second, 10*time.Second) + testFunc("retry after header in date time", time.Now().Add(10*time.Second).UTC().Format(http.TimeFormat), time.Second, 10*time.Second) + testFunc("retry after header < default backoff policy", "1", 10*time.Second, 20*time.Second) + testFunc("retry after header invalid", "in-correct-header", time.Second, 2*time.Second) + testFunc("retry after header not in GMT", time.Now().Add(10*time.Second).Format("Mon, 02 Jan 2006 15:04:05 FAKETZ"), time.Second, 2*time.Second) + +} + +// helper calculate the max possible time duration calculated by timeutil.AddJitterToDuration. +func helper(d time.Duration) time.Duration { + dv := d / 10 + if dv > 10*time.Second { + dv = 10 * time.Second + } + + return d + dv +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 16bab3d6d7c8..1e17c861ed0f 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -30,6 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). ## tip +* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): supported any status codes from the range 200-299 from alertmanager. Previously, only 200 status code considered a successful action. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6110). **Update note 1: the `-remoteWrite.multitenantURL` command-line flag at `vmagent` was removed starting from this release. This flag was deprecated since [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0). Use `-enableMultitenantHandlers` instead, as it is easier to use and combine with [multitenant URL at vminsert](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels). See these [docs for details](https://docs.victoriametrics.com/vmagent.html#multitenancy).** **Update note 2: the `-streamAggr.dropInputLabels` command-line flag at `vmagent` was renamed to `-remoteWrite.streamAggr.dropInputLabels`. `-streamAggr.dropInputLabels` is now used for global streaming aggregation.** @@ -44,6 +45,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery support for [Vultr](https://www.vultr.com/). See [these docs](https://docs.victoriametrics.com/sd_configs/#vultr_sd_configs) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6041). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementaion! * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`. +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): respect `Retry-After` header in remote write response for non 2xx/409/400 response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6097). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): reduce CPU usage when evaluating high number of alerting and recording rules. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): speed up retrieving rules files from object storages by skipping unchanged objects during reloading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6210).