forked from argoproj/argo-workflows
/
wait.go
111 lines (99 loc) · 2.77 KB
/
wait.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package commands
import (
"context"
"fmt"
"io"
"os"
"sync"
"github.com/argoproj/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/argoproj/argo/cmd/argo/commands/client"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/util"
)
func NewWaitCommand() *cobra.Command {
var (
ignoreNotFound bool
)
var command = &cobra.Command{
Use: "wait [WORKFLOW...]",
Short: "waits for workflows to complete",
Example: `# Wait on a workflow:
argo wait my-wf
# Wait on the latest workflow:
argo wait @latest
`,
Run: func(cmd *cobra.Command, args []string) {
ctx, apiClient := client.NewAPIClient()
serviceClient := apiClient.NewWorkflowServiceClient()
namespace := client.Namespace()
waitWorkflows(ctx, serviceClient, namespace, args, ignoreNotFound, false)
},
}
command.Flags().BoolVar(&ignoreNotFound, "ignore-not-found", false, "Ignore the wait if the workflow is not found")
return command
}
// waitWorkflows waits for the given workflowNames.
func waitWorkflows(ctx context.Context, serviceClient workflowpkg.WorkflowServiceClient, namespace string, workflowNames []string, ignoreNotFound, quiet bool) {
var wg sync.WaitGroup
wfSuccessStatus := true
for _, name := range workflowNames {
wg.Add(1)
go func(name string) {
if !waitOnOne(serviceClient, ctx, name, namespace, ignoreNotFound, quiet) {
wfSuccessStatus = false
}
wg.Done()
}(name)
}
wg.Wait()
if !wfSuccessStatus {
os.Exit(1)
}
}
func waitOnOne(serviceClient workflowpkg.WorkflowServiceClient, ctx context.Context, wfName, namespace string, ignoreNotFound, quiet bool) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
req := &workflowpkg.WatchWorkflowsRequest{
Namespace: namespace,
ListOptions: &metav1.ListOptions{
FieldSelector: util.GenerateFieldSelectorFromWorkflowName(wfName),
},
}
stream, err := serviceClient.WatchWorkflows(ctx, req)
if err != nil {
if status.Code(err) == codes.NotFound && ignoreNotFound {
return true
}
errors.CheckError(err)
return false
}
for {
event, err := stream.Recv()
if err == io.EOF {
log.Debug("Re-establishing workflow watch")
stream, err = serviceClient.WatchWorkflows(ctx, req)
errors.CheckError(err)
continue
}
errors.CheckError(err)
if event == nil {
continue
}
wf := event.Object
if !wf.Status.FinishedAt.IsZero() {
if !quiet {
fmt.Printf("%s %s at %v\n", wfName, wf.Status.Phase, wf.Status.FinishedAt)
}
if wf.Status.Phase == wfv1.NodeFailed || wf.Status.Phase == wfv1.NodeError {
return false
}
return true
}
}
}