This repository has been archived by the owner on Nov 30, 2023. It is now read-only.
/
create_scale_workers.go
321 lines (264 loc) · 11.6 KB
/
create_scale_workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
package nodepool
import (
"context"
"fmt"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/coreos/go-semver/semver"
apiextensionslabels "github.com/giantswarm/apiextensions/v6/pkg/label"
"github.com/giantswarm/microerror"
corev1 "k8s.io/api/core/v1"
capzexp "sigs.k8s.io/cluster-api-provider-azure/exp/api/v1beta1"
capi "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/giantswarm/azure-operator/v8/pkg/handler/nodes/scalestrategy"
"github.com/giantswarm/azure-operator/v8/pkg/handler/nodes/state"
"github.com/giantswarm/azure-operator/v8/pkg/label"
"github.com/giantswarm/azure-operator/v8/pkg/project"
"github.com/giantswarm/azure-operator/v8/pkg/tenantcluster"
"github.com/giantswarm/azure-operator/v8/service/controller/internal/vmsscheck"
"github.com/giantswarm/azure-operator/v8/service/controller/key"
)
// The goal of scaleUpWorkerVMSSTransition is to double the desired number
// of nodes in worker VMSS in order to provide 1:1 mapping between new
// up-to-date nodes when draining and terminating old nodes.
// This will be done in subsequent reconciliation loops to avoid hitting the
// VMSS api too hard.
func (r *Resource) scaleUpWorkerVMSSTransition(ctx context.Context, obj interface{}, currentState state.State) (state.State, error) {
azureMachinePool, err := key.ToAzureMachinePool(obj)
if err != nil {
return DeploymentUninitialized, microerror.Mask(err)
}
machinePool, err := r.getOwnerMachinePool(ctx, azureMachinePool.ObjectMeta)
if err != nil {
return DeploymentUninitialized, microerror.Mask(err)
}
if machinePool == nil {
return currentState, microerror.Mask(ownerReferenceNotSet)
}
if !machinePool.GetDeletionTimestamp().IsZero() {
r.Logger.Debugf(ctx, "MachinePool is being deleted, skipping reconciling node pool")
return currentState, nil
}
deploymentsClient, err := r.ClientFactory.GetDeploymentsClient(ctx, azureMachinePool.ObjectMeta)
if err != nil {
return currentState, microerror.Mask(err)
}
virtualMachineScaleSetVMsClient, err := r.ClientFactory.GetVirtualMachineScaleSetVMsClient(ctx, azureMachinePool.ObjectMeta)
if err != nil {
return currentState, microerror.Mask(err)
}
// We don't want to check for VMs health state for spot instances node pools as it might deadlock the upgrade process.
if azureMachinePool.Spec.Template.SpotVMOptions == nil {
allReady, err := vmsscheck.InstancesAreRunning(ctx, r.Logger, virtualMachineScaleSetVMsClient, key.ClusterID(&azureMachinePool), key.NodePoolVMSSName(&azureMachinePool))
if err != nil {
return DeploymentUninitialized, microerror.Mask(err)
}
// Not all workers are Running in Azure, wait for next reconciliation loop.
if !allReady {
return currentState, nil
}
}
strategy := scalestrategy.Staircase{}
// Ensure the deployment is successful before we move on with scaling.
currentDeployment, err := deploymentsClient.Get(ctx, key.ClusterID(&azureMachinePool), key.NodePoolDeploymentName(&azureMachinePool))
if IsDeploymentNotFound(err) {
// Deployment not found, we need to apply it again.
return DeploymentUninitialized, microerror.Mask(err)
} else if err != nil {
return currentState, microerror.Mask(err)
}
switch *currentDeployment.Properties.ProvisioningState {
case "Failed", "Canceled":
// Deployment is failed or canceled, I need to go back and re-apply it.
r.Logger.Debugf(ctx, "Node Pool deployment is in state %s, we need to reapply it.", *currentDeployment.Properties.ProvisioningState)
return DeploymentUninitialized, nil
case "Succeeded":
// Deployment is succeeded, safe to go on.
default:
// Deployment is still running, we need to wait for another reconciliation loop.
r.Logger.Debugf(ctx, "Node Pool deployment is in state %s, waiting for it to be succeeded.", *currentDeployment.Properties.ProvisioningState)
return currentState, nil
}
virtualMachineScaleSetsClient, err := r.ClientFactory.GetVirtualMachineScaleSetsClient(ctx, azureMachinePool.ObjectMeta)
if err != nil {
return currentState, microerror.Mask(err)
}
vmss, err := virtualMachineScaleSetsClient.Get(ctx, key.ClusterID(&azureMachinePool), key.NodePoolVMSSName(&azureMachinePool))
if IsNotFound(err) {
// vmss not found, we need to apply the deployment again.
r.Logger.Debugf(ctx, "Node Pool VMSS was not found, going back to initial state.")
return DeploymentUninitialized, nil
} else if err != nil {
return currentState, microerror.Mask(err)
}
// Check if the azure operator tag is up to date.
if currentVersion, found := vmss.Tags[label.AzureOperatorVersionTag]; !found || *currentVersion != project.Version() {
r.Logger.Debugf(ctx, "Node Pool VMSS's has an outdated %q label.", label.AzureOperatorVersionTag)
return DeploymentUninitialized, nil
}
oldInstances, newInstances, err := r.splitInstancesByUpdatedStatus(ctx, azureMachinePool)
if tenantcluster.IsAPINotAvailableError(err) {
r.Logger.Debugf(ctx, "tenant API not available yet")
r.Logger.Debugf(ctx, "canceling resource")
return currentState, nil
} else if err != nil {
return currentState, microerror.Mask(err)
}
desiredWorkerCount := int64(len(oldInstances) * 2)
r.Logger.Debugf(ctx, "The desired number of workers is: %d", desiredWorkerCount)
if desiredWorkerCount == 0 {
// The node pool is empty, the upgrade process can stop here.
r.Logger.Debugf(ctx, "No outdated instances found: no need to roll out nodes")
return DeploymentUninitialized, nil
}
if desiredWorkerCount > int64(len(oldInstances)+len(newInstances)) {
// Disable cluster autoscaler for this nodepool.
err = r.disableClusterAutoscaler(ctx, azureMachinePool)
if err != nil {
return DeploymentUninitialized, microerror.Mask(err)
}
newCount, err := r.scaleVMSS(ctx, azureMachinePool, desiredWorkerCount, strategy)
if err != nil {
return DeploymentUninitialized, microerror.Mask(err)
}
r.Logger.Debugf(ctx, "scaled worker VMSS to %d nodes (desired count is %d)", newCount, desiredWorkerCount)
// Let's stay in the current state.
return currentState, nil
}
// We didn't scale up the VMSS, ready to move to next step.
return WaitForWorkersToBecomeReady, nil
}
func (r *Resource) splitInstancesByUpdatedStatus(ctx context.Context, azureMachinePool capzexp.AzureMachinePool) ([]compute.VirtualMachineScaleSetVM, []compute.VirtualMachineScaleSetVM, error) {
cluster, err := util.GetClusterFromMetadata(ctx, r.CtrlClient, azureMachinePool.ObjectMeta)
if err != nil {
return nil, nil, microerror.Mask(err)
}
if !cluster.GetDeletionTimestamp().IsZero() {
r.Logger.Debugf(ctx, "Cluster is being deleted, skipping reconciling node pool")
return nil, nil, nil
}
// All workers ready, we can scale up if needed.
var allWorkerInstances []compute.VirtualMachineScaleSetVM
{
r.Logger.Debugf(ctx, "finding all worker VMSS instances")
allWorkerInstances, err = r.GetVMSSInstances(ctx, azureMachinePool)
if err != nil {
return nil, nil, microerror.Mask(err)
}
r.Logger.Debugf(ctx, "found %d worker VMSS instances", len(allWorkerInstances))
}
resourceGroup := key.ClusterID(&azureMachinePool)
vmssName := key.NodePoolVMSSName(&azureMachinePool)
virtualMachineScaleSetsClient, err := r.ClientFactory.GetVirtualMachineScaleSetsClient(ctx, azureMachinePool.ObjectMeta)
if err != nil {
return nil, nil, microerror.Mask(err)
}
vmss, err := virtualMachineScaleSetsClient.Get(ctx, resourceGroup, vmssName)
if err != nil {
return nil, nil, microerror.Mask(err)
}
var oldInstances []compute.VirtualMachineScaleSetVM
var newInstances []compute.VirtualMachineScaleSetVM
{
for _, i := range allWorkerInstances {
old, err := r.isWorkerInstanceFromPreviousRelease(ctx, cluster, azureMachinePool, i)
if err != nil {
return nil, nil, microerror.Mask(err)
}
sizeChanged := *i.Sku.Name != *vmss.Sku.Name
flatcarChanged := *i.StorageProfile.ImageReference.Version != *vmss.VirtualMachineProfile.StorageProfile.ImageReference.Version
if old || sizeChanged || flatcarChanged {
oldInstances = append(oldInstances, i)
} else {
newInstances = append(newInstances, i)
}
}
}
return oldInstances, newInstances, nil
}
func (r *Resource) getK8sWorkerNodeForInstance(ctx context.Context, tenantClusterK8sClient ctrlclient.Client, nodePoolId string, instance compute.VirtualMachineScaleSetVM) (*corev1.Node, error) {
name := key.NodePoolInstanceName(nodePoolId, *instance.InstanceID)
nodeList := &corev1.NodeList{}
labelSelector := ctrlclient.MatchingLabels{apiextensionslabels.MachinePool: nodePoolId}
err := tenantClusterK8sClient.List(ctx, nodeList, labelSelector)
if err != nil {
return nil, microerror.Mask(err)
}
nodes := nodeList.Items
for _, n := range nodes {
if n.GetName() == name {
return &n, nil
}
}
// Node related to this instance was not found.
return nil, nil
}
func (r *Resource) isWorkerInstanceFromPreviousRelease(ctx context.Context, cluster *capi.Cluster, azureMachinePool capzexp.AzureMachinePool, instance compute.VirtualMachineScaleSetVM) (bool, error) {
tenantClusterK8sClient, err := r.tenantClientFactory.GetClient(ctx, cluster)
if err != nil {
return false, microerror.Mask(err)
}
nodePoolId := azureMachinePool.Name
n, err := r.getK8sWorkerNodeForInstance(ctx, tenantClusterK8sClient, nodePoolId, instance)
if err != nil {
return false, microerror.Mask(err)
}
if n == nil {
// Kubernetes node related to this instance not found, we can't tell if this is new or old.
// We consider it as "new" to avoid deleting nodes being created right now.
// (Might happen when upgrading spot instances node pools).
return false, nil
}
machinePool, err := r.getOwnerMachinePool(ctx, azureMachinePool.ObjectMeta)
if err != nil {
return false, microerror.Mask(err)
}
myVersion := semver.New(project.Version())
v, exists := n.GetLabels()[label.OperatorVersion]
if exists {
// Azure operator version is changed, node is outdated.
nodeVersion := semver.New(v)
if nodeVersion.LessThan(*myVersion) {
return true, nil
}
}
// CGroups have changed.
nodeCgroupVersion := n.GetLabels()[label.CGroupVersion]
vmssCgroupVersion := key.CGroupVersion(machinePool)
if nodeCgroupVersion != vmssCgroupVersion {
// Cgroups version changed in the node pool
return true, nil
}
// Kubernetes version has changed, node is outdated.
nodeK8sVersion := n.Status.NodeInfo.KubeletVersion // v1.20.6
vmssK8sVersion := instance.Tags["kubernetes-version"] // 1.20.6
if nodeK8sVersion != "" && vmssK8sVersion != nil && nodeK8sVersion != fmt.Sprintf("v%s", *vmssK8sVersion) {
return true, nil
}
// We don't have enough data to say if the node is outdated. Default to false for safety.
return false, nil
}
func (r *Resource) scaleVMSS(ctx context.Context, azureMachinePool capzexp.AzureMachinePool, desiredNodeCount int64, scaleStrategy scalestrategy.Interface) (int64, error) {
resourceGroup := key.ClusterID(&azureMachinePool)
vmssName := key.NodePoolVMSSName(&azureMachinePool)
virtualMachineScaleSetsClient, err := r.ClientFactory.GetVirtualMachineScaleSetsClient(ctx, azureMachinePool.ObjectMeta)
if err != nil {
return 0, microerror.Mask(err)
}
vmss, err := virtualMachineScaleSetsClient.Get(ctx, resourceGroup, vmssName)
if err != nil {
return 0, microerror.Mask(err)
}
computedCount := scaleStrategy.GetNodeCount(*vmss.Sku.Capacity, desiredNodeCount)
*vmss.Sku.Capacity = computedCount
res, err := virtualMachineScaleSetsClient.CreateOrUpdate(ctx, resourceGroup, vmssName, vmss)
if err != nil {
return 0, microerror.Mask(err)
}
_, err = virtualMachineScaleSetsClient.CreateOrUpdateResponder(res.Response())
if err != nil {
return 0, microerror.Mask(err)
}
return computedCount, nil
}