-
Notifications
You must be signed in to change notification settings - Fork 0
/
daemonset_util.go
330 lines (289 loc) · 12 KB
/
daemonset_util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
/*
Copyright 2020 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package daemonset
import (
"context"
"fmt"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/client"
kubeClient "sigs.k8s.io/controller-runtime/pkg/client"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// nodeInSameCondition returns true if all effective types ("Status" is true) equals;
// otherwise, returns false.
func nodeInSameCondition(old []corev1.NodeCondition, cur []corev1.NodeCondition) bool {
if len(old) == 0 && len(cur) == 0 {
return true
}
c1map := map[corev1.NodeConditionType]corev1.ConditionStatus{}
for _, c := range old {
if c.Status == corev1.ConditionTrue {
c1map[c.Type] = c.Status
}
}
for _, c := range cur {
if c.Status != corev1.ConditionTrue {
continue
}
if _, found := c1map[c.Type]; !found {
return false
}
delete(c1map, c.Type)
}
return len(c1map) == 0
}
// NodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a
// summary. Returned booleans are:
// * wantToRun:
// Returns true when a user would expect a pod to run on this node and ignores conditions
// such as DiskPressure or insufficient resource that would cause a daemonset pod not to schedule.
// This is primarily used to populate daemonset status.
// * shouldSchedule:
// Returns true when a daemonset should be scheduled to a node if a daemonset pod is not already
// running on that node.
// * shouldContinueRunning:
// Returns true when a daemonset should continue running on a node if a daemonset pod is already
// running on that node.
func NodeShouldRunDaemonPod(client client.Client, node *corev1.Node, ds *appsv1alpha1.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) {
newPod := NewPod(ds, node.Name)
// Because these bools require an && of all their required conditions, we start
// with all bools set to true and set a bool to false if a condition is not met.
// A bool should probably not be set to true after this line.
wantToRun, shouldSchedule, shouldContinueRunning = true, true, true
// If the daemon set specifies a node name, check that it matches with node.Name.
if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
return false, false, false, nil
}
reasons, nodeInfo, err := Simulate(client, newPod, node, ds)
if err != nil {
klog.Warningf("DaemonSet Predicates failed on node %s for ds '%s/%s' due to unexpected error: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err)
return false, false, false, err
}
var insufficientResourceErr error
for _, r := range reasons {
klog.V(6).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason())
switch reason := r.(type) {
case *predicates.InsufficientResourceError:
insufficientResourceErr = reason
case *predicates.PredicateFailureError:
var emitEvent bool
// we try to partition predicates into two partitions here: intentional on the part of the operator and not.
switch reason {
// intentional
case
predicates.ErrNodeSelectorNotMatch,
predicates.ErrPodNotMatchHostName,
predicates.ErrNodeLabelPresenceViolated,
// this one is probably intentional since it's a workaround for not having
// pod hard anti affinity.
predicates.ErrPodNotFitsHostPorts:
return false, false, false, nil
case predicates.ErrTaintsTolerationsNotMatch:
// DaemonSet is expected to respect taints and tolerations
fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo)
if err != nil {
return false, false, false, err
}
if !fitsNoExecute {
return false, false, false, nil
}
wantToRun, shouldSchedule = false, false
// unintentional
case
predicates.ErrDiskConflict,
predicates.ErrVolumeZoneConflict,
predicates.ErrMaxVolumeCountExceeded,
predicates.ErrNodeUnderMemoryPressure,
predicates.ErrNodeUnderDiskPressure:
// wantToRun and shouldContinueRunning are likely true here. They are
// absolutely true at the time of writing the comment. See first comment
// of this method.
shouldSchedule = false
emitEvent = true
// unexpected
case
predicates.ErrPodAffinityNotMatch,
predicates.ErrServiceAffinityViolated:
klog.Warningf("unexpected predicate failure reason: %s", reason.GetReason())
return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason())
default:
klog.V(6).Infof("unknown predicate failure reason: %s", reason.GetReason())
wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
emitEvent = true
}
if emitEvent {
klog.Errorf("failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason())
}
}
}
// only emit this event if insufficient resource is the only thing
// preventing the daemon pod from scheduling
if shouldSchedule && insufficientResourceErr != nil {
klog.Errorf("failed to place pod on %q: %s", node.ObjectMeta.Name, insufficientResourceErr.Error())
shouldSchedule = false
}
return
}
func Simulate(kubeclient client.Client, newPod *corev1.Pod, node *corev1.Node, ds *appsv1alpha1.DaemonSet) ([]predicates.PredicateFailureReason, *schedulernodeinfo.NodeInfo, error) {
podList := corev1.PodList{}
err := kubeclient.List(context.TODO(), &podList, client.MatchingFields{"spec.nodeName": node.Name})
if err != nil {
return nil, nil, err
}
nodeInfo := schedulernodeinfo.NewNodeInfo()
nodeInfo.SetNode(node)
for index := range podList.Items {
if isControlledByDaemonSet(&podList.Items[index], ds.GetUID()) {
continue
}
nodeInfo.AddPod(&podList.Items[index])
}
_, reasons, err := Predicates(newPod, nodeInfo)
return reasons, nodeInfo, err
}
func ShouldIgnoreNodeUpdate(oldNode, curNode corev1.Node) bool {
if !nodeInSameCondition(oldNode.Status.Conditions, curNode.Status.Conditions) {
return false
}
oldNode.ResourceVersion = curNode.ResourceVersion
oldNode.Status.Conditions = curNode.Status.Conditions
return apiequality.Semantic.DeepEqual(oldNode, curNode)
}
func getBurstReplicas(ds *appsv1alpha1.DaemonSet) int {
// Error caught by validation
burstReplicas, _ := intstrutil.GetValueFromIntOrPercent(
intstrutil.ValueOrDefault(ds.Spec.BurstReplicas, intstrutil.FromInt(0)),
int(ds.Status.DesiredNumberScheduled),
false)
return burstReplicas
}
// GetPodDaemonSets returns a list of DaemonSets that potentially match a pod.
// Only the one specified in the Pod's ControllerRef will actually manage it.
// Returns an error only if no matching DaemonSets are found.
func (dsc *ReconcileDaemonSet) GetPodDaemonSets(pod *corev1.Pod) ([]*appsv1alpha1.DaemonSet, error) {
var selector labels.Selector
var daemonSet *appsv1alpha1.DaemonSet
if len(pod.Labels) == 0 {
return nil, fmt.Errorf("no daemon sets found for pod %v because it has no labels", pod.Name)
}
list := &appsv1alpha1.DaemonSetList{}
err := dsc.client.List(context.TODO(), list)
if err != nil {
return nil, err
}
var daemonSets []*appsv1alpha1.DaemonSet
for i := range list.Items {
daemonSet = &list.Items[i]
if daemonSet.Namespace != pod.Namespace {
continue
}
selector, err = metav1.LabelSelectorAsSelector(daemonSet.Spec.Selector)
if err != nil {
// this should not happen if the DaemonSet passed validation
return nil, err
}
// If a daemonSet with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
daemonSets = append(daemonSets, daemonSet)
}
if len(daemonSets) == 0 {
return nil, fmt.Errorf("could not find daemon set for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return daemonSets, nil
}
func storeDaemonSetStatus(dsClient kubeClient.Client, ds *appsv1alpha1.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable int, updateObservedGen bool, hash string) error {
key := types.NamespacedName{
Namespace: ds.Namespace,
Name: ds.Name,
}
if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled &&
int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled &&
int(ds.Status.NumberMisscheduled) == numberMisscheduled &&
int(ds.Status.NumberReady) == numberReady &&
int(ds.Status.UpdatedNumberScheduled) == updatedNumberScheduled &&
int(ds.Status.NumberAvailable) == numberAvailable &&
int(ds.Status.NumberUnavailable) == numberUnavailable &&
ds.Status.ObservedGeneration >= ds.Generation && ds.Status.DaemonSetHash == hash {
klog.V(6).Info("storeDaemonSetStatus has no changes and return nil.")
return nil
}
klog.V(6).Infof("toUpdate is %v", ds)
var updateErr, getErr error
for i := 0; i < StatusUpdateRetries; i++ {
if updateObservedGen {
ds.Status.ObservedGeneration = ds.Generation
}
ds.Status.DesiredNumberScheduled = int32(desiredNumberScheduled)
ds.Status.CurrentNumberScheduled = int32(currentNumberScheduled)
ds.Status.NumberMisscheduled = int32(numberMisscheduled)
ds.Status.NumberReady = int32(numberReady)
ds.Status.UpdatedNumberScheduled = int32(updatedNumberScheduled)
ds.Status.NumberAvailable = int32(numberAvailable)
ds.Status.NumberUnavailable = int32(numberUnavailable)
ds.Status.DaemonSetHash = hash
if updateErr = dsClient.Status().Update(context.TODO(), ds); updateErr == nil {
klog.V(6).Infof("update DaemonSet status succeed. new status is %v", ds.Status)
return nil
}
klog.Errorf("update DaemonSet status %v failed: %v", ds.Status, updateErr)
// Update the set with the latest resource version for the next poll
newDs := &appsv1alpha1.DaemonSet{}
if getErr = dsClient.Get(context.TODO(), key, newDs); getErr != nil {
// If the GET fails we can't trust status.Replicas anymore. This error
// is bound to be more interesting than the update failure.
klog.Errorf("get DaemonSet %v failed: %v", ds.Name, getErr)
return getErr
}
}
return updateErr
}
// GetPodRevision returns revision hash of this pod.
func GetPodRevision(pod metav1.Object) string {
return pod.GetLabels()[apps.ControllerRevisionHashLabelKey]
}
// NodeShouldUpdateBySelector checks if the node is selected to upgrade for ds's gray update selector.
// This function does not check NodeShouldRunDaemonPod
func NodeShouldUpdateBySelector(node *corev1.Node, ds *appsv1alpha1.DaemonSet) bool {
switch ds.Spec.UpdateStrategy.Type {
case appsv1alpha1.OnDeleteDaemonSetStrategyType:
return false
case appsv1alpha1.RollingUpdateDaemonSetStrategyType:
if ds.Spec.UpdateStrategy.RollingUpdate.Selector == nil {
return false
}
selector, err := metav1.LabelSelectorAsSelector(ds.Spec.UpdateStrategy.RollingUpdate.Selector)
if err != nil {
// this should not happen if the DaemonSet passed validation
klog.Errorf("unexpected rolling update selector for ds %s, err %s", ds.Name, err.Error())
return false
}
if selector.Empty() || !selector.Matches(labels.Set(node.Labels)) {
return false
}
return true
default:
klog.Warningf("get unknown update strategy type %s for daemonset %s", ds.Spec.UpdateStrategy.Type, ds.Name)
return false
}
}