Skip to content

Commit

Permalink
Implement xrate/xincrease/xdelta functions, as per prometheus#3746.
Browse files Browse the repository at this point in the history
  • Loading branch information
free committed Jan 30, 2018
1 parent 47538cf commit 80132a5
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 20 deletions.
21 changes: 10 additions & 11 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,22 +493,21 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
var maxOffset time.Duration

Inspect(s.Expr, func(node Node) bool {
nodeOffset := LookbackDelta
switch n := node.(type) {
case *VectorSelector:
if maxOffset < LookbackDelta {
maxOffset = LookbackDelta
}
if n.Offset+LookbackDelta > maxOffset {
maxOffset = n.Offset + LookbackDelta
if n.Offset > 0 {
nodeOffset += n.Offset
}
case *MatrixSelector:
if maxOffset < n.Range {
maxOffset = n.Range
}
if n.Offset+n.Range > maxOffset {
maxOffset = n.Offset + n.Range
nodeOffset += n.Range
if n.Offset > 0 {
nodeOffset += n.Offset
}
}
if maxOffset < nodeOffset {
maxOffset = nodeOffset
}
return true
})

Expand Down Expand Up @@ -550,7 +549,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
return false
}
for _, s := range n.series {
it := storage.NewBuffer(s.Iterator(), durationMilliseconds(n.Range))
it := storage.NewBuffer(s.Iterator(), durationMilliseconds(n.Range+LookbackDelta))
n.iterators = append(n.iterators, it)
}
}
Expand Down
108 changes: 108 additions & 0 deletions promql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,81 @@ func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Valu
return resultVector
}

func exactRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Value {
ms := arg.(*MatrixSelector)
// XXX: Hack for including at least one non-stale point before the beginning of the range.
ms.Range += LookbackDelta
matrix := ev.evalMatrix(ms)
ms.Range -= LookbackDelta

var (
rangeStart = ev.Timestamp - durationMilliseconds(ms.Range+ms.Offset)
rangeEnd = ev.Timestamp - durationMilliseconds(ms.Offset)
resultVector = make(Vector, 0, len(matrix))
)

for _, samples := range matrix {
points := samples.Points
// Skip all points before rangeStart except the last.
firstPoint := 0
for i := 0; i < len(points) && points[i].T <= rangeStart; i++ {
firstPoint = i
}

if len(points)-firstPoint < 2 {
continue
}
sampledInterval := float64(points[len(points)-1].T - points[firstPoint].T)
intervalThreshold := sampledInterval / float64(len(points)-1-firstPoint) * 1.1

// If the last point before the range is too far from rangeStart, drop it.
if float64(rangeStart-points[firstPoint].T) > intervalThreshold {
firstPoint++
if len(points)-firstPoint < 2 {
continue
}
sampledInterval = float64(points[len(points)-1].T - points[firstPoint].T)
intervalThreshold = sampledInterval / float64(len(points)-1-firstPoint) * 1.1
}

var (
counterCorrection float64
lastValue float64
)
if isCounter {
for i := firstPoint; i < len(points); i++ {
sample := points[i]
if sample.V < lastValue {
counterCorrection += lastValue
}
lastValue = sample.V
}
}
resultValue := points[len(points)-1].V - points[firstPoint].V + counterCorrection

if isRate {
// Duration between last sample and boundary of range.
durationToEnd := float64(rangeEnd - points[len(points)-1].T)
if points[firstPoint].T < rangeStart && durationToEnd < intervalThreshold {
// If the points cover the whole range (i.e. they start just before the
// range start and end just before the range end) the rate is the
// increase divided by the sampled interval.
resultValue = resultValue / (sampledInterval / 1000)
} else {
// If the points don't cover the whole range, the rate is the increase
// divided by the range length.
resultValue = resultValue / ms.Range.Seconds()
}
}

resultVector = append(resultVector, Sample{
Metric: dropMetricName(samples.Metric),
Point: Point{V: resultValue, T: ev.Timestamp},
})
}
return resultVector
}

