-
Notifications
You must be signed in to change notification settings - Fork 48
/
drain_manager.go
145 lines (126 loc) · 5.05 KB
/
drain_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
Copyright 2022 NVIDIA CORPORATION & AFFILIATES
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package upgrade
import (
"context"
"fmt"
"os"
"time"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/drain"
"github.com/Mellanox/network-operator/api/v1alpha1"
"github.com/Mellanox/network-operator/pkg/consts"
)
// DrainConfiguration contains the drain specification and the list of nodes to schedule drain on
type DrainConfiguration struct {
Spec *v1alpha1.DrainSpec
Nodes []*corev1.Node
}
// DrainManagerImpl implements DrainManager interface and can perform nodes drain based on received DrainConfiguration
type DrainManagerImpl struct {
k8sInterface kubernetes.Interface
drainingNodes *StringSet
nodeUpgradeStateProvider NodeUpgradeStateProvider
log logr.Logger
}
// DrainManager is an interface that allows to schedule nodes drain based on DrainSpec
type DrainManager interface {
ScheduleNodesDrain(ctx context.Context, drainConfig *DrainConfiguration) error
}
// ScheduleNodesDrain receives DrainConfiguration and schedules drain for each node in the list.
// When the node gets scheduled, it's marked as being drained and therefore will not be scheduled for drain twice
// if the initial drain didn't complete yet.
// During the drain the node is cordoned first, and then pods on the node are evicted.
// If the drain is successful, the node moves to UpgradeStatePodRestart state,
// otherwise it moves to UpgradeStateDrainFailed state.
func (m *DrainManagerImpl) ScheduleNodesDrain(ctx context.Context, drainConfig *DrainConfiguration) error {
m.log.V(consts.LogLevelInfo).Info("Drain Manager, starting Node Drain")
if len(drainConfig.Nodes) == 0 {
m.log.V(consts.LogLevelInfo).Info("Drain Manager, no nodes scheduled to drain")
return nil
}
drainSpec := drainConfig.Spec
if drainSpec == nil {
return fmt.Errorf("drain spec should not be empty")
}
if !drainSpec.Enable {
m.log.V(consts.LogLevelInfo).Info("Drain Manager, drain is disabled")
return nil
}
drainHelper := &drain.Helper{
Ctx: ctx,
Client: m.k8sInterface,
Force: drainSpec.Force,
// OFED Drivers Pods are part of a DaemonSet, so, this option needs to be set to true
IgnoreAllDaemonSets: true,
DeleteEmptyDirData: drainSpec.DeleteEmptyDir,
GracePeriodSeconds: -1,
Timeout: time.Duration(drainSpec.TimeoutSecond) * time.Second,
PodSelector: drainSpec.PodSelector,
OnPodDeletedOrEvicted: func(pod *corev1.Pod, usingEviction bool) {
verbStr := "Deleted"
if usingEviction {
verbStr = "Evicted"
}
m.log.V(consts.LogLevelInfo).Info(fmt.Sprintf("%s pod from Node %s/%s", verbStr, pod.Namespace, pod.Name))
},
Out: os.Stdout,
ErrOut: os.Stdout,
}
for _, node := range drainConfig.Nodes {
// We need to shadow the loop variable or initialize some other one with its value
// to avoid concurrency issues when launching goroutines.
// If a loop variable is used as it is, all/most goroutines, spawned inside this loop,
// will use the 'node' value of the last item in drainConfig.Nodes
node := node
if !m.drainingNodes.Has(node.Name) {
m.log.V(consts.LogLevelInfo).Info("Schedule drain for node", "node", node.Name)
m.drainingNodes.Add(node.Name)
go func() {
defer m.drainingNodes.Remove(node.Name)
err := drain.RunCordonOrUncordon(drainHelper, node, true)
if err != nil {
m.log.V(consts.LogLevelError).Error(err, "Failed to cordon node", "node", node.Name)
_ = m.nodeUpgradeStateProvider.ChangeNodeUpgradeState(ctx, node, UpgradeStateDrainFailed)
return
}
m.log.V(consts.LogLevelInfo).Info("Cordoned the node", "node", node.Name)
err = drain.RunNodeDrain(drainHelper, node.Name)
if err != nil {
m.log.V(consts.LogLevelError).Error(err, "Failed to drain node", "node", node.Name)
_ = m.nodeUpgradeStateProvider.ChangeNodeUpgradeState(ctx, node, UpgradeStateDrainFailed)
return
}
m.log.V(consts.LogLevelInfo).Info("Drained the node", "node", node.Name)
_ = m.nodeUpgradeStateProvider.ChangeNodeUpgradeState(ctx, node, UpgradeStatePodRestart)
}()
} else {
m.log.V(consts.LogLevelInfo).Info("Node is already being drained, skipping", "node", node.Name)
}
}
return nil
}
func NewDrainManager(
k8sInterface kubernetes.Interface,
nodeUpgradeStateProvider NodeUpgradeStateProvider,
log logr.Logger) *DrainManagerImpl {
mgr := &DrainManagerImpl{
k8sInterface: k8sInterface,
log: log,
drainingNodes: NewStringSet(),
nodeUpgradeStateProvider: nodeUpgradeStateProvider,
}
return mgr
}