/
value.go
129 lines (112 loc) · 3.64 KB
/
value.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package queryrange
import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/series"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
)
// FromResult transforms a promql query result into a samplestream
func FromResult(res *promql.Result) ([]tripperware.SampleStream, error) {
if res.Err != nil {
// The error could be wrapped by the PromQL engine. We get the error's cause in order to
// correctly parse the error in parent callers (eg. gRPC response status code extraction).
return nil, errors.Cause(res.Err)
}
switch v := res.Value.(type) {
case promql.Scalar:
return []tripperware.SampleStream{
{
Samples: []cortexpb.Sample{
{
Value: v.V,
TimestampMs: v.T,
},
},
},
}, nil
case promql.Vector:
res := make([]tripperware.SampleStream, 0, len(v))
for _, sample := range v {
res = append(res, tripperware.SampleStream{
Labels: mapLabels(sample.Metric),
Samples: mapPoints(promql.FPoint{
T: sample.T,
F: sample.F,
}),
})
}
return res, nil
case promql.Matrix:
res := make([]tripperware.SampleStream, 0, len(v))
for _, series := range v {
res = append(res, tripperware.SampleStream{
Labels: mapLabels(series.Metric),
Samples: mapPoints(series.Floats...),
})
}
return res, nil
}
return nil, errors.Errorf("Unexpected value type: [%s]", res.Value.Type())
}
func mapLabels(ls labels.Labels) []cortexpb.LabelAdapter {
result := make([]cortexpb.LabelAdapter, 0, len(ls))
for _, l := range ls {
result = append(result, cortexpb.LabelAdapter(l))
}
return result
}
func mapPoints(pts ...promql.FPoint) []cortexpb.Sample {
result := make([]cortexpb.Sample, 0, len(pts))
for _, pt := range pts {
result = append(result, cortexpb.Sample{
Value: pt.F,
TimestampMs: pt.T,
})
}
return result
}
// ResponseToSamples is needed to map back from api response to the underlying series data
func ResponseToSamples(resp tripperware.Response) ([]tripperware.SampleStream, error) {
promRes, ok := resp.(*PrometheusResponse)
if !ok {
return nil, errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{})
}
if promRes.Error != "" {
return nil, errors.New(promRes.Error)
}
switch promRes.Data.ResultType {
case string(parser.ValueTypeVector), string(parser.ValueTypeMatrix):
return promRes.Data.Result, nil
}
return nil, errors.Errorf(
"Invalid promql.Value type: [%s]. Only %s and %s supported",
promRes.Data.ResultType,
parser.ValueTypeVector,
parser.ValueTypeMatrix,
)
}
// NewSeriesSet returns an in memory storage.SeriesSet from a []SampleStream
// As NewSeriesSet uses NewConcreteSeriesSet to implement SeriesSet, result will be sorted by label names.
func NewSeriesSet(sortSeries bool, results []tripperware.SampleStream) storage.SeriesSet {
set := make([]storage.Series, 0, len(results))
for _, stream := range results {
samples := make([]model.SamplePair, 0, len(stream.Samples))
for _, sample := range stream.Samples {
samples = append(samples, model.SamplePair{
Timestamp: model.Time(sample.TimestampMs),
Value: model.SampleValue(sample.Value),
})
}
ls := make([]labels.Label, 0, len(stream.Labels))
for _, l := range stream.Labels {
ls = append(ls, labels.Label(l))
}
set = append(set, series.NewConcreteSeries(ls, samples))
}
return series.NewConcreteSeriesSet(sortSeries, set)
}