forked from zeromicro/go-zero
/
cache.go
129 lines (107 loc) · 2.82 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package internal
import (
"fmt"
"log"
"time"
"github.com/dylanNew/go-zero/core/errorx"
"github.com/dylanNew/go-zero/core/hash"
"github.com/dylanNew/go-zero/core/syncx"
)
type (
Cache interface {
DelCache(keys ...string) error
GetCache(key string, v interface{}) error
SetCache(key string, v interface{}) error
SetCacheWithExpire(key string, v interface{}, expire time.Duration) error
Take(v interface{}, key string, query func(v interface{}) error) error
TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error
}
cacheCluster struct {
dispatcher *hash.ConsistentHash
errNotFound error
}
)
func NewCache(c ClusterConf, barrier syncx.SharedCalls, st *CacheStat, errNotFound error,
opts ...Option) Cache {
if len(c) == 0 || TotalWeights(c) <= 0 {
log.Fatal("no cache nodes")
}
if len(c) == 1 {
return NewCacheNode(c[0].NewRedis(), barrier, st, errNotFound, opts...)
}
dispatcher := hash.NewConsistentHash()
for _, node := range c {
cn := NewCacheNode(node.NewRedis(), barrier, st, errNotFound, opts...)
dispatcher.AddWithWeight(cn, node.Weight)
}
return cacheCluster{
dispatcher: dispatcher,
errNotFound: errNotFound,
}
}
func (cc cacheCluster) DelCache(keys ...string) error {
switch len(keys) {
case 0:
return nil
case 1:
key := keys[0]
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).DelCache(key)
default:
var be errorx.BatchError
nodes := make(map[interface{}][]string)
for _, key := range keys {
c, ok := cc.dispatcher.Get(key)
if !ok {
be.Add(fmt.Errorf("key %q not found", key))
continue
}
nodes[c] = append(nodes[c], key)
}
for c, ks := range nodes {
if err := c.(Cache).DelCache(ks...); err != nil {
be.Add(err)
}
}
return be.Err()
}
}
func (cc cacheCluster) GetCache(key string, v interface{}) error {
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).GetCache(key, v)
}
func (cc cacheCluster) SetCache(key string, v interface{}) error {
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).SetCache(key, v)
}
func (cc cacheCluster) SetCacheWithExpire(key string, v interface{}, expire time.Duration) error {
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).SetCacheWithExpire(key, v, expire)
}
func (cc cacheCluster) Take(v interface{}, key string, query func(v interface{}) error) error {
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).Take(v, key, query)
}
func (cc cacheCluster) TakeWithExpire(v interface{}, key string,
query func(v interface{}, expire time.Duration) error) error {
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).TakeWithExpire(v, key, query)
}