Skip to content

Commit

Permalink
all: add support for Prometheus staleness markers
Browse files Browse the repository at this point in the history
Updates #1526
Updates #748
Updates #1509
Updates #1530
Updates #845
  • Loading branch information
valyala committed Aug 13, 2021
1 parent 9a8d1bc commit 4401464
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 82 deletions.
88 changes: 56 additions & 32 deletions app/vmselect/promql/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,16 @@ func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, en
}
newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig {
return &rollupConfig{
TagValue: tagValue,
Func: rf,
Start: start,
End: end,
Step: step,
Window: window,
MayAdjustWindow: !rollupFuncsCannotAdjustWindow[name],
CanDropLastSample: name == "default_rollup",
LookbackDelta: lookbackDelta,
Timestamps: sharedTimestamps,
TagValue: tagValue,
Func: rf,
Start: start,
End: end,
Step: step,
Window: window,
MayAdjustWindow: !rollupFuncsCannotAdjustWindow[name],
CanDropStalePoints: name == "default_rollup",
LookbackDelta: lookbackDelta,
Timestamps: sharedTimestamps,
}
}
appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig {
Expand Down Expand Up @@ -402,10 +402,10 @@ type rollupConfig struct {
// when using window smaller than 2 x scrape_interval.
MayAdjustWindow bool

// Whether the last sample can be dropped during rollup calculations.
// The last sample can be dropped for `default_rollup()` function only.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748 .
CanDropLastSample bool
// Whether points after Prometheus stale marks can be dropped during rollup calculations.
// Stale points can be dropped only if `default_rollup()` function is used.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 .
CanDropStalePoints bool

Timestamps []int64

Expand Down Expand Up @@ -506,6 +506,10 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
// Extend dstValues in order to remove mallocs below.
dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps))

