Skip to content

Commit

Permalink
lib/promscrape: use the standard net/http.Client instead of fasthttp.…
Browse files Browse the repository at this point in the history
…Client for scraping targets in non-streaming mode

While fasthttp.Client uses less CPU and RAM when scraping targets with small responses (up to 10K metrics),
it doesn't work well when scraping targets with big responses such as kube-state-metrics.
In this case it could use big amounts of additional memory comparing to net/http.Client,
since fasthttp.Client reads the full response in memory and then tries re-using the large buffer
for further scrapes.

Additionally, fasthttp.Client-based scraping had various issues with proxying, redirects
and scrape timeouts like the following ones:

- #1945
- #5425
- #2794
- #1017

This should help reducing memory usage for the case when target returns big response
and this response is scraped by fasthttp.Client at first before switching to stream parsing mode
for subsequent scrapes. Now the switch to stream parsing mode is performed on the first scrape
after reading the response body in memory and noticing that its size exceeds the value passed
to -promscrape.minResponseSizeForStreamParse command-line flag.
Updates #5567

Overrides #4931
  • Loading branch information
valyala committed Jan 30, 2024
1 parent a20c289 commit bc7cf49
Show file tree
Hide file tree
Showing 63 changed files with 151 additions and 15,289 deletions.
13 changes: 7 additions & 6 deletions docs/vmagent.md
Expand Up @@ -771,12 +771,13 @@ e.g. it sets `scrape_series_added` metric to zero. See [these docs](#automatical

## Stream parsing mode

By default, `vmagent` reads the full response body from scrape target into memory, then parses it, applies [relabeling](#relabeling)
and then pushes the resulting metrics to the configured `-remoteWrite.url`. This mode works good for the majority of cases
when the scrape target exposes small number of metrics (e.g. less than 10 thousand). But this mode may take big amounts of memory
when the scrape target exposes big number of metrics. In this case it is recommended enabling stream parsing mode.
When this mode is enabled, then `vmagent` reads response from scrape target in chunks, then immediately processes every chunk
and pushes the processed metrics to remote storage. This allows saving memory when scraping targets that expose millions of metrics.
By default, `vmagent` parses the full response from the scrape target, applies [relabeling](#relabeling)
and then pushes the resulting metrics to the configured `-remoteWrite.url` in one go. This mode works good for the majority of cases
when the scrape target exposes small number of metrics (e.g. less than 10K). But this mode may take big amounts of memory
when the scrape target exposes big number of metrics (for example, when `vmagent` scrapes [`kube-state-metrics`](https://github.com/kubernetes/kube-state-metrics)
in large Kubernetes cluster). It is recommended enabling stream parsing mode for such targets.
When this mode is enabled, `vmagent` processes the response from the scrape target in chunks.
This allows saving memory when scraping targets that expose millions of metrics.

Stream parsing mode is automatically enabled for scrape targets returning response bodies with sizes bigger than
the `-promscrape.minResponseSizeForStreamParse` command-line flag value. Additionally,
Expand Down
6 changes: 1 addition & 5 deletions go.mod
Expand Up @@ -8,10 +8,6 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.1
github.com/VictoriaMetrics/easyproto v0.1.4
github.com/VictoriaMetrics/fastcache v1.12.2

// Do not use the original github.com/valyala/fasthttp because of issues
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.2.0
github.com/VictoriaMetrics/metrics v1.31.0
github.com/VictoriaMetrics/metricsql v0.70.0
github.com/aws/aws-sdk-go-v2 v1.24.1
Expand All @@ -34,7 +30,7 @@ require (
github.com/valyala/gozstd v1.20.1
github.com/valyala/histogram v1.2.0
github.com/valyala/quicktemplate v1.7.0
golang.org/x/net v0.20.0
golang.org/x/net v0.20.0 // indirect
golang.org/x/oauth2 v0.16.0
golang.org/x/sys v0.16.0
google.golang.org/api v0.159.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Expand Up @@ -63,8 +63,6 @@ github.com/VictoriaMetrics/easyproto v0.1.4 h1:r8cNvo8o6sR4QShBXQd1bKw/VVLSQma/V
github.com/VictoriaMetrics/easyproto v0.1.4/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo3/GThPs2KH23mv710=
github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI=
github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI=
github.com/VictoriaMetrics/fasthttp v1.2.0 h1:nd9Wng4DlNtaI27WlYh5mGXCJOmee/2c2blTJwfyU9I=
github.com/VictoriaMetrics/fasthttp v1.2.0/go.mod h1:zv5YSmasAoSyv8sBVexfArzFDIGGTN4TfCKAtAw7IfE=
github.com/VictoriaMetrics/metrics v1.24.0/go.mod h1:eFT25kvsTidQFHb6U0oa0rTrDRdz4xTYjpL8+UPohys=
github.com/VictoriaMetrics/metrics v1.31.0 h1:X6+nBvAP0UB+GjR0Ht9hhQ3pjL1AN4b8dt9zFfzTsUo=
github.com/VictoriaMetrics/metrics v1.31.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
Expand Down
19 changes: 0 additions & 19 deletions lib/promauth/config.go
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/fasthttp"
"github.com/cespare/xxhash/v2"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
Expand Down Expand Up @@ -343,24 +342,6 @@ func (ac *Config) SetHeaders(req *http.Request, setAuthHeader bool) error {
return nil
}

// SetFasthttpHeaders sets the configured ac headers to req.
func (ac *Config) SetFasthttpHeaders(req *fasthttp.Request, setAuthHeader bool) error {
reqHeaders := &req.Header
for _, h := range ac.headers {
reqHeaders.Set(h.key, h.value)
}
if setAuthHeader {
ah, err := ac.GetAuthHeader()
if err != nil {
return fmt.Errorf("failed to obtain Authorization request header: %w", err)
}
if ah != "" {
reqHeaders.Set("Authorization", ah)
}
}
return nil
}

// GetAuthHeader returns optional `Authorization: ...` http header.
func (ac *Config) GetAuthHeader() (string, error) {
if f := ac.getAuthHeaderCached; f != nil {
Expand Down
27 changes: 0 additions & 27 deletions lib/promauth/config_test.go
Expand Up @@ -5,7 +5,6 @@ import (
"net/http/httptest"
"testing"

"github.com/VictoriaMetrics/fasthttp"
"gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -307,12 +306,6 @@ func TestConfigGetAuthHeaderFailure(t *testing.T) {
t.Fatalf("expecting non-nil error from SetHeaders()")
}

// Verify that cfg.SetFasthttpHeaders() returns error
var fhreq fasthttp.Request
if err := cfg.SetFasthttpHeaders(&fhreq, true); err == nil {
t.Fatalf("expecting non-nil error from SetFasthttpHeaders()")
}

// Verify that the tls cert cannot be loaded properly if it exists
if f := cfg.getTLSCertCached; f != nil {
cert, err := f(nil)
Expand Down Expand Up @@ -421,16 +414,6 @@ func TestConfigGetAuthHeaderSuccess(t *testing.T) {
if ah != ahExpected {
t.Fatalf("unexpected auth header from net/http request; got %q; want %q", ah, ahExpected)
}

// Make sure that cfg.SetFasthttpHeaders() properly set Authorization header
var fhreq fasthttp.Request
if err := cfg.SetFasthttpHeaders(&fhreq, true); err != nil {
t.Fatalf("unexpected error in SetFasthttpHeaders(): %s", err)
}
ahb := fhreq.Header.Peek("Authorization")
if string(ahb) != ahExpected {
t.Fatalf("unexpected auth header from fasthttp request; got %q; want %q", ahb, ahExpected)
}
}

// Zero config
Expand Down Expand Up @@ -578,16 +561,6 @@ func TestConfigHeaders(t *testing.T) {
t.Fatalf("unexpected value for net/http header %q; got %q; want %q", h.key, v, h.value)
}
}
var fhreq fasthttp.Request
if err := c.SetFasthttpHeaders(&fhreq, false); err != nil {
t.Fatalf("unexpected error in SetFasthttpHeaders(): %s", err)
}
for _, h := range headersParsed {
v := fhreq.Header.Peek(h.key)
if string(v) != h.value {
t.Fatalf("unexpected value for fasthttp header %q; got %q; want %q", h.key, v, h.value)
}
}
}
f(nil, "")
f([]string{"foo: bar"}, "foo: bar\r\n")
Expand Down

0 comments on commit bc7cf49

Please sign in to comment.