forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
desired_state_of_world_populator.go
170 lines (148 loc) · 6.03 KB
/
desired_state_of_world_populator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package populator implements interfaces that monitor and keep the states of the
// desired_state_of_word in sync with the "ground truth" from informer.
package populator
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corelisters "k8s.io/client-go/listers/core/v1"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// DesiredStateOfWorldPopulator periodically verifies that the pods in the
// desired state of the world still exist, if not, it removes them.
// It also loops through the list of active pods and ensures that
// each one exists in the desired state of the world cache
// if it has volumes.
type DesiredStateOfWorldPopulator interface {
Run(stopCh <-chan struct{})
}
// NewDesiredStateOfWorldPopulator returns a new instance of DesiredStateOfWorldPopulator.
// loopSleepDuration - the amount of time the populator loop sleeps between
// successive executions
// podManager - the kubelet podManager that is the source of truth for the pods
// that exist on this host
// desiredStateOfWorld - the cache to populate
func NewDesiredStateOfWorldPopulator(
loopSleepDuration time.Duration,
listPodsRetryDuration time.Duration,
podLister corelisters.PodLister,
desiredStateOfWorld cache.DesiredStateOfWorld,
volumePluginMgr *volume.VolumePluginMgr,
pvcLister corelisters.PersistentVolumeClaimLister,
pvLister corelisters.PersistentVolumeLister) DesiredStateOfWorldPopulator {
return &desiredStateOfWorldPopulator{
loopSleepDuration: loopSleepDuration,
listPodsRetryDuration: listPodsRetryDuration,
podLister: podLister,
desiredStateOfWorld: desiredStateOfWorld,
volumePluginMgr: volumePluginMgr,
pvcLister: pvcLister,
pvLister: pvLister,
}
}
type desiredStateOfWorldPopulator struct {
loopSleepDuration time.Duration
podLister corelisters.PodLister
desiredStateOfWorld cache.DesiredStateOfWorld
volumePluginMgr *volume.VolumePluginMgr
pvcLister corelisters.PersistentVolumeClaimLister
pvLister corelisters.PersistentVolumeLister
listPodsRetryDuration time.Duration
timeOfLastListPods time.Time
}
func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {
wait.Until(dswp.populatorLoopFunc(), dswp.loopSleepDuration, stopCh)
}
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
return func() {
dswp.findAndRemoveDeletedPods()
// findAndAddActivePods is called periodically, independently of the main
// populator loop.
if time.Since(dswp.timeOfLastListPods) < dswp.listPodsRetryDuration {
glog.V(5).Infof(
"Skipping findAndAddActivePods(). Not permitted until %v (listPodsRetryDuration %v).",
dswp.timeOfLastListPods.Add(dswp.listPodsRetryDuration),
dswp.listPodsRetryDuration)
return
}
dswp.findAndAddActivePods()
}
}
// Iterate through all pods in desired state of world, and remove if they no
// longer exist in the informer
func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
for dswPodUID, dswPodToAdd := range dswp.desiredStateOfWorld.GetPodToAdd() {
dswPodKey, err := kcache.MetaNamespaceKeyFunc(dswPodToAdd.Pod)
if err != nil {
glog.Errorf("MetaNamespaceKeyFunc failed for pod %q (UID %q) with: %v", dswPodKey, dswPodUID, err)
continue
}
// Retrieve the pod object from pod informer with the namespace key
namespace, name, err := kcache.SplitMetaNamespaceKey(dswPodKey)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error splitting dswPodKey %q: %v", dswPodKey, err))
continue
}
informerPod, err := dswp.podLister.Pods(namespace).Get(name)
switch {
case errors.IsNotFound(err):
// if we can't find the pod, we need to delete it below
case err != nil:
glog.Errorf("podLister Get failed for pod %q (UID %q) with %v", dswPodKey, dswPodUID, err)
continue
default:
volumeActionFlag := util.DetermineVolumeAction(
informerPod,
dswp.desiredStateOfWorld,
true /* default volume action */)
if volumeActionFlag {
informerPodUID := volumehelper.GetUniquePodName(informerPod)
// Check whether the unique identifier of the pod from dsw matches the one retrieved from pod informer
if informerPodUID == dswPodUID {
glog.V(10).Infof("Verified pod %q (UID %q) from dsw exists in pod informer.", dswPodKey, dswPodUID)
continue
}
}
}
// the pod from dsw does not exist in pod informer, or it does not match the unique identifer retrieved
// from the informer, delete it from dsw
glog.V(1).Infof("Removing pod %q (UID %q) from dsw because it does not exist in pod informer.", dswPodKey, dswPodUID)
dswp.desiredStateOfWorld.DeletePod(dswPodUID, dswPodToAdd.VolumeName, dswPodToAdd.NodeName)
}
}
func (dswp *desiredStateOfWorldPopulator) findAndAddActivePods() {
pods, err := dswp.podLister.List(labels.Everything())
if err != nil {
glog.Errorf("podLister List failed: %v", err)
return
}
dswp.timeOfLastListPods = time.Now()
for _, pod := range pods {
if volumehelper.IsPodTerminated(pod, pod.Status) {
// Do not add volumes for terminated pods
continue
}
util.ProcessPodVolumes(pod, true,
dswp.desiredStateOfWorld, dswp.volumePluginMgr, dswp.pvcLister, dswp.pvLister)
}
}