-
-
Notifications
You must be signed in to change notification settings - Fork 274
/
updater.go
286 lines (239 loc) Β· 8.15 KB
/
updater.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
package filterlists
import (
"context"
"fmt"
"sort"
"time"
"github.com/hashicorp/go-version"
"github.com/safing/portbase/database"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/log"
"github.com/safing/portbase/updater"
"github.com/tevino/abool"
)
var updateInProgress = abool.New()
// tryListUpdate wraps performUpdate but ensures the module's
// error state is correctly set or resolved.
func tryListUpdate(ctx context.Context) error {
err := performUpdate(ctx)
if err != nil {
if !isLoaded() {
module.Error(filterlistsDisabled, err.Error())
} else {
module.Warning(filterlistsUpdateFailed, err.Error())
}
return err
}
// if the module is in an error, warning or hint state resolve that right now.
module.Resolve(filterlistsDisabled)
module.Resolve(filterlistsStaleDataSurvived)
module.Resolve(filterlistsUpdateInProgress)
return nil
}
func performUpdate(ctx context.Context) error {
if !updateInProgress.SetToIf(false, true) {
log.Debugf("intel/filterlists: upgrade already in progress")
return nil
}
defer updateInProgress.UnSet()
module.Hint(filterlistsUpdateInProgress, filterlistsUpdateInProgressDescr)
upgradables, err := getUpgradableFiles()
if err != nil {
return err
}
log.Debugf("intel/filterlists: resources to update: %v", upgradables)
if len(upgradables) == 0 {
log.Debugf("intel/filterlists: ignoring update, latest version is already used")
return nil
}
cleanupRequired := false
filterToUpdate := defaultFilter
// perform the actual upgrade by processing each file
// in the returned order.
for idx, file := range upgradables {
log.Debugf("intel/filterlists: applying update (%d) %s version %s", idx, file.Identifier(), file.Version())
if file == baseFile {
if idx != 0 {
log.Warningf("intel/filterlists: upgrade order is wrong, base file needs to be updated first not at idx %d", idx)
// we still continue because after processing the base
// file everything is correct again, we just used some
// CPU and IO resources for nothing when processing
// the previous files.
}
cleanupRequired = true
// since we are processing a base update we will create our
// bloom filters from scratch.
filterToUpdate = newScopedBloom()
}
if err := processListFile(ctx, filterToUpdate, file); err != nil {
return fmt.Errorf("failed to process upgrade %s: %w", file.Identifier(), err)
}
}
if filterToUpdate != defaultFilter {
// replace the bloom filters in our default
// filter.
defaultFilter.replaceWith(filterToUpdate)
}
// from now on, the database is ready and can be used if
// it wasn't loaded yet.
if !isLoaded() {
close(filterListsLoaded)
}
if err := defaultFilter.saveToCache(); err != nil {
// just handle the error by logging as it's only consequence
// is that we will need to reprocess all files during the next
// start.
log.Errorf("intel/filterlists: failed to persist bloom filters in cache database: %s", err)
}
// if we processed the base file we need to perform
// some cleanup on filterlists entities that have not
// been updated now. Once we are done, start a worker
// for that purpose.
if cleanupRequired {
if err := module.RunWorker("filterlists:cleanup", removeAllObsoleteFilterEntries); err != nil {
// if we failed to remove all stale cache entries
// we abort now WITHOUT updating the database version. This means
// we'll try again during the next update.
module.Warning(filterlistsStaleDataSurvived, filterlistsStaleDataDescr)
return fmt.Errorf("failed to cleanup stale cache records: %w", err)
}
}
// try to save the highest version of our files.
highestVersion := upgradables[len(upgradables)-1]
if err := setCacheDatabaseVersion(highestVersion.Version()); err != nil {
log.Errorf("intel/filterlists: failed to save cache database version: %s", err)
} else {
log.Infof("intel/filterlists: successfully migrated cache database to %s", highestVersion.Version())
}
return nil
}
func removeAllObsoleteFilterEntries(_ context.Context) error {
log.Debugf("intel/filterlists: cleanup task started, removing obsolete filter list entries ...")
for {
done, err := removeObsoleteFilterEntries(10000)
if err != nil {
return err
}
if done {
return nil
}
}
}
func removeObsoleteFilterEntries(batchSize int) (bool, error) {
iter, err := cache.Query(
query.New(filterListKeyPrefix).Where(
// TODO(ppacher): remember the timestamp we started the last update
// and use that rather than "one hour ago"
query.Where("UpdatedAt", query.LessThan, time.Now().Add(-time.Hour).Unix()),
),
)
if err != nil {
return false, err
}
keys := make([]string, 0, batchSize)
var cnt int
for r := range iter.Next {
cnt++
keys = append(keys, r.Key())
if cnt == batchSize {
break
}
}
iter.Cancel()
for _, key := range keys {
if err := cache.Delete(key); err != nil {
log.Errorf("intel/filterlists: failed to remove stale cache entry %q: %s", key, err)
}
}
log.Debugf("intel/filterlists: successfully removed %d obsolete entries", cnt)
// if we removed less entries that the batch size we
// are done and no more entries exist
return cnt < batchSize, nil
}
// getUpgradableFiles returns a slice of filterlists files
// that should be updated. The files MUST be updated and
// processed in the returned order!
func getUpgradableFiles() ([]*updater.File, error) {
var updateOrder []*updater.File
cacheDBInUse := isLoaded()
if baseFile == nil || baseFile.UpgradeAvailable() || !cacheDBInUse {
var err error
baseFile, err = getFile(baseListFilePath)
if err != nil {
return nil, err
}
log.Tracef("intel/filterlists: base file needs update, selected version %s", baseFile.Version())
updateOrder = append(updateOrder, baseFile)
}
if intermediateFile == nil || intermediateFile.UpgradeAvailable() || !cacheDBInUse {
var err error
intermediateFile, err = getFile(intermediateListFilePath)
if err != nil && err != updater.ErrNotFound {
return nil, err
}
if err == nil {
log.Tracef("intel/filterlists: intermediate file needs update, selected version %s", intermediateFile.Version())
updateOrder = append(updateOrder, intermediateFile)
}
}
if urgentFile == nil || urgentFile.UpgradeAvailable() || !cacheDBInUse {
var err error
urgentFile, err = getFile(urgentListFilePath)
if err != nil && err != updater.ErrNotFound {
return nil, err
}
if err == nil {
log.Tracef("intel/filterlists: urgent file needs update, selected version %s", urgentFile.Version())
updateOrder = append(updateOrder, urgentFile)
}
}
return resolveUpdateOrder(updateOrder)
}
func resolveUpdateOrder(updateOrder []*updater.File) ([]*updater.File, error) {
// sort the update order by ascending version
sort.Sort(byAscVersion(updateOrder))
log.Tracef("intel/filterlists: order of updates: %v", updateOrder)
var cacheDBVersion *version.Version
if !isLoaded() {
cacheDBVersion, _ = version.NewSemver("v0.0.0")
} else {
var err error
cacheDBVersion, err = getCacheDatabaseVersion()
if err != nil {
if err != database.ErrNotFound {
log.Errorf("intel/filterlists: failed to get cache database version: %s", err)
}
cacheDBVersion, _ = version.NewSemver("v0.0.0")
}
}
startAtIdx := -1
for idx, file := range updateOrder {
ver, _ := version.NewSemver(file.Version())
log.Tracef("intel/filterlists: checking file with version %s against %s", ver, cacheDBVersion)
if ver.GreaterThan(cacheDBVersion) && (startAtIdx == -1 || file == baseFile) {
startAtIdx = idx
}
}
// if startAtIdx == -1 we don't have any upgradables to
// process.
if startAtIdx == -1 {
log.Tracef("intel/filterlists: nothing to process, latest version %s already in use", cacheDBVersion)
return nil, nil
}
// skip any files that are lower then the current cache db version
// or after which a base upgrade would be performed.
return updateOrder[startAtIdx:], nil
}
type byAscVersion []*updater.File
func (fs byAscVersion) Len() int { return len(fs) }
func (fs byAscVersion) Less(i, j int) bool {
vi, _ := version.NewSemver(fs[i].Version())
vj, _ := version.NewSemver(fs[j].Version())
return vi.LessThan(vj)
}
func (fs byAscVersion) Swap(i, j int) {
fi := fs[i]
fj := fs[j]
fs[i] = fj
fs[j] = fi
}