/
active_series.go
219 lines (179 loc) · 5.54 KB
/
active_series.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package ingester
import (
"math"
"sync"
"time"
"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"
)
const (
numActiveSeriesStripes = 512
)
// ActiveSeries is keeping track of recently active series for a single tenant.
type ActiveSeries struct {
stripes [numActiveSeriesStripes]activeSeriesStripe
}
// activeSeriesStripe holds a subset of the series timestamps for a single tenant.
type activeSeriesStripe struct {
// Unix nanoseconds. Only used by purge. Zero = unknown.
// Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated
// without holding the lock -- hence the atomic).
oldestEntryTs atomic.Int64
mu sync.RWMutex
refs map[uint64][]activeSeriesEntry
active int // Number of active entries in this stripe. Only decreased during purge or clear.
}
// activeSeriesEntry holds a timestamp for single series.
type activeSeriesEntry struct {
lbs labels.Labels
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
}
func NewActiveSeries() *ActiveSeries {
c := &ActiveSeries{}
// Stripes are pre-allocated so that we only read on them and no lock is required.
for i := 0; i < numActiveSeriesStripes; i++ {
c.stripes[i].refs = map[uint64][]activeSeriesEntry{}
}
return c
}
// Updates series timestamp to 'now'. Function is called to make a copy of labels if entry doesn't exist yet.
func (c *ActiveSeries) UpdateSeries(series labels.Labels, hash uint64, now time.Time, labelsCopy func(labels.Labels) labels.Labels) {
stripeID := hash % numActiveSeriesStripes
c.stripes[stripeID].updateSeriesTimestamp(now, series, hash, labelsCopy)
}
// Purge removes expired entries from the cache. This function should be called
// periodically to avoid memory leaks.
func (c *ActiveSeries) Purge(keepUntil time.Time) {
for s := 0; s < numActiveSeriesStripes; s++ {
c.stripes[s].purge(keepUntil)
}
}
// nolint // Linter reports that this method is unused, but it is.
func (c *ActiveSeries) clear() {
for s := 0; s < numActiveSeriesStripes; s++ {
c.stripes[s].clear()
}
}
func (c *ActiveSeries) Active() int {
total := 0
for s := 0; s < numActiveSeriesStripes; s++ {
total += c.stripes[s].getActive()
}
return total
}
func (s *activeSeriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels, fingerprint uint64, labelsCopy func(labels.Labels) labels.Labels) {
nowNanos := now.UnixNano()
e := s.findEntryForSeries(fingerprint, series)
entryTimeSet := false
if e == nil {
e, entryTimeSet = s.findOrCreateEntryForSeries(fingerprint, series, nowNanos, labelsCopy)
}
if !entryTimeSet {
if prev := e.Load(); nowNanos > prev {
entryTimeSet = e.CompareAndSwap(prev, nowNanos)
}
}
if entryTimeSet {
for prevOldest := s.oldestEntryTs.Load(); nowNanos < prevOldest; {
// If recent purge already removed entries older than "oldest entry timestamp", setting this to 0 will make
// sure that next purge doesn't take the shortcut route.
if s.oldestEntryTs.CompareAndSwap(prevOldest, 0) {
break
}
}
}
}
func (s *activeSeriesStripe) findEntryForSeries(fingerprint uint64, series labels.Labels) *atomic.Int64 {
s.mu.RLock()
defer s.mu.RUnlock()
// Check if already exists within the entries.
for ix, entry := range s.refs[fingerprint] {
if labels.Equal(entry.lbs, series) {
return s.refs[fingerprint][ix].nanos
}
}
return nil
}
func (s *activeSeriesStripe) findOrCreateEntryForSeries(fingerprint uint64, series labels.Labels, nowNanos int64, labelsCopy func(labels.Labels) labels.Labels) (*atomic.Int64, bool) {
s.mu.Lock()
defer s.mu.Unlock()
// Check if already exists within the entries.
for ix, entry := range s.refs[fingerprint] {
if labels.Equal(entry.lbs, series) {
return s.refs[fingerprint][ix].nanos, false
}
}
s.active++
e := activeSeriesEntry{
lbs: labelsCopy(series),
nanos: atomic.NewInt64(nowNanos),
}
s.refs[fingerprint] = append(s.refs[fingerprint], e)
return e.nanos, true
}
// nolint // Linter reports that this method is unused, but it is.
func (s *activeSeriesStripe) clear() {
s.mu.Lock()
defer s.mu.Unlock()
s.oldestEntryTs.Store(0)
s.refs = map[uint64][]activeSeriesEntry{}
s.active = 0
}
func (s *activeSeriesStripe) purge(keepUntil time.Time) {
keepUntilNanos := keepUntil.UnixNano()
if oldest := s.oldestEntryTs.Load(); oldest > 0 && keepUntilNanos <= oldest {
// Nothing to do.
return
}
s.mu.Lock()
defer s.mu.Unlock()
active := 0
oldest := int64(math.MaxInt64)
for fp, entries := range s.refs {
// Since we do expect very few fingerprint collisions, we
// have an optimized implementation for the common case.
if len(entries) == 1 {
ts := entries[0].nanos.Load()
if ts < keepUntilNanos {
delete(s.refs, fp)
continue
}
active++
if ts < oldest {
oldest = ts
}
continue
}
// We have more entries, which means there's a collision,
// so we have to iterate over the entries.
for i := 0; i < len(entries); {
ts := entries[i].nanos.Load()
if ts < keepUntilNanos {
entries = append(entries[:i], entries[i+1:]...)
} else {
if ts < oldest {
oldest = ts
}
i++
}
}
// Either update or delete the entries in the map
if cnt := len(entries); cnt == 0 {
delete(s.refs, fp)
} else {
active += cnt
s.refs[fp] = entries
}
}
if oldest == math.MaxInt64 {
s.oldestEntryTs.Store(0)
} else {
s.oldestEntryTs.Store(oldest)
}
s.active = active
}
func (s *activeSeriesStripe) getActive() int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.active
}