-
Notifications
You must be signed in to change notification settings - Fork 796
/
mapper.go
161 lines (131 loc) · 3.89 KB
/
mapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package ruler
import (
"crypto/md5"
"net/url"
"path/filepath"
"sort"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/spf13/afero"
"gopkg.in/yaml.v3"
)
// mapper is designed to enusre the provided rule sets are identical
// to the on-disk rules tracked by the prometheus manager
type mapper struct {
Path string // Path specifies the directory in which rule files will be mapped.
FS afero.Fs
logger log.Logger
}
func newMapper(path string, logger log.Logger) *mapper {
m := &mapper{
Path: path,
FS: afero.NewOsFs(),
logger: logger,
}
m.cleanup()
return m
}
func (m *mapper) cleanupUser(userID string) {
dirPath := filepath.Join(m.Path, userID)
err := m.FS.RemoveAll(dirPath)
if err != nil {
level.Warn(m.logger).Log("msg", "unable to remove user directory", "path", dirPath, "err", err)
}
}
// cleanup removes all of the user directories in the path of the mapper
func (m *mapper) cleanup() {
level.Info(m.logger).Log("msg", "cleaning up mapped rules directory", "path", m.Path)
users, err := m.users()
if err != nil {
level.Error(m.logger).Log("msg", "unable to read rules directory", "path", m.Path, "err", err)
return
}
for _, u := range users {
m.cleanupUser(u)
}
}
func (m *mapper) users() ([]string, error) {
var result []string
dirs, err := afero.ReadDir(m.FS, m.Path)
for _, u := range dirs {
if u.IsDir() {
result = append(result, u.Name())
}
}
return result, err
}
func (m *mapper) MapRules(user string, ruleConfigs map[string][]rulefmt.RuleGroup) (bool, []string, error) {
anyUpdated := false
filenames := []string{}
// user rule files will be stored as `/<path>/<userid>/<encoded filename>`
path := filepath.Join(m.Path, user)
err := m.FS.MkdirAll(path, 0777)
if err != nil {
return false, nil, err
}
// write all rule configs to disk
for filename, groups := range ruleConfigs {
// Store the encoded file name to better handle `/` characters
encodedFileName := url.PathEscape(filename)
fullFileName := filepath.Join(path, encodedFileName)
fileUpdated, err := m.writeRuleGroupsIfNewer(groups, fullFileName)
if err != nil {
return false, nil, err
}
filenames = append(filenames, fullFileName)
anyUpdated = anyUpdated || fileUpdated
}
// and clean any up that shouldn't exist
existingFiles, err := afero.ReadDir(m.FS, path)
if err != nil {
return false, nil, err
}
for _, existingFile := range existingFiles {
fullFileName := filepath.Join(path, existingFile.Name())
// Ensure the namespace is decoded from a url path encoding to see if it is still required
decodedNamespace, err := url.PathUnescape(existingFile.Name())
if err != nil {
level.Warn(m.logger).Log("msg", "unable to remove rule file on disk", "file", fullFileName, "err", err)
continue
}
ruleGroups := ruleConfigs[string(decodedNamespace)]
if ruleGroups == nil {
err = m.FS.Remove(fullFileName)
if err != nil {
level.Warn(m.logger).Log("msg", "unable to remove rule file on disk", "file", fullFileName, "err", err)
}
anyUpdated = true
}
}
return anyUpdated, filenames, nil
}
func (m *mapper) writeRuleGroupsIfNewer(groups []rulefmt.RuleGroup, filename string) (bool, error) {
sort.Slice(groups, func(i, j int) bool {
return groups[i].Name > groups[j].Name
})
rgs := rulefmt.RuleGroups{Groups: groups}
d, err := yaml.Marshal(&rgs)
if err != nil {
return false, err
}
_, err = m.FS.Stat(filename)
if err == nil {
current, err := afero.ReadFile(m.FS, filename)
if err != nil {
return false, err
}
newHash := md5.New()
currentHash := md5.New()
// bailout if there is no update
if string(currentHash.Sum(current)) == string(newHash.Sum(d)) {
return false, nil
}
}
level.Info(m.logger).Log("msg", "updating rule file", "file", filename)
err = afero.WriteFile(m.FS, filename, d, 0777)
if err != nil {
return false, err
}
return true, nil
}