-
-
Notifications
You must be signed in to change notification settings - Fork 37
/
taskcollectgarbage.go
134 lines (114 loc) · 3.59 KB
/
taskcollectgarbage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package orchestrator
import (
"context"
"fmt"
"time"
v1 "github.com/garethgeorge/backrest/gen/go/v1"
"go.uber.org/zap"
)
const (
gcStartupDelay = 5 * time.Second
gcInterval = 24 * time.Hour
// keep operations that are eligible for gc for 30 days OR up to a limit of 100 for any one plan.
// an operation is eligible for gc if:
// - it has no snapshot associated with it
// - it has a forgotten snapshot associated with it
gcHistoryAge = 30 * 24 * time.Hour
gcHistoryMaxCount = 1000
// keep stats operations for 1 year (they're small and useful for long term trends)
gcHistoryStatsAge = 365 * 24 * time.Hour
)
type CollectGarbageTask struct {
orchestrator *Orchestrator // owning orchestrator
firstRun bool
}
var _ Task = &CollectGarbageTask{}
func (t *CollectGarbageTask) Name() string {
return "collect garbage"
}
func (t *CollectGarbageTask) Next(now time.Time) *time.Time {
if !t.firstRun {
t.firstRun = true
runAt := now.Add(gcStartupDelay)
return &runAt
}
runAt := now.Add(gcInterval)
return &runAt
}
func (t *CollectGarbageTask) Run(ctx context.Context) error {
if err := t.gcOperations(); err != nil {
return fmt.Errorf("collecting garbage: %w", err)
}
return nil
}
func (t *CollectGarbageTask) gcOperations() error {
oplog := t.orchestrator.OpLog
// pass 1: identify forgotten snapshots.
snapshotIsForgotten := make(map[string]bool)
if err := oplog.ForAll(func(op *v1.Operation) error {
if snapshotOp, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok {
if snapshotOp.OperationIndexSnapshot.Forgot {
snapshotIsForgotten[snapshotOp.OperationIndexSnapshot.Snapshot.Id] = true
}
}
return nil
}); err != nil {
return fmt.Errorf("identifying forgotten snapshots: %w", err)
}
// pass 2: identify operations that are gc eligible
// - any operation that has no snapshot associated with it
// - any operation that has a forgotten snapshot associated with it
operationsByPlan := make(map[string][]gcOpInfo)
if err := oplog.ForAll(func(op *v1.Operation) error {
if op.SnapshotId == "" || snapshotIsForgotten[op.SnapshotId] {
_, isStats := op.Op.(*v1.Operation_OperationStats)
operationsByPlan[op.PlanId] = append(operationsByPlan[op.PlanId], gcOpInfo{
id: op.Id,
timestamp: op.UnixTimeStartMs,
isStats: isStats,
})
}
return nil
}); err != nil {
return fmt.Errorf("identifying gc eligible operations: %w", err)
}
var gcOps []int64
curTime := curTimeMillis()
for _, opInfos := range operationsByPlan {
if len(opInfos) >= gcHistoryMaxCount {
for _, opInfo := range opInfos[:len(opInfos)-gcHistoryMaxCount] {
gcOps = append(gcOps, opInfo.id)
}
opInfos = opInfos[len(opInfos)-gcHistoryMaxCount:]
}
// check if each operation timestamp is old.
for _, opInfo := range opInfos {
maxAgeForType := gcHistoryAge.Milliseconds()
if opInfo.isStats {
maxAgeForType = gcHistoryStatsAge.Milliseconds()
}
if curTime-opInfo.timestamp > maxAgeForType {
gcOps = append(gcOps, opInfo.id)
}
}
}
// pass 3: remove gc eligible operations
if err := oplog.Delete(gcOps...); err != nil {
return fmt.Errorf("removing gc eligible operations: %w", err)
}
zap.L().Info("collecting garbage",
zap.Int("forgotten_snapshots", len(snapshotIsForgotten)),
zap.Any("operations_removed", len(gcOps)))
return nil
}
func (t *CollectGarbageTask) Cancel(withStatus v1.OperationStatus) error {
return nil
}
func (t *CollectGarbageTask) OperationId() int64 {
return 0
}
type gcOpInfo struct {
id int64 // operation ID
timestamp int64 // unix time milliseconds
isStats bool // true if this is a stats operation
}