Skip to content

Commit

Permalink
lib/promscrape: eliminate data race in stream parse mode
Browse files Browse the repository at this point in the history
Previously `-promscrape.streamParse` mode could result in garbage labels for the scraped metrics because of data race.
See #825 (comment)
  • Loading branch information
valyala committed Nov 7, 2020
1 parent 55e98e2 commit 188325f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,8 @@
* FEATURE: vmagent: add `/ready` HTTP endpoint, which returns 200 OK status code when all the service discovery has been initialized.
This may be useful during rolling upgrades. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/875

* BUGFIX: vmagent: eliminate data race when `-promscrape.streamParse` command-line is set. Previously this mode could result in scraped metrics with garbage labels.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 for details.
* BUGFIX: properly calculate `topk_*` and `bottomk_*` functions from [MetricsQL](https://victoriametrics.github.io/MetricsQL.html) for time series with gaps.
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/883

Expand Down
22 changes: 9 additions & 13 deletions lib/promscrape/scrapework.go
Expand Up @@ -325,18 +325,16 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
samplesScraped += len(rows)
for i := range rows {
sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true)
if len(wc.labels) > 40000 {
// Limit the maximum size of wc.writeRequest.
// This should reduce memory usage when scraping targets with millions of metrics and/or labels.
// For example, when scraping /federate handler from Prometheus - see https://prometheus.io/docs/prometheus/latest/federation/
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
sw.updateSeriesAdded(wc)
startTime := time.Now()
sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
wc.resetNoRows()
}
}
// Push the collected rows to sw before returning from the callback, since they cannot be held
// after returning from the callback - this will result in data race.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
sw.updateSeriesAdded(wc)
startTime := time.Now()
sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
wc.resetNoRows()
return nil
})
scrapedSamples.Update(float64(samplesScraped))
Expand All @@ -352,8 +350,6 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
}
scrapesFailed.Inc()
}
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
sw.updateSeriesAdded(wc)
seriesAdded := sw.finalizeSeriesAdded(samplesPostRelabeling)
sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp)
Expand Down
6 changes: 5 additions & 1 deletion lib/uint64set/uint64set.go
Expand Up @@ -927,9 +927,13 @@ func (b *bucket16) delFromSmallPool(x uint16) bool {
func (b *bucket16) appendTo(dst []uint64, hi uint32, hi16 uint16) []uint64 {
hi64 := uint64(hi)<<32 | uint64(hi16)<<16
if b.bits == nil {
// Sort a copy of b.smallPool, since b must be readonly in order to prevent from data races
// when b.appendTo is called from concurrent goroutines.
smallPool := b.smallPool

// Use uint16Sorter instead of sort.Slice here in order to reduce memory allocations.
a := uint16SorterPool.Get().(*uint16Sorter)
*a = uint16Sorter(b.smallPool[:b.smallPoolLen])
*a = uint16Sorter(smallPool[:b.smallPoolLen])
if len(*a) > 1 && !sort.IsSorted(a) {
sort.Sort(a)
}
Expand Down

0 comments on commit 188325f

Please sign in to comment.