-
Notifications
You must be signed in to change notification settings - Fork 2
/
gatherer.go
439 lines (356 loc) 路 12.8 KB
/
gatherer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
// Copyright 2015-2023 Bleemeo
//
// bleemeo.com an infrastructure monitoring solution in the Cloud
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package registry
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/bleemeo/glouton/logger"
"github.com/bleemeo/glouton/prometheus/model"
"github.com/bleemeo/glouton/prometheus/registry/internal/ruler"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
prometheusModel "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/protobuf/proto"
)
const defaultGatherTimeout = 10 * time.Second
var (
errIncorrectType = errors.New("incorrect type for gathered metric family")
errGatherOnNilGatherer = errors.New("GatherWithState called on nil gatherer")
)
type queryType int
const (
// NoProbe is the default value. Probes can be very costly as it involves network calls, hence it is disabled by default.
NoProbe queryType = iota
// OnlyProbes specifies we only want data from the probes.
OnlyProbes
// All specifies we want all the data, including probes.
All
// FromStore use the store (queryable) to get recent stored data.
FromStore
)
// GatherState is an argument given to gatherers that support it. It allows us to give extra information
// to gatherers. Due to the way such objects are constructed when no argument is supplied (when calling
// Gather() on a GathererWithState, most of the time Gather() will directly call GatherWithState(GatherState{}),
// please make sure that default values are sensible. For example, NoProbe *must* be the default queryType, as
// we do not want queries on /metrics to always probe the collectors by default).
type GatherState struct {
QueryType queryType
// FromScrapeLoop tells whether the gathering is done by the periodic scrape loop or for /metrics endpoint
FromScrapeLoop bool
T0 time.Time
NoFilter bool
// HintMetricFilter is an optional filter that gather could use to skip metrics that would be filtered later.
// Nothing is mandatory: the HintMetricFilter could be nil and the gatherer could ignore HintMetricFilter even if non-nil.
// If used the filter should be applied after any label alteration.
HintMetricFilter func(lbls labels.Labels) bool
}
func (s GatherState) Now() time.Time {
if s.T0.IsZero() {
return time.Now()
}
return s.T0
}
// GatherStateFromMap creates a GatherState from a state passed as a map.
func GatherStateFromMap(params map[string][]string) GatherState {
state := GatherState{T0: time.Now()}
// TODO: add this in some user-facing documentation
if _, includeProbes := params["includeMonitors"]; includeProbes {
state.QueryType = All
}
// TODO: add this in some user-facing documentation
if _, excludeMetrics := params["onlyMonitors"]; excludeMetrics {
state.QueryType = OnlyProbes
}
if _, noFilter := params["noFilter"]; noFilter {
state.NoFilter = true
}
if _, fromStore := params["fromStore"]; fromStore {
state.QueryType = FromStore
}
return state
}
// GathererWithState is a generalization of prometheus.Gather.
type GathererWithState interface {
GatherWithState(ctx context.Context, state GatherState) ([]*dto.MetricFamily, error)
}
// GathererWithScheduleUpdate is a Gatherer that had a ScheduleUpdate (like Probe gatherer).
// The ScheduleUpdate could be used to trigger an additional gather earlier than default scrape interval.
type GathererWithScheduleUpdate interface {
SetScheduleUpdate(scheduleUpdate func(runAt time.Time))
}
// GathererWithStateWrapper is a wrapper around GathererWithState that allows to specify a state to forward
// to the wrapped gatherer when the caller does not know about GathererWithState and uses raw Gather().
// The main use case is the /metrics HTTP endpoint, where we want to be able to gather() only some metrics
// (e.g. all metrics/only probes/no probes).
// In the prometheus exporter endpoint, when receiving an request, the (user-provided)
// HTTP handler will:
// - create a new wrapper instance, generate a GatherState accordingly, and call wrapper.setState(newState).
// - pass the wrapper to a new prometheus HTTP handler.
// - when Gather() is called upon the wrapper by prometheus, the wrapper calls GathererWithState(newState)
// on its internal gatherer.
// GatherWithState also contains the metrics allow/deny list in order to sync the metrics on /metric
// with the metrics sent to the bleemeo platform.
type GathererWithStateWrapper struct {
gatherState GatherState
gatherer GathererWithState
filter metricFilter
ctx context.Context //nolint:containedctx
}
// NewGathererWithStateWrapper creates a new wrapper around GathererWithState.
func NewGathererWithStateWrapper(ctx context.Context, g GathererWithState, filter metricFilter) *GathererWithStateWrapper {
return &GathererWithStateWrapper{gatherer: g, filter: filter, ctx: ctx}
}
// SetState updates the state the wrapper will provide to its internal gatherer when called.
func (w *GathererWithStateWrapper) SetState(state GatherState) {
w.gatherState = state
}
// Gather implements prometheus.Gatherer for GathererWithStateWrapper.
func (w *GathererWithStateWrapper) Gather() ([]*dto.MetricFamily, error) {
res, err := w.gatherer.GatherWithState(w.ctx, w.gatherState)
if err != nil {
logger.V(2).Printf("Error during gather on /metrics: %v", err)
}
return res, err
}
type gatherModifier func(mfs []*dto.MetricFamily, gatherError error) []*dto.MetricFamily
// wrappedGatherer wraps a gatherer to apply Registry change and apply RegistrationOption.
// For example, it will add provided labels to all metrics and/or change timestamps.
type wrappedGatherer struct {
labels []*dto.LabelPair
opt RegistrationOption
ruler *ruler.SimpleRuler
source prometheus.Gatherer
l sync.Mutex
closed bool
running bool
cond *sync.Cond
}
func newWrappedGatherer(g prometheus.Gatherer, extraLabels labels.Labels, opt RegistrationOption) *wrappedGatherer {
labels := make([]*dto.LabelPair, 0, len(extraLabels))
for _, l := range extraLabels {
if !strings.HasPrefix(l.Name, prometheusModel.ReservedLabelPrefix) {
labels = append(labels, &dto.LabelPair{
Name: &l.Name, //nolint: gosec,exportloopref
Value: &l.Value, //nolint: gosec,exportloopref
})
}
}
var sruler *ruler.SimpleRuler
if len(opt.rrules) > 0 {
sruler = ruler.New(opt.rrules)
}
wrap := &wrappedGatherer{
source: g,
labels: labels,
ruler: sruler,
opt: opt,
}
wrap.cond = sync.NewCond(&wrap.l)
return wrap
}
func dtoLabelToMap(lbls []*dto.LabelPair) map[string]string {
result := make(map[string]string, len(lbls))
for _, l := range lbls {
result[l.GetName()] = l.GetValue()
}
return result
}
// Gather implements prometheus.Gather.
func (g *wrappedGatherer) Gather() ([]*dto.MetricFamily, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultGatherTimeout)
defer cancel()
return g.GatherWithState(ctx, GatherState{})
}
// GatherWithState implements GathererWithState.
func (g *wrappedGatherer) GatherWithState(ctx context.Context, state GatherState) ([]*dto.MetricFamily, error) {
g.l.Lock()
defer g.l.Unlock()
for g.running {
g.cond.Wait()
}
if g.closed || g.source == nil {
// Make sure to signal before exiting. If two gorouting were blocked on
// the condition, we need to make sure the other one get wake-up.
g.cond.Signal()
return nil, errGatherOnNilGatherer
}
// do not collect non-probes metrics when the user only wants probes
if _, probe := g.source.(*ProbeGatherer); !probe && state.QueryType == OnlyProbes {
// Make sure to signal before exiting. If two gorouting were blocked on
// the condition, we need to make sure the other one get wake-up.
g.cond.Signal()
return nil, nil
}
var (
mfs []*dto.MetricFamily
err error
)
now := time.Now().Truncate(time.Second)
if !state.T0.IsZero() {
now = state.T0
}
g.running = true
g.l.Unlock()
if cg, ok := g.source.(GathererWithState); ok {
mfs, err = cg.GatherWithState(ctx, state)
} else {
mfs, err = g.source.Gather()
}
g.l.Lock()
g.running = false
g.cond.Signal()
if g.ruler != nil {
mfs = g.ruler.ApplyRulesMFS(ctx, now, mfs)
}
if g.opt.GatherModifier != nil {
mfs = g.opt.GatherModifier(mfs, err)
}
if len(g.labels) == 0 {
return mfs, err
}
for _, mf := range mfs {
for i, m := range mf.GetMetric() {
m.Label = mergeLabelsDTO(m.GetLabel(), g.labels)
mf.Metric[i] = m
}
}
if !g.opt.HonorTimestamp {
forcedTimestamp := now.UnixMilli()
// CallForMetricsEndpoint is currently not implemented and it's
// always enable on Gatherer (e.g. we always call the Gather() method).
if g.opt.CallForMetricsEndpoint || true {
// If the callback is used for all invocation of /metrics,
// we can use "no timestamp" since metric points will be more recent
// data.
forcedTimestamp = 0
}
for _, mf := range mfs {
for _, m := range mf.GetMetric() {
if forcedTimestamp == 0 {
m.TimestampMs = nil
} else {
m.TimestampMs = proto.Int64(forcedTimestamp)
}
}
}
}
return mfs, err
}
// close waits for the current gather to finish and deletes the gatherer.
func (g *wrappedGatherer) close() {
g.l.Lock()
defer g.l.Unlock()
g.closed = true
}
func (g *wrappedGatherer) getSource() prometheus.Gatherer {
return g.source
}
// mergeLabels merge two sorted list of labels. In case of name conflict, value from b wins.
func mergeLabels(a labels.Labels, b []*dto.LabelPair) labels.Labels {
result := make(labels.Labels, 0, len(a)+len(b))
aIndex := 0
for _, bLabel := range b {
for aIndex < len(a) && a[aIndex].Name < bLabel.GetName() {
result = append(result, a[aIndex])
aIndex++
}
if aIndex < len(a) && a[aIndex].Name == bLabel.GetName() {
aIndex++
}
result = append(result, labels.Label{Name: bLabel.GetName(), Value: bLabel.GetValue()})
}
for aIndex < len(a) {
result = append(result, a[aIndex])
aIndex++
}
return result
}
// mergeLabelsDTO merge two sorted list of labels. In case of name conflict, value from b wins.
func mergeLabelsDTO(a []*dto.LabelPair, b []*dto.LabelPair) []*dto.LabelPair {
result := make([]*dto.LabelPair, 0, len(a)+len(b))
aIndex := 0
for _, bLabel := range b {
for aIndex < len(a) && a[aIndex].GetName() < bLabel.GetName() {
result = append(result, a[aIndex])
aIndex++
}
if aIndex < len(a) && a[aIndex].GetName() == bLabel.GetName() {
aIndex++
}
result = append(result, bLabel)
}
for aIndex < len(a) {
result = append(result, a[aIndex])
aIndex++
}
return result
}
type sliceGatherer []*dto.MetricFamily
// Gather implements Gatherer.
func (s sliceGatherer) Gather() ([]*dto.MetricFamily, error) {
return s, nil
}
// mergeMFS take a list of metric families where two entry might be from the same family.
// When two entry have the same family, the type must be the same.
// The help text could be different, only the first will be kept.
func mergeMFS(mfs []*dto.MetricFamily) ([]*dto.MetricFamily, error) {
metricFamiliesByName := map[string]*dto.MetricFamily{}
var errs prometheus.MultiError
for _, mf := range mfs {
existingMF, exists := metricFamiliesByName[mf.GetName()]
if exists {
switch {
case existingMF.GetType() == mf.GetType():
// Nothing to do.
case existingMF.GetType() == dto.MetricType_UNTYPED:
existingMF.Type = mf.Type //nolint:protogetter
for i, metric := range existingMF.GetMetric() {
model.FixType(metric, *mf.GetType().Enum())
existingMF.Metric[i] = metric
}
case mf.GetType() == dto.MetricType_UNTYPED:
mf.Type = existingMF.Type //nolint:protogetter
for i, metric := range mf.GetMetric() {
model.FixType(metric, *existingMF.GetType().Enum())
mf.Metric[i] = metric
}
case existingMF.GetType() != mf.GetType():
errs = append(errs, fmt.Errorf(
"%w: %s has type %s but should have %s", errIncorrectType,
mf.GetName(), mf.GetType(), existingMF.GetType(),
))
continue
}
} else {
existingMF = &dto.MetricFamily{}
existingMF.Name = proto.String(mf.GetName())
existingMF.Help = proto.String(mf.GetHelp())
existingMF.Type = mf.Type //nolint:protogetter
metricFamiliesByName[mf.GetName()] = existingMF
}
existingMF.Metric = append(existingMF.GetMetric(), mf.GetMetric()...)
}
result := make([]*dto.MetricFamily, 0, len(metricFamiliesByName))
for _, f := range metricFamiliesByName {
result = append(result, f)
}
return result, errs.MaybeUnwrap()
}