/
rollouts.go
145 lines (129 loc) · 4.18 KB
/
rollouts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package agent
import (
"context"
"fmt"
argov1alpha1 "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/typed/rollouts/v1alpha1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/datawire/dlib/dlog"
)
// rolloutAction indicates the action to be performed on a Rollout object.
type rolloutAction string
const (
// rolloutActionPause represents the "pause" action on a Rollout.
rolloutActionPause = rolloutAction("PAUSE")
// rolloutActionResume represents the "resume" action on a Rollout.
rolloutActionResume = rolloutAction("RESUME")
// rolloutActionAbort represents the "abort" action on a Rollout.
rolloutActionAbort = rolloutAction("ABORT")
)
// rolloutsGetterFactory is a factory for creating RolloutsGetter.
type rolloutsGetterFactory func() (argov1alpha1.RolloutsGetter, error)
// rolloutCommand holds a reference to a Rollout command to be ran.
type rolloutCommand struct {
namespace string
rolloutName string
action rolloutAction
}
func (r *rolloutCommand) String() string {
return fmt.Sprintf("<rollout=%s namespace=%s action=%s>", r.rolloutName, r.namespace, r.action)
}
// RunWithClientFactory runs the given Rollout command using rolloutsClientFactory to get a RolloutsGetter.
func (r *rolloutCommand) RunWithClientFactory(ctx context.Context, rolloutsClientFactory rolloutsGetterFactory) error {
client, err := rolloutsClientFactory()
if err != nil {
return err
}
return r.patchRollout(ctx, client)
}
const (
unpausePatch = `{"spec":{"paused":false}}`
abortPatch = `{"status":{"abort":true}}`
retryPatch = `{"status":{"abort":false}}`
pausePatch = `{"spec":{"paused":true}}`
)
func (r *rolloutCommand) patchRollout(ctx context.Context, client argov1alpha1.RolloutsGetter) error {
var err error
switch r.action {
// The "Resume" action in the DCP should be able to recover from Rollout that is either paused or aborted.
// For more information about the need for rolloutCommand.applyRetryPatch to apply the "retry" patch, please check its godoc.
case rolloutActionResume:
err = r.applyPatch(ctx, client, unpausePatch)
if err == nil {
err = r.applyStatusPatch(ctx, client, retryPatch)
}
case rolloutActionAbort:
err = r.applyStatusPatch(ctx, client, abortPatch)
case rolloutActionPause:
err = r.applyPatch(ctx, client, pausePatch)
default:
err := fmt.Errorf(
"tried to perform unknown action '%s' on rollout %s (%s)",
r.action,
r.rolloutName,
r.namespace,
)
dlog.Errorln(ctx, err)
return err
}
if err != nil {
errMsg := fmt.Errorf(
"failed to %s rollout %s (%s): %w",
r.action,
r.rolloutName,
r.namespace,
err,
)
dlog.Errorln(ctx, errMsg)
return err
}
return nil
}
func (r *rolloutCommand) applyPatch(ctx context.Context, client argov1alpha1.RolloutsGetter, patch string) error {
rollout := client.Rollouts(r.namespace)
_, err := rollout.Patch(
ctx,
r.rolloutName,
types.MergePatchType,
[]byte(patch),
metav1.PatchOptions{},
)
return err
}
// applyStatusPatch exists because any change to a Rollout status (Rollout Abort or Retry)
// requires a patch the rollouts/status subresource. If that fails, then base "rollouts" rollout is patched.
// This is based on the logic of the Argo Rollouts CLI, as seen at https://github.com/argoproj/argo-rollouts/blob/v1.1.1/pkg/kubectl-argo-rollouts/cmd/retry/retry.go#L84.
func (r *rolloutCommand) applyStatusPatch(ctx context.Context, client argov1alpha1.RolloutsGetter, patch string) error {
rollout := client.Rollouts(r.namespace)
_, err := rollout.Patch(
ctx,
r.rolloutName,
types.MergePatchType,
[]byte(patch),
metav1.PatchOptions{},
"status",
)
if err != nil && k8serrors.IsNotFound(err) {
_, err = rollout.Patch(
ctx,
r.rolloutName,
types.MergePatchType,
[]byte(patch),
metav1.PatchOptions{},
)
}
return err
}
// NewArgoRolloutsGetter creates a RolloutsGetter from Argo's v1alpha1 API.
func NewArgoRolloutsGetter() (argov1alpha1.RolloutsGetter, error) {
kubeConfig, err := newK8sRestClient()
if err != nil {
return nil, err
}
argoClient, err := argov1alpha1.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}
return argoClient, nil
}