-
Notifications
You must be signed in to change notification settings - Fork 8
/
transitions.go
213 lines (186 loc) · 9.8 KB
/
transitions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package transitioner
import (
"fmt"
"time"
"strings"
v1 "github.com/atlassian-labs/cyclops/pkg/apis/atlassian/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// transitionUndefined transitions any CycleNodeStatuses in the Undefined phase to the Pending phase
// It checks to ensure that a node name has been provided
// When the CRD validation features are available in the Kubernetes API, we could probably remove these checks
func (t *CycleNodeStatusTransitioner) transitionUndefined() (reconcile.Result, error) {
t.rm.LogEvent(t.cycleNodeStatus, "Initialising", "Initialising CycleNodeStatus")
// Check to ensure method is set
if len(t.cycleNodeStatus.Spec.CycleSettings.Method) == 0 {
return t.transitionToFailed(fmt.Errorf("method cannot be empty"))
}
// This CRD should only be created by the controller, so we don't validate the method any further
// Check to ensure NodeName is set
if t.cycleNodeStatus.Spec.NodeName == "" {
return t.transitionToFailed(fmt.Errorf("nodeName cannot be empty"))
}
// Set the timestamp so we know when we've timed out
currentTime := metav1.Now()
t.cycleNodeStatus.Status.StartedTimestamp = ¤tTime
// Calculate cns timeout timestamp with the default duration if no CyclingTimeout is provided or if CyclingTimeout is negative
timeoutTime := metav1.NewTime(currentTime.Add(t.options.DefaultCNScyclingExpiry))
if t.cycleNodeStatus.Spec.CycleSettings.CyclingTimeout != nil && t.cycleNodeStatus.Spec.CycleSettings.CyclingTimeout.Duration > 0*time.Second {
timeoutTime = metav1.NewTime(currentTime.Add(t.cycleNodeStatus.Spec.CycleSettings.CyclingTimeout.Duration))
}
t.cycleNodeStatus.Status.TimeoutTimestamp = &timeoutTime
// Transition the object to pending
return t.transitionObject(v1.CycleNodeStatusPending)
}
// transitionPending transitions any CycleNodeStatuses in the Pending phase into either the WaitingPods phase, or the
// RemovingLabelsFromPods phase based on the .Spec.Method provided.
// Gets the requested node from the cloud provider and from kube and performs sanity checks. Depending on these checks
// the CycleNodeStatus may go straight to Failed or Successful.
//
// If the node has problems then it will transition straight to Failed.
func (t *CycleNodeStatusTransitioner) transitionPending() (reconcile.Result, error) {
t.rm.LogEvent(t.cycleNodeStatus, "FetchingNode", "Fetching information about node: %v", t.cycleNodeStatus.Spec.NodeName)
node, err := t.rm.GetNode(t.cycleNodeStatus.Spec.NodeName)
if err != nil {
// If the node doesn't exist in Kube then assume that the node was killed by something else
// Don't allow this to fail the CycleNodeRequest
if serr, ok := err.(*errors.StatusError); ok && errors.IsNotFound(serr) {
t.rm.LogEvent(t.cycleNodeStatus, "FetchingNode", "Node not found, assuming cycle successful: %v", t.cycleNodeStatus.Spec.NodeName)
return t.transitionToSuccessful()
}
return t.transitionToFailed(err)
}
// Set the current node
t.cycleNodeStatus.Status.CurrentNode.Name = node.Name
t.cycleNodeStatus.Status.CurrentNode.ProviderID = node.Spec.ProviderID
// Ensure the node still exists in AWS before attempting anything
existingProviderIDs, err := t.rm.CloudProvider.InstancesExist([]string{t.cycleNodeStatus.Status.CurrentNode.ProviderID})
if err != nil {
// The node existed in Kube if we got this far, so if it doesn't exist in AWS then something funky is
// happening, so we exit with an error
return t.transitionToFailed(err)
}
if len(existingProviderIDs) == 0 {
return t.transitionToSuccessful()
}
// Depending on the Method we transition to a different phase
if t.cycleNodeStatus.Spec.CycleSettings.Method == v1.CycleNodeRequestMethodWait {
return t.transitionObject(v1.CycleNodeStatusWaitingPods)
}
return t.transitionObject(v1.CycleNodeStatusRemovingLabelsFromPods)
}
// transitionWaitingPods transitions any CycleNodeStatuses in the WaitingPods phase to the
// RemovingLabelsFromPods phase. Waits for any pods not excluded by the WaitRules for this CycleNodeStatus
// to finish then transitions to the next phase.
func (t *CycleNodeStatusTransitioner) transitionWaitingPods() (reconcile.Result, error) {
t.rm.LogEvent(t.cycleNodeStatus, "WaitingPods", "Waiting for pods to finish")
finished, err := t.podsFinished()
if err != nil {
return t.transitionToFailed(err)
}
if !finished {
if t.timedOut() {
return t.transitionToFailed(fmt.Errorf("timed out waiting for pods to finish"))
}
return reconcile.Result{Requeue: true, RequeueAfter: 60 * time.Second}, nil
}
return t.transitionObject(v1.CycleNodeStatusRemovingLabelsFromPods)
}
// transitionRemovingLabelsFromPods transitions a CycleNodeStatus in the RemovingLabelsFromPods phase to the Draining phase.
// This removes any of the specified labels from pods that have them.
// This is used to remove the pod from any services/endpoints before pod termination, so that
// it is guaranteed there is no traffic sent to it when draining.
func (t *CycleNodeStatusTransitioner) transitionRemovingLabelsFromPods() (reconcile.Result, error) {
t.rm.LogEvent(t.cycleNodeStatus, "RemovingLabels", "Removing labels from pods")
finished, err := t.removeLabelsFromPods()
if err != nil {
return t.transitionToFailed(err)
}
if !finished {
return reconcile.Result{Requeue: true, RequeueAfter: 1 * time.Second}, nil
}
return t.transitionObject(v1.CycleNodeStatusDrainingPods)
}
// transitionDraining transitions any CycleNodeStatuses in the Draining phase to the Deleting phase
// It gets all of the drainable pods on the selected node and then drains them.
// It will check that all of the pods on the selected node have been drained before moving it to the
// deleting phase..
func (t *CycleNodeStatusTransitioner) transitionDraining() (reconcile.Result, error) {
// Drain pods off the node
t.rm.LogEvent(t.cycleNodeStatus, "DrainingPods", "Draining pods from node: %v", t.cycleNodeStatus.Status.CurrentNode.Name)
finished, errs := t.rm.DrainPods(t.cycleNodeStatus.Status.CurrentNode.Name, t.options.UnhealthyPodTerminationThreshold)
// We need to do some fairly complicated error handling here. It is most efficient to drain all pods at once, as
// this stops us being blocked behind one pod that takes a long time to get evicted. This means we need to handle
// all the errors at once. One class of error, StatusTooManyRequests, indicates that the evicted pod is
// "undisruptable" via a pod disruption budget. This error is fine. All others are not, and we have to combine
// them and fail this CycleNodeStatus if we encounter them.
var unexpectedErrors []string
tooManyRequests := false
for _, err := range errs {
if err != nil {
// Custom logic handling, mainly for handling pods that are undisruptable via a PodDisruptionBudget
if serr, ok := err.(*errors.StatusError); ok && errors.IsTooManyRequests(serr) {
// API says we should retry, we do the actual retry further down but log the pod here
t.rm.Logger.Info("waiting to retry after receiving StatusTooManyRequests error",
"podName", serr.ErrStatus.Details.Name)
tooManyRequests = true
} else {
unexpectedErrors = append(unexpectedErrors, err.Error())
}
}
}
// Fail with all of the combined encountered errors if we got any. If we failed inside the loop we would
// potentially miss some important information in the logs.
if len(unexpectedErrors) > 0 {
return t.transitionToFailed(fmt.Errorf(strings.Join(unexpectedErrors, "\n")))
}
// No serious errors were encountered. If we're done, move on.
if finished {
return t.transitionObject(v1.CycleNodeStatusDeletingNode)
}
// Fail if we've taken too long in this phase.
if t.timedOut() {
return t.transitionToFailed(fmt.Errorf("timed out while draining pods"))
}
// The API says we should retry (likely due to currently undisruptable pods)
if tooManyRequests {
return reconcile.Result{Requeue: true, RequeueAfter: 15 * time.Second}, nil
}
// If all the pods aren't finished draining, try again a while later to avoid spamming the API server.
return reconcile.Result{Requeue: true, RequeueAfter: 30 * time.Second}, nil
}
// transitionDeleting transitions any CycleNodeStatuses in the Deleting phase to the Terminating phase
// It will delete the node out of the Kubernetes API and remove the finalizer.
func (t *CycleNodeStatusTransitioner) transitionDeleting() (reconcile.Result, error) {
t.rm.LogEvent(t.cycleNodeStatus, "DeletingNode", "Deleting node: %v", t.cycleNodeStatus.Status.CurrentNode.Name)
err := t.rm.DeleteNode(t.cycleNodeStatus.Status.CurrentNode.Name)
if err != nil {
return t.transitionToFailed(err)
}
// Remove the finalizer to allow the node to be deleted
if err := t.rm.RemoveFinalizerFromNode(t.cycleNodeStatus.Status.CurrentNode.Name); err != nil {
t.rm.LogEvent(t.cycleNodeStatus, "RemoveFinalizerFromNodeError", err.Error())
return t.transitionToFailed(err)
}
return t.transitionObject(v1.CycleNodeStatusTerminatingNode)
}
// transitionTerminating transitions any CycleNodeStatuses in the Terminating phase to the Successful phase.
// It terminates the node via the cloud provider.
func (t *CycleNodeStatusTransitioner) transitionTerminating() (reconcile.Result, error) {
t.rm.LogEvent(t.cycleNodeStatus, "TerminatingNode", "Terminating instance: %v", t.cycleNodeStatus.Status.CurrentNode.ProviderID)
err := t.rm.CloudProvider.TerminateInstance(t.cycleNodeStatus.Status.CurrentNode.ProviderID)
if err != nil {
return t.transitionToFailed(err)
}
return t.transitionObject(v1.CycleNodeStatusSuccessful)
}
// transitionFailed handles failed CycleNodeStatuses
func (t *CycleNodeStatusTransitioner) transitionFailed() (reconcile.Result, error) {
return reconcile.Result{}, nil
}
// transitionSuccessful handles successful CycleNodeStatuses
func (t *CycleNodeStatusTransitioner) transitionSuccessful() (reconcile.Result, error) {
return reconcile.Result{}, nil
}