-
Notifications
You must be signed in to change notification settings - Fork 796
/
query.go
268 lines (232 loc) · 8.6 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package tripperware
import (
"bytes"
"compress/gzip"
"context"
"io"
"net/http"
"sort"
"strconv"
"strings"
"time"
"unsafe"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/httpgrpc"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util/runutil"
)
var (
json = jsoniter.Config{
EscapeHTML: false, // No HTML in our responses.
SortMapKeys: true,
ValidateJsonRawMessage: false,
}.Froze()
)
// Codec is used to encode/decode query range requests and responses so they can be passed down to middlewares.
type Codec interface {
Merger
// DecodeRequest decodes a Request from an http request.
DecodeRequest(_ context.Context, request *http.Request, forwardHeaders []string) (Request, error)
// DecodeResponse decodes a Response from an http response.
// The original request is also passed as a parameter this is useful for implementation that needs the request
// to merge result or build the result correctly.
DecodeResponse(context.Context, *http.Response, Request) (Response, error)
// EncodeRequest encodes a Request into an http request.
EncodeRequest(context.Context, Request) (*http.Request, error)
// EncodeResponse encodes a Response into an http response.
EncodeResponse(context.Context, Response) (*http.Response, error)
}
// Merger is used by middlewares making multiple requests to merge back all responses into a single one.
type Merger interface {
// MergeResponse merges responses from multiple requests into a single Response
MergeResponse(context.Context, Request, ...Response) (Response, error)
}
// Response represents a query range response.
type Response interface {
proto.Message
// HTTPHeaders returns the HTTP headers in the response.
HTTPHeaders() map[string][]string
}
// Request represents a query range request that can be process by middlewares.
type Request interface {
// GetStart returns the start timestamp of the request in milliseconds.
GetStart() int64
// GetEnd returns the end timestamp of the request in milliseconds.
GetEnd() int64
// GetStep returns the step of the request in milliseconds.
GetStep() int64
// GetQuery returns the query of the request.
GetQuery() string
// WithStartEnd clone the current request with different start and end timestamp.
WithStartEnd(startTime int64, endTime int64) Request
// WithQuery clone the current request with a different query.
WithQuery(string) Request
proto.Message
// LogToSpan writes information about this request to an OpenTracing span
LogToSpan(opentracing.Span)
// GetStats returns the stats of the request.
GetStats() string
// WithStats clones the current `PrometheusRequest` with a new stats.
WithStats(stats string) Request
}
func decodeSampleStream(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
lbls := labels.Labels{}
samples := []cortexpb.Sample{}
for field := iter.ReadObject(); field != ""; field = iter.ReadObject() {
switch field {
case "metric":
iter.ReadVal(&lbls)
case "values":
for {
if !iter.ReadArray() {
break
}
s := cortexpb.Sample{}
cortexpb.SampleJsoniterDecode(unsafe.Pointer(&s), iter)
samples = append(samples, s)
}
}
}
*(*SampleStream)(ptr) = SampleStream{
Samples: samples,
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
}
}
func encodeSampleStream(ptr unsafe.Pointer, stream *jsoniter.Stream) {
ss := (*SampleStream)(ptr)
stream.WriteObjectStart()
stream.WriteObjectField(`metric`)
lbls, err := cortexpb.FromLabelAdaptersToLabels(ss.Labels).MarshalJSON()
if err != nil {
stream.Error = err
return
}
stream.SetBuffer(append(stream.Buffer(), lbls...))
stream.WriteMore()
stream.WriteObjectField(`values`)
stream.WriteArrayStart()
for i, sample := range ss.Samples {
if i != 0 {
stream.WriteMore()
}
cortexpb.SampleJsoniterEncode(unsafe.Pointer(&sample), stream)
}
stream.WriteArrayEnd()
stream.WriteObjectEnd()
}
// UnmarshalJSON implements json.Unmarshaler.
func (s *SampleStream) UnmarshalJSON(data []byte) error {
var stream struct {
Metric labels.Labels `json:"metric"`
Values []cortexpb.Sample `json:"values"`
}
if err := json.Unmarshal(data, &stream); err != nil {
return err
}
s.Labels = cortexpb.FromLabelsToLabelAdapters(stream.Metric)
s.Samples = stream.Values
return nil
}
func PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
if !iter.ReadArray() {
iter.ReportError("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", "expected [")
return
}
t := model.Time(iter.ReadFloat64() * float64(time.Second/time.Millisecond))
if !iter.ReadArray() {
iter.ReportError("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", "expected ,")
return
}
v := iter.ReadInt64()
if iter.ReadArray() {
iter.ReportError("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", "expected ]")
}
*(*PrometheusResponseQueryableSamplesStatsPerStep)(ptr) = PrometheusResponseQueryableSamplesStatsPerStep{
TimestampMs: int64(t),
Value: v,
}
}
func PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
stats := (*PrometheusResponseQueryableSamplesStatsPerStep)(ptr)
stream.WriteArrayStart()
stream.WriteFloat64(float64(stats.TimestampMs) / float64(time.Second/time.Millisecond))
stream.WriteMore()
stream.WriteInt64(stats.Value)
stream.WriteArrayEnd()
}
func init() {
jsoniter.RegisterTypeEncoderFunc("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode, func(unsafe.Pointer) bool { return false })
jsoniter.RegisterTypeDecoderFunc("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode)
jsoniter.RegisterTypeEncoderFunc("tripperware.SampleStream", encodeSampleStream, func(unsafe.Pointer) bool { return false })
jsoniter.RegisterTypeDecoderFunc("tripperware.SampleStream", decodeSampleStream)
}
func EncodeTime(t int64) string {
f := float64(t) / 1.0e3
return strconv.FormatFloat(f, 'f', -1, 64)
}
// Buffer can be used to read a response body.
// This allows to avoid reading the body multiple times from the `http.Response.Body`.
type Buffer interface {
Bytes() []byte
}
func BodyBuffer(res *http.Response, logger log.Logger) ([]byte, error) {
var buf *bytes.Buffer
// Attempt to cast the response body to a Buffer and use it if possible.
// This is because the frontend may have already read the body and buffered it.
if buffer, ok := res.Body.(Buffer); ok {
buf = bytes.NewBuffer(buffer.Bytes())
} else {
// Preallocate the buffer with the exact size so we don't waste allocations
// while progressively growing an initial small buffer. The buffer capacity
// is increased by MinRead to avoid extra allocations due to how ReadFrom()
// internally works.
buf = bytes.NewBuffer(make([]byte, 0, res.ContentLength+bytes.MinRead))
if _, err := buf.ReadFrom(res.Body); err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}
}
// if the response is gziped, lets unzip it here
if strings.EqualFold(res.Header.Get("Content-Encoding"), "gzip") {
gReader, err := gzip.NewReader(buf)
if err != nil {
return nil, err
}
defer runutil.CloseWithLogOnErr(logger, gReader, "close gzip reader")
return io.ReadAll(gReader)
}
return buf.Bytes(), nil
}
func BodyBufferFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger) ([]byte, error) {
// if the response is gziped, lets unzip it here
headers := http.Header{}
for _, h := range res.Headers {
headers[h.Key] = h.Values
}
if strings.EqualFold(headers.Get("Content-Encoding"), "gzip") {
gReader, err := gzip.NewReader(bytes.NewBuffer(res.Body))
if err != nil {
return nil, err
}
defer runutil.CloseWithLogOnErr(logger, gReader, "close gzip reader")
return io.ReadAll(gReader)
}
return res.Body, nil
}
func StatsMerge(stats map[int64]*PrometheusResponseQueryableSamplesStatsPerStep) *PrometheusResponseStats {
keys := make([]int64, 0, len(stats))
for key := range stats {
keys = append(keys, key)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
result := &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}}
for _, key := range keys {
result.Samples.TotalQueryableSamplesPerStep = append(result.Samples.TotalQueryableSamplesPerStep, stats[key])
result.Samples.TotalQueryableSamples += stats[key].Value
}
return result
}