Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: change the logic of delete pod during retry. Fixes: #12538 #12734

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ rules:
- get
- list
- watch
- delete
- apiGroups:
- ""
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ rules:
- get
- list
- watch
- delete
- apiGroups:
- ""
resources:
Expand Down
1 change: 0 additions & 1 deletion manifests/quick-start-minimal.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion manifests/quick-start-mysql.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion manifests/quick-start-postgres.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 1 addition & 43 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"
"sort"
"sync"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -371,18 +370,8 @@ func (s *workflowServer) DeleteWorkflow(ctx context.Context, req *workflowpkg.Wo
return &workflowpkg.WorkflowDeleteResponse{}, nil
}

func errorFromChannel(errCh <-chan error) error {
select {
case err := <-errCh:
return err
default:
}
return nil
}

func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.WorkflowRetryRequest) (*wfv1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
kubeClient := auth.GetKubeClient(ctx)

wf, err := s.getWorkflow(ctx, wfClient, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
Expand All @@ -394,38 +383,7 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor
return nil, sutils.ToStatusError(err, codes.InvalidArgument)
}

err = s.hydrator.Hydrate(wf)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}

wf, podsToDelete, err := util.FormulateRetryWorkflow(ctx, wf, req.RestartSuccessful, req.NodeFieldSelector, req.Parameters)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}

errCh := make(chan error, len(podsToDelete))
var wg sync.WaitGroup
wg.Add(len(podsToDelete))
for _, podName := range podsToDelete {
log.WithFields(log.Fields{"podDeleted": podName}).Info("Deleting pod")
go func(podName string) {
defer wg.Done()
err := kubeClient.CoreV1().Pods(wf.Namespace).Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil && !apierr.IsNotFound(err) {
errCh <- err
return
}
}(podName)
}
wg.Wait()

err = errorFromChannel(errCh)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}

err = s.hydrator.Dehydrate(wf)
wf, err = util.MarkWorkflowForRetry(ctx, wf, req.RestartSuccessful, req.NodeFieldSelector, req.Parameters)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
Expand Down
19 changes: 7 additions & 12 deletions server/workflowarchive/archived_workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ func (w *archivedWorkflowServer) ResubmitArchivedWorkflow(ctx context.Context, r

func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req *workflowarchivepkg.RetryArchivedWorkflowRequest) (*wfv1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
kubeClient := auth.GetKubeClient(ctx)

wf, err := w.GetArchivedWorkflow(ctx, &workflowarchivepkg.GetArchivedWorkflowRequest{Uid: req.Uid})
if err != nil {
Expand All @@ -285,23 +284,19 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req

_, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(ctx, wf.Name, metav1.GetOptions{})
if apierr.IsNotFound(err) {

wf, podsToDelete, err := util.FormulateRetryWorkflow(ctx, wf, req.RestartSuccessful, req.NodeFieldSelector, req.Parameters)
wf.ObjectMeta.ResourceVersion = ""
wf.ObjectMeta.UID = ""
wf, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Create(ctx, wf, metav1.CreateOptions{})
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}

for _, podName := range podsToDelete {
log.WithFields(log.Fields{"podDeleted": podName}).Info("Deleting pod")
err := kubeClient.CoreV1().Pods(wf.Namespace).Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil && !apierr.IsNotFound(err) {
return nil, sutils.ToStatusError(err, codes.Internal)
}
wf, err = util.MarkWorkflowForRetry(ctx, wf, req.RestartSuccessful, req.NodeFieldSelector, req.Parameters)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}

wf.ObjectMeta.ResourceVersion = ""
wf.ObjectMeta.UID = ""
result, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Create(ctx, wf, metav1.CreateOptions{})
result, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Update(ctx, wf, metav1.UpdateOptions{})
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
Expand Down
48 changes: 46 additions & 2 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

const (
Expand Down Expand Up @@ -857,12 +858,51 @@ func (s *CLISuite) TestWorkflowRetry() {
return wf.Status.AnyActiveSuspendNode(), "suspended node"
}), time.Second*90).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
outerStepsPodNode := status.Nodes.FindByDisplayName("steps-outer-step1")
innerStepsPodNode := status.Nodes.FindByDisplayName("steps-inner-step1")

assert.True(t, outerStepsPodNode.FinishedAt.Before(&retryTime))
assert.True(t, retryTime.Before(&innerStepsPodNode.FinishedAt))

assert.Equal(t, "Retried", metadata.GetLabels()[common.LabelKeyWorkflowRetryingStatus])
assert.Equal(t, "true", metadata.GetAnnotations()[common.AnnotationKeyRetryRestartSuccessful])
assert.Equal(t, "templateName=steps-inner", metadata.GetAnnotations()[common.AnnotationKeyRetryNodeFieldSelector])
assert.Equal(t, "null", metadata.GetAnnotations()[common.AnnotationKeyRetryParameters])
})
}

