Skip to content

Commit

Permalink
lib/promscrape: properly apply series limit
Browse files Browse the repository at this point in the history
Fix the following issues:

- Series limit wasn't applied when staleness tracking was disabled.
- Series limit didn't prevent from sending staleness markers for new series exceeding the limit.

Updates #3660

Thanks to @hagen1778 for the initial attempt to fix the issue
at #3665
  • Loading branch information
valyala committed Jan 17, 2023
1 parent e06168f commit 289af65
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 21 deletions.
2 changes: 2 additions & 0 deletions docs/CHANGELOG.md
Expand Up @@ -24,6 +24,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): apply `filters` only to objects of the specified `role`. Previously filters were applied to all the objects, which could cause errors when different types of objects were used with filters that were not compatible with them. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): suppress all the scrape errors when `-promscrape.suppressScrapeErrors` is enabled. Previously some scrape errors were logged even if `-promscrape.suppressScrapeErrors` flag was set.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): consistently put the scrape url with scrape target labels to all error logs for failed scrapes. Previously some failed scrapes were logged without this information.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not send stale markers to remote storage for series exceeding the configured [series limit](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly apply [series limit](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter) when [staleness tracking](https://docs.victoriametrics.com/vmagent.html#prometheus-staleness-markers) is disabled.
* BUGFIX: [Pushgateway import](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format): properly return `200 OK` HTTP response code. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3636).

## [v1.86.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.86.1)
Expand Down
13 changes: 9 additions & 4 deletions lib/promscrape/config.go
Expand Up @@ -41,9 +41,10 @@ import (
)

var (
noStaleMarkers = flag.Bool("promscrape.noStaleMarkers", false, "Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series")
strictParse = flag.Bool("promscrape.config.strictParse", true, "Whether to deny unsupported fields in -promscrape.config . Set to false in order to silently skip unsupported fields")
dryRun = flag.Bool("promscrape.config.dryRun", false, "Checks -promscrape.config file for errors and unsupported fields and then exits. "+
noStaleMarkers = flag.Bool("promscrape.noStaleMarkers", false, "Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series")
seriesLimitPerTarget = flag.Int("promscrape.seriesLimitPerTarget", 0, "Optional limit on the number of unique time series a single scrape target can expose. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter for more info")
strictParse = flag.Bool("promscrape.config.strictParse", true, "Whether to deny unsupported fields in -promscrape.config . Set to false in order to silently skip unsupported fields")
dryRun = flag.Bool("promscrape.config.dryRun", false, "Checks -promscrape.config file for errors and unsupported fields and then exits. "+
"Returns non-zero exit code on parsing errors and emits these errors to stderr. "+
"See also -promscrape.config.strictParse command-line flag. "+
"Pass -loggerLevel=ERROR if you don't need to see info messages in the output.")
Expand Down Expand Up @@ -971,6 +972,10 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
if sc.NoStaleMarkers != nil {
noStaleTracking = *sc.NoStaleMarkers
}
seriesLimit := *seriesLimitPerTarget
if sc.SeriesLimit > 0 {
seriesLimit = sc.SeriesLimit
}
swc := &scrapeWorkConfig{
scrapeInterval: scrapeInterval,
scrapeIntervalString: scrapeInterval.String(),
Expand All @@ -995,7 +1000,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
streamParse: sc.StreamParse,
scrapeAlignInterval: sc.ScrapeAlignInterval.Duration(),
scrapeOffset: sc.ScrapeOffset.Duration(),
seriesLimit: sc.SeriesLimit,
seriesLimit: seriesLimit,
noStaleMarkers: noStaleTracking,
}
return swc, nil
Expand Down
42 changes: 25 additions & 17 deletions lib/promscrape/scrapework.go
Expand Up @@ -37,7 +37,6 @@ var (
"See also -promscrape.suppressScrapeErrorsDelay")
suppressScrapeErrorsDelay = flag.Duration("promscrape.suppressScrapeErrorsDelay", 0, "The delay for suppressing repeated scrape errors logging per each scrape targets. "+
"This may be used for reducing the number of log lines related to scrape errors. See also -promscrape.suppressScrapeErrors")
seriesLimitPerTarget = flag.Int("promscrape.seriesLimitPerTarget", 0, "Optional limit on the number of unique time series a single scrape target can expose. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter for more info")
minResponseSizeForStreamParse = flagutil.NewBytes("promscrape.minResponseSizeForStreamParse", 1e6, "The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode")
)

Expand Down Expand Up @@ -451,7 +450,7 @@ func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, b
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
lastScrape := sw.loadLastScrape()
bodyString := bytesutil.ToUnsafeString(body.B)
areIdenticalSeries := sw.Config.NoStaleMarkers || parser.AreIdenticalSeriesFast(lastScrape, bodyString)
areIdenticalSeries := sw.areIdenticalSeries(lastScrape, bodyString)
if err != nil {
up = 0
scrapesFailed.Inc()
Expand Down Expand Up @@ -485,9 +484,6 @@ func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, b
samplesDropped := 0
if sw.seriesLimitExceeded || !areIdenticalSeries {
samplesDropped = sw.applySeriesLimit(wc)
if samplesDropped > 0 {
sw.seriesLimitExceeded = true
}
}
am := &autoMetrics{
up: up,
Expand Down Expand Up @@ -577,7 +573,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
err = sbr.Init(sr)
if err == nil {
bodyString = bytesutil.ToUnsafeString(sbr.body)
areIdenticalSeries = sw.Config.NoStaleMarkers || parser.AreIdenticalSeriesFast(lastScrape, bodyString)
areIdenticalSeries = sw.areIdenticalSeries(lastScrape, bodyString)
err = parser.ParseStream(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error {
mu.Lock()
defer mu.Unlock()
Expand All @@ -594,9 +590,6 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
}
if sw.seriesLimitExceeded || !areIdenticalSeries {
samplesDropped += sw.applySeriesLimit(wc)
if samplesDropped > 0 && !sw.seriesLimitExceeded {
sw.seriesLimitExceeded = true
}
}
// Push the collected rows to sw before returning from the callback, since they cannot be held
// after returning from the callback - this will result in data race.
Expand Down Expand Up @@ -655,6 +648,15 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
return err
}

func (sw *scrapeWork) areIdenticalSeries(prevData, currData string) bool {
if sw.Config.NoStaleMarkers && sw.Config.SeriesLimit <= 0 {
// Do not spend CPU time on tracking the changes in series if stale markers are disabled.
// The check for series_limit is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660
return true
}
return parser.AreIdenticalSeriesFast(prevData, currData)
}

// leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx
// structs contain mixed number of labels.
//
Expand Down Expand Up @@ -738,17 +740,13 @@ func (sw *scrapeWork) getSeriesAdded(lastScrape, currScrape string) int {
}

func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int {
seriesLimit := *seriesLimitPerTarget
if sw.Config.SeriesLimit > 0 {
seriesLimit = sw.Config.SeriesLimit
if sw.Config.SeriesLimit <= 0 {
return 0
}
if sw.seriesLimiter == nil && seriesLimit > 0 {
sw.seriesLimiter = bloomfilter.NewLimiter(seriesLimit, 24*time.Hour)
if sw.seriesLimiter == nil {
sw.seriesLimiter = bloomfilter.NewLimiter(sw.Config.SeriesLimit, 24*time.Hour)
}
sl := sw.seriesLimiter
if sl == nil {
return 0
}
dstSeries := wc.writeRequest.Timeseries[:0]
samplesDropped := 0
for _, ts := range wc.writeRequest.Timeseries {
Expand All @@ -761,6 +759,9 @@ func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int {
}
prompbmarshal.ResetTimeSeries(wc.writeRequest.Timeseries[len(dstSeries):])
wc.writeRequest.Timeseries = dstSeries
if samplesDropped > 0 && !sw.seriesLimitExceeded {
sw.seriesLimitExceeded = true
}
return samplesDropped
}

Expand All @@ -784,6 +785,13 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i
am := &autoMetrics{}
sw.addAutoMetrics(am, wc, timestamp)
}

// Apply series limit to stale markers in order to prevent sending stale markers for newly created series.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660
if sw.seriesLimitExceeded {
sw.applySeriesLimit(wc)
}

series := wc.writeRequest.Timeseries
if len(series) == 0 {
return
Expand Down

0 comments on commit 289af65

Please sign in to comment.