Skip to content

Commit 5f615bf

Browse files
authored
Improve memory performance (#195)
- Use an int64 instead of a time.Time struct to represent the time. - By default, include the cost of the storeItem in the cost calculation. Related to DGRAPH-1378
1 parent a1c354a commit 5f615bf

File tree

5 files changed

+104
-60
lines changed

5 files changed

+104
-60
lines changed

cache.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"sync"
2727
"sync/atomic"
2828
"time"
29+
"unsafe"
2930

3031
"github.com/dgraph-io/ristretto/z"
3132
)
@@ -37,6 +38,8 @@ var (
3738

3839
type itemCallback func(*Item)
3940

41+
const itemSize = int64(unsafe.Sizeof(storeItem{}))
42+
4043
// Cache is a thread-safe implementation of a hashmap with a TinyLFU admission
4144
// policy and a Sampled LFU eviction policy. You can use the same Cache instance
4245
// from as many goroutines as you want.
@@ -65,6 +68,9 @@ type Cache struct {
6568
stop chan struct{}
6669
// cost calculates cost from a value.
6770
cost func(value interface{}) int64
71+
// ignoreInternalCost dictates whether to ignore the cost of internally storing
72+
// the item in the cost calculation.
73+
ignoreInternalCost bool
6874
// cleanupTicker is used to periodically check for entries whose TTL has passed.
6975
cleanupTicker *time.Ticker
7076
// Metrics contains a running log of important statistics like hits, misses,
@@ -119,6 +125,11 @@ type Config struct {
119125
// is ran after Set is called for a new item or an item update with a cost
120126
// param of 0.
121127
Cost func(value interface{}) int64
128+
// IgnoreInternalCost set to true indicates to the cache that the cost of
129+
// internally storing the value should be ignored. This is useful when the
130+
// cost passed to set is not using bytes as units. Keep in mind that setting
131+
// this to true will increase the memory usage.
132+
IgnoreInternalCost bool
122133
}
123134

124135
type itemFlag byte
@@ -136,7 +147,7 @@ type Item struct {
136147
Conflict uint64
137148
Value interface{}
138149
Cost int64
139-
Expiration time.Time
150+
Expiration int64
140151
wg *sync.WaitGroup
141152
}
142153

@@ -152,14 +163,15 @@ func NewCache(config *Config) (*Cache, error) {
152163
}
153164
policy := newPolicy(config.NumCounters, config.MaxCost)
154165
cache := &Cache{
155-
store: newStore(),
156-
policy: policy,
157-
getBuf: newRingBuffer(policy, config.BufferItems),
158-
setBuf: make(chan *Item, setBufSize),
159-
keyToHash: config.KeyToHash,
160-
stop: make(chan struct{}),
161-
cost: config.Cost,
162-
cleanupTicker: time.NewTicker(time.Duration(bucketDurationSecs) * time.Second / 2),
166+
store: newStore(),
167+
policy: policy,
168+
getBuf: newRingBuffer(policy, config.BufferItems),
169+
setBuf: make(chan *Item, setBufSize),
170+
keyToHash: config.KeyToHash,
171+
stop: make(chan struct{}),
172+
cost: config.Cost,
173+
ignoreInternalCost: config.IgnoreInternalCost,
174+
cleanupTicker: time.NewTicker(time.Duration(bucketDurationSecs) * time.Second / 2),
163175
}
164176
cache.onExit = func(val interface{}) {
165177
if config.OnExit != nil && val != nil {
@@ -241,7 +253,7 @@ func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration
241253
return false
242254
}
243255

244-
var expiration time.Time
256+
var expiration int64
245257
switch {
246258
case ttl == 0:
247259
// No expiration.
@@ -250,7 +262,7 @@ func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration
250262
// Treat this a a no-op.
251263
return false
252264
default:
253-
expiration = time.Now().Add(ttl)
265+
expiration = time.Now().Add(ttl).Unix()
254266
}
255267

256268
keyHash, conflictHash := c.keyToHash(key)
@@ -393,6 +405,11 @@ func (c *Cache) processItems() {
393405
if i.Cost == 0 && c.cost != nil && i.flag != itemDelete {
394406
i.Cost = c.cost(i.Value)
395407
}
408+
if !c.ignoreInternalCost {
409+
// Add the cost of internally storing the object.
410+
i.Cost += itemSize
411+
}
412+
396413
switch i.flag {
397414
case itemNew:
398415
victims, added := c.policy.Add(i.Key, i.Cost)

cache_test.go

Lines changed: 56 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ var wait = time.Millisecond * 10
1919
func TestCacheKeyToHash(t *testing.T) {
2020
keyToHashCount := 0
2121
c, err := NewCache(&Config{
22-
NumCounters: 100,
23-
MaxCost: 10,
24-
BufferItems: 64,
22+
NumCounters: 10,
23+
MaxCost: 1000,
24+
BufferItems: 64,
25+
IgnoreInternalCost: true,
2526
KeyToHash: func(key interface{}) (uint64, uint64) {
2627
keyToHashCount++
2728
return z.KeyToHash(key)
@@ -150,9 +151,10 @@ func TestCacheProcessItems(t *testing.T) {
150151
m := &sync.Mutex{}
151152
evicted := make(map[uint64]struct{})
152153
c, err := NewCache(&Config{
153-
NumCounters: 100,
154-
MaxCost: 10,
155-
BufferItems: 64,
154+
NumCounters: 100,
155+
MaxCost: 10,
156+
BufferItems: 64,
157+
IgnoreInternalCost: true,
156158
Cost: func(value interface{}) int64 {
157159
return int64(value.(int))
158160
},
@@ -249,10 +251,11 @@ func TestCacheProcessItems(t *testing.T) {
249251

250252
func TestCacheGet(t *testing.T) {
251253
c, err := NewCache(&Config{
252-
NumCounters: 100,
253-
MaxCost: 10,
254-
BufferItems: 64,
255-
Metrics: true,
254+
NumCounters: 100,
255+
MaxCost: 10,
256+
BufferItems: 64,
257+
IgnoreInternalCost: true,
258+
Metrics: true,
256259
})
257260
require.NoError(t, err)
258261

@@ -299,10 +302,11 @@ func retrySet(t *testing.T, c *Cache, key, value int, cost int64, ttl time.Durat
299302

300303
func TestCacheSet(t *testing.T) {
301304
c, err := NewCache(&Config{
302-
NumCounters: 100,
303-
MaxCost: 10,
304-
BufferItems: 64,
305-
Metrics: true,
305+
NumCounters: 100,
306+
MaxCost: 10,
307+
IgnoreInternalCost: true,
308+
BufferItems: 64,
309+
Metrics: true,
306310
})
307311
require.NoError(t, err)
308312

@@ -333,13 +337,31 @@ func TestCacheSet(t *testing.T) {
333337
require.False(t, c.Set(1, 1, 1))
334338
}
335339

336-
func TestRecacheWithTTL(t *testing.T) {
340+
func TestCacheInternalCost(t *testing.T) {
337341
c, err := NewCache(&Config{
338342
NumCounters: 100,
339343
MaxCost: 10,
340344
BufferItems: 64,
341345
Metrics: true,
342346
})
347+
require.NoError(t, err)
348+
349+
// Get should return false because the cache's cost is too small to store the item
350+
// when accounting for the internal cost.
351+
c.SetWithTTL(1, 1, 1, 0)
352+
time.Sleep(wait)
353+
_, ok := c.Get(1)
354+
require.False(t, ok)
355+
}
356+
357+
func TestRecacheWithTTL(t *testing.T) {
358+
c, err := NewCache(&Config{
359+
NumCounters: 100,
360+
MaxCost: 10,
361+
IgnoreInternalCost: true,
362+
BufferItems: 64,
363+
Metrics: true,
364+
})
343365

344366
require.NoError(t, err)
345367

@@ -378,10 +400,11 @@ func TestCacheSetWithTTL(t *testing.T) {
378400
m := &sync.Mutex{}
379401
evicted := make(map[uint64]struct{})
380402
c, err := NewCache(&Config{
381-
NumCounters: 100,
382-
MaxCost: 10,
383-
BufferItems: 64,
384-
Metrics: true,
403+
NumCounters: 100,
404+
MaxCost: 10,
405+
IgnoreInternalCost: true,
406+
BufferItems: 64,
407+
Metrics: true,
385408
OnEvict: func(item *Item) {
386409
m.Lock()
387410
defer m.Unlock()
@@ -451,9 +474,10 @@ func TestCacheDel(t *testing.T) {
451474

452475
func TestCacheDelWithTTL(t *testing.T) {
453476
c, err := NewCache(&Config{
454-
NumCounters: 100,
455-
MaxCost: 10,
456-
BufferItems: 64,
477+
NumCounters: 100,
478+
MaxCost: 10,
479+
IgnoreInternalCost: true,
480+
BufferItems: 64,
457481
})
458482
require.NoError(t, err)
459483
retrySet(t, c, 3, 1, 1, 10*time.Second)
@@ -468,10 +492,11 @@ func TestCacheDelWithTTL(t *testing.T) {
468492

469493
func TestCacheClear(t *testing.T) {
470494
c, err := NewCache(&Config{
471-
NumCounters: 100,
472-
MaxCost: 10,
473-
BufferItems: 64,
474-
Metrics: true,
495+
NumCounters: 100,
496+
MaxCost: 10,
497+
IgnoreInternalCost: true,
498+
BufferItems: 64,
499+
Metrics: true,
475500
})
476501
require.NoError(t, err)
477502

@@ -493,10 +518,11 @@ func TestCacheClear(t *testing.T) {
493518

494519
func TestCacheMetrics(t *testing.T) {
495520
c, err := NewCache(&Config{
496-
NumCounters: 100,
497-
MaxCost: 10,
498-
BufferItems: 64,
499-
Metrics: true,
521+
NumCounters: 100,
522+
MaxCost: 10,
523+
IgnoreInternalCost: true,
524+
BufferItems: 64,
525+
Metrics: true,
500526
})
501527
require.NoError(t, err)
502528

store.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type storeItem struct {
2626
key uint64
2727
conflict uint64
2828
value interface{}
29-
expiration time.Time
29+
expiration int64
3030
}
3131

3232
// store is the interface fulfilled by all hash map implementations in this
@@ -39,7 +39,7 @@ type store interface {
3939
// Get returns the value associated with the key parameter.
4040
Get(uint64, uint64) (interface{}, bool)
4141
// Expiration returns the expiration time for this key.
42-
Expiration(uint64) time.Time
42+
Expiration(uint64) int64
4343
// Set adds the key-value pair to the Map or updates the value if it's
4444
// already present. The key-value pair is passed as a pointer to an
4545
// item object.
@@ -82,7 +82,7 @@ func (sm *shardedMap) Get(key, conflict uint64) (interface{}, bool) {
8282
return sm.shards[key%numShards].get(key, conflict)
8383
}
8484

85-
func (sm *shardedMap) Expiration(key uint64) time.Time {
85+
func (sm *shardedMap) Expiration(key uint64) int64 {
8686
return sm.shards[key%numShards].Expiration(key)
8787
}
8888

@@ -138,13 +138,13 @@ func (m *lockedMap) get(key, conflict uint64) (interface{}, bool) {
138138
}
139139

140140
// Handle expired items.
141-
if !item.expiration.IsZero() && time.Now().After(item.expiration) {
141+
if item.expiration != 0 && time.Now().Unix() > item.expiration {
142142
return nil, false
143143
}
144144
return item.value, true
145145
}
146146

147-
func (m *lockedMap) Expiration(key uint64) time.Time {
147+
func (m *lockedMap) Expiration(key uint64) int64 {
148148
m.RLock()
149149
defer m.RUnlock()
150150
return m.data[key].expiration
@@ -193,7 +193,7 @@ func (m *lockedMap) Del(key, conflict uint64) (uint64, interface{}) {
193193
return 0, nil
194194
}
195195

196-
if !item.expiration.IsZero() {
196+
if item.expiration != 0 {
197197
m.em.del(key, item.expiration)
198198
}
199199

stress_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ import (
1515

1616
func TestStressSetGet(t *testing.T) {
1717
c, err := NewCache(&Config{
18-
NumCounters: 1000,
19-
MaxCost: 100,
20-
BufferItems: 64,
21-
Metrics: true,
18+
NumCounters: 1000,
19+
MaxCost: 100,
20+
IgnoreInternalCost: true,
21+
BufferItems: 64,
22+
Metrics: true,
2223
})
2324
require.NoError(t, err)
2425

ttl.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ var (
2626
bucketDurationSecs = int64(5)
2727
)
2828

29-
func storageBucket(t time.Time) int64 {
30-
return (t.Unix() / bucketDurationSecs) + 1
29+
func storageBucket(t int64) int64 {
30+
return (t / bucketDurationSecs) + 1
3131
}
3232

33-
func cleanupBucket(t time.Time) int64 {
33+
func cleanupBucket(t int64) int64 {
3434
// The bucket to cleanup is always behind the storage bucket by one so that
3535
// no elements in that bucket (which might not have expired yet) are deleted.
3636
return storageBucket(t) - 1
@@ -51,13 +51,13 @@ func newExpirationMap() *expirationMap {
5151
}
5252
}
5353

54-
func (m *expirationMap) add(key, conflict uint64, expiration time.Time) {
54+
func (m *expirationMap) add(key, conflict uint64, expiration int64) {
5555
if m == nil {
5656
return
5757
}
5858

5959
// Items that don't expire don't need to be in the expiration map.
60-
if expiration.IsZero() {
60+
if expiration == 0 {
6161
return
6262
}
6363

@@ -73,7 +73,7 @@ func (m *expirationMap) add(key, conflict uint64, expiration time.Time) {
7373
b[key] = conflict
7474
}
7575

76-
func (m *expirationMap) update(key, conflict uint64, oldExpTime, newExpTime time.Time) {
76+
func (m *expirationMap) update(key, conflict uint64, oldExpTime, newExpTime int64) {
7777
if m == nil {
7878
return
7979
}
@@ -96,7 +96,7 @@ func (m *expirationMap) update(key, conflict uint64, oldExpTime, newExpTime time
9696
newBucket[key] = conflict
9797
}
9898

99-
func (m *expirationMap) del(key uint64, expiration time.Time) {
99+
func (m *expirationMap) del(key uint64, expiration int64) {
100100
if m == nil {
101101
return
102102
}
@@ -120,15 +120,15 @@ func (m *expirationMap) cleanup(store store, policy policy, onEvict itemCallback
120120
}
121121

122122
m.Lock()
123-
now := time.Now()
123+
now := time.Now().Unix()
124124
bucketNum := cleanupBucket(now)
125125
keys := m.buckets[bucketNum]
126126
delete(m.buckets, bucketNum)
127127
m.Unlock()
128128

129129
for key, conflict := range keys {
130130
// Sanity check. Verify that the store agrees that this key is expired.
131-
if store.Expiration(key).After(now) {
131+
if store.Expiration(key) > now {
132132
continue
133133
}
134134

0 commit comments

Comments
 (0)