This repository has been archived by the owner on Dec 4, 2023. It is now read-only.
/
metric_calculator.go
129 lines (111 loc) · 2.91 KB
/
metric_calculator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*
* Copyright 2020, EnMasse authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*
*/
package metric
import (
"container/ring"
"context"
"github.com/enmasseproject/enmasse/pkg/consolegraphql"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"math"
"time"
)
type Calculator interface {
Calc(timeSeries *ring.Ring) (float64, error)
}
type promQLCalculator struct {
engine *promql.Engine
promQLExpression string
}
func New(promQLExpression string) (p *promQLCalculator) {
return &promQLCalculator{
engine: promql.NewEngine(promql.EngineOpts{
MaxConcurrent: 1,
MaxSamples: 100,
Timeout: 10 * time.Second,
}),
promQLExpression: promQLExpression,
}
}
func (p *promQLCalculator) Calc(timeSeries *ring.Ring) (float64, error) {
now := time.Now()
query, err := p.engine.NewInstantQuery(&adaptingQueryable{
dataPointRing: timeSeries,
}, p.promQLExpression, now)
if err != nil {
return 0, err
}
result := query.Exec(context.TODO())
if result.Err != nil {
return 0, result.Err
}
vector := result.Value.(promql.Vector)
if len(vector) == 0 {
return 0, nil
} else {
v := vector[0].V
if v > 1 {
v = math.Round(v)
}
return v, nil
}
}
type adaptingQueryable struct {
dataPointRing *ring.Ring
}
type adaptingQuerier struct {
queryResults *prompb.QueryResult
labels []prompb.Label
}
func (aqr adaptingQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return remote.FromQueryResult(aqr.queryResults), nil, nil
}
func (aqr adaptingQuerier) LabelValues(string) ([]string, storage.Warnings, error) {
values := make([]string, len(aqr.labels))
for i, v := range aqr.labels {
values[i] = v.Value
}
return values, nil, nil
}
func (aqr adaptingQuerier) LabelNames() ([]string, storage.Warnings, error) {
names := make([]string, len(aqr.labels))
for i, v := range aqr.labels {
names[i] = v.Name
}
return names, nil, nil
}
func (aqr adaptingQuerier) Close() error {
return nil
}
func (aq adaptingQueryable) Querier(context.Context, int64, int64) (storage.Querier, error) {
samples := make([]prompb.Sample, 0)
aq.dataPointRing.Do(func(rv interface{}) {
if rv != nil {
pair := rv.(consolegraphql.DataPointTimePair)
samples = append(samples, prompb.Sample{
Value: pair.DataPoint,
Timestamp: timestamp.FromTime(pair.Timestamp),
})
}
})
labs := []prompb.Label{{Name: "unused_label", Value: "unused_value"}}
ts := make([]*prompb.TimeSeries, 0)
ts = append(ts, &prompb.TimeSeries{
Labels: labs,
Samples: samples,
})
qr := &prompb.QueryResult{
Timeseries: ts,
}
return &adaptingQuerier{
qr,
labs,
}, nil
}