-
Notifications
You must be signed in to change notification settings - Fork 795
/
ref_cache.go
160 lines (130 loc) · 4.29 KB
/
ref_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package tsdb
import (
"sync"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"go.uber.org/atomic"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
)
const (
// DefaultRefCacheTTL is the default RefCache purge TTL. We use a reasonable
// value that should cover most use cases. The cache would be ineffective if
// the scrape interval of a series is greater than this TTL.
DefaultRefCacheTTL = 10 * time.Minute
numRefCacheStripes = 512
)
// RefCache is a single-tenant cache mapping a labels set with the reference
// ID in TSDB, in order to be able to append samples to the TSDB head without having
// to copy write request series labels each time (because the memory buffers used to
// unmarshal the write request is reused).
type RefCache struct {
// The cache is split into stripes, each one with a dedicated lock, in
// order to reduce lock contention.
stripes [numRefCacheStripes]refCacheStripe
}
// refCacheStripe holds a subset of the series references for a single tenant.
type refCacheStripe struct {
refsMu sync.RWMutex
refs map[model.Fingerprint][]refCacheEntry
}
// refCacheEntry holds a single series reference.
type refCacheEntry struct {
lbs labels.Labels
ref uint64
touchedAt atomic.Int64 // Unix nano time.
}
// NewRefCache makes a new RefCache.
func NewRefCache() *RefCache {
c := &RefCache{}
// Stripes are pre-allocated so that we only read on them and no lock is required.
for i := 0; i < numRefCacheStripes; i++ {
c.stripes[i].refs = map[model.Fingerprint][]refCacheEntry{}
}
return c
}
// Ref returns the cached series reference, and guarantees the input labels set
// is NOT retained.
func (c *RefCache) Ref(now time.Time, series labels.Labels) (uint64, bool) {
fp := client.Fingerprint(series)
stripeID := util.HashFP(fp) % numRefCacheStripes
return c.stripes[stripeID].ref(now, series, fp)
}
// SetRef sets/updates the cached series reference. The input labels set IS retained.
func (c *RefCache) SetRef(now time.Time, series labels.Labels, ref uint64) {
fp := client.Fingerprint(series)
stripeID := util.HashFP(fp) % numRefCacheStripes
c.stripes[stripeID].setRef(now, series, fp, ref)
}
// Purge removes expired entries from the cache. This function should be called
// periodically to avoid memory leaks.
func (c *RefCache) Purge(keepUntil time.Time) {
for s := 0; s < numRefCacheStripes; s++ {
c.stripes[s].purge(keepUntil)
}
}
func (s *refCacheStripe) ref(now time.Time, series labels.Labels, fp model.Fingerprint) (uint64, bool) {
s.refsMu.RLock()
defer s.refsMu.RUnlock()
entries, ok := s.refs[fp]
if !ok {
return 0, false
}
for ix := range entries {
if labels.Equal(entries[ix].lbs, series) {
// Since we use read-only lock, we need to use atomic update.
entries[ix].touchedAt.Store(now.UnixNano())
return entries[ix].ref, true
}
}
return 0, false
}
func (s *refCacheStripe) setRef(now time.Time, series labels.Labels, fp model.Fingerprint, ref uint64) {
s.refsMu.Lock()
defer s.refsMu.Unlock()
// Check if already exists within the entries.
for ix, entry := range s.refs[fp] {
if !labels.Equal(entry.lbs, series) {
continue
}
entry.ref = ref
entry.touchedAt.Store(now.UnixNano())
s.refs[fp][ix] = entry
return
}
// The entry doesn't exist, so we have to add a new one.
refCacheEntry := refCacheEntry{lbs: series, ref: ref}
refCacheEntry.touchedAt.Store(now.UnixNano())
s.refs[fp] = append(s.refs[fp], refCacheEntry)
}
func (s *refCacheStripe) purge(keepUntil time.Time) {
s.refsMu.Lock()
defer s.refsMu.Unlock()
keepUntilNanos := keepUntil.UnixNano()
for fp, entries := range s.refs {
// Since we do expect very few fingerprint collisions, we
// have an optimized implementation for the common case.
if len(entries) == 1 {
if entries[0].touchedAt.Load() < keepUntilNanos {
delete(s.refs, fp)
}
continue
}
// We have more entries, which means there's a collision,
// so we have to iterate over the entries.
for i := 0; i < len(entries); {
if entries[i].touchedAt.Load() < keepUntilNanos {
entries = append(entries[:i], entries[i+1:]...)
} else {
i++
}
}
// Either update or delete the entries in the map
if len(entries) == 0 {
delete(s.refs, fp)
} else {
s.refs[fp] = entries
}
}
}