-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
value.go
135 lines (116 loc) · 3.58 KB
/
value.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package queryrangebase
import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/series"
)
// FromResult transforms a promql query result into a samplestream
func FromResult(res *promql.Result) ([]SampleStream, error) {
if res.Err != nil {
// The error could be wrapped by the PromQL engine. We get the error's cause in order to
// correctly parse the error in parent callers (eg. gRPC response status code extraction).
return nil, errors.Cause(res.Err)
}
return FromValue((res.Value))
}
func FromValue(value parser.Value) ([]SampleStream, error) {
switch v := value.(type) {
case promql.Scalar:
return []SampleStream{
{
Samples: []logproto.LegacySample{
{
Value: v.V,
TimestampMs: v.T,
},
},
},
}, nil
case promql.Vector:
res := make([]SampleStream, 0, len(v))
for _, sample := range v {
res = append(res, SampleStream{
Labels: mapLabels(sample.Metric),
Samples: []logproto.LegacySample{
{
Value: sample.F,
TimestampMs: sample.T,
},
},
})
}
return res, nil
case promql.Matrix:
res := make([]SampleStream, 0, len(v))
for _, series := range v {
res = append(res, SampleStream{
Labels: mapLabels(series.Metric),
Samples: mapPoints(series.Floats...),
})
}
return res, nil
}
return nil, errors.Errorf("Unexpected value type: [%s]", value.Type())
}
func mapLabels(ls labels.Labels) []logproto.LabelAdapter {
result := make([]logproto.LabelAdapter, 0, len(ls))
for _, l := range ls {
result = append(result, logproto.LabelAdapter(l))
}
return result
}
func mapPoints(pts ...promql.FPoint) []logproto.LegacySample {
result := make([]logproto.LegacySample, 0, len(pts))
for _, pt := range pts {
result = append(result, logproto.LegacySample{
Value: pt.F,
TimestampMs: pt.T,
})
}
return result
}
// ResponseToSamples is needed to map back from api response to the underlying series data
func ResponseToSamples(resp Response) ([]SampleStream, error) {
promRes, ok := resp.(*PrometheusResponse)
if !ok {
return nil, errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{})
}
if promRes.Error != "" {
return nil, errors.New(promRes.Error)
}
switch promRes.Data.ResultType {
case string(parser.ValueTypeVector), string(parser.ValueTypeMatrix):
return promRes.Data.Result, nil
}
return nil, errors.Errorf(
"Invalid promql.Value type: [%s]. Only %s and %s supported",
promRes.Data.ResultType,
parser.ValueTypeVector,
parser.ValueTypeMatrix,
)
}
// NewSeriesSet returns an in memory storage.SeriesSet from a []SampleStream
// As NewSeriesSet uses NewConcreteSeriesSet to implement SeriesSet, result will be sorted by label names.
func NewSeriesSet(results []SampleStream) storage.SeriesSet {
set := make([]storage.Series, 0, len(results))
for _, stream := range results {
samples := make([]model.SamplePair, 0, len(stream.Samples))
for _, sample := range stream.Samples {
samples = append(samples, model.SamplePair{
Timestamp: model.Time(sample.TimestampMs),
Value: model.SampleValue(sample.Value),
})
}
ls := make([]labels.Label, 0, len(stream.Labels))
for _, l := range stream.Labels {
ls = append(ls, labels.Label(l))
}
set = append(set, series.NewConcreteSeries(ls, samples))
}
return series.NewConcreteSeriesSet(set)
}