Skip to content

Commit 18e2797

Browse files
authored
Make item public. Add a new onReject call for rejected items. (#180)
- Making Item public makes the onEvict and onReject function calls more readable. - Adding onReject allows us to tightly track every Set that happens, so we can avoid memory leaks in manually allocated memory.
1 parent 7822bcf commit 18e2797

File tree

6 files changed

+174
-162
lines changed

6 files changed

+174
-162
lines changed

cache.go

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ var (
3434
setBufSize = 32 * 1024
3535
)
3636

37-
type onEvictFunc func(uint64, uint64, interface{}, int64)
37+
type itemCallback func(*Item)
3838

3939
// Cache is a thread-safe implementation of a hashmap with a TinyLFU admission
4040
// policy and a Sampled LFU eviction policy. You can use the same Cache instance
@@ -49,9 +49,11 @@ type Cache struct {
4949
getBuf *ringBuffer
5050
// setBuf is a buffer allowing us to batch/drop Sets during times of high
5151
// contention.
52-
setBuf chan *item
52+
setBuf chan *Item
5353
// onEvict is called for item evictions.
54-
onEvict onEvictFunc
54+
onEvict itemCallback
55+
// onReject is called when an item is rejected via admission policy.
56+
onReject itemCallback
5557
// KeyToHash function is used to customize the key hashing algorithm.
5658
// Each key will be hashed using the provided function. If keyToHash value
5759
// is not set, the default keyToHash function is used.
@@ -99,7 +101,9 @@ type Config struct {
99101
Metrics bool
100102
// OnEvict is called for every eviction and passes the hashed key, value,
101103
// and cost to the function.
102-
OnEvict func(key, conflict uint64, value interface{}, cost int64)
104+
OnEvict func(item *Item)
105+
// OnReject is called for every rejection done via the policy.
106+
OnReject func(item *Item)
103107
// KeyToHash function is used to customize the key hashing algorithm.
104108
// Each key will be hashed using the provided function. If keyToHash value
105109
// is not set, the default keyToHash function is used.
@@ -118,14 +122,14 @@ const (
118122
itemUpdate
119123
)
120124

121-
// item is passed to setBuf so items can eventually be added to the cache.
122-
type item struct {
125+
// Item is passed to setBuf so items can eventually be added to the cache.
126+
type Item struct {
123127
flag itemFlag
124-
key uint64
125-
conflict uint64
126-
value interface{}
127-
cost int64
128-
expiration time.Time
128+
Key uint64
129+
Conflict uint64
130+
Value interface{}
131+
Cost int64
132+
Expiration time.Time
129133
}
130134

131135
// NewCache returns a new Cache instance and any configuration errors, if any.
@@ -143,8 +147,9 @@ func NewCache(config *Config) (*Cache, error) {
143147
store: newStore(),
144148
policy: policy,
145149
getBuf: newRingBuffer(policy, config.BufferItems),
146-
setBuf: make(chan *item, setBufSize),
150+
setBuf: make(chan *Item, setBufSize),
147151
onEvict: config.OnEvict,
152+
onReject: config.OnReject,
148153
keyToHash: config.KeyToHash,
149154
stop: make(chan struct{}),
150155
cost: config.Cost,
@@ -216,13 +221,13 @@ func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration
216221
}
217222

218223
keyHash, conflictHash := c.keyToHash(key)
219-
i := &item{
224+
i := &Item{
220225
flag: itemNew,
221-
key: keyHash,
222-
conflict: conflictHash,
223-
value: value,
224-
cost: cost,
225-
expiration: expiration,
226+
Key: keyHash,
227+
Conflict: conflictHash,
228+
Value: value,
229+
Cost: cost,
230+
Expiration: expiration,
226231
}
227232
// cost is eventually updated. The expiration must also be immediately updated
228233
// to prevent items from being prematurely removed from the map.
@@ -257,10 +262,10 @@ func (c *Cache) Del(key interface{}) {
257262
// So we must push the same item to `setBuf` with the deletion flag.
258263
// This ensures that if a set is followed by a delete, it will be
259264
// applied in the correct order.
260-
c.setBuf <- &item{
265+
c.setBuf <- &Item{
261266
flag: itemDelete,
262-
key: keyHash,
263-
conflict: conflictHash,
267+
Key: keyHash,
268+
Conflict: conflictHash,
264269
}
265270
}
266271

@@ -314,29 +319,31 @@ func (c *Cache) processItems() {
314319
select {
315320
case i := <-c.setBuf:
316321
// Calculate item cost value if new or update.
317-
if i.cost == 0 && c.cost != nil && i.flag != itemDelete {
318-
i.cost = c.cost(i.value)
322+
if i.Cost == 0 && c.cost != nil && i.flag != itemDelete {
323+
i.Cost = c.cost(i.Value)
319324
}
320325
switch i.flag {
321326
case itemNew:
322-
victims, added := c.policy.Add(i.key, i.cost)
327+
victims, added := c.policy.Add(i.Key, i.Cost)
323328
if added {
324329
c.store.Set(i)
325-
c.Metrics.add(keyAdd, i.key, 1)
330+
c.Metrics.add(keyAdd, i.Key, 1)
331+
} else if c.onReject != nil {
332+
c.onReject(i)
326333
}
327334
for _, victim := range victims {
328-
victim.conflict, victim.value = c.store.Del(victim.key, 0)
335+
victim.Conflict, victim.Value = c.store.Del(victim.Key, 0)
329336
if c.onEvict != nil {
330-
c.onEvict(victim.key, victim.conflict, victim.value, victim.cost)
337+
c.onEvict(victim)
331338
}
332339
}
333340

334341
case itemUpdate:
335-
c.policy.Update(i.key, i.cost)
342+
c.policy.Update(i.Key, i.Cost)
336343

337344
case itemDelete:
338-
c.policy.Del(i.key) // Deals with metrics updates.
339-
c.store.Del(i.key, i.conflict)
345+
c.policy.Del(i.Key) // Deals with metrics updates.
346+
c.store.Del(i.Key, i.Conflict)
340347
}
341348
case <-c.cleanupTicker.C:
342349
c.store.Cleanup(c.policy, c.onEvict)

cache_test.go

Lines changed: 49 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,10 @@ func TestCacheProcessItems(t *testing.T) {
155155
Cost: func(value interface{}) int64 {
156156
return int64(value.(int))
157157
},
158-
OnEvict: func(key, conflict uint64, value interface{}, cost int64) {
158+
OnEvict: func(item *Item) {
159159
m.Lock()
160160
defer m.Unlock()
161-
evicted[key] = struct{}{}
161+
evicted[item.Key] = struct{}{}
162162
},
163163
})
164164
require.NoError(t, err)
@@ -167,33 +167,33 @@ func TestCacheProcessItems(t *testing.T) {
167167
var conflict uint64
168168

169169
key, conflict = z.KeyToHash(1)
170-
c.setBuf <- &item{
170+
c.setBuf <- &Item{
171171
flag: itemNew,
172-
key: key,
173-
conflict: conflict,
174-
value: 1,
175-
cost: 0,
172+
Key: key,
173+
Conflict: conflict,
174+
Value: 1,
175+
Cost: 0,
176176
}
177177
time.Sleep(wait)
178178
require.True(t, c.policy.Has(1))
179179
require.Equal(t, int64(1), c.policy.Cost(1))
180180

181181
key, conflict = z.KeyToHash(1)
182-
c.setBuf <- &item{
182+
c.setBuf <- &Item{
183183
flag: itemUpdate,
184-
key: key,
185-
conflict: conflict,
186-
value: 2,
187-
cost: 0,
184+
Key: key,
185+
Conflict: conflict,
186+
Value: 2,
187+
Cost: 0,
188188
}
189189
time.Sleep(wait)
190190
require.Equal(t, int64(2), c.policy.Cost(1))
191191

192192
key, conflict = z.KeyToHash(1)
193-
c.setBuf <- &item{
193+
c.setBuf <- &Item{
194194
flag: itemDelete,
195-
key: key,
196-
conflict: conflict,
195+
Key: key,
196+
Conflict: conflict,
197197
}
198198
time.Sleep(wait)
199199
key, conflict = z.KeyToHash(1)
@@ -203,36 +203,36 @@ func TestCacheProcessItems(t *testing.T) {
203203
require.False(t, c.policy.Has(1))
204204

205205
key, conflict = z.KeyToHash(2)
206-
c.setBuf <- &item{
206+
c.setBuf <- &Item{
207207
flag: itemNew,
208-
key: key,
209-
conflict: conflict,
210-
value: 2,
211-
cost: 3,
208+
Key: key,
209+
Conflict: conflict,
210+
Value: 2,
211+
Cost: 3,
212212
}
213213
key, conflict = z.KeyToHash(3)
214-
c.setBuf <- &item{
214+
c.setBuf <- &Item{
215215
flag: itemNew,
216-
key: key,
217-
conflict: conflict,
218-
value: 3,
219-
cost: 3,
216+
Key: key,
217+
Conflict: conflict,
218+
Value: 3,
219+
Cost: 3,
220220
}
221221
key, conflict = z.KeyToHash(4)
222-
c.setBuf <- &item{
222+
c.setBuf <- &Item{
223223
flag: itemNew,
224-
key: key,
225-
conflict: conflict,
226-
value: 3,
227-
cost: 3,
224+
Key: key,
225+
Conflict: conflict,
226+
Value: 3,
227+
Cost: 3,
228228
}
229229
key, conflict = z.KeyToHash(5)
230-
c.setBuf <- &item{
230+
c.setBuf <- &Item{
231231
flag: itemNew,
232-
key: key,
233-
conflict: conflict,
234-
value: 3,
235-
cost: 5,
232+
Key: key,
233+
Conflict: conflict,
234+
Value: 3,
235+
Cost: 5,
236236
}
237237
time.Sleep(wait)
238238
m.Lock()
@@ -243,7 +243,7 @@ func TestCacheProcessItems(t *testing.T) {
243243
require.NotNil(t, recover())
244244
}()
245245
c.Close()
246-
c.setBuf <- &item{flag: itemNew}
246+
c.setBuf <- &Item{flag: itemNew}
247247
}
248248

249249
func TestCacheGet(t *testing.T) {
@@ -256,10 +256,10 @@ func TestCacheGet(t *testing.T) {
256256
require.NoError(t, err)
257257

258258
key, conflict := z.KeyToHash(1)
259-
i := item{
260-
key: key,
261-
conflict: conflict,
262-
value: 1,
259+
i := Item{
260+
Key: key,
261+
Conflict: conflict,
262+
Value: 1,
263263
}
264264
c.store.Set(&i)
265265
val, ok := c.Get(1)
@@ -315,12 +315,12 @@ func TestCacheSet(t *testing.T) {
315315
c.stop <- struct{}{}
316316
for i := 0; i < setBufSize; i++ {
317317
key, conflict := z.KeyToHash(1)
318-
c.setBuf <- &item{
318+
c.setBuf <- &Item{
319319
flag: itemUpdate,
320-
key: key,
321-
conflict: conflict,
322-
value: 1,
323-
cost: 1,
320+
Key: key,
321+
Conflict: conflict,
322+
Value: 1,
323+
Cost: 1,
324324
}
325325
}
326326
require.False(t, c.Set(2, 2, 1))
@@ -381,10 +381,10 @@ func TestCacheSetWithTTL(t *testing.T) {
381381
MaxCost: 10,
382382
BufferItems: 64,
383383
Metrics: true,
384-
OnEvict: func(key, conflict uint64, value interface{}, cost int64) {
384+
OnEvict: func(item *Item) {
385385
m.Lock()
386386
defer m.Unlock()
387-
evicted[key] = struct{}{}
387+
evicted[item.Key] = struct{}{}
388388
},
389389
})
390390
require.NoError(t, err)
@@ -655,8 +655,8 @@ func TestDropUpdates(t *testing.T) {
655655
MaxCost: 10,
656656
BufferItems: 64,
657657
Metrics: true,
658-
OnEvict: func(_, _ uint64, value interface{}, _ int64) {
659-
handler(nil, value)
658+
OnEvict: func(item *Item) {
659+
handler(nil, item.Value)
660660
},
661661
})
662662
require.NoError(t, err)

policy.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type policy interface {
3838
// Add attempts to Add the key-cost pair to the Policy. It returns a slice
3939
// of evicted keys and a bool denoting whether or not the key-cost pair
4040
// was added. If it returns true, the key should be stored in cache.
41-
Add(uint64, int64) ([]*item, bool)
41+
Add(uint64, int64) ([]*Item, bool)
4242
// Has returns true if the key exists in the Policy.
4343
Has(uint64) bool
4444
// Del deletes the key from the Policy.
@@ -121,7 +121,7 @@ func (p *defaultPolicy) Push(keys []uint64) bool {
121121
// Add decides whether the item with the given key and cost should be accepted by
122122
// the policy. It returns the list of victims that have been evicted and a boolean
123123
// indicating whether the incoming item should be accepted.
124-
func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) {
124+
func (p *defaultPolicy) Add(key uint64, cost int64) ([]*Item, bool) {
125125
p.Lock()
126126
defer p.Unlock()
127127

@@ -155,7 +155,7 @@ func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) {
155155
// O(lg N).
156156
sample := make([]*policyPair, 0, lfuSample)
157157
// As items are evicted they will be appended to victims.
158-
victims := make([]*item, 0)
158+
victims := make([]*Item, 0)
159159

160160
// Delete victims until there's enough space or a minKey is found that has
161161
// more hits than incoming item.
@@ -185,10 +185,10 @@ func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) {
185185
sample[minId] = sample[len(sample)-1]
186186
sample = sample[:len(sample)-1]
187187
// Store victim in evicted victims slice.
188-
victims = append(victims, &item{
189-
key: minKey,
190-
conflict: 0,
191-
cost: minCost,
188+
victims = append(victims, &Item{
189+
Key: minKey,
190+
Conflict: 0,
191+
Cost: minCost,
192192
})
193193
}
194194

0 commit comments

Comments
 (0)