forked from argoproj/argo-workflows
/
util.go
128 lines (114 loc) · 3.49 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package util
import (
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"
"k8s.io/apimachinery/pkg/fields"
log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
errorsutil "github.com/argoproj/argo/util/errors"
"github.com/argoproj/argo/util/retry"
)
type Closer interface {
Close() error
}
// Close is a convenience function to close a object that has a Close() method, ignoring any errors
// Used to satisfy errcheck lint
func Close(c Closer) {
_ = c.Close()
}
// GetSecrets retrieves a secret value and memoizes the result
func GetSecrets(clientSet kubernetes.Interface, namespace, name, key string) ([]byte, error) {
secretsIf := clientSet.CoreV1().Secrets(namespace)
var secret *apiv1.Secret
var err error
_ = wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
secret, err = secretsIf.Get(name, metav1.GetOptions{})
if err != nil {
log.Warnf("Failed to get secret '%s': %v", name, err)
if !errorsutil.IsTransientErr(err) {
return false, err
}
return false, nil
}
return true, nil
})
if err != nil {
return []byte{}, errors.InternalWrapError(err)
}
val, ok := secret.Data[key]
if !ok {
return []byte{}, errors.Errorf(errors.CodeBadRequest, "secret '%s' does not have the key '%s'", name, key)
}
return val, nil
}
// Write the Terminate message in pod spec
func WriteTeriminateMessage(message string) {
err := ioutil.WriteFile("/dev/termination-log", []byte(message), 0644)
if err != nil {
panic(err)
}
}
// Merge the two parameters Slice
// Merge the slices based on arguments order (first is high priority).
func MergeParameters(params ...[]wfv1.Parameter) []wfv1.Parameter {
var resultParams []wfv1.Parameter
passedParams := make(map[string]bool)
for _, param := range params {
for _, item := range param {
if _, ok := passedParams[item.Name]; ok {
continue
}
resultParams = append(resultParams, item)
passedParams[item.Name] = true
}
}
return resultParams
}
func RecoverIndexFromNodeName(name string) int {
startIndex := strings.Index(name, "(")
endIndex := strings.Index(name, ":")
if startIndex < 0 || endIndex < 0 {
return -1
}
out, err := strconv.Atoi(name[startIndex+1 : endIndex])
if err != nil {
return -1
}
return out
}
func GenerateFieldSelectorFromWorkflowName(wfName string) string {
result := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", wfName)).String()
compare := RecoverWorkflowNameFromSelectorStringIfAny(result)
if wfName != compare {
panic(fmt.Sprintf("Could not recover field selector from workflow name. Expected '%s' but got '%s'\n", wfName, compare))
}
return result
}
func RecoverWorkflowNameFromSelectorStringIfAny(selector string) string {
const tag = "metadata.name="
if starts := strings.Index(selector, tag); starts > -1 {
suffix := selector[starts+len(tag):]
if ends := strings.Index(suffix, ","); ends > -1 {
return strings.TrimSpace(suffix[:ends])
}
return strings.TrimSpace(suffix)
}
return ""
}
// getDeletePropagation return the default or configured DeletePropagation policy
func GetDeletePropagation() *metav1.DeletionPropagation {
propagationPolicy := metav1.DeletePropagationBackground
envVal, ok := os.LookupEnv("WF_DEL_PROPAGATION_POLICY")
if ok && envVal != "" {
propagationPolicy = metav1.DeletionPropagation(envVal)
}
return &propagationPolicy
}