-
Notifications
You must be signed in to change notification settings - Fork 786
/
user_metrics_metadata.go
110 lines (92 loc) · 3.3 KB
/
user_metrics_metadata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package ingester
import (
"sync"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util/validation"
)
// userMetricsMetadata allows metric metadata of a tenant to be held by the ingester.
// Metadata is kept as a set as it can come from multiple targets that Prometheus scrapes
// with the same metric name.
type userMetricsMetadata struct {
limiter *Limiter
metrics *ingesterMetrics
userID string
mtx sync.RWMutex
metricToMetadata map[string]metricMetadataSet
}
func newMetadataMap(l *Limiter, m *ingesterMetrics, userID string) *userMetricsMetadata {
return &userMetricsMetadata{
metricToMetadata: map[string]metricMetadataSet{},
limiter: l,
metrics: m,
userID: userID,
}
}
func (mm *userMetricsMetadata) add(metric string, metadata *cortexpb.MetricMetadata) error {
mm.mtx.Lock()
defer mm.mtx.Unlock()
// As we get the set, we also validate two things:
// 1. The user is allowed to create new metrics to add metadata to.
// 2. If the metadata set is already present, it hasn't reached the limit of metadata we can append.
set, ok := mm.metricToMetadata[metric]
if !ok {
// Verify that the user can create more metric metadata given we don't have a set for that metric name.
if err := mm.limiter.AssertMaxMetricsWithMetadataPerUser(mm.userID, len(mm.metricToMetadata)); err != nil {
validation.DiscardedMetadata.WithLabelValues(mm.userID, perUserMetadataLimit).Inc()
return makeLimitError(perUserMetadataLimit, mm.limiter.FormatError(mm.userID, err))
}
set = metricMetadataSet{}
mm.metricToMetadata[metric] = set
}
if err := mm.limiter.AssertMaxMetadataPerMetric(mm.userID, len(set)); err != nil {
validation.DiscardedMetadata.WithLabelValues(mm.userID, perMetricMetadataLimit).Inc()
return makeMetricLimitError(perMetricMetadataLimit, labels.FromStrings(labels.MetricName, metric), mm.limiter.FormatError(mm.userID, err))
}
// if we have seen this metadata before, it is a no-op and we don't need to change our metrics.
_, ok = set[*metadata]
if !ok {
mm.metrics.memMetadata.Inc()
mm.metrics.memMetadataCreatedTotal.WithLabelValues(mm.userID).Inc()
}
mm.metricToMetadata[metric][*metadata] = time.Now()
return nil
}
// If deadline is zero, all metadata is purged.
func (mm *userMetricsMetadata) purge(deadline time.Time) {
mm.mtx.Lock()
defer mm.mtx.Unlock()
var deleted int
for m, s := range mm.metricToMetadata {
deleted += s.purge(deadline)
if len(s) <= 0 {
delete(mm.metricToMetadata, m)
}
}
mm.metrics.memMetadata.Sub(float64(deleted))
mm.metrics.memMetadataRemovedTotal.WithLabelValues(mm.userID).Add(float64(deleted))
}
func (mm *userMetricsMetadata) toClientMetadata() []*cortexpb.MetricMetadata {
mm.mtx.RLock()
defer mm.mtx.RUnlock()
r := make([]*cortexpb.MetricMetadata, 0, len(mm.metricToMetadata))
for _, set := range mm.metricToMetadata {
for m := range set {
r = append(r, &m)
}
}
return r
}
type metricMetadataSet map[cortexpb.MetricMetadata]time.Time
// If deadline is zero time, all metrics are purged.
func (mms metricMetadataSet) purge(deadline time.Time) int {
var deleted int
for metadata, t := range mms {
if deadline.IsZero() || deadline.After(t) {
delete(mms, metadata)
deleted++
}
}
return deleted
}