Skip to content

Commit

Permalink
app/vmselect/promql: drop staleness marks before calling rollupConfig.Do
Browse files Browse the repository at this point in the history
This allows dropping staleness marks only once and then calculate multiple rollup functions on the result.

Updates #1526
  • Loading branch information
valyala committed Aug 15, 2021
1 parent 6c4c54e commit 5420c3d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 45 deletions.
34 changes: 34 additions & 0 deletions app/vmselect/promql/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
Expand Down Expand Up @@ -785,6 +786,10 @@ func getRollupMemoryLimiter() *memoryLimiter {
func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error {
if name != "default_rollup" {
// Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values.
rs.Values, rs.Timestamps = dropStaleNaNs(rs.Values, rs.Timestamps)
}
preFunc(rs.Values, rs.Timestamps)
ts := getTimeseries()
defer putTimeseries(ts)
Expand Down Expand Up @@ -818,6 +823,10 @@ func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs
tss := make([]*timeseries, 0, rss.Len()*len(rcs))
var tssLock sync.Mutex
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error {
if name != "default_rollup" {
// Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values.
rs.Values, rs.Timestamps = dropStaleNaNs(rs.Values, rs.Timestamps)
}
preFunc(rs.Values, rs.Timestamps)
for _, rc := range rcs {
if tsm := newTimeseriesMap(name, sharedTimestamps, &rs.MetricName); tsm != nil {
Expand Down Expand Up @@ -915,3 +924,28 @@ func toTagFilter(dst *storage.TagFilter, src *metricsql.LabelFilter) {
dst.IsRegexp = src.IsRegexp
dst.IsNegative = src.IsNegative
}

func dropStaleNaNs(values []float64, timestamps []int64) ([]float64, []int64) {
hasStaleSamples := false
for _, v := range values {
if decimal.IsStaleNaN(v) {
hasStaleSamples = true
break
}
}
if !hasStaleSamples {
// Fast path: values have no Prometheus staleness marks.
return values, timestamps
}
// Slow path: drop Prometheus staleness marks from values.
dstValues := values[:0]
dstTimestamps := timestamps[:0]
for i, v := range values {
if decimal.IsStaleNaN(v) {
continue
}
dstValues = append(dstValues, v)
dstTimestamps = append(dstTimestamps, timestamps[i])
}
return dstValues, dstTimestamps
}
59 changes: 14 additions & 45 deletions app/vmselect/promql/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,16 @@ func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, en
}
newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig {
return &rollupConfig{
TagValue: tagValue,
Func: rf,
Start: start,
End: end,
Step: step,
Window: window,
MayAdjustWindow: !rollupFuncsCannotAdjustWindow[name],
CanDropStalePoints: name == "default_rollup",
LookbackDelta: lookbackDelta,
Timestamps: sharedTimestamps,
TagValue: tagValue,
Func: rf,
Start: start,
End: end,
Step: step,
Window: window,
MayAdjustWindow: !rollupFuncsCannotAdjustWindow[name],
LookbackDelta: lookbackDelta,
Timestamps: sharedTimestamps,
isDefaultRollup: name == "default_rollup",
}
}
appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig {
Expand Down Expand Up @@ -402,15 +402,13 @@ type rollupConfig struct {
// when using window smaller than 2 x scrape_interval.
MayAdjustWindow bool

// Whether points after Prometheus stale marks can be dropped during rollup calculations.
// Stale points can be dropped only if `default_rollup()` function is used.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 .
CanDropStalePoints bool

Timestamps []int64

// LoookbackDelta is the analog to `-query.lookback-delta` from Prometheus world.
LookbackDelta int64

// Whether default_rollup is used.
isDefaultRollup bool
}

var (
Expand Down Expand Up @@ -506,10 +504,6 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
// Extend dstValues in order to remove mallocs below.
dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps))

if !rc.CanDropStalePoints {
// Remove Prometheus staleness marks from values, so rollup functions don't hit NaN values.
values, timestamps = dropStaleNaNs(values, timestamps)
}
scrapeInterval := getScrapeInterval(timestamps)
maxPrevInterval := getMaxPrevInterval(scrapeInterval)
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
Expand All @@ -523,7 +517,7 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
window := rc.Window
if window <= 0 {
window = rc.Step
if rc.CanDropStalePoints && rc.LookbackDelta > 0 && window > rc.LookbackDelta {
if rc.isDefaultRollup && rc.LookbackDelta > 0 && window > rc.LookbackDelta {
// Implicit window exceeds -search.maxStalenessInterval, so limit it to -search.maxStalenessInterval
// according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/784
window = rc.LookbackDelta
Expand Down Expand Up @@ -580,31 +574,6 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
return dstValues
}

func dropStaleNaNs(values []float64, timestamps []int64) ([]float64, []int64) {
hasStaleSamples := false
for _, v := range values {
if decimal.IsStaleNaN(v) {
hasStaleSamples = true
break
}
}
if !hasStaleSamples {
// Fast path: values have noe Prometheus staleness marks.
return values, timestamps
}
// Slow path: drop Prometheus staleness marks from values.
dstValues := make([]float64, 0, len(values))
dstTimestamps := make([]int64, 0, len(timestamps))
for i, v := range values {
if decimal.IsStaleNaN(v) {
continue
}
dstValues = append(dstValues, v)
dstTimestamps = append(dstTimestamps, timestamps[i])
}
return dstValues, dstTimestamps
}

func seekFirstTimestampIdxAfter(timestamps []int64, seekTimestamp int64, nHint int) int {
if len(timestamps) == 0 || timestamps[0] > seekTimestamp {
return 0
Expand Down

0 comments on commit 5420c3d

Please sign in to comment.