forked from influxdata/influxdb
/
cache.go
195 lines (170 loc) · 5.92 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package tsi1
import (
"container/list"
"sync"
"github.com/influxdata/influxdb/tsdb"
)
// TagValueSeriesIDCache is an LRU cache for series id sets associated with
// name -> key -> value mappings. The purpose of the cache is to provide
// efficient means to get sets of series ids that would otherwise involve merging
// many individual bitmaps at query time.
//
// When initialising a TagValueSeriesIDCache a capacity must be provided. When
// more than c items are added to the cache, the least recently used item is
// evicted from the cache.
//
// A TagValueSeriesIDCache comprises a linked list implementation to track the
// order by which items should be evicted from the cache, and a hashmap implementation
// to provide constant time retrievals of items from the cache.
type TagValueSeriesIDCache struct {
sync.RWMutex
cache map[string]map[string]map[string]*list.Element
evictor *list.List
capacity int
}
// NewTagValueSeriesIDCache returns a TagValueSeriesIDCache with capacity c.
func NewTagValueSeriesIDCache(c int) *TagValueSeriesIDCache {
return &TagValueSeriesIDCache{
cache: map[string]map[string]map[string]*list.Element{},
evictor: list.New(),
capacity: c,
}
}
// Get returns the SeriesIDSet associated with the {name, key, value} tuple if it
// exists.
func (c *TagValueSeriesIDCache) Get(name, key, value []byte) *tsdb.SeriesIDSet {
c.Lock()
defer c.Unlock()
return c.get(name, key, value)
}
func (c *TagValueSeriesIDCache) get(name, key, value []byte) *tsdb.SeriesIDSet {
if mmap, ok := c.cache[string(name)]; ok {
if tkmap, ok := mmap[string(key)]; ok {
if ele, ok := tkmap[string(value)]; ok {
c.evictor.MoveToFront(ele) // This now becomes most recently used.
return ele.Value.(*seriesIDCacheElement).SeriesIDSet
}
}
}
return nil
}
// exists returns true if the an item exists for the tuple {name, key, value}.
func (c *TagValueSeriesIDCache) exists(name, key, value []byte) bool {
if mmap, ok := c.cache[string(name)]; ok {
if tkmap, ok := mmap[string(key)]; ok {
_, ok := tkmap[string(value)]
return ok
}
}
return false
}
// addToSet adds x to the SeriesIDSet associated with the tuple {name, key, value}
// if it exists. This method takes a lock on the underlying SeriesIDSet.
//
// NB this does not count as an access on the set—therefore the set is not promoted
// within the LRU cache.
func (c *TagValueSeriesIDCache) addToSet(name, key, value []byte, x uint64) {
if mmap, ok := c.cache[string(name)]; ok {
if tkmap, ok := mmap[string(key)]; ok {
if ele, ok := tkmap[string(value)]; ok {
ss := ele.Value.(*seriesIDCacheElement).SeriesIDSet
if ss == nil {
ele.Value.(*seriesIDCacheElement).SeriesIDSet = tsdb.NewSeriesIDSet(x)
return
}
ele.Value.(*seriesIDCacheElement).SeriesIDSet.Add(x)
}
}
}
}
// measurementContainsSets returns true if there are sets cached for the provided measurement.
func (c *TagValueSeriesIDCache) measurementContainsSets(name []byte) bool {
_, ok := c.cache[string(name)]
return ok
}
// Put adds the SeriesIDSet to the cache under the tuple {name, key, value}. If
// the cache is at its limit, then the least recently used item is evicted.
func (c *TagValueSeriesIDCache) Put(name, key, value []byte, ss *tsdb.SeriesIDSet) {
c.Lock()
// Check under the write lock if the relevant item is now in the cache.
if c.exists(name, key, value) {
c.Unlock()
return
}
defer c.Unlock()
// Create list item, and add to the front of the eviction list.
listElement := c.evictor.PushFront(&seriesIDCacheElement{
name: name,
key: key,
value: value,
SeriesIDSet: ss,
})
// Add the listElement to the set of items.
if mmap, ok := c.cache[string(name)]; ok {
if tkmap, ok := mmap[string(key)]; ok {
if _, ok := tkmap[string(value)]; ok {
goto EVICT
}
// Add the set to the map
tkmap[string(value)] = listElement
goto EVICT
}
// No series set map for the tag key - first tag value for the tag key.
mmap[string(key)] = map[string]*list.Element{string(value): listElement}
goto EVICT
}
// No map for the measurement - first tag key for the measurment.
c.cache[string(name)] = map[string]map[string]*list.Element{
string(key): map[string]*list.Element{string(value): listElement},
}
EVICT:
c.checkEviction()
}
// Delete removes x from the tuple {name, key, value} if it exists.
// This method takes a lock on the underlying SeriesIDSet.
func (c *TagValueSeriesIDCache) Delete(name, key, value []byte, x uint64) {
c.Lock()
c.delete(name, key, value, x)
c.Unlock()
}
// delete removes x from the tuple {name, key, value} if it exists.
func (c *TagValueSeriesIDCache) delete(name, key, value []byte, x uint64) {
if mmap, ok := c.cache[string(name)]; ok {
if tkmap, ok := mmap[string(key)]; ok {
if ele, ok := tkmap[string(value)]; ok {
if ss := ele.Value.(*seriesIDCacheElement).SeriesIDSet; ss != nil {
ele.Value.(*seriesIDCacheElement).SeriesIDSet.Remove(x)
}
}
}
}
}
// checkEviction checks if the cache is too big, and evicts the least recently used
// item if it is.
func (c *TagValueSeriesIDCache) checkEviction() {
if c.evictor.Len() <= c.capacity {
return
}
e := c.evictor.Back() // Least recently used item.
listElement := e.Value.(*seriesIDCacheElement)
name := listElement.name
key := listElement.key
value := listElement.value
c.evictor.Remove(e) // Remove from evictor
delete(c.cache[string(name)][string(key)], string(value)) // Remove from hashmap of items.
// Check if there are no more tag values for the tag key.
if len(c.cache[string(name)][string(key)]) == 0 {
delete(c.cache[string(name)], string(key))
}
// Check there are no more tag keys for the measurement.
if len(c.cache[string(name)]) == 0 {
delete(c.cache, string(name))
}
}
// seriesIDCacheElement is an item stored within a cache.
type seriesIDCacheElement struct {
name []byte
key []byte
value []byte
SeriesIDSet *tsdb.SeriesIDSet
}