-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache.go
332 lines (271 loc) · 8.55 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
package immunitycache
import (
"sync"
"github.com/bhagyaraj1208117/andes-core-go/core/atomic"
logger "github.com/bhagyaraj1208117/andes-logger-go"
"github.com/bhagyaraj1208117/andes-storage-go/common"
"github.com/bhagyaraj1208117/andes-storage-go/monitoring"
"github.com/bhagyaraj1208117/andes-storage-go/types"
)
var _ types.Cacher = (*ImmunityCache)(nil)
var log = logger.GetOrCreate("storage/immunitycache")
const hospitalityWarnThreshold = -10000
const hospitalityUpperLimit = 10000
const capacityReachedWarningPeriod = 100
// ImmunityCache is a cache-like structure
type ImmunityCache struct {
config CacheConfig
chunks []*immunityChunk
hospitality atomic.Counter
numCapacityReachedOccurrences atomic.Counter
mutex sync.RWMutex
}
// NewImmunityCache creates a new cache
func NewImmunityCache(config CacheConfig) (*ImmunityCache, error) {
log.Debug("NewImmunityCache", "config", config.String())
monitoring.MonitorNewCache(config.Name, uint64(config.MaxNumBytes))
err := config.Verify()
if err != nil {
return nil, err
}
cache := ImmunityCache{
config: config,
}
cache.initializeChunksWithLock()
return &cache, nil
}
func (ic *ImmunityCache) initializeChunksWithLock() {
ic.mutex.Lock()
defer ic.mutex.Unlock()
config := ic.config
chunkConfig := config.getChunkConfig()
ic.chunks = make([]*immunityChunk, config.NumChunks)
for i := uint32(0); i < config.NumChunks; i++ {
ic.chunks[i] = newImmunityChunk(chunkConfig)
}
}
// ImmunizeKeys marks items as immune to eviction
func (ic *ImmunityCache) ImmunizeKeys(keys [][]byte) (numNowTotal, numFutureTotal int) {
immuneItemsCapacityReached := ic.CountImmune()+len(keys) > int(ic.config.MaxNumItems)
if immuneItemsCapacityReached {
logLevel := ic.decideLogLevelOnCapacityReached()
log.Log(logLevel, "ImmunityCache.ImmunizeKeys(): will not immunize", "err", common.ErrImmuneItemsCapacityReached)
return
}
ic.forgetCapacityHadBeenReachedInThePast()
groups := ic.groupKeysByChunk(keys)
for chunkIndex, chunkKeys := range groups {
chunk := ic.getChunkByIndexWithLock(chunkIndex)
numNow, numFuture := chunk.ImmunizeKeys(chunkKeys)
numNowTotal += numNow
numFutureTotal += numFuture
}
return
}
func (ic *ImmunityCache) decideLogLevelOnCapacityReached() logger.LogLevel {
logLevel := logger.LogDebug
if ic.numCapacityReachedOccurrences.GetUint64()%capacityReachedWarningPeriod == 0 {
logLevel = logger.LogWarning
}
ic.numCapacityReachedOccurrences.Increment()
return logLevel
}
func (ic *ImmunityCache) forgetCapacityHadBeenReachedInThePast() {
ic.numCapacityReachedOccurrences.Reset()
}
func (ic *ImmunityCache) groupKeysByChunk(keys [][]byte) map[uint32][][]byte {
groups := make(map[uint32][][]byte)
for _, key := range keys {
chunkIndex := ic.getChunkIndexByKey(string(key))
groups[chunkIndex] = append(groups[chunkIndex], key)
}
return groups
}
func (ic *ImmunityCache) getChunkIndexByKey(key string) uint32 {
return fnv32Hash(key) % ic.config.NumChunks
}
// fnv32Hash implements https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function for 32 bits
func fnv32Hash(key string) uint32 {
hash := uint32(2166136261)
const prime32 = uint32(16777619)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}
func (ic *ImmunityCache) getChunkByIndexWithLock(index uint32) *immunityChunk {
ic.mutex.RLock()
defer ic.mutex.RUnlock()
return ic.chunks[index]
}
func (ic *ImmunityCache) getChunkByKeyWithLock(key string) *immunityChunk {
ic.mutex.RLock()
defer ic.mutex.RUnlock()
chunkIndex := ic.getChunkIndexByKey(key)
return ic.chunks[chunkIndex]
}
// Get gets an item (payload) by key
func (ic *ImmunityCache) Get(key []byte) (value interface{}, ok bool) {
item, ok := ic.getItem(key)
if ok {
return item.payload, true
}
return nil, false
}
// GetItem gets an item by key
func (ic *ImmunityCache) getItem(key []byte) (*cacheItem, bool) {
chunk := ic.getChunkByKeyWithLock(string(key))
return chunk.GetItem(string(key))
}
// Has checks is an item exists
func (ic *ImmunityCache) Has(key []byte) bool {
chunk := ic.getChunkByKeyWithLock(string(key))
_, ok := chunk.GetItem(string(key))
return ok
}
// Peek gets an item
func (ic *ImmunityCache) Peek(key []byte) (value interface{}, ok bool) {
return ic.Get(key)
}
// HasOrAdd adds an item in the cache
func (ic *ImmunityCache) HasOrAdd(key []byte, value interface{}, sizeInBytes int) (has, added bool) {
item := newCacheItem(value, string(key), sizeInBytes)
chunk := ic.getChunkByKeyWithLock(string(key))
has, added = chunk.AddItem(item)
if !has {
if added {
ic.hospitality.Increment()
} else {
ic.hospitality.Decrement()
}
}
return has, added
}
// Put adds an item in the cache
func (ic *ImmunityCache) Put(key []byte, value interface{}, sizeInBytes int) (evicted bool) {
ic.HasOrAdd(key, value, sizeInBytes)
return false
}
// Remove removes an item
func (ic *ImmunityCache) Remove(key []byte) {
_ = ic.RemoveWithResult(key)
}
// RemoveWithResult removes an item
// TODO: In the future, add this method to the "storage.Cacher" interface. EN-6739.
func (ic *ImmunityCache) RemoveWithResult(key []byte) bool {
chunk := ic.getChunkByKeyWithLock(string(key))
return chunk.RemoveItem(string(key))
}
// RemoveOldest is not implemented
func (ic *ImmunityCache) RemoveOldest() {
log.Error("ImmunityCache.RemoveOldest is not implemented")
}
// Clear clears the map
func (ic *ImmunityCache) Clear() {
// There is no need to explicitly remove each item for each chunk
// The garbage collector will remove the data from memory
ic.initializeChunksWithLock()
}
// MaxSize returns the capacity of the cache
func (ic *ImmunityCache) MaxSize() int {
return int(ic.config.MaxNumItems)
}
// Len is an alias for Count
func (ic *ImmunityCache) Len() int {
return ic.Count()
}
// SizeInBytesContained returns 0
func (ic *ImmunityCache) SizeInBytesContained() uint64 {
return 0
}
// Count returns the number of elements within the map
func (ic *ImmunityCache) Count() int {
count := 0
for _, chunk := range ic.getChunksWithLock() {
count += chunk.Count()
}
return count
}
func (ic *ImmunityCache) getChunksWithLock() []*immunityChunk {
ic.mutex.RLock()
defer ic.mutex.RUnlock()
return ic.chunks
}
// CountImmune returns the number of immunized (current or future) elements within the map
func (ic *ImmunityCache) CountImmune() int {
count := 0
for _, chunk := range ic.getChunksWithLock() {
count += chunk.CountImmune()
}
return count
}
// NumBytes estimates the size of the cache, in bytes
func (ic *ImmunityCache) NumBytes() int {
numBytes := 0
for _, chunk := range ic.getChunksWithLock() {
numBytes += chunk.NumBytes()
}
return numBytes
}
// Keys returns all keys
func (ic *ImmunityCache) Keys() [][]byte {
count := ic.Count()
keys := make([][]byte, 0, count)
for _, chunk := range ic.getChunksWithLock() {
keys = chunk.AppendKeys(keys)
}
return keys
}
// RegisterHandler is not implemented
func (ic *ImmunityCache) RegisterHandler(func(key []byte, value interface{}), string) {
log.Error("ImmunityCache.RegisterHandler is not implemented")
}
// UnRegisterHandler removes the handler from the list
func (ic *ImmunityCache) UnRegisterHandler(_ string) {
log.Error("ImmunityCache.UnRegisterHandler is not implemented")
}
// ForEachItem iterates over the items in the cache
func (ic *ImmunityCache) ForEachItem(function types.ForEachItem) {
for _, chunk := range ic.getChunksWithLock() {
chunk.ForEachItem(function)
}
}
// Diagnose displays a summary of the internal state of the cache
func (ic *ImmunityCache) Diagnose(_ bool) {
count := ic.Count()
countImmune := ic.CountImmune()
numBytes := ic.NumBytes()
hospitality := ic.hospitality.Get()
isNotHospitable := hospitality <= hospitalityWarnThreshold
if isNotHospitable {
// After emitting a Warn, we reset the hospitality indicator
log.Warn("ImmunityCache.Diagnose(): cache is not hospitable",
"name", ic.config.Name,
"count", count,
"countImmune", countImmune,
"numBytes", numBytes,
"hospitality", hospitality,
)
ic.hospitality.Reset()
return
}
if hospitality >= hospitalityUpperLimit {
ic.hospitality.Set(hospitalityUpperLimit)
}
log.Trace("ImmunityCache.Diagnose()",
"name", ic.config.Name,
"count", count,
"countImmune", countImmune,
"numBytes", numBytes,
"hospitality", hospitality,
)
}
// Close does nothing for this cacher implementation
func (ic *ImmunityCache) Close() error {
return nil
}
// IsInterfaceNil returns true if there is no value under the interface
func (ic *ImmunityCache) IsInterfaceNil() bool {
return ic == nil
}