From 80132a575cba5c4c95b6a2651fc50326b8eafd00 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Tue, 30 Jan 2018 11:04:52 +0100 Subject: [PATCH] Implement xrate/xincrease/xdelta functions, as per #3746. --- promql/engine.go | 21 +++---- promql/functions.go | 108 +++++++++++++++++++++++++++++++++ promql/testdata/functions.test | 67 +++++++++++++++++--- 3 files changed, 176 insertions(+), 20 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index a4503f45e35..c4e602954ad 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -493,22 +493,21 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q var maxOffset time.Duration Inspect(s.Expr, func(node Node) bool { + nodeOffset := LookbackDelta switch n := node.(type) { case *VectorSelector: - if maxOffset < LookbackDelta { - maxOffset = LookbackDelta - } - if n.Offset+LookbackDelta > maxOffset { - maxOffset = n.Offset + LookbackDelta + if n.Offset > 0 { + nodeOffset += n.Offset } case *MatrixSelector: - if maxOffset < n.Range { - maxOffset = n.Range - } - if n.Offset+n.Range > maxOffset { - maxOffset = n.Offset + n.Range + nodeOffset += n.Range + if n.Offset > 0 { + nodeOffset += n.Offset } } + if maxOffset < nodeOffset { + maxOffset = nodeOffset + } return true }) @@ -550,7 +549,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q return false } for _, s := range n.series { - it := storage.NewBuffer(s.Iterator(), durationMilliseconds(n.Range)) + it := storage.NewBuffer(s.Iterator(), durationMilliseconds(n.Range+LookbackDelta)) n.iterators = append(n.iterators, it) } } diff --git a/promql/functions.go b/promql/functions.go index 242220f636a..135d9d520c0 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -126,6 +126,81 @@ func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Valu return resultVector } +func exactRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Value { + ms := arg.(*MatrixSelector) + // XXX: Hack for including at least one non-stale point before the beginning of the range. + ms.Range += LookbackDelta + matrix := ev.evalMatrix(ms) + ms.Range -= LookbackDelta + + var ( + rangeStart = ev.Timestamp - durationMilliseconds(ms.Range+ms.Offset) + rangeEnd = ev.Timestamp - durationMilliseconds(ms.Offset) + resultVector = make(Vector, 0, len(matrix)) + ) + + for _, samples := range matrix { + points := samples.Points + // Skip all points before rangeStart except the last. + firstPoint := 0 + for i := 0; i < len(points) && points[i].T <= rangeStart; i++ { + firstPoint = i + } + + if len(points)-firstPoint < 2 { + continue + } + sampledInterval := float64(points[len(points)-1].T - points[firstPoint].T) + intervalThreshold := sampledInterval / float64(len(points)-1-firstPoint) * 1.1 + + // If the last point before the range is too far from rangeStart, drop it. + if float64(rangeStart-points[firstPoint].T) > intervalThreshold { + firstPoint++ + if len(points)-firstPoint < 2 { + continue + } + sampledInterval = float64(points[len(points)-1].T - points[firstPoint].T) + intervalThreshold = sampledInterval / float64(len(points)-1-firstPoint) * 1.1 + } + + var ( + counterCorrection float64 + lastValue float64 + ) + if isCounter { + for i := firstPoint; i < len(points); i++ { + sample := points[i] + if sample.V < lastValue { + counterCorrection += lastValue + } + lastValue = sample.V + } + } + resultValue := points[len(points)-1].V - points[firstPoint].V + counterCorrection + + if isRate { + // Duration between last sample and boundary of range. + durationToEnd := float64(rangeEnd - points[len(points)-1].T) + if points[firstPoint].T < rangeStart && durationToEnd < intervalThreshold { + // If the points cover the whole range (i.e. they start just before the + // range start and end just before the range end) the rate is the + // increase divided by the sampled interval. + resultValue = resultValue / (sampledInterval / 1000) + } else { + // If the points don't cover the whole range, the rate is the increase + // divided by the range length. + resultValue = resultValue / ms.Range.Seconds() + } + } + + resultVector = append(resultVector, Sample{ + Metric: dropMetricName(samples.Metric), + Point: Point{V: resultValue, T: ev.Timestamp}, + }) + } + return resultVector +} + // === delta(Matrix ValueTypeMatrix) Vector === func funcDelta(ev *evaluator, args Expressions) Value { return extrapolatedRate(ev, args[0], false, false) @@ -141,6 +216,21 @@ func funcIncrease(ev *evaluator, args Expressions) Value { return extrapolatedRate(ev, args[0], true, false) } +// === xdelta(Matrix ValueTypeMatrix) Vector === +func funcXdelta(ev *evaluator, args Expressions) Value { + return exactRate(ev, args[0], false, false) +} + +// === xrate(node ValueTypeMatrix) Vector === +func funcXrate(ev *evaluator, args Expressions) Value { + return exactRate(ev, args[0], true, true) +} + +// === xincrease(node ValueTypeMatrix) Vector === +func funcXincrease(ev *evaluator, args Expressions) Value { + return exactRate(ev, args[0], true, false) +} + // === irate(node ValueTypeMatrix) Vector === func funcIrate(ev *evaluator, args Expressions) Value { return instantValue(ev, args[0], true) @@ -1233,6 +1323,24 @@ var functions = map[string]*Function{ ReturnType: ValueTypeVector, Call: funcVector, }, + "xdelta": { + Name: "xdelta", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + Call: funcXdelta, + }, + "xincrease": { + Name: "xincrease", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + Call: funcXincrease, + }, + "xrate": { + Name: "xrate", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + Call: funcXrate, + }, "year": { Name: "year", ArgTypes: []ValueType{ValueTypeVector}, diff --git a/promql/testdata/functions.test b/promql/testdata/functions.test index 9b03055ed87..28fb7cbe0ff 100644 --- a/promql/testdata/functions.test +++ b/promql/testdata/functions.test @@ -62,23 +62,57 @@ eval instant at 15m changes(x[15m]) clear -# Tests for increase(). -load 5m +# Tests for increase()/xincrease()/xrate(). +load 5s http_requests{path="/foo"} 0+10x10 http_requests{path="/bar"} 0+10x5 0+10x5 # Tests for increase(). -eval instant at 50m increase(http_requests[50m]) +eval instant at 50s increase(http_requests[50s]) {path="/foo"} 100 {path="/bar"} 90 -eval instant at 50m increase(http_requests[100m]) +eval instant at 50s increase(http_requests[100s]) {path="/foo"} 100 {path="/bar"} 90 +# Tests for xincrease(). +eval instant at 50s xincrease(http_requests[50s]) + {path="/foo"} 100 + {path="/bar"} 90 + +eval instant at 50s xincrease(http_requests[5s]) + {path="/foo"} 10 + {path="/bar"} 10 + +eval instant at 50s xincrease(http_requests[3s]) + {path="/foo"} 10 + {path="/bar"} 10 + +eval instant at 49s xincrease(http_requests[3s]) + +# Tests for xrate(). +eval instant at 50s xrate(http_requests[50s]) + {path="/foo"} 2 + {path="/bar"} 1.8 + +eval instant at 50s xrate(http_requests[100s]) + {path="/foo"} 1 + {path="/bar"} 0.9 + +eval instant at 50s xrate(http_requests[5s]) + {path="/foo"} 2 + {path="/bar"} 2 + +eval instant at 50s xrate(http_requests[3s]) + {path="/foo"} 2 + {path="/bar"} 2 + +eval instant at 49s xrate(http_requests[3s]) + clear -# Test for increase() with counter reset. +# Test for increase()/xincrease with counter reset. # When the counter is reset, it always starts at 0. # So the sequence 3 2 (decreasing counter = reset) is interpreted the same as 3 0 1 2. # Prometheus assumes it missed the intermediate values 0 and 1. @@ -86,7 +120,10 @@ load 5m http_requests{path="/foo"} 0 1 2 3 2 3 4 eval instant at 30m increase(http_requests[30m]) - {path="/foo"} 7 + {path="/foo"} 7 + +eval instant at 30m xincrease(http_requests[30m]) + {path="/foo"} 7 clear @@ -106,15 +143,27 @@ eval instant at 30m irate(http_requests[50m]) clear -# Tests for delta(). +# Tests for delta()/xdelta(). load 5m - http_requests{path="/foo"} 0 50 100 150 200 - http_requests{path="/bar"} 200 150 100 50 0 + http_requests{path="/foo"} 0 50 300 150 200 + http_requests{path="/bar"} 200 150 300 50 0 eval instant at 20m delta(http_requests[20m]) {path="/foo"} 200 {path="/bar"} -200 +eval instant at 20m xdelta(http_requests[20m]) + {path="/foo"} 200 + {path="/bar"} -200 + +eval instant at 20m xdelta(http_requests[19m]) + {path="/foo"} 200 + {path="/bar"} -200 + +eval instant at 20m xdelta(http_requests[1m]) + {path="/foo"} 50 + {path="/bar"} -50 + clear # Tests for idelta().