forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
observer.go
171 lines (152 loc) · 6.53 KB
/
observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller/framework"
)
// taskUpdateTx execute a task update transaction f for the task identified by
// taskId. if no such task exists then f is not invoked and an error is
// returned. if f is invoked then taskUpdateTx returns the bool result of f.
type taskUpdateTx func(taskId string, f func(*kuberTask, *api.Pod) bool) (changed bool, err error)
// podObserver receives callbacks for every pod state change on the apiserver and
// for each decides whether to execute a task update transaction.
type podObserver struct {
podController *framework.Controller
terminate <-chan struct{}
taskUpdateTx taskUpdateTx
}
func newPodObserver(podLW cache.ListerWatcher, taskUpdateTx taskUpdateTx, terminate <-chan struct{}) *podObserver {
// watch pods from the given pod ListWatch
if podLW == nil {
// fail early to make debugging easier
panic("cannot create executor with nil PodLW")
}
p := &podObserver{
terminate: terminate,
taskUpdateTx: taskUpdateTx,
}
_, p.podController = framework.NewInformer(podLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*api.Pod)
log.V(4).Infof("pod %s/%s created on apiserver", pod.Namespace, pod.Name)
p.handleChangedApiserverPod(pod)
},
UpdateFunc: func(oldObj, newObj interface{}) {
pod := newObj.(*api.Pod)
log.V(4).Infof("pod %s/%s updated on apiserver", pod.Namespace, pod.Name)
p.handleChangedApiserverPod(pod)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*api.Pod)
log.V(4).Infof("pod %s/%s deleted on apiserver", pod.Namespace, pod.Name)
},
})
return p
}
// run begins observing pod state changes; blocks until the terminate chan closes.
func (p *podObserver) run() {
p.podController.Run(p.terminate)
}
// handleChangedApiserverPod is invoked for pod add/update state changes and decides whether
// task updates are necessary. if so, a task update is executed via taskUpdateTx.
func (p *podObserver) handleChangedApiserverPod(pod *api.Pod) {
// Don't do anything for pods without task anotation which means:
// - "pre-scheduled" pods which have a NodeName set to this node without being scheduled already.
// - static/mirror pods: they'll never have a TaskID annotation, and we don't expect them to ever change.
// - all other pods that haven't passed through the launch-task-binding phase, which would set annotations.
taskId := pod.Annotations[meta.TaskIdKey]
if taskId == "" {
// There also could be a race between the overall launch-task process and this update, but here we
// will never be able to process such a stale update because the "update pod" that we're receiving
// in this func won't yet have a task ID annotation. It follows that we can safely drop such a stale
// update on the floor because we'll get another update later that, in addition to the changes that
// we're dropping now, will also include the changes from the binding process.
log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey)
return
}
_, err := p.taskUpdateTx(taskId, func(_ *kuberTask, relatedPod *api.Pod) (sendSnapshot bool) {
if relatedPod == nil {
// should never happen because:
// (a) the update pod record has already passed through the binding phase in launchTasks()
// (b) all remaining updates to executor.{pods,tasks} are sync'd in unison
log.Errorf("internal state error: pod not found for task %s", taskId)
return
}
// TODO(sttts): check whether we can and should send all "semantic" changes down to the kubelet
// see kubelet/config/config.go for semantic change detection
// check for updated labels/annotations: need to forward these for the downward API
sendSnapshot = sendSnapshot || updateMetaMap(&relatedPod.Labels, pod.Labels)
sendSnapshot = sendSnapshot || updateMetaMap(&relatedPod.Annotations, pod.Annotations)
// terminating pod?
if pod.Status.Phase == api.PodRunning {
timeModified := differentTime(relatedPod.DeletionTimestamp, pod.DeletionTimestamp)
graceModified := differentPeriod(relatedPod.DeletionGracePeriodSeconds, pod.DeletionGracePeriodSeconds)
if timeModified || graceModified {
log.Infof("pod %s/%s is terminating at %v with %vs grace period, telling kubelet",
pod.Namespace, pod.Name, *pod.DeletionTimestamp, *pod.DeletionGracePeriodSeconds)
// modify the pod in our registry instead of sending the new pod. The later
// would allow that other changes bleed into the kubelet. For now we are
// very conservative changing this behaviour.
relatedPod.DeletionTimestamp = pod.DeletionTimestamp
relatedPod.DeletionGracePeriodSeconds = pod.DeletionGracePeriodSeconds
sendSnapshot = true
}
}
return
})
if err != nil {
log.Errorf("failed to update pod %s/%s: %+v", pod.Namespace, pod.Name, err)
}
}
// updateMetaMap looks for differences between src and dest; if there are differences
// then dest is changed (possibly to point to src) and this func returns true.
func updateMetaMap(dest *map[string]string, src map[string]string) (changed bool) {
// check for things in dest that are missing in src
for k := range *dest {
if _, ok := src[k]; !ok {
changed = true
break
}
}
if !changed {
if len(*dest) == 0 {
if len(src) > 0 {
changed = true
goto finished
}
// no update needed
return
}
// check for things in src that are missing/different in dest
for k, v := range src {
if vv, ok := (*dest)[k]; !ok || vv != v {
changed = true
break
}
}
}
finished:
*dest = src
return
}
func differentTime(a, b *unversioned.Time) bool {
return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b)
}
func differentPeriod(a, b *int64) bool {
return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b)
}