-
Notifications
You must be signed in to change notification settings - Fork 3.2k
/
artifact_gc.go
657 lines (571 loc) · 24 KB
/
artifact_gc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
package controller
import (
"context"
"fmt"
"hash/fnv"
"sort"
"golang.org/x/exp/maps"
apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/env"
"k8s.io/utils/pointer"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/slice"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
)
const artifactGCComponent = "artifact-gc"
// artifactGCEnabled is a feature flag to globally disabled artifact GC in case of emergency
var artifactGCEnabled, _ = env.GetBool("ARGO_ARTIFACT_GC_ENABLED", true)
func (woc *wfOperationCtx) garbageCollectArtifacts(ctx context.Context) error {
if !artifactGCEnabled {
return nil
}
if woc.wf.Status.ArtifactGCStatus == nil {
woc.wf.Status.ArtifactGCStatus = &wfv1.ArtGCStatus{}
}
// only do Artifact GC if we have a Finalizer for it (i.e. Artifact GC is configured for this Workflow
// and there's work left to do for it)
if !slice.ContainsString(woc.wf.Finalizers, common.FinalizerArtifactGC) {
if woc.wf.Status.ArtifactGCStatus.NotSpecified {
return nil // we already verified it's not required for this workflow
}
if woc.execWf.HasArtifactGC() {
woc.log.Info("adding artifact GC finalizer")
finalizers := append(woc.wf.GetFinalizers(), common.FinalizerArtifactGC)
woc.wf.SetFinalizers(finalizers)
woc.wf.Status.ArtifactGCStatus.NotSpecified = false
} else {
woc.wf.Status.ArtifactGCStatus.NotSpecified = true
}
return nil
}
// based on current state of Workflow, which Artifact GC Strategies can be processed now?
strategies := woc.artifactGCStrategiesReady()
for strategy := range strategies {
woc.log.Debugf("processing Artifact GC Strategy %s", strategy)
err := woc.processArtifactGCStrategy(ctx, strategy)
if err != nil {
return err
}
}
err := woc.processArtifactGCCompletion(ctx)
if err != nil {
return err
}
return nil
}
// which ArtifactGC Strategies are ready to process?
func (woc *wfOperationCtx) artifactGCStrategiesReady() map[wfv1.ArtifactGCStrategy]struct{} {
strategies := map[wfv1.ArtifactGCStrategy]struct{}{} // essentially a Set
if woc.wf.Labels[common.LabelKeyCompleted] == "true" || woc.wf.DeletionTimestamp != nil {
if !woc.wf.Status.ArtifactGCStatus.IsArtifactGCStrategyProcessed(wfv1.ArtifactGCOnWorkflowCompletion) {
strategies[wfv1.ArtifactGCOnWorkflowCompletion] = struct{}{}
}
}
if woc.wf.DeletionTimestamp != nil {
if !woc.wf.Status.ArtifactGCStatus.IsArtifactGCStrategyProcessed(wfv1.ArtifactGCOnWorkflowDeletion) {
strategies[wfv1.ArtifactGCOnWorkflowDeletion] = struct{}{}
}
}
// todo: look at implementing "OnWorkflowSuccessOrDeletion" and "OnWorkflowFailureOrDeletion" instead of these:
/*
if woc.wf.Status.Successful() {
if !woc.wf.Status.ArtifactGCStatus.IsArtifactGCStrategyProcessed(wfv1.ArtifactGCOnWorkflowSuccess) {
strategies[wfv1.ArtifactGCOnWorkflowSuccess] = struct{}{}
}
}
if woc.wf.Status.Failed() {
if !woc.wf.Status.ArtifactGCStatus.IsArtifactGCStrategyProcessed(wfv1.ArtifactGCOnWorkflowFailure) {
strategies[wfv1.ArtifactGCOnWorkflowFailure] = struct{}{}
}
}*/
return strategies
}
type templatesToArtifacts map[string]wfv1.ArtifactSearchResults
// Artifact GC Strategy is ready: start up Pods to handle it
func (woc *wfOperationCtx) processArtifactGCStrategy(ctx context.Context, strategy wfv1.ArtifactGCStrategy) error {
defer func() {
woc.wf.Status.ArtifactGCStatus.SetArtifactGCStrategyProcessed(strategy, true)
woc.updated = true
}()
var err error
woc.log.Debugf("processing Artifact GC Strategy %s", strategy)
// Search for artifacts
artifactSearchResults := woc.wf.SearchArtifacts(&wfv1.ArtifactSearchQuery{
ArtifactGCStrategies: map[wfv1.ArtifactGCStrategy]bool{strategy: true},
Deleted: pointer.BoolPtr(false),
NodeTypes: map[wfv1.NodeType]bool{wfv1.NodeTypePod: true},
})
if len(artifactSearchResults) == 0 {
woc.log.Debugf("No Artifact Search Results returned from strategy %s", strategy)
return nil
}
// cache the templates by name so we can find them easily
templatesByName := make(map[string]*wfv1.Template)
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// We need to create a separate Pod for each set of Artifacts that require special permissions
// (i.e. Service Account and Pod Metadata)
// So first group artifacts that need to be deleted by permissions
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
groupedByPod := make(map[string]templatesToArtifacts)
// a mapping from the name we'll use for the Pod to the actual metadata and Service Account that need to be applied for that Pod
podNames := make(map[string]podInfo)
var podName string
var podAccessInfo podInfo
for _, artifactSearchResult := range artifactSearchResults {
// get the permissions required for this artifact and create a unique Pod name from them
podAccessInfo = woc.getArtifactGCPodInfo(&artifactSearchResult.Artifact)
podName, err = woc.artGCPodName(strategy, podAccessInfo)
if err != nil {
return err
}
if _, found := podNames[podName]; !found {
podNames[podName] = podAccessInfo
}
if _, found := groupedByPod[podName]; !found {
groupedByPod[podName] = make(templatesToArtifacts)
}
// get the Template for the Artifact
node, found := woc.wf.Status.Nodes[artifactSearchResult.NodeID]
if !found {
return fmt.Errorf("can't process Artifact GC Strategy %s: node ID '%s' not found in Status??", strategy, artifactSearchResult.NodeID)
}
templateName := node.TemplateName
if templateName == "" && node.GetTemplateRef() != nil {
templateName = node.GetTemplateRef().Name
}
if templateName == "" {
return fmt.Errorf("can't process Artifact GC Strategy %s: node %+v has an unnamed template", strategy, node)
}
template, found := templatesByName[templateName]
if !found {
template = woc.wf.GetTemplateByName(templateName)
if template == nil {
return fmt.Errorf("can't process Artifact GC Strategy %s: template name '%s' belonging to node %+v not found??", strategy, node.TemplateName, node)
}
templatesByName[templateName] = template
}
if _, found := groupedByPod[podName][template.Name]; !found {
groupedByPod[podName][template.Name] = make(wfv1.ArtifactSearchResults, 0)
}
groupedByPod[podName][template.Name] = append(groupedByPod[podName][template.Name], artifactSearchResult)
}
// start up a separate Pod with a separate set of ArtifactGCTasks for it to use for each unique Service Account/metadata
for podName, templatesToArtList := range groupedByPod {
tasks := make([]*wfv1.WorkflowArtifactGCTask, 0)
for templateName, artifacts := range templatesToArtList {
template := templatesByName[templateName]
woc.addTemplateArtifactsToTasks(podName, &tasks, template, artifacts)
}
if len(tasks) > 0 {
// create the K8s WorkflowArtifactGCTask objects
for i, task := range tasks {
tasks[i], err = woc.createWorkflowArtifactGCTask(ctx, task)
if err != nil {
return err
}
}
// create the pod
podAccessInfo, found := podNames[podName]
if !found {
return fmt.Errorf("can't find podInfo for podName '%s'??", podName)
}
_, err := woc.createArtifactGCPod(ctx, strategy, tasks, podAccessInfo, podName, templatesToArtList, templatesByName)
if err != nil {
return err
}
}
}
return nil
}
type podInfo struct {
serviceAccount string
podMetadata wfv1.Metadata
}
// get Pod name
// (we have a unique Pod for each Artifact GC Strategy and Service Account/Metadata requirement)
func (woc *wfOperationCtx) artGCPodName(strategy wfv1.ArtifactGCStrategy, podAccessInfo podInfo) (string, error) {
h := fnv.New32a()
_, _ = h.Write([]byte(podAccessInfo.serviceAccount))
// we should be able to always get the same result regardless of the order of our Labels or Annotations
// so sort alphabetically
sortedLabels := maps.Keys(podAccessInfo.podMetadata.Labels)
sort.Strings(sortedLabels)
for _, label := range sortedLabels {
labelValue := podAccessInfo.podMetadata.Labels[label]
_, _ = h.Write([]byte(label))
_, _ = h.Write([]byte(labelValue))
}
sortedAnnotations := maps.Keys(podAccessInfo.podMetadata.Annotations)
sort.Strings(sortedAnnotations)
for _, annotation := range sortedAnnotations {
annotationValue := podAccessInfo.podMetadata.Annotations[annotation]
_, _ = h.Write([]byte(annotation))
_, _ = h.Write([]byte(annotationValue))
}
abbreviatedName := ""
switch strategy {
case wfv1.ArtifactGCOnWorkflowCompletion:
abbreviatedName = "wfcomp"
case wfv1.ArtifactGCOnWorkflowDeletion:
abbreviatedName = "wfdel"
default:
return "", fmt.Errorf("ArtifactGCStrategy '%s' not valid", strategy)
}
return fmt.Sprintf("%s-artgc-%s-%v", woc.wf.Name, abbreviatedName, h.Sum32()), nil
}
func (woc *wfOperationCtx) artGCTaskName(podName string, taskIndex int) string {
return fmt.Sprintf("%s-%d", podName, taskIndex)
}
func (woc *wfOperationCtx) addTemplateArtifactsToTasks(podName string, tasks *[]*wfv1.WorkflowArtifactGCTask, template *wfv1.Template, artifactSearchResults wfv1.ArtifactSearchResults) {
if len(artifactSearchResults) == 0 {
return
}
if tasks == nil {
ts := make([]*wfv1.WorkflowArtifactGCTask, 0)
tasks = &ts
}
// do we need to generate a new WorkflowArtifactGCTask or can we use current?
// todo: currently we're only handling one but may require more in the future if we start to reach 1 MB in the CRD
if len(*tasks) == 0 {
currentTask := &wfv1.WorkflowArtifactGCTask{
TypeMeta: metav1.TypeMeta{
Kind: workflow.WorkflowArtifactGCTaskKind,
APIVersion: workflow.APIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: woc.wf.Namespace,
Name: woc.artGCTaskName(podName, 0),
Labels: map[string]string{common.LabelKeyArtifactGCPodName: podName},
OwnerReferences: []metav1.OwnerReference{ // make sure we get deleted with the workflow
*metav1.NewControllerRef(woc.wf, wfv1.SchemeGroupVersion.WithKind(workflow.WorkflowKind)),
},
},
Spec: wfv1.ArtifactGCSpec{
ArtifactsByNode: make(map[string]wfv1.ArtifactNodeSpec),
},
}
*tasks = append(*tasks, currentTask)
} /*else if hitting 1 MB on CRD { //todo: handle multiple WorkflowArtifactGCTasks
// add a new WorkflowArtifactGCTask to *tasks
}*/
currentTask := (*tasks)[len(*tasks)-1]
artifactsByNode := currentTask.Spec.ArtifactsByNode
// if ArchiveLocation is specified for the Template use that, otherwise use default
archiveLocation := template.ArchiveLocation
if !archiveLocation.HasLocation() {
archiveLocation = woc.artifactRepository.ToArtifactLocation()
}
// go through artifactSearchResults and create a map from nodeID to artifacts
// for each node, create an ArtifactNodeSpec with our Template's ArchiveLocation (if any) and our list of Artifacts
for _, artifactSearchResult := range artifactSearchResults {
artifactNodeSpec, found := artifactsByNode[artifactSearchResult.NodeID]
if !found {
artifactsByNode[artifactSearchResult.NodeID] = wfv1.ArtifactNodeSpec{
ArchiveLocation: archiveLocation,
Artifacts: make(map[string]wfv1.Artifact),
}
artifactNodeSpec = artifactsByNode[artifactSearchResult.NodeID]
}
artifactNodeSpec.Artifacts[artifactSearchResult.Name] = artifactSearchResult.Artifact
}
woc.log.Debugf("list of artifacts pertaining to template %s to WorkflowArtifactGCTask '%s': %+v", template.Name, currentTask.Name, artifactsByNode)
}
// find WorkflowArtifactGCTask CRD object by name
func (woc *wfOperationCtx) getArtifactTask(taskName string) (*wfv1.WorkflowArtifactGCTask, error) {
key := woc.wf.Namespace + "/" + taskName
task, exists, err := woc.controller.artGCTaskInformer.Informer().GetIndexer().GetByKey(key)
if err != nil {
return nil, fmt.Errorf("failed to get WorkflowArtifactGCTask by key '%s': %w", key, err)
}
if !exists {
return nil, nil
}
return task.(*wfv1.WorkflowArtifactGCTask), nil
}
// create WorkflowArtifactGCTask CRD object
func (woc *wfOperationCtx) createWorkflowArtifactGCTask(ctx context.Context, task *wfv1.WorkflowArtifactGCTask) (*wfv1.WorkflowArtifactGCTask, error) {
// first make sure it doesn't already exist
foundTask, err := woc.getArtifactTask(task.Name)
if err != nil {
return nil, err
}
if foundTask != nil {
woc.log.Debugf("Artifact GC Task %s already exists", task.Name)
} else {
woc.log.Infof("Creating Artifact GC Task %s", task.Name)
task, err = woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowArtifactGCTasks(woc.wf.Namespace).Create(ctx, task, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to Create WorkflowArtifactGCTask '%s' for Garbage Collection: %w", task.Name, err)
}
}
return task, nil
}
// create the Pod which will do the deletions
func (woc *wfOperationCtx) createArtifactGCPod(ctx context.Context, strategy wfv1.ArtifactGCStrategy, tasks []*wfv1.WorkflowArtifactGCTask,
podAccessInfo podInfo, podName string, templatesToArtList templatesToArtifacts, templatesByName map[string]*wfv1.Template) (*corev1.Pod, error) {
woc.log.
WithField("strategy", strategy).
Infof("creating pod to delete artifacts: %s", podName)
// Pod is owned by WorkflowArtifactGCTasks, so it will die automatically when all of them have died
ownerReferences := make([]metav1.OwnerReference, len(tasks))
for i, task := range tasks {
// make sure pod gets deleted with the WorkflowArtifactGCTasks
ownerReferences[i] = *metav1.NewControllerRef(task, wfv1.SchemeGroupVersion.WithKind(workflow.WorkflowArtifactGCTaskKind))
}
artifactLocations := make([]*wfv1.ArtifactLocation, 0)
for templateName, artifacts := range templatesToArtList {
template, found := templatesByName[templateName]
if !found {
return nil, fmt.Errorf("can't find template with name %s???", templateName)
}
if template.ArchiveLocation.HasLocation() {
artifactLocations = append(artifactLocations, template.ArchiveLocation)
} else {
artifactLocations = append(artifactLocations, woc.artifactRepository.ToArtifactLocation())
}
for i := range artifacts {
artifactLocations = append(artifactLocations, &artifacts[i].ArtifactLocation)
}
}
volumes, volumeMounts := createSecretVolumesAndMountsFromArtifactLocations(artifactLocations)
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Labels: map[string]string{
common.LabelKeyWorkflow: woc.wf.Name,
common.LabelKeyComponent: artifactGCComponent,
common.LabelKeyCompleted: "false",
},
Annotations: map[string]string{
common.AnnotationKeyArtifactGCStrategy: string(strategy),
},
OwnerReferences: ownerReferences,
},
Spec: corev1.PodSpec{
Volumes: volumes,
Containers: []corev1.Container{
{
Name: common.MainContainerName,
Image: woc.controller.executorImage(),
ImagePullPolicy: woc.controller.executorImagePullPolicy(),
Args: []string{"artifact", "delete", "--loglevel", getExecutorLogLevel()},
Env: []corev1.EnvVar{
{Name: common.EnvVarArtifactPodName, Value: podName},
},
// if this pod is breached by an attacker we:
// * prevent installation of any new packages
// * modification of the file-system
SecurityContext: &corev1.SecurityContext{
Capabilities: &corev1.Capabilities{Drop: []corev1.Capability{"ALL"}},
Privileged: pointer.Bool(false),
RunAsNonRoot: pointer.Bool(true),
RunAsUser: pointer.Int64Ptr(8737),
ReadOnlyRootFilesystem: pointer.Bool(true),
AllowPrivilegeEscalation: pointer.Bool(false),
},
// if this pod is breached by an attacker these limits prevent excessive CPU and memory usage
Resources: corev1.ResourceRequirements{
Limits: map[corev1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("100m"),
"memory": resource.MustParse("64Mi"),
},
Requests: map[corev1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("50m"),
"memory": resource.MustParse("32Mi"),
},
},
VolumeMounts: volumeMounts,
},
},
AutomountServiceAccountToken: pointer.Bool(true),
RestartPolicy: corev1.RestartPolicyNever,
},
}
// Use the Service Account and/or Labels and Annotations specified for our Pod, if they exist
if podAccessInfo.serviceAccount != "" {
pod.Spec.ServiceAccountName = podAccessInfo.serviceAccount
}
for label, labelVal := range podAccessInfo.podMetadata.Labels {
pod.ObjectMeta.Labels[label] = labelVal
}
for annotation, annotationVal := range podAccessInfo.podMetadata.Annotations {
pod.ObjectMeta.Annotations[annotation] = annotationVal
}
if v := woc.controller.Config.InstanceID; v != "" {
pod.Labels[common.EnvVarInstanceID] = v
}
_, err := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
if apierr.IsAlreadyExists(err) {
woc.log.Warningf("Artifact GC Pod %s already exists?", pod.Name)
} else {
return nil, fmt.Errorf("failed to create pod: %w", err)
}
}
return pod, nil
}
// go through any GC pods that are already running and may have completed
func (woc *wfOperationCtx) processArtifactGCCompletion(ctx context.Context) error {
// check if any previous Artifact GC Pods completed
pods, err := woc.controller.podInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.GetNamespace()+"/"+woc.wf.GetName())
if err != nil {
return fmt.Errorf("failed to get pods from informer: %w", err)
}
anyPodSuccess := false
for _, obj := range pods {
pod := obj.(*corev1.Pod)
if pod.Labels[common.LabelKeyComponent] != artifactGCComponent { // make sure it's an Artifact GC Pod
continue
}
// make sure we didn't already process this one
if woc.wf.Status.ArtifactGCStatus.IsArtifactGCPodRecouped(pod.Name) {
// already processed
continue
}
phase := pod.Status.Phase
// if Pod is done process the results
if phase == corev1.PodSucceeded || phase == corev1.PodFailed {
woc.log.WithField("pod", pod.Name).
WithField("phase", phase).
WithField("message", pod.Status.Message).
Info("reconciling artifact-gc pod")
err = woc.processCompletedArtifactGCPod(ctx, pod)
if err != nil {
return err
}
woc.wf.Status.ArtifactGCStatus.SetArtifactGCPodRecouped(pod.Name, true)
if phase == corev1.PodSucceeded {
anyPodSuccess = true
}
woc.updated = true
}
}
if anyPodSuccess {
// check if all artifacts have been deleted and if so remove Finalizer
if woc.allArtifactsDeleted() {
woc.log.Info("no remaining artifacts to GC, removing artifact GC finalizer")
woc.wf.Finalizers = slice.RemoveString(woc.wf.Finalizers, common.FinalizerArtifactGC)
woc.updated = true
}
}
return nil
}
func (woc *wfOperationCtx) allArtifactsDeleted() bool {
for _, n := range woc.wf.Status.Nodes {
if n.Type != wfv1.NodeTypePod {
continue
}
for _, a := range n.GetOutputs().GetArtifacts() {
if !a.Deleted && woc.execWf.GetArtifactGCStrategy(&a) != wfv1.ArtifactGCNever && woc.execWf.GetArtifactGCStrategy(&a) != wfv1.ArtifactGCStrategyUndefined {
return false
}
}
}
return true
}
func (woc *wfOperationCtx) processCompletedArtifactGCPod(ctx context.Context, pod *corev1.Pod) error {
woc.log.Infof("processing completed Artifact GC Pod '%s'", pod.Name)
// get associated WorkflowArtifactGCTasks
labelSelector := fmt.Sprintf("%s = %s", common.LabelKeyArtifactGCPodName, pod.Name)
taskList, err := woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowArtifactGCTasks(woc.wf.Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
return fmt.Errorf("failed to List WorkflowArtifactGCTasks: %w", err)
}
strategyStr, found := pod.Annotations[common.AnnotationKeyArtifactGCStrategy]
if !found {
return fmt.Errorf("Artifact GC Pod '%s' doesn't have annotation '%s'?", pod.Name, common.AnnotationKeyArtifactGCStrategy)
}
strategy := wfv1.ArtifactGCStrategy(strategyStr)
for _, task := range taskList.Items {
err = woc.processCompletedWorkflowArtifactGCTask(ctx, &task, strategy)
if err != nil {
return err
}
}
return nil
}
// process the Status in the WorkflowArtifactGCTask which was completed and reflect it in Workflow Status; then delete the Task CRD Object
// return first found error message if GC failed
func (woc *wfOperationCtx) processCompletedWorkflowArtifactGCTask(ctx context.Context, artifactGCTask *wfv1.WorkflowArtifactGCTask, strategy wfv1.ArtifactGCStrategy) error {
woc.log.Debugf("processing WorkflowArtifactGCTask %s", artifactGCTask.Name)
foundGCFailure := false
for nodeName, nodeResult := range artifactGCTask.Status.ArtifactResultsByNode {
// find this node result in the Workflow Status
wfNode, found := woc.wf.Status.Nodes[nodeName]
if !found {
return fmt.Errorf("node named '%s' returned by WorkflowArtifactGCTask '%s' wasn't found in Workflow '%s' Status", nodeName, artifactGCTask.Name, woc.wf.Name)
}
if wfNode.Outputs == nil {
return fmt.Errorf("node named '%s' returned by WorkflowArtifactGCTask '%s' doesn't seem to have Outputs in Workflow Status", nodeName, artifactGCTask.Name)
}
for i, wfArtifact := range wfNode.Outputs.Artifacts {
// find artifact in the WorkflowArtifactGCTask Status
artifactResult, foundArt := nodeResult.ArtifactResults[wfArtifact.Name]
if !foundArt {
// could be in a different WorkflowArtifactGCTask
continue
}
woc.wf.Status.Nodes[nodeName].Outputs.Artifacts[i].Deleted = artifactResult.Success
if artifactResult.Error != nil {
woc.wf.Status.Conditions.UpsertCondition(wfv1.Condition{
Type: wfv1.ConditionTypeArtifactGCError,
Status: metav1.ConditionTrue,
Message: fmt.Sprintf("%s (artifactGCTask: %s)", *artifactResult.Error, artifactGCTask.Name),
})
// issue an Event if there was an error - just do this one to prevent flooding the system with Events
if !foundGCFailure {
foundGCFailure = true
gcFailureMsg := *artifactResult.Error
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "ArtifactGCFailed",
fmt.Sprintf("Artifact Garbage Collection failed for strategy %s, err:%s", strategy, gcFailureMsg))
}
}
}
}
// now we can delete it, if it succeeded (otherwise we leave it up to be inspected)
if !foundGCFailure {
woc.log.Debugf("deleting WorkflowArtifactGCTask: %s", artifactGCTask.Name)
err := woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowArtifactGCTasks(woc.wf.Namespace).Delete(ctx, artifactGCTask.Name, metav1.DeleteOptions{})
if err != nil {
woc.log.Errorf("error deleting WorkflowArtifactGCTask: %s: %v", artifactGCTask.Name, err)
}
}
return nil
}
func (woc *wfOperationCtx) getArtifactGCPodInfo(artifact *wfv1.Artifact) podInfo {
// start with Workflow.ArtifactGC and override with Artifact.ArtifactGC
podAccessInfo := podInfo{}
if woc.execWf.Spec.ArtifactGC != nil {
woc.updateArtifactGCPodInfo(woc.execWf.Spec.ArtifactGC, &podAccessInfo)
}
if artifact.ArtifactGC != nil {
woc.updateArtifactGCPodInfo(artifact.ArtifactGC, &podAccessInfo)
}
return podAccessInfo
}
// propagate the information from artifactGC into the podInfo
func (woc *wfOperationCtx) updateArtifactGCPodInfo(artifactGC *wfv1.ArtifactGC, podAccessInfo *podInfo) {
if artifactGC.ServiceAccountName != "" {
podAccessInfo.serviceAccount = artifactGC.ServiceAccountName
}
if artifactGC.PodMetadata != nil {
if len(artifactGC.PodMetadata.Labels) > 0 && podAccessInfo.podMetadata.Labels == nil {
podAccessInfo.podMetadata.Labels = make(map[string]string)
}
for labelKey, labelValue := range artifactGC.PodMetadata.Labels {
podAccessInfo.podMetadata.Labels[labelKey] = labelValue
}
if len(artifactGC.PodMetadata.Annotations) > 0 && podAccessInfo.podMetadata.Annotations == nil {
podAccessInfo.podMetadata.Annotations = make(map[string]string)
}
for annotationKey, annotationValue := range artifactGC.PodMetadata.Annotations {
podAccessInfo.podMetadata.Annotations[annotationKey] = annotationValue
}
}
}