-
Notifications
You must be signed in to change notification settings - Fork 3.2k
/
retry.go
150 lines (124 loc) · 5.01 KB
/
retry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package commands
import (
"context"
"fmt"
"os"
"github.com/argoproj/pkg/errors"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"github.com/argoproj/argo-workflows/v3/cmd/argo/commands/client"
workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)
type retryOps struct {
nodeFieldSelector string // --node-field-selector
restartSuccessful bool // --restart-successful
namespace string // --namespace
labelSelector string // --selector
fieldSelector string // --field-selector
}
// hasSelector returns true if the CLI arguments selects multiple workflows
func (o *retryOps) hasSelector() bool {
if o.labelSelector != "" || o.fieldSelector != "" {
return true
}
return false
}
func NewRetryCommand() *cobra.Command {
var (
cliSubmitOpts cliSubmitOpts
retryOpts retryOps
)
command := &cobra.Command{
Use: "retry [WORKFLOW...]",
Short: "retry zero or more workflows",
Example: `# Retry a workflow:
argo retry my-wf
# Retry multiple workflows:
argo retry my-wf my-other-wf my-third-wf
# Retry multiple workflows by label selector:
argo retry -l workflows.argoproj.io/test=true
# Retry multiple workflows by field selector:
argo retry --field-selector metadata.namespace=argo
# Retry and wait for completion:
argo retry --wait my-wf.yaml
# Retry and watch until completion:
argo retry --watch my-wf.yaml
# Retry and tail logs until completion:
argo retry --log my-wf.yaml
# Retry the latest workflow:
argo retry @latest
`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 && !retryOpts.hasSelector() {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
ctx, apiClient := client.NewAPIClient(cmd.Context())
serviceClient := apiClient.NewWorkflowServiceClient()
retryOpts.namespace = client.Namespace()
err := retryWorkflows(ctx, serviceClient, retryOpts, cliSubmitOpts, args)
errors.CheckError(err)
},
}
command.Flags().StringVarP(&cliSubmitOpts.output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
command.Flags().BoolVarP(&cliSubmitOpts.wait, "wait", "w", false, "wait for the workflow to complete, only works when a single workflow is retried")
command.Flags().BoolVar(&cliSubmitOpts.watch, "watch", false, "watch the workflow until it completes, only works when a single workflow is retried")
command.Flags().BoolVar(&cliSubmitOpts.log, "log", false, "log the workflow until it completes")
command.Flags().BoolVar(&retryOpts.restartSuccessful, "restart-successful", false, "indicates to restart successful nodes matching the --node-field-selector")
command.Flags().StringVar(&retryOpts.nodeFieldSelector, "node-field-selector", "", "selector of nodes to reset, eg: --node-field-selector inputs.paramaters.myparam.value=abc")
command.Flags().StringVarP(&retryOpts.labelSelector, "selector", "l", "", "Selector (label query) to filter on, not including uninitialized ones, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2)")
command.Flags().StringVar(&retryOpts.fieldSelector, "field-selector", "", "Selector (field query) to filter on, supports '=', '==', and '!='.(e.g. --field-selector key1=value1,key2=value2). The server only supports a limited number of field queries per type.")
return command
}
// retryWorkflows retries workflows by given retryArgs or workflow names
func retryWorkflows(ctx context.Context, serviceClient workflowpkg.WorkflowServiceClient, retryOpts retryOps, cliSubmitOpts cliSubmitOpts, args []string) error {
selector, err := fields.ParseSelector(retryOpts.nodeFieldSelector)
if err != nil {
return fmt.Errorf("unable to parse node field selector '%s': %s", retryOpts.nodeFieldSelector, err)
}
var wfs wfv1.Workflows
if retryOpts.hasSelector() {
wfs, err = listWorkflows(ctx, serviceClient, listFlags{
namespace: retryOpts.namespace,
fields: retryOpts.fieldSelector,
labels: retryOpts.labelSelector,
})
if err != nil {
return err
}
}
for _, n := range args {
wfs = append(wfs, wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: n,
Namespace: retryOpts.namespace,
},
})
}
var lastRetried *wfv1.Workflow
retriedNames := make(map[string]bool)
for _, wf := range wfs {
if _, ok := retriedNames[wf.Name]; ok {
// de-duplication in case there is an overlap between the selector and given workflow names
continue
}
retriedNames[wf.Name] = true
lastRetried, err = serviceClient.RetryWorkflow(ctx, &workflowpkg.WorkflowRetryRequest{
Name: wf.Name,
Namespace: wf.Namespace,
RestartSuccessful: retryOpts.restartSuccessful,
NodeFieldSelector: selector.String(),
})
if err != nil {
return err
}
printWorkflow(lastRetried, getFlags{output: cliSubmitOpts.output})
}
if len(retriedNames) == 1 {
// watch or wait when there is only one workflow retried
waitWatchOrLog(ctx, serviceClient, lastRetried.Namespace, []string{lastRetried.Name}, cliSubmitOpts)
}
return nil
}