-
-
Notifications
You must be signed in to change notification settings - Fork 576
/
cache.go
69 lines (61 loc) · 1.51 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package rule
import (
"hash/fnv"
"sync/atomic"
"time"
)
type cacheItem struct {
channel string
value channelOptionsResult
expires int64
}
type cacheShard struct {
index int32
size int32
buffer []atomic.Value
}
type rollingCache struct {
shards []*cacheShard
}
func newRollingCache(shardSize int, shardCount int) *rollingCache {
rc := &rollingCache{
shards: make([]*cacheShard, shardCount),
}
for i := range rc.shards {
shard := &cacheShard{
size: int32(shardSize),
buffer: make([]atomic.Value, shardSize),
}
for j := 0; j < shardSize; j++ {
shard.buffer[j].Store(cacheItem{}) // Initialize with zero value.
}
rc.shards[i] = shard
}
return rc
}
func (c *rollingCache) shardForKey(key string) *cacheShard {
h := fnv.New64a()
_, _ = h.Write([]byte(key))
shardIndex := h.Sum64() % uint64(len(c.shards))
return c.shards[shardIndex]
}
func (c *rollingCache) Get(channel string) (channelOptionsResult, bool) {
shard := c.shardForKey(channel)
for i := 0; i < int(shard.size); i++ {
item := shard.buffer[i].Load().(cacheItem)
if item.channel == channel && time.Now().UnixNano() < item.expires {
return item.value, true
}
}
return channelOptionsResult{}, false
}
func (c *rollingCache) Set(channel string, value channelOptionsResult, ttl time.Duration) {
shard := c.shardForKey(channel)
index := int(atomic.AddInt32(&shard.index, 1) % shard.size)
item := cacheItem{
channel: channel,
value: value,
expires: time.Now().Add(ttl).UnixNano(),
}
shard.buffer[index].Store(item)
}