Skip to content

Commit

Permalink
fix: if artifact GC Pod fails make sure error is propagated as a Cond…
Browse files Browse the repository at this point in the history
…ition (#10019)
  • Loading branch information
juliev0 committed Nov 14, 2022
1 parent acab9b5 commit 55ad680
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 39 deletions.
60 changes: 60 additions & 0 deletions test/e2e/artifacts_test.go
Expand Up @@ -4,11 +4,13 @@
package e2e

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-workflows/v3/workflow/common"
Expand Down Expand Up @@ -178,6 +180,64 @@ func (s *ArtifactsSuite) TestArtifactGC() {
}
}

// create a ServiceAccount which won't be tied to the artifactgc role and attempt to use that service account in the GC Pod
// Want to verify that this causes the ArtifactGCError Condition in the Workflow
func (s *ArtifactsSuite) TestArtifactGC_InsufficientRole() {
ctx := context.Background()
_, err := s.KubeClient.CoreV1().ServiceAccounts(fixtures.Namespace).Create(ctx, &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Name: "artgc-role-test-sa"}}, metav1.CreateOptions{})
assert.NoError(s.T(), err)
s.T().Cleanup(func() {
_ = s.KubeClient.CoreV1().ServiceAccounts(fixtures.Namespace).Delete(ctx, "artgc-role-test-sa", metav1.DeleteOptions{})
})

