forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stateful_pod_control.go
238 lines (214 loc) · 9.96 KB
/
stateful_pod_control.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statefulset
import (
"fmt"
"strings"
apierrors "k8s.io/apimachinery/pkg/api/errors"
errorutils "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/client/retry"
)
// StatefulPodControlInterface defines the interface that StatefulSetController uses to create, update, and delete Pods,
// and to update the Status of a StatefulSet. It follows the design paradigms used for PodControl, but its
// implementation provides for PVC creation, ordered Pod creation, ordered Pod termination, and Pod identity enforcement.
// Like controller.PodControlInterface, it is implemented as an interface to provide for testing fakes.
type StatefulPodControlInterface interface {
// CreateStatefulPod create a Pod in a StatefulSet. Any PVCs necessary for the Pod are created prior to creating
// the Pod. If the returned error is nil the Pod and its PVCs have been created.
CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
// UpdateStatefulPod Updates a Pod in a StatefulSet. If the Pod already has the correct identity and stable
// storage this method is a no-op. If the Pod must be mutated to conform to the Set, it is mutated and updated.
// pod is an in-out parameter, and any updates made to the pod are reflected as mutations to this parameter. If
// the create is successful, the returned error is nil.
UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
// DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful,
// the returned error is nil.
DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
// UpdateStatefulSetStatus updates the status of a StatefulSet. set is an in-out parameter, and any
// updates made to the set are made visible as mutations to the parameter. If the method is successful, the
// returned error is nil, and set has its status updated.
UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error
}
func NewRealStatefulPodControl(
client clientset.Interface,
setLister appslisters.StatefulSetLister,
podLister corelisters.PodLister,
pvcLister corelisters.PersistentVolumeClaimLister,
recorder record.EventRecorder,
) StatefulPodControlInterface {
return &realStatefulPodControl{client, setLister, podLister, pvcLister, recorder}
}
// realStatefulPodControl implements StatefulPodControlInterface using a clientset.Interface to communicate with the
// API server. The struct is package private as the internal details are irrelevant to importing packages.
type realStatefulPodControl struct {
client clientset.Interface
setLister appslisters.StatefulSetLister
podLister corelisters.PodLister
pvcLister corelisters.PersistentVolumeClaimLister
recorder record.EventRecorder
}
func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
// Create the Pod's PVCs prior to creating the Pod
if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
spc.recordPodEvent("create", set, pod, err)
return err
}
// If we created the PVCs attempt to create the Pod
_, err := spc.client.Core().Pods(set.Namespace).Create(pod)
// sink already exists errors
if apierrors.IsAlreadyExists(err) {
return err
}
spc.recordPodEvent("create", set, pod, err)
return err
}
func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
attemptedUpdate := false
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// assume the Pod is consistent
consistent := true
// if the Pod does not conform to its identity, update the identity and dirty the Pod
if !identityMatches(set, pod) {
updateIdentity(set, pod)
consistent = false
}
// if the Pod does not conform to the StatefulSet's storage requirements, update the Pod's PVC's,
// dirty the Pod, and create any missing PVCs
if !storageMatches(set, pod) {
updateStorage(set, pod)
consistent = false
if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err)
return err
}
}
// if the Pod is not dirty do nothing
if consistent {
return nil
}
attemptedUpdate = true
// commit the update, retrying on conflicts
_, err := spc.client.Core().Pods(set.Namespace).Update(pod)
if err == nil {
return nil
}
updateErr := err
if updated, err := spc.podLister.Pods(set.Namespace).Get(pod.Name); err == nil {
// make a copy so we don't mutate the shared cache
if copy, err := api.Scheme.DeepCopy(updated); err == nil {
pod = copy.(*v1.Pod)
} else {
utilruntime.HandleError(fmt.Errorf("error copying updated Pod: %v", err))
}
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", set.Namespace, pod.Name, err))
}
return updateErr
})
if attemptedUpdate {
spc.recordPodEvent("update", set, pod, err)
}
return err
}
func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
err := spc.client.Core().Pods(set.Namespace).Delete(pod.Name, nil)
spc.recordPodEvent("delete", set, pod, err)
return err
}
func (spc *realStatefulPodControl) UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
set.Status.Replicas = replicas
set.Status.ObservedGeneration = &generation
_, err := spc.client.Apps().StatefulSets(set.Namespace).UpdateStatus(set)
if err == nil {
return nil
}
updateErr := err
if updated, err := spc.setLister.StatefulSets(set.Namespace).Get(set.Name); err == nil {
// make a copy so we don't mutate the shared cache
if copy, err := api.Scheme.DeepCopy(updated); err == nil {
set = copy.(*apps.StatefulSet)
} else {
utilruntime.HandleError(fmt.Errorf("error copying updated StatefulSet: %v", err))
}
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated StatefulSet %s/%s from lister: %v", set.Namespace, set.Name, err))
}
return updateErr
})
}
// recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will
// have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a reason of v1.EventTypeWarning.
func (spc *realStatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) {
if err == nil {
reason := fmt.Sprintf("Successful%s", strings.Title(verb))
message := fmt.Sprintf("%s Pod %s in StatefulSet %s successful",
strings.ToLower(verb), pod.Name, set.Name)
spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
} else {
reason := fmt.Sprintf("Failed%s", strings.Title(verb))
message := fmt.Sprintf("%s Pod %s in StatefulSet %s failed error: %s",
strings.ToLower(verb), pod.Name, set.Name, err)
spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
}
}
// recordClaimEvent records an event for verb applied to the PersistentVolumeClaim of a Pod in a StatefulSet. If err is
// nil the generated event will have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a
// reason of v1.EventTypeWarning.
func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, err error) {
if err == nil {
reason := fmt.Sprintf("Successful%s", strings.Title(verb))
message := fmt.Sprintf("%s Claim %s Pod %s in StatefulSet %s success",
strings.ToLower(verb), claim.Name, pod.Name, set.Name)
spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
} else {
reason := fmt.Sprintf("Failed%s", strings.Title(verb))
message := fmt.Sprintf("%s Claim %s for Pod %s in StatefulSet %s failed error: %s",
strings.ToLower(verb), claim.Name, pod.Name, set.Name, err)
spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
}
}
// createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which mush be a member of
// set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method
// may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with
// set's Spec.
func (spc *realStatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
var errs []error
for _, claim := range getPersistentVolumeClaims(set, pod) {
_, err := spc.pvcLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
switch {
case apierrors.IsNotFound(err):
_, err := spc.client.Core().PersistentVolumeClaims(claim.Namespace).Create(&claim)
if err != nil {
errs = append(errs, fmt.Errorf("Failed to create PVC %s: %s", claim.Name, err))
}
if err == nil || !apierrors.IsAlreadyExists(err) {
spc.recordClaimEvent("create", set, pod, &claim, err)
}
case err != nil:
errs = append(errs, fmt.Errorf("Failed to retrieve PVC %s: %s", claim.Name, err))
spc.recordClaimEvent("create", set, pod, &claim, err)
}
// TODO: Check resource requirements and accessmodes, update if necessary
}
return errorutils.NewAggregate(errs)
}
var _ StatefulPodControlInterface = &realStatefulPodControl{}