func (s *CLISuite) TestWorkflowRetryWithParameters() {
s.Given().
Workflow("@testdata/retry-parameters.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
RunCli([]string{"logs", "@latest", "--follow"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "hello world")
}
}).
Wait(3*time.Second).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would be the potential issue without wait here

RunCli([]string{"retry", "@latest", "--node-field-selector", "templateName=main", "-p", "message1=hi", "-p", "message2=argo"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err, output) {
assert.Contains(t, output, "Name:")
assert.Contains(t, output, "Namespace:")
}
}).
Wait(3*time.Second).
WaitForWorkflow(fixtures.ToBeFailed).
RunCli([]string{"logs", "@latest", "--follow"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "hi argo")
}
}).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, "Retried", metadata.GetLabels()[common.LabelKeyWorkflowRetryingStatus])
assert.Equal(t, "false", metadata.GetAnnotations()[common.AnnotationKeyRetryRestartSuccessful])
assert.Equal(t, "templateName=main", metadata.GetAnnotations()[common.AnnotationKeyRetryNodeFieldSelector])
assert.Equal(t, "[\"message1=hi\",\"message2=argo\"]", metadata.GetAnnotations()[common.AnnotationKeyRetryParameters])
})
}

Expand All @@ -877,7 +917,11 @@ func (s *CLISuite) TestWorkflowRetryFailedWorkflow() {
RunCli([]string{"retry", "-l", "workflows.argoproj.io/workflow=fail-first-pass-second-workflow", "--namespace=argo"}, func(t *testing.T, output string, err error) {
assert.NoError(t, err, output)
}).
WaitForWorkflow(fixtures.ToBeSucceeded)
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, "Retried", metadata.GetLabels()[common.LabelKeyWorkflowRetryingStatus])
})
}

func (s *CLISuite) TestWorkflowRetryNestedDag() {
Expand Down
18 changes: 18 additions & 0 deletions test/e2e/testdata/retry-parameters.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: retry-parameters-
spec:
entrypoint: main
arguments:
parameters:
- name: message1
value: "hello"
- name: message2
value: "world"
templates:
- name: main
container:
image: argoproj/argosay:v2
command: [sh, -c]
args: ["echo {{workflow.parameters.message1}} {{workflow.parameters.message2}}; exit 1"]
15 changes: 15 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ const (
// the strategy whose artifacts are being deleted
AnnotationKeyArtifactGCStrategy = workflow.WorkflowFullName + "/artifact-gc-strategy"

// AnnotationKeyRetryNodeFieldSelector is the annotation that specifies the node field selector to use when retrying a node
AnnotationKeyRetryNodeFieldSelector = workflow.WorkflowFullName + "/retry-node-field-selector"

// AnnotationKeyRetryParameters is the annotation that specifies the retry parameters to use when retrying a node
AnnotationKeyRetryParameters = workflow.WorkflowFullName + "/retry-parameters"

// AnnotationKeyRetryRestartSuccessful is the annotation that specifies if retry succeeded node or not
AnnotationKeyRetryRestartSuccessful = workflow.WorkflowFullName + "/retry-restart-successful"

// LabelKeyControllerInstanceID is the label the controller will carry forward to workflows/pod labels
// for the purposes of workflow segregation
LabelKeyControllerInstanceID = workflow.WorkflowFullName + "/controller-instanceid"
Expand Down Expand Up @@ -103,6 +112,12 @@ const (

// LabelKeyCronWorkflowCompleted is a label applied to the cron workflow when the configured stopping condition is achieved
LabelKeyCronWorkflowCompleted = workflow.CronWorkflowFullName + "/completed"
// LabelKeyWorkflowRetryingStatus indicates if a workflow needs Retrying or not:
// * `` - does not need retrying ... yet
// * `Pending` - pending retrying
// * `Retrying` - retrying in progress
// * `Retried` - has been retried
LabelKeyWorkflowRetryingStatus = workflow.WorkflowFullName + "/workflow-retrying-status"

// ExecutorArtifactBaseDir is the base directory in the init container in which artifacts will be copied to.
// Each artifact will be named according to its input name (e.g: /argo/inputs/artifacts/CODE)
Expand Down
38 changes: 38 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,13 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
if err != nil && !apierr.IsNotFound(err) {
return err
}
case labelBatchDeletePodsCompleted:
// When running here, means that all pods that need to be deleted for the retry operation have been completed.
workflowName := podName
err := wfc.labelWorkflowRetried(ctx, namespace, workflowName)
if err != nil {
return err
}
Comment on lines +593 to +599
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a problem here,
the podCleanupQueue is handled by multiple workers,
there's no guarantee all pods are cleaned when the worker sees labelBatchDeletePodsCompleted.

We may end up in a situation where a retry workflow starts but still having pods that's yet to be cleaned up

}
return nil
}()
Expand All @@ -602,6 +609,37 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
return true
}

