Skip to content

Commit

Permalink
lib/promscrape: reduce memory and CPU usage when Prometheus staleness…
Browse files Browse the repository at this point in the history
… tracking is enabled for metrics from deleted / disappeared scrape targets

Store the scraped response body instead of storing the parsed and relabeld metrics.
This should reduce memory usage, since the response body takes less memory than the parsed and relabeled metrics.
This is especially true for Kubernetes service discovery, which adds many long labels for all the scraped metrics.

This should also reduce CPU usage, since the marshaling of the parsed
and relabeld metrics has been substituted by response body copying.

Updates #1526
  • Loading branch information
valyala committed Aug 21, 2021
1 parent ff4c7c1 commit 67bc407
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 80 deletions.
2 changes: 1 addition & 1 deletion docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ sort: 15

## tip


* FEATURE: vmagent: reduce memory usage and CPU usage when Prometheus staleness tracking is enabled for metrics exported from the deleted or disappeared scrape targets.
* FEATURE: take into account failed queries in `vm_request_duration_seconds` summary at `/metrics`. Previously only successful queries were taken into account. This could result in skewed summary. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1537).
* FEATURE: vmalert: add `-disableAlertgroupLabel` command-line flag for disabling the label with alert group name. This may be needed for proper deduplication in Alertmanager. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1532).

Expand Down
120 changes: 41 additions & 79 deletions lib/promscrape/scrapework.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
Expand Down Expand Up @@ -187,8 +186,9 @@ type scrapeWork struct {
// It is used as a hint in order to reduce memory usage when parsing scrape responses.
prevLabelsLen int

activeSeriesBuf []byte
activeSeries [][]byte
// lastScrape holds the last response from scrape target.
// It is used for generating Prometheus stale markers.
lastScrape []byte
}

func (sw *scrapeWork) run(stopCh <-chan struct{}) {
Expand Down Expand Up @@ -240,7 +240,7 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}) {
select {
case <-stopCh:
t := time.Now().UnixNano() / 1e6
sw.sendStaleMarkers(t, false)
sw.sendStaleMarkersForLastScrape(t, true)
return
case tt := <-ticker.C:
t := tt.UnixNano() / 1e6
Expand Down Expand Up @@ -284,7 +284,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
}

// Common case: read all the data from scrape target to memory (body) and then process it.
// This case should work more optimally for than stream parse code above for common case when scrape target exposes
// This case should work more optimally than stream parse code for common case when scrape target exposes
// up to a few thousand metrics.
body := leveledbytebufferpool.Get(sw.prevBodyLen)
var err error
Expand All @@ -295,11 +295,11 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
scrapeResponseSize.Update(float64(len(body.B)))
up := 1
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
bodyString := bytesutil.ToUnsafeString(body.B)
if err != nil {
up = 0
scrapesFailed.Inc()
} else {
bodyString := bytesutil.ToUnsafeString(body.B)
wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError)
}
srcRows := wc.rows.Rows
Expand All @@ -323,10 +323,6 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
if up == 0 {
sw.sendStaleMarkers(scrapeTimestamp, true)
}
sw.updateActiveSeries(wc)
sw.pushData(&wc.writeRequest)
sw.prevLabelsLen = len(wc.labels)
wc.reset()
Expand All @@ -335,20 +331,12 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
sw.prevBodyLen = len(body.B)
leveledbytebufferpool.Put(body)
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err)
return err
}

func isAutogenSeries(name string) bool {
switch name {
case "up",
"scrape_duration_seconds",
"scrape_samples_scraped",
"scrape_samples_post_metric_relabeling",
"scrape_series_added":
return true
default:
return false
if up == 0 {
bodyString = ""
sw.sendStaleMarkersForLastScrape(scrapeTimestamp, false)
}
sw.updateLastScrape(bodyString)
return err
}

func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) {
Expand Down Expand Up @@ -412,14 +400,13 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
// Do not call sw.updateActiveSeries(wc), since wc doesn't contain the full list of scraped metrics.
// Do not track active series in streaming mode, since this may need too big amounts of memory
// when the target exports too big number of metrics.
sw.pushData(&wc.writeRequest)
sw.prevLabelsLen = len(wc.labels)
wc.reset()
writeRequestCtxPool.Put(wc)
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err)
// Do not track active series in streaming mode, since this may need too big amounts of memory
// when the target exports too big number of metrics.
return err
}

