-
Notifications
You must be signed in to change notification settings - Fork 0
/
shard.go
101 lines (83 loc) · 1.88 KB
/
shard.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package flache
import (
"sync"
"time"
)
type shard struct {
lock sync.Mutex
indices map[uint64]uint32
ringBuf *ringBuffer
}
func newShard(size int) *shard {
s := &shard{
indices: make(map[uint64]uint32),
ringBuf: newRingBuffer((size+defaultBlockSize-1)/defaultBlockSize*defaultBlockSize, defaultBlockSize),
}
return s
}
func (s *shard) get(key string, hashedKey uint64) ([]byte, error) {
s.lock.Lock()
defer s.lock.Unlock()
index, ok := s.indices[hashedKey]
if !ok {
return nil, ErrKeyNotFound
}
if s.isExpire(index) {
s.ringBuf.remove(index)
delete(s.indices, hashedKey)
return nil, ErrKeyNotFound
}
cachedKey := s.ringBuf.readKey(index)
if key != cachedKey {
return nil, ErrKeyNotFound
}
val := s.ringBuf.readVal(index)
s.ringBuf.moveToHead(index)
return val, nil
}
func (s *shard) isExpire(index uint32) bool {
expireAt := s.ringBuf.readExpireAt(index)
if expireAt.IsZero() {
return false
}
return expireAt.Before(time.Now())
}
func (s *shard) set(key string, hashedKey uint64, value []byte, expiration time.Duration) error {
s.lock.Lock()
defer s.lock.Unlock()
index, ok := s.indices[hashedKey]
if ok {
s.ringBuf.remove(index)
}
entry := newEntry(key, hashedKey, value, expiration)
if !s.ringBuf.hasEnoughSpace(entry) {
return ErrNotEnoughSpace
}
for !s.ringBuf.hasEnoughBlocks(entry) {
tail := s.ringBuf.getTail()
s.remove(tail)
}
index = s.ringBuf.write(entry)
s.ringBuf.insertHead(index)
s.indices[hashedKey] = index
return nil
}
func (s *shard) remove(index uint32) {
hashedKey := s.ringBuf.readHashedKey(index)
s.ringBuf.remove(index)
delete(s.indices, hashedKey)
}
func (s *shard) del(key string, hashedKey uint64) error {
s.lock.Lock()
defer s.lock.Unlock()
index, ok := s.indices[hashedKey]
if !ok {
return nil
}
cachedKey := s.ringBuf.readKey(index)
if key != cachedKey {
return nil
}
s.remove(index)
return nil
}