Skip to content

Commit

Permalink
feat: [retry after] vmagent remote write Respect Retry-After header f…
Browse files Browse the repository at this point in the history
…or non-(2xx/409/400) response
  • Loading branch information
jiekun committed May 22, 2024
1 parent ac836bc commit b90ae7d
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 4 deletions.
47 changes: 43 additions & 4 deletions app/vmagent/remotewrite/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -456,10 +457,9 @@ again:

// Unexpected status code returned
retriesCount++
retryDuration *= 2
if retryDuration > maxRetryDuration {
retryDuration = maxRetryDuration
}
retryDuration = calculateRetryDuration(resp.Header.Get("Retry-After"), retryDuration, maxRetryDuration)

// Handle response
body, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
Expand All @@ -481,3 +481,42 @@ again:
}

var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second)

// calculateRetryAfterDuration calculate the retry duration.
// 1. Calculate next retry duration by backoff policy (x2) and max retry duration limit.
// 2. If Retry-After exists in response header, use max(Retry-After duration, next retry duration).
// It returns `retryDuration` if `retryAfterString` does not follows RFC 7231.
// Also see: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6097
func calculateRetryDuration(retryAfterString string, retryDuration, maxRetryDuration time.Duration) time.Duration {
// default backoff retry policy
retryDuration *= 2
if retryDuration > maxRetryDuration {
retryDuration = maxRetryDuration
}

if retryAfterString == "" {
return retryDuration
}

var retryAfterDuration time.Duration

// Retry-After could be in "Mon, 02 Jan 2006 15:04:05 GMT" format.
parsedTime, err := time.Parse(http.TimeFormat, retryAfterString)
if err == nil {
retryAfterDuration = time.Duration(time.Until(parsedTime).Seconds()) * time.Second
} else {
// Retry-After could be in seconds.
if d, err := strconv.Atoi(retryAfterString); err == nil {
retryAfterDuration = time.Duration(d) * time.Second
} else {
// Format does not match RFC 7231. Fallback with retryDuration
return retryDuration
}
}

if retryDuration > retryAfterDuration {
return retryDuration
}

return timeutil.AddJitterToDuration(retryAfterDuration)
}
53 changes: 53 additions & 0 deletions app/vmagent/remotewrite/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package remotewrite

import (
"net/http"
"testing"
"time"
)

func TestCalculateRetryDuration(t *testing.T) {
// testFunc evaluate if the result of calculateRetryDuration is
// 1. >= expectMinDuration
// 2. <= expectMinDuration + 10% (see timeutil.AddJitterToDuration)
testFunc := func(name string, retryAfterString string, retryDuration, expectMinDuration time.Duration) {
t.Run(name, func(t *testing.T) {
result := calculateRetryDuration(retryAfterString, retryDuration, time.Minute)

expectMaxDuration := helper(expectMinDuration)
expectMinDuration = expectMinDuration - (1000 * time.Millisecond) // Avoid edge case when calculating time.Until(now)

if !(result >= expectMinDuration && result <= expectMaxDuration) {
t.Fatalf(
"incorrect retry duration, want (ms): [%d, %d], got (ms): %d",
expectMinDuration.Milliseconds(), expectMaxDuration.Milliseconds(),
result.Milliseconds(),
)
}
})
}

// default timezone

// default backoff policy test cases
testFunc("default backoff policy", "", time.Second, 2*time.Second)
testFunc("default backoff policy exceed max limit", "", 10*time.Minute, time.Minute)

// retry after header test cases
testFunc("retry after header in seconds", "10", time.Second, 10*time.Second)
testFunc("retry after header in date time", time.Now().Add(10*time.Second).UTC().Format(http.TimeFormat), time.Second, 10*time.Second)
testFunc("retry after header < default backoff policy", "1", 10*time.Second, 20*time.Second)
testFunc("retry after header invalid", "in-correct-header", time.Second, 2*time.Second)
testFunc("retry after header not in GMT", time.Now().Add(10*time.Second).Format("Mon, 02 Jan 2006 15:04:05 FAKETZ"), time.Second, 2*time.Second)

}

// helper calculate the max possible time duration calculated by timeutil.AddJitterToDuration.
func helper(d time.Duration) time.Duration {
dv := d / 10
if dv > 10*time.Second {
dv = 10 * time.Second
}

return d + dv
}
2 changes: 2 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).

## tip

* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): supported any status codes from the range 200-299 from alertmanager. Previously, only 200 status code considered a successful action. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6110).
**Update note 1: the `-remoteWrite.multitenantURL` command-line flag at `vmagent` was removed starting from this release. This flag was deprecated since [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0). Use `-enableMultitenantHandlers` instead, as it is easier to use and combine with [multitenant URL at vminsert](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels). See these [docs for details](https://docs.victoriametrics.com/vmagent.html#multitenancy).**
**Update note 2: the `-streamAggr.dropInputLabels` command-line flag at `vmagent` was renamed to `-remoteWrite.streamAggr.dropInputLabels`. `-streamAggr.dropInputLabels` is now used for global streaming aggregation.**

Expand All @@ -44,6 +45,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery support for [Vultr](https://www.vultr.com/). See [these docs](https://docs.victoriametrics.com/sd_configs/#vultr_sd_configs) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6041).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementaion!
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): respect `Retry-After` header in remote write response for non 2xx/409/400 response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6097).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): reduce CPU usage when evaluating high number of alerting and recording rules.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): speed up retrieving rules files from object storages by skipping unchanged objects during reloading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6210).
Expand Down

0 comments on commit b90ae7d

Please sign in to comment.