Expand Down Expand Up @@ -503,69 +490,44 @@ func (sw *scrapeWork) updateSeriesAdded(wc *writeRequestCtx) {
}
}

func (sw *scrapeWork) updateActiveSeries(wc *writeRequestCtx) {
func (sw *scrapeWork) updateLastScrape(response string) {
if *noStaleMarkers {
return
}
b := sw.activeSeriesBuf[:0]
as := sw.activeSeries[:0]
for _, ts := range wc.writeRequest.Timeseries {
bLen := len(b)
for _, label := range ts.Labels {
b = encoding.MarshalBytes(b, bytesutil.ToUnsafeBytes(label.Name))
b = encoding.MarshalBytes(b, bytesutil.ToUnsafeBytes(label.Value))
}
as = append(as, b[bLen:])
}
sw.activeSeriesBuf = b
sw.activeSeries = as
sw.lastScrape = append(sw.lastScrape[:0], response...)
}

func (sw *scrapeWork) sendStaleMarkers(timestamp int64, skipAutogenSeries bool) {
series := make([]prompbmarshal.TimeSeries, 0, len(sw.activeSeries))
staleMarkSamples := []prompbmarshal.Sample{
{
Value: decimal.StaleNaN,
Timestamp: timestamp,
},
}
for _, b := range sw.activeSeries {
var labels []prompbmarshal.Label
skipSeries := false
for len(b) > 0 {
tail, name, err := encoding.UnmarshalBytes(b)
if err != nil {
logger.Panicf("BUG: cannot unmarshal label name from activeSeries: %s", err)
}
b = tail
tail, value, err := encoding.UnmarshalBytes(b)
if err != nil {
logger.Panicf("BUG: cannot unmarshal label value from activeSeries: %s", err)
}
b = tail
if skipAutogenSeries && string(name) == "__name__" && isAutogenSeries(bytesutil.ToUnsafeString(value)) {
skipSeries = true
}
labels = append(labels, prompbmarshal.Label{
Name: bytesutil.ToUnsafeString(name),
Value: bytesutil.ToUnsafeString(value),
})
}
if skipSeries {
continue
}
series = append(series, prompbmarshal.TimeSeries{
Labels: labels,
Samples: staleMarkSamples,
})
func (sw *scrapeWork) sendStaleMarkersForLastScrape(timestamp int64, addAutoSeries bool) {
bodyString := bytesutil.ToUnsafeString(sw.lastScrape)
if len(bodyString) == 0 && !addAutoSeries {
return
}
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError)
srcRows := wc.rows.Rows
for i := range srcRows {
sw.addRowToTimeseries(wc, &srcRows[i], timestamp, true)
}
if addAutoSeries {
sw.addAutoTimeseries(wc, "up", 0, timestamp)
sw.addAutoTimeseries(wc, "scrape_duration_seconds", 0, timestamp)
sw.addAutoTimeseries(wc, "scrape_samples_scraped", 0, timestamp)
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", 0, timestamp)
sw.addAutoTimeseries(wc, "scrape_series_added", 0, timestamp)
}
series := wc.writeRequest.Timeseries
if len(series) == 0 {
return
}
wr := &prompbmarshal.WriteRequest{
Timeseries: series,
// Substitute all the values with Prometheus stale markers.
for _, tss := range series {
samples := tss.Samples
for i := range samples {
samples[i].Value = decimal.StaleNaN
}
}
sw.pushData(wr)
sw.pushData(&wc.writeRequest)
writeRequestCtxPool.Put(wc)
}

func (sw *scrapeWork) finalizeSeriesAdded(lastScrapeSize int) int {
Expand Down

0 comments on commit 67bc407

Please sign in to comment.