-
Notifications
You must be signed in to change notification settings - Fork 793
/
compat.go
101 lines (86 loc) · 2.84 KB
/
compat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package chunkcompat
import (
"bytes"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/cortexproject/cortex/pkg/chunk"
prom_chunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
)
// StreamsToMatrix converts a slice of QueryStreamResponse to a model.Matrix.
func StreamsToMatrix(from, through model.Time, responses []*client.QueryStreamResponse) (model.Matrix, error) {
result := model.Matrix{}
for _, response := range responses {
series, err := SeriesChunksToMatrix(from, through, response.Chunkseries)
if err != nil {
return nil, err
}
result = append(result, series...)
}
return result, nil
}
// SeriesChunksToMatrix converts slice of []client.TimeSeriesChunk to a model.Matrix.
func SeriesChunksToMatrix(from, through model.Time, serieses []client.TimeSeriesChunk) (model.Matrix, error) {
if serieses == nil {
return nil, nil
}
result := model.Matrix{}
for _, series := range serieses {
metric := cortexpb.FromLabelAdaptersToMetric(series.Labels)
chunks, err := FromChunks("", cortexpb.FromLabelAdaptersToLabels(series.Labels), series.Chunks)
if err != nil {
return nil, err
}
samples := []model.SamplePair{}
for _, chunk := range chunks {
ss, err := chunk.Samples(from, through)
if err != nil {
return nil, err
}
samples = util.MergeSampleSets(samples, ss)
}
result = append(result, &model.SampleStream{
Metric: metric,
Values: samples,
})
}
return result, nil
}
// FromChunks converts []client.Chunk to []chunk.Chunk.
func FromChunks(userID string, metric labels.Labels, in []client.Chunk) ([]chunk.Chunk, error) {
out := make([]chunk.Chunk, 0, len(in))
for _, i := range in {
o, err := prom_chunk.NewForEncoding(prom_chunk.Encoding(byte(i.Encoding)))
if err != nil {
return nil, err
}
if err := o.UnmarshalFromBuf(i.Data); err != nil {
return nil, err
}
firstTime, lastTime := model.Time(i.StartTimestampMs), model.Time(i.EndTimestampMs)
// As the lifetime of this chunk is scopes to this request, we don't need
// to supply a fingerprint.
out = append(out, chunk.NewChunk(userID, 0, metric, o, firstTime, lastTime))
}
return out, nil
}
// ToChunks converts []chunk.Chunk to []client.Chunk.
func ToChunks(in []chunk.Chunk) ([]client.Chunk, error) {
out := make([]client.Chunk, 0, len(in))
for _, i := range in {
wireChunk := client.Chunk{
StartTimestampMs: int64(i.From),
EndTimestampMs: int64(i.Through),
Encoding: int32(i.Data.Encoding()),
}
buf := bytes.NewBuffer(make([]byte, 0, prom_chunk.ChunkLen))
if err := i.Data.Marshal(buf); err != nil {
return nil, err
}
wireChunk.Data = buf.Bytes()
out = append(out, wireChunk)
}
return out, nil
}