This repository has been archived by the owner on Oct 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 527
/
upgradeagentnode.go
149 lines (130 loc) · 5.31 KB
/
upgradeagentnode.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package kubernetesupgrade
import (
"context"
"fmt"
"math/rand"
"strings"
"time"
"github.com/pkg/errors"
"github.com/Azure/aks-engine/pkg/api"
"github.com/Azure/aks-engine/pkg/armhelpers"
"github.com/Azure/aks-engine/pkg/i18n"
"github.com/Azure/aks-engine/pkg/kubernetes"
"github.com/Azure/aks-engine/pkg/operations"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
interval = time.Second * 1
retry = time.Second * 5
)
// Compiler to verify QueueMessageProcessor implements OperationsProcessor
var _ UpgradeNode = &UpgradeAgentNode{}
// UpgradeAgentNode upgrades a Kubernetes 1.5 agent node to 1.6
type UpgradeAgentNode struct {
Translator *i18n.Translator
logger *logrus.Entry
TemplateMap map[string]interface{}
ParametersMap map[string]interface{}
UpgradeContainerService *api.ContainerService
SubscriptionID string
ResourceGroup string
Client armhelpers.AKSEngineClient
kubeConfig string
timeout time.Duration
cordonDrainTimeout time.Duration
}
// DeleteNode takes state/resources of the master/agent node from ListNodeResources
// backs up/preserves state as needed by a specific version of Kubernetes and then deletes
// the node
// The 'drain' flag is used to invoke 'cordon and drain' flow.
func (kan *UpgradeAgentNode) DeleteNode(vmName *string, drain bool) error {
kubeAPIServerURL := kan.UpgradeContainerService.Properties.MasterProfile.FQDN
if vmName == nil || *vmName == "" {
return errors.Errorf("Error deleting VM: VM name was empty")
}
nodeName := strings.ToLower(*vmName)
client, err := kan.Client.GetKubernetesClient(kubeAPIServerURL, kan.kubeConfig, interval, kan.timeout)
if err != nil {
return err
}
// Cordon and drain the node
if drain {
err = operations.SafelyDrainNodeWithClient(client, kan.logger, nodeName, kan.cordonDrainTimeout)
if err != nil {
kan.logger.Warningf("Error draining agent VM %s. Proceeding with deletion. Error: %v", *vmName, err)
// Proceed with deletion anyways
}
}
// Delete VM in ARM
if err = operations.CleanDeleteVirtualMachine(kan.Client, kan.logger, kan.SubscriptionID, kan.ResourceGroup, *vmName); err != nil {
return err
}
// Delete VM in api server
if err = client.DeleteNode(nodeName); err != nil {
statusErr, ok := err.(*apierrors.StatusError)
if ok && statusErr.ErrStatus.Reason != v1.StatusReasonNotFound {
kan.logger.Warnf("Node %s got an error while deregistering: %#v", *vmName, err)
}
}
return nil
}
// CreateNode creates a new master/agent node with the targeted version of Kubernetes
func (kan *UpgradeAgentNode) CreateNode(ctx context.Context, poolName string, agentNo int) error {
poolCountParameter := kan.ParametersMap[poolName+"Count"].(map[string]interface{})
poolCountParameter["value"] = agentNo + 1
agentCount := poolCountParameter["value"]
kan.logger.Infof("Agent pool: %s, set count to: %d temporarily during upgrade. Upgrading agent: %d",
poolName, agentCount, agentNo)
poolOffsetVarName := poolName + "Offset"
templateVariables := kan.TemplateMap["variables"].(map[string]interface{})
templateVariables[poolOffsetVarName] = agentNo
random := rand.New(rand.NewSource(time.Now().UnixNano()))
deploymentSuffix := random.Int31()
deploymentName := fmt.Sprintf("k8s-upgrade-%s-%d-%s-%d", poolName, agentNo, time.Now().Format("06-01-02T15.04.05"), deploymentSuffix)
return armhelpers.DeployTemplateSync(kan.Client, kan.logger, kan.ResourceGroup, deploymentName, kan.TemplateMap, kan.ParametersMap)
}
// Validate will verify that agent node has been upgraded as expected.
func (kan *UpgradeAgentNode) Validate(vmName *string) error {
if vmName == nil || *vmName == "" {
kan.logger.Warningf("VM name was empty. Skipping node condition check")
return nil
}
nodeName := strings.ToLower(*vmName)
kan.logger.Infof("Validating %s", nodeName)
apiserverURL := kan.UpgradeContainerService.Properties.MasterProfile.FQDN
client, err := kan.Client.GetKubernetesClient(apiserverURL, kan.kubeConfig, interval, kan.timeout)
if err != nil {
return &armhelpers.DeploymentValidationError{Err: err}
}
retryTimer := time.NewTimer(time.Millisecond)
timeoutTimer := time.NewTimer(kan.timeout)
for {
select {
case <-timeoutTimer.C:
retryTimer.Stop()
err := kan.DeleteNode(vmName, false)
if err != nil {
kan.logger.Errorf("Error deleting agent VM %s: %v", *vmName, err)
return &armhelpers.DeploymentValidationError{Err: kan.Translator.Errorf("Node was not ready within %v. Failed to delete node %s, error: %v", kan.timeout, *vmName, err)}
}
return &armhelpers.DeploymentValidationError{Err: kan.Translator.Errorf("Node was not ready within %v. Succeeded to delete node %s.", kan.timeout, *vmName)}
case <-retryTimer.C:
agentNode, err := client.GetNode(nodeName)
if err != nil {
kan.logger.Infof("Agent node: %s status error: %v", nodeName, err)
retryTimer.Reset(retry)
} else if kubernetes.IsNodeReady(agentNode) {
kan.logger.Infof("Agent node: %s is ready", nodeName)
timeoutTimer.Stop()
return nil
} else {
kan.logger.Infof("Agent node: %s not ready yet...", nodeName)
retryTimer.Reset(retry)
}
}
}
}