if !rc.CanDropStalePoints {
// Remove Prometheus staleness marks from values, so rollup functions don't hit NaN values.
values, timestamps = dropStaleNaNs(values, timestamps)
}
scrapeInterval := getScrapeInterval(timestamps)
maxPrevInterval := getMaxPrevInterval(scrapeInterval)
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
Expand All @@ -519,7 +523,7 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
window := rc.Window
if window <= 0 {
window = rc.Step
if rc.CanDropLastSample && rc.LookbackDelta > 0 && window > rc.LookbackDelta {
if rc.CanDropStalePoints && rc.LookbackDelta > 0 && window > rc.LookbackDelta {
// Implicit window exceeds -search.maxStalenessInterval, so limit it to -search.maxStalenessInterval
// according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/784
window = rc.LookbackDelta
Expand All @@ -537,10 +541,6 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
j := 0
ni := 0
nj := 0
stalenessInterval := int64(float64(scrapeInterval) * 0.9)
// Do not drop trailing data points for queries, which return 2 or 1 point (aka instant queries).
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/845
canDropLastSample := rc.CanDropLastSample && len(rc.Timestamps) > 2
f := rc.Func
for _, tEnd := range rc.Timestamps {
tStart := tEnd - window
Expand All @@ -560,16 +560,6 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
}
rfa.values = values[i:j]
rfa.timestamps = timestamps[i:j]
if canDropLastSample && j == len(timestamps) && j > 0 && (tEnd-timestamps[j-1] > stalenessInterval || i == j && len(timestamps) == 1) {
// Drop trailing data points in the following cases:
// - if the distance between the last raw sample and tEnd exceeds stalenessInterval
// - if time series contains only a single raw sample
// This should prevent from double counting when a label changes in time series (for instance,
// during new deployment in K8S). See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748
rfa.prevValue = nan
rfa.values = nil
rfa.timestamps = nil
}
if i > 0 {
rfa.realPrevValue = values[i-1]
} else {
Expand All @@ -590,6 +580,31 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
return dstValues
}

func dropStaleNaNs(values []float64, timestamps []int64) ([]float64, []int64) {
hasStaleSamples := false
for _, v := range values {
if decimal.IsStaleNaN(v) {
hasStaleSamples = true
break
}
}
if !hasStaleSamples {
// Fast path: values have noe Prometheus staleness marks.
return values, timestamps
}
// Slow path: drop Prometheus staleness marks from values.
dstValues := make([]float64, 0, len(values))
dstTimestamps := make([]int64, 0, len(timestamps))
for i, v := range values {
if decimal.IsStaleNaN(v) {
continue
}
dstValues = append(dstValues, v)
dstTimestamps = append(dstTimestamps, timestamps[i])
}
return dstValues, dstTimestamps
}

func seekFirstTimestampIdxAfter(timestamps []int64, seekTimestamp int64, nHint int) int {
if len(timestamps) == 0 || timestamps[0] > seekTimestamp {
return 0
Expand Down Expand Up @@ -1826,11 +1841,20 @@ func rollupFirst(rfa *rollupFuncArg) float64 {
return values[0]
}

var rollupDefault = rollupLast
func rollupDefault(rfa *rollupFuncArg) float64 {
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
// Intentionally do not skip the possible last Prometheus staleness mark.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 .
return values[len(values)-1]
}

func rollupLast(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
Expand Down
2 changes: 2 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ sort: 15

## tip

* FEATURE: add support for Prometheus staleness markers. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526).
* FEATURE: vmagent: automatically generate Prometheus staleness markers for the scraped metrics when scrape targets disappear in the same way as Prometheus does. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526).
* FEATURE: add `present_over_time(m[d])` function, which returns 1 if `m` has a least a single sample over the previous duration `d`. This function has been added also to [Prometheus 2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0).
* FEATURE: vmagent: support multitenant writes according to [these docs](https://docs.victoriametrics.com/vmagent.html#multitenancy). This allows using a single `vmagent` instance in front of VictoriaMetrics cluster for all the tenants. Thanks to @omarghader for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1505). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1491).
* FEATURE: vmagent: add `__meta_ec2_availability_zone_id` label to discovered Amazon EC2 targets. This label is available in Prometheus [starting from v2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0).
Expand Down
100 changes: 74 additions & 26 deletions lib/decimal/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func CalibrateScale(a []int64, ae int16, b []int64, be int16) (e int16) {
}
upExp -= downExp
for i, v := range a {
if v == vInfPos || v == vInfNeg {
if isSpecialValue(v) {
// Do not take into account special values.
continue
}
adjExp := upExp
Expand All @@ -48,7 +49,8 @@ func CalibrateScale(a []int64, ae int16, b []int64, be int16) (e int16) {
}
if downExp > 0 {
for i, v := range b {
if v == vInfPos || v == vInfNeg {
if isSpecialValue(v) {
// Do not take into account special values.
continue
}
adjExp := downExp
Expand Down Expand Up @@ -106,13 +108,17 @@ func AppendDecimalToFloat(dst []float64, va []int64, e int16) []float64 {
}
_ = a[len(va)-1]
for i, v := range va {
f := float64(v)
a[i] = float64(v)
if !isSpecialValue(v) {
continue
}
if v == vInfPos {
f = infPos
a[i] = infPos
} else if v == vInfNeg {
f = infNeg
a[i] = infNeg
} else {
a[i] = StaleNaN
}
a[i] = f
}
return dst[:len(dst)+len(va)]
}
Expand All @@ -122,26 +128,34 @@ func AppendDecimalToFloat(dst []float64, va []int64, e int16) []float64 {
e10 := math.Pow10(int(-e))
_ = a[len(va)-1]
for i, v := range va {
f := float64(v) / e10
a[i] = float64(v) / e10
if !isSpecialValue(v) {
continue
}
if v == vInfPos {
f = infPos
a[i] = infPos
} else if v == vInfNeg {
f = infNeg
a[i] = infNeg
} else {
a[i] = StaleNaN
}
a[i] = f
}
return dst[:len(dst)+len(va)]
}
e10 := math.Pow10(int(e))
_ = a[len(va)-1]
for i, v := range va {
f := float64(v) * e10
a[i] = float64(v) * e10
if !isSpecialValue(v) {
continue
}
if v == vInfPos {
f = infPos
a[i] = infPos
} else if v == vInfNeg {
f = infNeg
a[i] = infNeg
} else {
a[i] = StaleNaN
}
a[i] = f
}
return dst[:len(dst)+len(va)]
}
Expand Down Expand Up @@ -184,7 +198,7 @@ func AppendFloatToDecimal(dst []int64, src []float64) ([]int64, int16) {
v, exp := FromFloat(f)
va[i] = v
ea[i] = exp
if exp < minExp && v != vInfPos && v != vInfNeg {
if exp < minExp && !isSpecialValue(v) {
minExp = exp
}
}
Expand All @@ -211,7 +225,8 @@ func AppendFloatToDecimal(dst []int64, src []float64) ([]int64, int16) {
_ = a[len(va)-1]
_ = ea[len(va)-1]
for i, v := range va {
if v == vInfPos || v == vInfNeg {
if isSpecialValue(v) {
// There is no need in scaling special values.
a[i] = v
continue
}
Expand Down Expand Up @@ -245,8 +260,8 @@ var vaeBufPool sync.Pool
const int64Max = int64(1<<63 - 1)

func maxUpExponent(v int64) int16 {
if v == 0 || v == vInfPos || v == vInfNeg {
// Any exponent allowed.
if v == 0 || isSpecialValue(v) {
// Any exponent allowed for zeroes and special values.
return 1024
}
if v < 0 {
Expand Down Expand Up @@ -302,6 +317,10 @@ func maxUpExponent(v int64) int16 {
//
// See also RoundToSignificantFigures.
func RoundToDecimalDigits(f float64, digits int) float64 {
if IsStaleNaN(f) {
// Do not modify stale nan mark value.
return f
}
if digits <= -100 || digits >= 100 {
return f
}
Expand All @@ -313,6 +332,10 @@ func RoundToDecimalDigits(f float64, digits int) float64 {
//
// See also RoundToDecimalDigits.
func RoundToSignificantFigures(f float64, digits int) float64 {
if IsStaleNaN(f) {
// Do not modify stale nan mark value.
return f
}
if digits <= 0 || digits >= 18 {
return f
}
Expand Down Expand Up @@ -345,11 +368,14 @@ func RoundToSignificantFigures(f float64, digits int) float64 {

// ToFloat returns f=v*10^e.
func ToFloat(v int64, e int16) float64 {
if v == vInfPos {
return infPos
}
if v == vInfNeg {
return infNeg
if isSpecialValue(v) {
if v == vInfPos {
return infPos
}
if v == vInfNeg {
return infNeg
}
return StaleNaN
}
f := float64(v)
// increase conversion precision for negative exponents by dividing by e10
Expand All @@ -364,24 +390,46 @@ var (
infNeg = math.Inf(-1)
)

// StaleNaN is a special NaN value, which is used as Prometheus staleness mark.
// See https://www.robustperception.io/staleness-and-promql
var StaleNaN = math.Float64frombits(staleNaNBits)

const (
vInfPos = 1<<63 - 1
vInfNeg = -1 << 63
vInfPos = 1<<63 - 1
vInfNeg = -1 << 63
vStaleNaN = 1<<63 - 2

vMax = 1<<63 - 3
vMin = -1<<63 + 1

// staleNaNbits is bit representation of Prometheus staleness mark (aka stale NaN).
// This mark is put by Prometheus at the end of time series for improving staleness detection.
// See https://www.robustperception.io/staleness-and-promql
staleNaNBits uint64 = 0x7ff0000000000002
)

func isSpecialValue(v int64) bool {
return v > vMax || v < vMin
}

// IsStaleNaN returns true if f represents Prometheus staleness mark.
func IsStaleNaN(f float64) bool {
return math.Float64bits(f) == staleNaNBits
}

// FromFloat converts f to v*10^e.
//
// It tries minimizing v.
// For instance, for f = -1.234 it returns v = -1234, e = -3.
//
// FromFloat doesn't work properly with NaN values, so don't pass them here.
// FromFloat doesn't work properly with NaN values other than Prometheus staleness mark, so don't pass them here.
func FromFloat(f float64) (int64, int16) {
if f == 0 {
return 0, 0
}
if IsStaleNaN(f) {
return vStaleNaN, 0
}
if math.IsInf(f, 0) {
return fromFloatInf(f)
}
Expand Down
Loading

0 comments on commit 4401464

Please sign in to comment.