-
Notifications
You must be signed in to change notification settings - Fork 0
/
tinygocache.go
133 lines (117 loc) · 2.76 KB
/
tinygocache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package tinyGoCache
import (
"fmt"
"github.com/Rand01ph/tinyGoCache/singleflight"
pb "github.com/Rand01ph/tinyGoCache/tinygocachepb"
"log"
"sync"
)
type Getter interface {
Get(key string) ([]byte, error)
}
// 实现Getter接口
type GetterFunc func(key string) ([]byte, error)
// 定义一个函数类型 F,并且实现接口 A 的方法,然后在这个方法中调用自己。
// 这是 Go 语言中将其他函数(参数返回值定义与 F 一致)转换为接口 A 的常用技巧。
func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}
// 核心数据结构
type Group struct {
name string
getter Getter // 缓存未命中时获取源数据的方法
mainCache cache // 并发缓存
peers PeerPicker
// 防止缓存击穿
loader *singleflight.Group
}
var (
mu sync.RWMutex
groups = make(map[string]*Group)
)
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil Getter")
}
mu.Lock()
defer mu.Unlock()
g := &Group{
name: name,
getter: getter,
mainCache: cache{
cacheBytes: cacheBytes,
},
loader: &singleflight.Group{},
}
groups[name] = g
return g
}
func GetGroup(name string) *Group {
mu.RLock()
g := groups[name]
mu.RUnlock()
return g
}
func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}
if v, ok := g.mainCache.get(key); ok {
log.Println("[tinyGoCache] hit")
return v, nil
}
return g.load(key)
}
func (g *Group) load(key string) (value ByteView, err error) {
viewi, err := g.loader.Do(key, func() (interface{}, error) {
if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok {
// 从远程节点获取缓存
if value, err = g.getFromPeer(peer, key); err == nil {
return value, nil
}
log.Println("[tinyGoCache] Failed to get from peer", err)
}
}
// 从本地节点获取缓存
return g.getLocally(key)
})
if err == nil {
return viewi.(ByteView), nil
}
return
}
// 从对应节点获取缓存
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
req := &pb.Request{
Group: g.name,
Key: key,
}
res := &pb.Response{}
err := peer.Get(req, res)
if err != nil {
return ByteView{}, err
}
return ByteView{b: res.Value}, nil
}
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{
b: cloneBytes(bytes),
}
g.populateCache(key, value)
return value, nil
}
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}
// 注册节点选择器
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}