forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pod_workers.go
212 lines (185 loc) · 6.94 KB
/
pod_workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/unversioned/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
)
// PodWorkers is an abstract interface for testability.
type PodWorkers interface {
UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func())
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
ForgetWorker(uid types.UID)
}
type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod, SyncPodType) error
type podWorkers struct {
// Protects all per worker fields.
podLock sync.Mutex
// Tracks all running per-pod goroutines - per-pod goroutine will be
// processing updates received through its corresponding channel.
podUpdates map[types.UID]chan workUpdate
// Track the current state of per-pod goroutines.
// Currently all update request for a given pod coming when another
// update of this pod is being processed are ignored.
isWorking map[types.UID]bool
// Tracks the last undelivered work item for this pod - a work item is
// undelivered if it comes in while the worker is working.
lastUndeliveredWorkUpdate map[types.UID]workUpdate
// runtimeCache is used for listing running containers.
runtimeCache kubecontainer.RuntimeCache
// This function is run to sync the desired stated of pod.
// NOTE: This function has to be thread-safe - it can be called for
// different pods at the same time.
syncPodFn syncPodFnType
// The EventRecorder to use
recorder record.EventRecorder
}
type workUpdate struct {
// The pod state to reflect.
pod *api.Pod
// The mirror pod of pod; nil if it does not exist.
mirrorPod *api.Pod
// Function to call when the update is complete.
updateCompleteFn func()
// A string describing the type of this update, eg: create
updateType SyncPodType
}
func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType,
recorder record.EventRecorder) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
lastUndeliveredWorkUpdate: map[types.UID]workUpdate{},
runtimeCache: runtimeCache,
syncPodFn: syncPodFn,
recorder: recorder,
}
}
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
var minRuntimeCacheTime time.Time
for newWork := range podUpdates {
func() {
defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn)
// We would like to have the state of the containers from at least
// the moment when we finished the previous processing of that pod.
if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil {
glog.Errorf("Error updating the container runtime cache: %v", err)
return
}
pods, err := p.runtimeCache.GetPods()
if err != nil {
glog.Errorf("Error getting pods while syncing pod: %v", err)
return
}
err = p.syncPodFn(newWork.pod, newWork.mirrorPod,
kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID), newWork.updateType)
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, "FailedSync", "Error syncing pod, skipping: %v", err)
return
}
minRuntimeCacheTime = time.Now()
newWork.updateCompleteFn()
}()
}
}
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) {
uid := pod.UID
var podUpdates chan workUpdate
var exists bool
// TODO: Pipe this through from the kubelet. Currently kubelets operating with
// snapshot updates (PodConfigNotificationSnapshot) will send updates, creates
// and deletes as SET operations, which makes updates indistinguishable from
// creates. The intent here is to communicate to the pod worker that it can take
// certain liberties, like skipping status generation, when it receives a create
// event for a pod.
updateType := SyncPodUpdate
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
// We need to have a buffer here, because checkForUpdates() method that
// puts an update into channel is called from the same goroutine where
// the channel is consumed. However, it is guaranteed that in such case
// the channel is empty, so buffer of size 1 is enough.
podUpdates = make(chan workUpdate, 1)
p.podUpdates[uid] = podUpdates
// Creating a new pod worker either means this is a new pod, or that the
// kubelet just restarted. In either case the kubelet is willing to believe
// the status of the pod for the first pod worker sync. See corresponding
// comment in syncPod.
updateType = SyncPodCreate
go func() {
defer util.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- workUpdate{
pod: pod,
mirrorPod: mirrorPod,
updateCompleteFn: updateComplete,
updateType: updateType,
}
} else {
p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{
pod: pod,
mirrorPod: mirrorPod,
updateCompleteFn: updateComplete,
updateType: updateType,
}
}
}
func (p *podWorkers) removeWorker(uid types.UID) {
if ch, ok := p.podUpdates[uid]; ok {
close(ch)
delete(p.podUpdates, uid)
// If there is an undelivered work update for this pod we need to remove it
// since per-pod goroutine won't be able to put it to the already closed
// channel when it finish processing the current work update.
if _, cached := p.lastUndeliveredWorkUpdate[uid]; cached {
delete(p.lastUndeliveredWorkUpdate, uid)
}
}
}
func (p *podWorkers) ForgetWorker(uid types.UID) {
p.podLock.Lock()
defer p.podLock.Unlock()
p.removeWorker(uid)
}
func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {
p.podLock.Lock()
defer p.podLock.Unlock()
for key := range p.podUpdates {
if _, exists := desiredPods[key]; !exists {
p.removeWorker(key)
}
}
}
func (p *podWorkers) checkForUpdates(uid types.UID, updateComplete func()) {
p.podLock.Lock()
defer p.podLock.Unlock()
if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {
p.podUpdates[uid] <- workUpdate
delete(p.lastUndeliveredWorkUpdate, uid)
} else {
p.isWorking[uid] = false
}
}