func (wfc *WorkflowController) labelWorkflowRetried(ctx context.Context, namespace string, workflowName string) error {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := wfc.patchWorkflowLabels(ctx, namespace, workflowName, map[string]string{
common.LabelKeyWorkflowRetryingStatus: "Retried",
})
return err
})
if err != nil {
return err
}
return nil
}

func (wfc *WorkflowController) patchWorkflowLabels(ctx context.Context, namespace string, workflowName string, labels map[string]string) error {
data, err := json.Marshal(&wfv1.WorkflowTaskResult{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
})
if err != nil {
return err
}
_, err = wfc.wfclientset.ArgoprojV1alpha1().Workflows(namespace).Patch(ctx,
workflowName,
types.MergePatchType,
data,
metav1.PatchOptions{},
)
return err
}

func (wfc *WorkflowController) getPodFromAPI(ctx context.Context, namespace string, podName string) (*apiv1.Pod, error) {
pod, err := wfc.kubeclientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
return
}

// Check whether it is a workflow that requires retry
if woc.shouldRetry() {
woc.log.Info("workflow retried")
err := woc.retryWorkflow(ctx)
if err != nil {
woc.log.WithError(err).Errorf("Retry workflow failed")
}
return
}

if woc.wf.Status.ArtifactRepositoryRef == nil {
ref, err := woc.controller.artifactRepositories.Resolve(ctx, woc.execWf.Spec.ArtifactRepositoryRef, woc.wf.Namespace)
if err != nil {
Expand Down Expand Up @@ -3851,6 +3861,50 @@ func (woc *wfOperationCtx) retryStrategy(tmpl *wfv1.Template) *wfv1.RetryStrateg
return woc.execWf.Spec.RetryStrategy
}

func (woc *wfOperationCtx) shouldRetry() bool {
retryingStatus, ok := woc.wf.Labels[common.LabelKeyWorkflowRetryingStatus]
if !ok || retryingStatus == "Retried" {
return false
}
return true
}

func (woc *wfOperationCtx) IsRetried() bool {
return woc.wf.GetLabels()[common.LabelKeyWorkflowRetryingStatus] != "Pending"
}

func (woc *wfOperationCtx) retryWorkflow(ctx context.Context) error {
if woc.IsRetried() {
return nil
}
// Parse the retry parameters from the annotations.
nodeFiledSelector := woc.wf.GetAnnotations()[common.AnnotationKeyRetryNodeFieldSelector]
parametersStr := woc.wf.GetAnnotations()[common.AnnotationKeyRetryParameters]
var parameters []string
err := json.Unmarshal([]byte(parametersStr), &parameters)
if err != nil {
return fmt.Errorf("fail to unmarshaling parameters: %v", err)
}
restartSuccessful := woc.wf.GetAnnotations()[common.AnnotationKeyRetryRestartSuccessful] == "true"

// Clean up remaining pods in the workflow
wf, podsToDelete, err := wfutil.FormulateRetryWorkflow(ctx, woc.wf, restartSuccessful, nodeFiledSelector, parameters)
if err != nil {
return fmt.Errorf("fail to FormulateRetryWorkflow")
}
for _, podName := range podsToDelete {
woc.controller.queuePodForCleanup(wf.Namespace, podName, deletePod)
}

// Add labelBatchDeletePodsCompleted to the queue to help determine whether the pod has been cleaned up.
woc.controller.queuePodForCleanup(wf.Namespace, wf.Name, labelBatchDeletePodsCompleted)

woc.wf = wf
woc.wf.ObjectMeta.Labels[common.LabelKeyWorkflowRetryingStatus] = "Retrying"
woc.updated = true
return nil
}

func (woc *wfOperationCtx) setExecWorkflow(ctx context.Context) error {
if woc.wf.Spec.WorkflowTemplateRef != nil { // not-woc-misuse
err := woc.setStoredWfSpec()
Expand Down
Loading
Loading