/
manager.go
307 lines (268 loc) · 7.81 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
package caches
import (
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/types"
"golang.org/x/sys/unix"
"strconv"
"sync"
)
var SharedManager = NewManager()
func init() {
if !teaconst.IsMain {
return
}
events.OnClose(func() {
remotelogs.Println("CACHE", "quiting cache manager")
SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{})
})
}
// Manager 缓存策略管理器
type Manager struct {
// 全局配置
MaxDiskCapacity *shared.SizeCapacity
MainDiskDir string
SubDiskDirs []*serverconfigs.CacheDir
MaxMemoryCapacity *shared.SizeCapacity
CountFileStorages int
CountMemoryStorages int
policyMap map[int64]*serverconfigs.HTTPCachePolicy // policyId => []*Policy
storageMap map[int64]StorageInterface // policyId => *Storage
locker sync.RWMutex
}
// NewManager 获取管理器对象
func NewManager() *Manager {
var m = &Manager{
policyMap: map[int64]*serverconfigs.HTTPCachePolicy{},
storageMap: map[int64]StorageInterface{},
}
return m
}
// UpdatePolicies 重新设置策略
func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy) {
this.locker.Lock()
defer this.locker.Unlock()
var newPolicyIds = []int64{}
for _, policy := range newPolicies {
// 使用节点单独的缓存目录
policy.UpdateDiskDir(this.MainDiskDir, this.SubDiskDirs)
newPolicyIds = append(newPolicyIds, policy.Id)
}
// 停止旧有的
for _, oldPolicy := range this.policyMap {
if !lists.ContainsInt64(newPolicyIds, oldPolicy.Id) {
remotelogs.Println("CACHE", "remove policy "+strconv.FormatInt(oldPolicy.Id, 10))
delete(this.policyMap, oldPolicy.Id)
storage, ok := this.storageMap[oldPolicy.Id]
if ok {
storage.Stop()
delete(this.storageMap, oldPolicy.Id)
}
}
}
// 启动新的
for _, newPolicy := range newPolicies {
_, ok := this.policyMap[newPolicy.Id]
if !ok {
remotelogs.Println("CACHE", "add policy "+strconv.FormatInt(newPolicy.Id, 10))
}
// 初始化
err := newPolicy.Init()
if err != nil {
remotelogs.Error("CACHE", "UpdatePolicies: init policy error: "+err.Error())
continue
}
this.policyMap[newPolicy.Id] = newPolicy
}
// 启动存储管理
for _, policy := range this.policyMap {
storage, ok := this.storageMap[policy.Id]
if !ok {
storage = this.NewStorageWithPolicy(policy)
if storage == nil {
remotelogs.Error("CACHE", "can not find storage type '"+policy.Type+"'")
continue
}
err := storage.Init()
if err != nil {
remotelogs.Error("CACHE", "UpdatePolicies: init storage failed: "+err.Error())
continue
}
this.storageMap[policy.Id] = storage
} else {
// 检查policy是否有变化
if !storage.Policy().IsSame(policy) {
// 检查是否可以直接修改
if storage.CanUpdatePolicy(policy) {
err := policy.Init()
if err != nil {
remotelogs.Error("CACHE", "reload policy '"+types.String(policy.Id)+"' failed: init policy failed: "+err.Error())
continue
}
remotelogs.Println("CACHE", "reload policy '"+types.String(policy.Id)+"'")
storage.UpdatePolicy(policy)
continue
}
remotelogs.Println("CACHE", "restart policy '"+types.String(policy.Id)+"'")
// 停止老的
storage.Stop()
delete(this.storageMap, policy.Id)
// 启动新的
storage = this.NewStorageWithPolicy(policy)
if storage == nil {
remotelogs.Error("CACHE", "can not find storage type '"+policy.Type+"'")
continue
}
err := storage.Init()
if err != nil {
remotelogs.Error("CACHE", "UpdatePolicies: init storage failed: "+err.Error())
continue
}
this.storageMap[policy.Id] = storage
}
}
}
this.CountFileStorages = 0
this.CountFileStorages = 0
for _, storage := range this.storageMap {
_, isFileStorage := storage.(*FileStorage)
this.CountMemoryStorages++
if isFileStorage {
this.CountFileStorages++
}
}
}
// FindPolicy 获取Policy信息
func (this *Manager) FindPolicy(policyId int64) *serverconfigs.HTTPCachePolicy {
this.locker.RLock()
defer this.locker.RUnlock()
return this.policyMap[policyId]
}
// FindStorageWithPolicy 根据策略ID查找存储
func (this *Manager) FindStorageWithPolicy(policyId int64) StorageInterface {
this.locker.RLock()
defer this.locker.RUnlock()
return this.storageMap[policyId]
}
// NewStorageWithPolicy 根据策略获取存储对象
func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy) StorageInterface {
switch policy.Type {
case serverconfigs.CachePolicyStorageFile:
return NewFileStorage(policy)
case serverconfigs.CachePolicyStorageMemory:
return NewMemoryStorage(policy, nil)
}
return nil
}
// StorageMap 获取已有的存储对象
func (this *Manager) StorageMap() map[int64]StorageInterface {
return this.storageMap
}
// TotalDiskSize 消耗的磁盘尺寸
func (this *Manager) TotalDiskSize() int64 {
this.locker.RLock()
defer this.locker.RUnlock()
var total = int64(0)
var sidMap = map[string]bool{} // partition sid => bool
for _, storage := range this.storageMap {
// 这里不能直接用 storage.TotalDiskSize() 相加,因为多个缓存策略缓存目录可能处在同一个分区目录下
fileStorage, ok := storage.(*FileStorage)
if ok {
var options = fileStorage.options // copy
if options != nil {
var dir = options.Dir // copy
if len(dir) == 0 {
continue
}
var stat = &unix.Statfs_t{}
err := unix.Statfs(dir, stat)
if err != nil {
continue
}
var sid = fmt.Sprintf("%d_%d", stat.Fsid.Val[0], stat.Fsid.Val[1])
if sidMap[sid] {
continue
}
sidMap[sid] = true
total += int64(stat.Blocks-stat.Bfree) * int64(stat.Bsize) // we add extra int64() for darwin
}
}
}
if total < 0 {
total = 0
}
return total
}
// TotalMemorySize 消耗的内存尺寸
func (this *Manager) TotalMemorySize() int64 {
this.locker.RLock()
defer this.locker.RUnlock()
total := int64(0)
for _, storage := range this.storageMap {
total += storage.TotalMemorySize()
}
return total
}
// FindAllCachePaths 所有缓存路径
func (this *Manager) FindAllCachePaths() []string {
this.locker.Lock()
defer this.locker.Unlock()
var result = []string{}
for _, policy := range this.policyMap {
if policy.Type == serverconfigs.CachePolicyStorageFile {
if policy.Options != nil {
dir, ok := policy.Options["dir"]
if ok {
var dirString = types.String(dir)
if len(dirString) > 0 {
result = append(result, dirString)
}
}
}
}
}
return result
}
// FindAllStorages 读取所有缓存存储
func (this *Manager) FindAllStorages() []StorageInterface {
this.locker.Lock()
defer this.locker.Unlock()
var storages = []StorageInterface{}
for _, storage := range this.storageMap {
storages = append(storages, storage)
}
return storages
}
// ScanGarbageCaches 清理目录中“失联”的缓存文件
func (this *Manager) ScanGarbageCaches(callback func(path string) error) error {
var storages = this.FindAllStorages()
for _, storage := range storages {
fileStorage, ok := storage.(*FileStorage)
if !ok {
continue
}
err := fileStorage.ScanGarbageCaches(callback)
if err != nil {
return err
}
}
return nil
}
// MaxSystemMemoryBytesPerStorage 计算单个策略能使用的系统最大内存
func (this *Manager) MaxSystemMemoryBytesPerStorage() int64 {
var count = this.CountMemoryStorages
if count < 1 {
count = 1
}
var resultBytes = int64(utils.SystemMemoryBytes()) / 3 / int64(count) // 1/3 of the system memory
if resultBytes < 1<<30 {
resultBytes = 1 << 30
}
return resultBytes
}