This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
/
garbage_collector.go
193 lines (172 loc) · 7.3 KB
/
garbage_collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package controller
import (
"context"
"runtime/pprof"
"time"
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"strings"
"github.com/flyteorg/flytepropeller/pkg/client/clientset/versioned/typed/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)
type gcMetrics struct {
gcRoundSuccess labeled.Counter
gcRoundFailure labeled.Counter
gcTime labeled.StopWatch
}
// GarbageCollector is an active background cleanup service, that deletes all workflows that are completed and older
// than the configured TTL
type GarbageCollector struct {
wfClient v1alpha1.FlyteworkflowV1alpha1Interface
namespaceClient corev1.NamespaceInterface
ttlHours int
interval time.Duration
clk clock.Clock
metrics *gcMetrics
namespace string
labelSelectorRequirements []v1.LabelSelectorRequirement
}
// Issues a background deletion command with label selector for all completed workflows outside of the retention period
func (g *GarbageCollector) deleteWorkflows(ctx context.Context) error {
s := CompletedWorkflowsSelectorOutsideRetentionPeriod(g.ttlHours, g.clk.Now())
if len(g.labelSelectorRequirements) != 0 {
s.MatchExpressions = append(s.MatchExpressions, g.labelSelectorRequirements...)
}
// Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" {
namespaceList, err := g.namespaceClient.List(ctx, v1.ListOptions{})
if err != nil {
return err
}
for _, n := range namespaceList.Items {
namespaceCtx := contextutils.WithNamespace(ctx, n.GetName())
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", n.GetName())
if err := g.deleteWorkflowsForNamespace(ctx, n.GetName(), s); err != nil {
g.metrics.gcRoundFailure.Inc(namespaceCtx)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", n.GetName(), err)
} else {
g.metrics.gcRoundSuccess.Inc(namespaceCtx)
}
}
} else {
namespaceCtx := contextutils.WithNamespace(ctx, g.namespace)
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", g.namespace)
if err := g.deleteWorkflowsForNamespace(ctx, g.namespace, s); err != nil {
g.metrics.gcRoundFailure.Inc(namespaceCtx)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", g.namespace, err)
} else {
g.metrics.gcRoundSuccess.Inc(namespaceCtx)
}
}
return nil
}
// Deprecated: Please use deleteWorkflows instead
func (g *GarbageCollector) deprecatedDeleteWorkflows(ctx context.Context) error {
s := DeprecatedCompletedWorkflowsSelectorOutsideRetentionPeriod(g.ttlHours, g.clk.Now())
if len(g.labelSelectorRequirements) != 0 {
s.MatchExpressions = append(s.MatchExpressions, g.labelSelectorRequirements...)
}
// Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" {
namespaceList, err := g.namespaceClient.List(ctx, v1.ListOptions{})
if err != nil {
return err
}
for _, n := range namespaceList.Items {
namespaceCtx := contextutils.WithNamespace(ctx, n.GetName())
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", n.GetName())
if err := g.deleteWorkflowsForNamespace(ctx, n.GetName(), s); err != nil {
g.metrics.gcRoundFailure.Inc(namespaceCtx)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", n.GetName(), err)
} else {
g.metrics.gcRoundSuccess.Inc(namespaceCtx)
}
}
} else {
namespaceCtx := contextutils.WithNamespace(ctx, g.namespace)
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", g.namespace)
if err := g.deleteWorkflowsForNamespace(ctx, g.namespace, s); err != nil {
g.metrics.gcRoundFailure.Inc(namespaceCtx)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", g.namespace, err)
} else {
g.metrics.gcRoundSuccess.Inc(namespaceCtx)
}
}
return nil
}
func (g *GarbageCollector) deleteWorkflowsForNamespace(ctx context.Context, namespace string, labelSelector *v1.LabelSelector) error {
gracePeriodZero := int64(0)
propagation := v1.DeletePropagationBackground
return g.wfClient.FlyteWorkflows(namespace).DeleteCollection(
ctx,
v1.DeleteOptions{
GracePeriodSeconds: &gracePeriodZero,
PropagationPolicy: &propagation,
},
v1.ListOptions{
LabelSelector: v1.FormatLabelSelector(labelSelector),
},
)
}
// runGC runs GC periodically
func (g *GarbageCollector) runGC(ctx context.Context, ticker clock.Ticker) {
logger.Infof(ctx, "Background workflow garbage collection started, with duration [%s], TTL [%d] hours", g.interval.String(), g.ttlHours)
ctx = contextutils.WithGoroutineLabel(ctx, "gc-worker")
pprof.SetGoroutineLabels(ctx)
defer ticker.Stop()
for {
select {
case <-ticker.C():
logger.Infof(ctx, "Garbage collector running...")
t := g.metrics.gcTime.Start(ctx)
if err := g.deprecatedDeleteWorkflows(ctx); err != nil {
logger.Errorf(ctx, "Garbage collection failed in this round.Error : [%v]", err)
}
if err := g.deleteWorkflows(ctx); err != nil {
logger.Errorf(ctx, "Garbage collection failed in this round.Error : [%v]", err)
}
t.Stop()
case <-ctx.Done():
logger.Infof(ctx, "Garbage collector stopping")
return
}
}
}
// StartGC starts a background garbage collection routine. Use the context to signal an exit signal
func (g *GarbageCollector) StartGC(ctx context.Context) error {
if g.ttlHours <= 0 {
logger.Warningf(ctx, "Garbage collector is disabled, as ttl [%d] is <=0", g.ttlHours)
return nil
}
ticker := g.clk.NewTicker(g.interval)
go g.runGC(ctx, ticker)
return nil
}
func NewGarbageCollector(cfg *config.Config, scope promutils.Scope, clk clock.Clock, namespaceClient corev1.NamespaceInterface, wfClient v1alpha1.FlyteworkflowV1alpha1Interface) (*GarbageCollector, error) {
ttl := 23
if cfg.MaxTTLInHours < 23 {
ttl = cfg.MaxTTLInHours
} else {
logger.Warningf(context.TODO(), "defaulting max ttl for workflows to 23 hours, since configured duration is larger than 23 [%d]", cfg.MaxTTLInHours)
}
labelSelectorRequirements := getShardedLabelSelectorRequirements(cfg)
return &GarbageCollector{
wfClient: wfClient,
ttlHours: ttl,
interval: cfg.GCInterval.Duration,
namespaceClient: namespaceClient,
metrics: &gcMetrics{
gcTime: labeled.NewStopWatch("gc_latency", "time taken to issue a delete for TTL'ed workflows", time.Millisecond, scope),
gcRoundSuccess: labeled.NewCounter("gc_success", "successful executions of delete request", scope),
gcRoundFailure: labeled.NewCounter("gc_failure", "failure to delete workflows", scope),
},
clk: clk,
namespace: cfg.LimitNamespace,
labelSelectorRequirements: labelSelectorRequirements,
}, nil
}