s.Given().Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: art-gc-simple-
spec:
entrypoint: main
templates:
- name: main
container:
image: argoproj/argosay:v2
command:
- sh
- -c
args:
- |
echo "can throw this away" > /tmp/temporary-artifact-on-completion.txt
outputs:
artifacts:
- name: temporary-artifact-on-completion
path: /tmp/temporary-artifact-on-completion.txt
s3:
key: temporary-artifact-on-completion.txt
artifactGC:
strategy: OnWorkflowCompletion
serviceAccountName: artgc-role-test-sa`).
When().
SubmitWorkflow().
WaitForWorkflow(
fixtures.WorkflowCompletionOkay(true),
fixtures.Condition(func(wf *wfv1.Workflow) (bool, string) {
return wf.Status.ArtifactGCStatus != nil &&
len(wf.Status.ArtifactGCStatus.PodsRecouped) == 1, "for pod to have been recouped"
})).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
failCondition := false
for _, c := range status.Conditions {
if c.Type == wfv1.ConditionTypeArtifactGCError {
failCondition = true
}
}
assert.Equal(t, true, failCondition)
}).
When().
RemoveFinalizers(true)
}

func (s *ArtifactsSuite) TestDefaultParameterOutputs() {
s.Given().
Workflow(`
Expand Down
88 changes: 51 additions & 37 deletions workflow/controller/artifact_gc.go
Expand Up @@ -147,7 +147,7 @@ func (woc *wfOperationCtx) processArtifactGCStrategy(ctx context.Context, strate
// get the Template for the Artifact
node, found := woc.wf.Status.Nodes[artifactSearchResult.NodeID]
if !found {
return fmt.Errorf("can't process Artifact GC Strategy %s: node ID '%s' not found in Status??", strategy, artifactSearchResult.NodeID)
return fmt.Errorf("can't process Artifact GC Strategy %s: node ID %q not found in Status??", strategy, artifactSearchResult.NodeID)
}
templateName := node.TemplateName
if templateName == "" && node.GetTemplateRef() != nil {
Expand All @@ -160,7 +160,7 @@ func (woc *wfOperationCtx) processArtifactGCStrategy(ctx context.Context, strate
if !found {
template = woc.wf.GetTemplateByName(templateName)
if template == nil {
return fmt.Errorf("can't process Artifact GC Strategy %s: template name '%s' belonging to node %+v not found??", strategy, node.TemplateName, node)
return fmt.Errorf("can't process Artifact GC Strategy %s: template name %q belonging to node %+v not found??", strategy, node.TemplateName, node)
}
templatesByName[templateName] = template
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func (woc *wfOperationCtx) processArtifactGCStrategy(ctx context.Context, strate
// create the pod
podAccessInfo, found := podNames[podName]
if !found {
return fmt.Errorf("can't find podInfo for podName '%s'??", podName)
return fmt.Errorf("can't find podInfo for podName %q??", podName)
}

_, err := woc.createArtifactGCPod(ctx, strategy, tasks, podAccessInfo, podName, templatesToArtList, templatesByName)
Expand Down Expand Up @@ -240,7 +240,7 @@ func (woc *wfOperationCtx) artGCPodName(strategy wfv1.ArtifactGCStrategy, podAcc
case wfv1.ArtifactGCOnWorkflowDeletion:
abbreviatedName = "wfdel"
default:
return "", fmt.Errorf("ArtifactGCStrategy '%s' not valid", strategy)
return "", fmt.Errorf("ArtifactGCStrategy %q not valid", strategy)
}

return fmt.Sprintf("%s-artgc-%s-%v", woc.wf.Name, abbreviatedName, h.Sum32()), nil
Expand Down Expand Up @@ -315,7 +315,7 @@ func (woc *wfOperationCtx) addTemplateArtifactsToTasks(podName string, tasks *[]
artifactNodeSpec.Artifacts[artifactSearchResult.Name] = artifactSearchResult.Artifact

}
woc.log.Debugf("list of artifacts pertaining to template %s to WorkflowArtifactGCTask '%s': %+v", template.Name, currentTask.Name, artifactsByNode)
woc.log.Debugf("list of artifacts pertaining to template %s to WorkflowArtifactGCTask %q: %+v", template.Name, currentTask.Name, artifactsByNode)

}

Expand All @@ -324,7 +324,7 @@ func (woc *wfOperationCtx) getArtifactTask(taskName string) (*wfv1.WorkflowArtif
key := woc.wf.Namespace + "/" + taskName
task, exists, err := woc.controller.artGCTaskInformer.Informer().GetIndexer().GetByKey(key)
if err != nil {
return nil, fmt.Errorf("failed to get WorkflowArtifactGCTask by key '%s': %w", key, err)
return nil, fmt.Errorf("failed to get WorkflowArtifactGCTask by key %q: %w", key, err)
}
if !exists {
return nil, nil
Expand All @@ -347,7 +347,7 @@ func (woc *wfOperationCtx) createWorkflowArtifactGCTask(ctx context.Context, tas

task, err = woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowArtifactGCTasks(woc.wf.Namespace).Create(ctx, task, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to Create WorkflowArtifactGCTask '%s' for Garbage Collection: %w", task.Name, err)
return nil, fmt.Errorf("failed to Create WorkflowArtifactGCTask %q for Garbage Collection: %w", task.Name, err)
}
}
return task, nil
Expand Down Expand Up @@ -493,7 +493,7 @@ func (woc *wfOperationCtx) processArtifactGCCompletion(ctx context.Context) erro

phase := pod.Status.Phase

// if Pod is done process the results
// if Pod is done processing the results
if phase == corev1.PodSucceeded || phase == corev1.PodFailed {
woc.log.WithField("pod", pod.Name).
WithField("phase", phase).
Expand Down Expand Up @@ -539,7 +539,19 @@ func (woc *wfOperationCtx) allArtifactsDeleted() bool {
}

func (woc *wfOperationCtx) processCompletedArtifactGCPod(ctx context.Context, pod *corev1.Pod) error {
woc.log.Infof("processing completed Artifact GC Pod '%s'", pod.Name)
woc.log.Infof("processing completed Artifact GC Pod %q", pod.Name)

strategyStr, found := pod.Annotations[common.AnnotationKeyArtifactGCStrategy]
if !found {
return fmt.Errorf("Artifact GC Pod %q doesn't have annotation %q?", pod.Name, common.AnnotationKeyArtifactGCStrategy)
}
strategy := wfv1.ArtifactGCStrategy(strategyStr)

if pod.Status.Phase == corev1.PodFailed {
errMsg := fmt.Sprintf("Artifact Garbage Collection failed for strategy %s, pod %s exited with non-zero exit code", pod.Name, strategy)
woc.addArtGCCondition(errMsg)
woc.addArtGCEvent(errMsg)
}

// get associated WorkflowArtifactGCTasks
labelSelector := fmt.Sprintf("%s = %s", common.LabelKeyArtifactGCPodHash, woc.artifactGCPodLabel(pod.Name))
Expand All @@ -548,35 +560,38 @@ func (woc *wfOperationCtx) processCompletedArtifactGCPod(ctx context.Context, po
return fmt.Errorf("failed to List WorkflowArtifactGCTasks: %w", err)
}

strategyStr, found := pod.Annotations[common.AnnotationKeyArtifactGCStrategy]
if !found {
return fmt.Errorf("Artifact GC Pod '%s' doesn't have annotation '%s'?", pod.Name, common.AnnotationKeyArtifactGCStrategy)
}
strategy := wfv1.ArtifactGCStrategy(strategyStr)

for _, task := range taskList.Items {
err = woc.processCompletedWorkflowArtifactGCTask(ctx, &task, strategy)
allArtifactsSucceeded, err := woc.processCompletedWorkflowArtifactGCTask(&task, strategy)
if err != nil {
return err
}
if allArtifactsSucceeded && pod.Status.Phase == corev1.PodSucceeded {
// now we can delete it, if it succeeded (otherwise we leave it up to be inspected)
woc.log.Debugf("deleting WorkflowArtifactGCTask: %s", task.Name)
err := woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowArtifactGCTasks(woc.wf.Namespace).Delete(ctx, task.Name, metav1.DeleteOptions{})
if err != nil {
woc.log.Errorf("error deleting WorkflowArtifactGCTask: %s: %v", task.Name, err)
}
}

}
return nil
}

// process the Status in the WorkflowArtifactGCTask which was completed and reflect it in Workflow Status; then delete the Task CRD Object
// return first found error message if GC failed
func (woc *wfOperationCtx) processCompletedWorkflowArtifactGCTask(ctx context.Context, artifactGCTask *wfv1.WorkflowArtifactGCTask, strategy wfv1.ArtifactGCStrategy) error {
// return true if all artifacts succeeded, else false
func (woc *wfOperationCtx) processCompletedWorkflowArtifactGCTask(artifactGCTask *wfv1.WorkflowArtifactGCTask, strategy wfv1.ArtifactGCStrategy) (bool, error) {
woc.log.Debugf("processing WorkflowArtifactGCTask %s", artifactGCTask.Name)

foundGCFailure := false
for nodeName, nodeResult := range artifactGCTask.Status.ArtifactResultsByNode {
// find this node result in the Workflow Status
wfNode, found := woc.wf.Status.Nodes[nodeName]
if !found {
return fmt.Errorf("node named '%s' returned by WorkflowArtifactGCTask '%s' wasn't found in Workflow '%s' Status", nodeName, artifactGCTask.Name, woc.wf.Name)
return false, fmt.Errorf("node named %q returned by WorkflowArtifactGCTask %q wasn't found in Workflow %q Status", nodeName, artifactGCTask.Name, woc.wf.Name)
}
if wfNode.Outputs == nil {
return fmt.Errorf("node named '%s' returned by WorkflowArtifactGCTask '%s' doesn't seem to have Outputs in Workflow Status", nodeName, artifactGCTask.Name)
return false, fmt.Errorf("node named %q returned by WorkflowArtifactGCTask %q doesn't seem to have Outputs in Workflow Status", nodeName, artifactGCTask.Name)
}
for i, wfArtifact := range wfNode.Outputs.Artifacts {
// find artifact in the WorkflowArtifactGCTask Status
Expand All @@ -588,32 +603,31 @@ func (woc *wfOperationCtx) processCompletedWorkflowArtifactGCTask(ctx context.Co
woc.wf.Status.Nodes[nodeName].Outputs.Artifacts[i].Deleted = artifactResult.Success

if artifactResult.Error != nil {
woc.wf.Status.Conditions.UpsertCondition(wfv1.Condition{
Type: wfv1.ConditionTypeArtifactGCError,
Status: metav1.ConditionTrue,
Message: fmt.Sprintf("%s (artifactGCTask: %s)", *artifactResult.Error, artifactGCTask.Name),
})
// issue an Event if there was an error - just do this one to prevent flooding the system with Events
woc.addArtGCCondition(fmt.Sprintf("%s (artifactGCTask: %s)", *artifactResult.Error, artifactGCTask.Name))
// issue an Event if there was an error - just do this once to prevent flooding the system with Events
if !foundGCFailure {
foundGCFailure = true
gcFailureMsg := *artifactResult.Error
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "ArtifactGCFailed",
fmt.Sprintf("Artifact Garbage Collection failed for strategy %s, err:%s", strategy, gcFailureMsg))
woc.addArtGCEvent(fmt.Sprintf("Artifact Garbage Collection failed for strategy %s, err:%s", strategy, gcFailureMsg))
}
}
}

}

// now we can delete it, if it succeeded (otherwise we leave it up to be inspected)
if !foundGCFailure {
woc.log.Debugf("deleting WorkflowArtifactGCTask: %s", artifactGCTask.Name)
err := woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowArtifactGCTasks(woc.wf.Namespace).Delete(ctx, artifactGCTask.Name, metav1.DeleteOptions{})
if err != nil {
woc.log.Errorf("error deleting WorkflowArtifactGCTask: %s: %v", artifactGCTask.Name, err)
}
}
return nil
return !foundGCFailure, nil
}

func (woc *wfOperationCtx) addArtGCCondition(msg string) {
woc.wf.Status.Conditions.UpsertCondition(wfv1.Condition{
Type: wfv1.ConditionTypeArtifactGCError,
Status: metav1.ConditionTrue,
Message: msg,
})
}

func (woc *wfOperationCtx) addArtGCEvent(msg string) {
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "ArtifactGCFailed", msg)
}

func (woc *wfOperationCtx) getArtifactGCPodInfo(artifact *wfv1.Artifact) podInfo {
Expand Down
3 changes: 1 addition & 2 deletions workflow/controller/artifact_gc_test.go
Expand Up @@ -555,15 +555,14 @@ func TestProcessCompletedWorkflowArtifactGCTask(t *testing.T) {
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.wf.Status.ArtifactGCStatus = &wfv1.ArtGCStatus{}

// verify that we update these Status fields:
// - Artifact.Deleted
// - Conditions

err := woc.processCompletedWorkflowArtifactGCTask(ctx, wfat, "OnWorkflowCompletion")
_, err := woc.processCompletedWorkflowArtifactGCTask(wfat, "OnWorkflowCompletion")
assert.Nil(t, err)

for _, expectedArtifact := range []struct {
Expand Down

0 comments on commit 55ad680

Please sign in to comment.