This repository has been archived by the owner on Nov 30, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
create_drain_old_worker_nodes.go
127 lines (102 loc) · 4.65 KB
/
create_drain_old_worker_nodes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package instance
import (
"context"
"fmt"
corev1alpha1 "github.com/giantswarm/apiextensions/pkg/apis/core/v1alpha1"
"github.com/giantswarm/microerror"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/giantswarm/azure-operator/v4/pkg/label"
"github.com/giantswarm/azure-operator/v4/service/controller/internal/state"
"github.com/giantswarm/azure-operator/v4/service/controller/key"
"github.com/giantswarm/azure-operator/v4/service/controller/resource/nodes"
)
func (r *Resource) drainOldWorkerNodesTransition(ctx context.Context, obj interface{}, currentState state.State) (state.State, error) {
cr, err := key.ToCustomResource(obj)
if err != nil {
return "", microerror.Mask(err)
}
r.Logger.LogCtx(ctx, "level", "debug", "message", "finding all drainerconfigs")
drainerConfigs := make(map[string]corev1alpha1.DrainerConfig)
{
n := metav1.NamespaceAll
o := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", label.Cluster, key.ClusterID(&cr)),
}
list, err := r.G8sClient.CoreV1alpha1().DrainerConfigs(n).List(o)
if err != nil {
return "", microerror.Mask(err)
}
for _, dc := range list.Items {
drainerConfigs[dc.Name] = dc
}
}
r.Logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("found %d drainerconfigs", len(drainerConfigs)))
r.Logger.LogCtx(ctx, "level", "debug", "message", "finding all worker VMSS instances")
allWorkerInstances, err := r.AllInstances(ctx, cr, key.WorkerVMSSName)
if nodes.IsScaleSetNotFound(err) {
r.Logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("did not find the scale set '%s'", key.WorkerVMSSName(cr)))
} else if err != nil {
return "", microerror.Mask(err)
}
r.Logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("found %d worker VMSS instances", len(allWorkerInstances)))
r.Logger.LogCtx(ctx, "level", "debug", "message", "ensuring that drainerconfig exists for all old worker nodes")
var nodesPendingDraining int
for _, i := range allWorkerInstances {
old, err := r.isWorkerInstanceFromPreviousRelease(ctx, cr, i)
if err != nil {
return DeploymentUninitialized, nil
}
if old == nil || !*old {
// Node is a new one or we weren't able to check it's status, don't drain it.
continue
}
n := key.WorkerInstanceName(cr, *i.InstanceID)
dc, drainerConfigExists := drainerConfigs[n]
if !drainerConfigExists {
nodesPendingDraining++
r.Logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("creating drainerconfig for %s", n))
err = r.CreateDrainerConfig(ctx, cr, key.WorkerInstanceName(cr, *i.InstanceID))
if err != nil {
return "", microerror.Mask(err)
}
}
if drainerConfigExists && dc.Status.HasTimeoutCondition() {
nodesPendingDraining++
r.Logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("drainerconfig for %s already exists but has timed out", n))
r.Logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("deleting drainerconfig for %s", n))
err = r.G8sClient.CoreV1alpha1().DrainerConfigs(dc.Namespace).Delete(dc.Name, &metav1.DeleteOptions{})
if errors.IsNotFound(err) {
r.Logger.LogCtx(ctx, "level", "debug", "message", "did not delete drainer config for tenant cluster node")
r.Logger.LogCtx(ctx, "level", "debug", "message", "drainer config for tenant cluster node does not exist")
} else if err != nil {
return "", microerror.Mask(err)
}
r.Logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("creating drainerconfig for %s", n))
err = r.CreateDrainerConfig(ctx, cr, key.WorkerInstanceName(cr, *i.InstanceID))
if err != nil {
return "", microerror.Mask(err)
}
}
if drainerConfigExists && !dc.Status.HasTimeoutCondition() && !dc.Status.HasDrainedCondition() {
nodesPendingDraining++
r.Logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("drainerconfig for %s already exists", n))
}
}
r.Logger.LogCtx(ctx, "level", "debug", "message", "ensured that drainerconfig exists for all old worker nodes")
r.Logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("%d nodes are pending draining", nodesPendingDraining))
if nodesPendingDraining > 0 {
r.Logger.LogCtx(ctx, "level", "debug", "message", "cancelling resource")
return currentState, nil
}
r.Logger.LogCtx(ctx, "level", "debug", "message", "deleting all drainerconfigs")
// Delete DrainerConfigs now that all nodes have been DRAINED.
for _, dc := range drainerConfigs {
err = r.G8sClient.CoreV1alpha1().DrainerConfigs(dc.Namespace).Delete(dc.Name, &metav1.DeleteOptions{})
if err != nil {
return "", microerror.Mask(err)
}
}
r.Logger.LogCtx(ctx, "level", "debug", "message", "deleted all drainerconfigs")
return TerminateOldWorkerInstances, nil
}