-
Notifications
You must be signed in to change notification settings - Fork 1
/
redis_mgr.go
172 lines (149 loc) · 3.87 KB
/
redis_mgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/* ######################################################################
# Author: (zfly1207@126.com)
# Created Time: 2020-10-29 21:07:46
# File Name: redis_mgr.go
# Description:
####################################################################### */
package redis
import (
"fmt"
"sync"
"time"
"github.com/ant-libs-go/config"
"github.com/ant-libs-go/config/options"
rds "github.com/gomodule/redigo/redis"
)
var (
once sync.Once
lock sync.RWMutex
pools map[string]*rds.Pool
)
func init() {
pools = map[string]*rds.Pool{}
}
type redisConfig struct {
Cfgs map[string]*Cfg `toml:"redis" antcfg:"redis"`
}
type Cfg struct {
// dial
DialAddr string `toml:"addr"`
DialUsername string `toml:"user"`
DialPassword string `toml:"pawd"`
DialDatabase int `toml:"database"`
DialConnectTimeout time.Duration `toml:"dial_timeout"`
DialReadTimeout time.Duration `toml:"read_timeout"`
DialWriteTimeout time.Duration `toml:"write_timeout"`
// pool
PoolMaxIdle int `toml:"pool_max_idle"` // 最大闲置连接数
PoolMaxActive int `toml:"pool_max_active"` // 最大活跃连接数
PoolIdleTimeout time.Duration `toml:"pool_idle_time"` // 闲置的过期时间,在Get方法中会对过期的连接删除
PoolWait bool `toml:"pool_wait"` // 当活跃连接达到上限,Get时是等待还是返回错误。为false时返回错误,为true时阻塞等待
PoolMaxConnLifetime time.Duration `toml:"pool_max_conn_life_time"` // 连接最长生存时间,当超过时间会被删除
}
// 验证Redis实例的配置正确性与连通性。
// 参数names是实例的名称列表,如果为空则检测所有配置的实例
func Valid(names ...string) (err error) {
if len(names) == 0 {
var cfgs map[string]*Cfg
if cfgs, err = loadCfgs(); err != nil {
return
}
for k, _ := range cfgs {
names = append(names, k)
}
}
for _, name := range names {
var cli rds.Conn
cli, err = SafeClient(name)
if err == nil {
defer cli.Close()
_, err = cli.Do("PING")
}
if err != nil {
err = fmt.Errorf("redis#%s is invalid, %s", name, err)
return
}
}
return
}
func DefaultClient() (r rds.Conn) {
return Client("default")
}
func DefaultPool() (r *rds.Pool) {
return Pool("default")
}
func Client(name string) (r rds.Conn) {
r = Pool(name).Get()
return
}
func SafeClient(name string) (r rds.Conn, err error) {
var pool *rds.Pool
pool, err = SafePool(name)
if err != nil {
return
}
r = pool.Get()
return
}
func Pool(name string) (r *rds.Pool) {
var err error
if r, err = getPool(name); err != nil {
panic(err)
}
return
}
func SafePool(name string) (r *rds.Pool, err error) {
return getPool(name)
}
func getPool(name string) (r *rds.Pool, err error) {
lock.RLock()
r = pools[name]
lock.RUnlock()
if r == nil {
r, err = addPool(name)
}
return
}
func addPool(name string) (r *rds.Pool, err error) {
var cfg *Cfg
if cfg, err = loadCfg(name); err != nil {
return
}
r = NewRedisPool(cfg)
lock.Lock()
pools[name] = r
lock.Unlock()
return
}
func loadCfg(name string) (r *Cfg, err error) {
var cfgs map[string]*Cfg
if cfgs, err = loadCfgs(); err != nil {
return
}
if r = cfgs[name]; r == nil {
err = fmt.Errorf("redis#%s not configed", name)
return
}
return
}
func loadCfgs() (r map[string]*Cfg, err error) {
r = map[string]*Cfg{}
once.Do(func() {
config.Get(&redisConfig{}, options.WithOpOnChangeFn(func(cfg interface{}) {
lock.Lock()
defer lock.Unlock()
pools = map[string]*rds.Pool{}
}))
})
cfg := config.Get(&redisConfig{}).(*redisConfig)
if err == nil && (cfg.Cfgs == nil || len(cfg.Cfgs) == 0) {
err = fmt.Errorf("not configed")
}
if err != nil {
err = fmt.Errorf("redis load cfgs error, %s", err)
return
}
r = cfg.Cfgs
return
}
// vim: set noexpandtab ts=4 sts=4 sw=4 :