// === delta(Matrix ValueTypeMatrix) Vector ===
func funcDelta(ev *evaluator, args Expressions) Value {
return extrapolatedRate(ev, args[0], false, false)
Expand All @@ -141,6 +216,21 @@ func funcIncrease(ev *evaluator, args Expressions) Value {
return extrapolatedRate(ev, args[0], true, false)
}

// === xdelta(Matrix ValueTypeMatrix) Vector ===
func funcXdelta(ev *evaluator, args Expressions) Value {
return exactRate(ev, args[0], false, false)
}

// === xrate(node ValueTypeMatrix) Vector ===
func funcXrate(ev *evaluator, args Expressions) Value {
return exactRate(ev, args[0], true, true)
}

// === xincrease(node ValueTypeMatrix) Vector ===
func funcXincrease(ev *evaluator, args Expressions) Value {
return exactRate(ev, args[0], true, false)
}

// === irate(node ValueTypeMatrix) Vector ===
func funcIrate(ev *evaluator, args Expressions) Value {
return instantValue(ev, args[0], true)
Expand Down Expand Up @@ -1233,6 +1323,24 @@ var functions = map[string]*Function{
ReturnType: ValueTypeVector,
Call: funcVector,
},
"xdelta": {
Name: "xdelta",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
Call: funcXdelta,
},
"xincrease": {
Name: "xincrease",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
Call: funcXincrease,
},
"xrate": {
Name: "xrate",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
Call: funcXrate,
},
"year": {
Name: "year",
ArgTypes: []ValueType{ValueTypeVector},
Expand Down
67 changes: 58 additions & 9 deletions promql/testdata/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -62,31 +62,68 @@ eval instant at 15m changes(x[15m])

clear

# Tests for increase().
load 5m
# Tests for increase()/xincrease()/xrate().
load 5s
http_requests{path="/foo"} 0+10x10
http_requests{path="/bar"} 0+10x5 0+10x5

# Tests for increase().
eval instant at 50m increase(http_requests[50m])
eval instant at 50s increase(http_requests[50s])
{path="/foo"} 100
{path="/bar"} 90

eval instant at 50m increase(http_requests[100m])
eval instant at 50s increase(http_requests[100s])
{path="/foo"} 100
{path="/bar"} 90

# Tests for xincrease().
eval instant at 50s xincrease(http_requests[50s])
{path="/foo"} 100
{path="/bar"} 90

eval instant at 50s xincrease(http_requests[5s])
{path="/foo"} 10
{path="/bar"} 10

eval instant at 50s xincrease(http_requests[3s])
{path="/foo"} 10
{path="/bar"} 10

eval instant at 49s xincrease(http_requests[3s])

# Tests for xrate().
eval instant at 50s xrate(http_requests[50s])
{path="/foo"} 2
{path="/bar"} 1.8

eval instant at 50s xrate(http_requests[100s])
{path="/foo"} 1
{path="/bar"} 0.9

eval instant at 50s xrate(http_requests[5s])
{path="/foo"} 2
{path="/bar"} 2

eval instant at 50s xrate(http_requests[3s])
{path="/foo"} 2
{path="/bar"} 2

eval instant at 49s xrate(http_requests[3s])

clear

# Test for increase() with counter reset.
# Test for increase()/xincrease with counter reset.
# When the counter is reset, it always starts at 0.
# So the sequence 3 2 (decreasing counter = reset) is interpreted the same as 3 0 1 2.
# Prometheus assumes it missed the intermediate values 0 and 1.
load 5m
http_requests{path="/foo"} 0 1 2 3 2 3 4

eval instant at 30m increase(http_requests[30m])
{path="/foo"} 7
{path="/foo"} 7

eval instant at 30m xincrease(http_requests[30m])
{path="/foo"} 7

clear

Expand All @@ -106,15 +143,27 @@ eval instant at 30m irate(http_requests[50m])

clear

# Tests for delta().
# Tests for delta()/xdelta().
load 5m
http_requests{path="/foo"} 0 50 100 150 200
http_requests{path="/bar"} 200 150 100 50 0
http_requests{path="/foo"} 0 50 300 150 200
http_requests{path="/bar"} 200 150 300 50 0

eval instant at 20m delta(http_requests[20m])
{path="/foo"} 200
{path="/bar"} -200

eval instant at 20m xdelta(http_requests[20m])
{path="/foo"} 200
{path="/bar"} -200

eval instant at 20m xdelta(http_requests[19m])
{path="/foo"} 200
{path="/bar"} -200

eval instant at 20m xdelta(http_requests[1m])
{path="/foo"} 50
{path="/bar"} -50

clear

# Tests for idelta().
Expand Down

0 comments on commit 80132a5

Please sign in to comment.