forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stateful_pod_control.go
206 lines (187 loc) · 8.72 KB
/
stateful_pod_control.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statefulset
import (
"fmt"
"strings"
apierrors "k8s.io/apimachinery/pkg/api/errors"
errorutils "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/client/retry"
)
// StatefulPodControlInterface defines the interface that StatefulSetController uses to create, update, and delete Pods,
// and to update the Status of a StatefulSet. It follows the design paradigms used for PodControl, but its
// implementation provides for PVC creation, ordered Pod creation, ordered Pod termination, and Pod identity enforcement.
// Like controller.PodControlInterface, it is implemented as an interface to provide for testing fakes.
type StatefulPodControlInterface interface {
// CreateStatefulPod create a Pod in a StatefulSet. Any PVCs necessary for the Pod are created prior to creating
// the Pod. If the returned error is nil the Pod and its PVCs have been created.
CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
// UpdateStatefulPod Updates a Pod in a StatefulSet. If the Pod already has the correct identity and stable
// storage this method is a no-op. If the Pod must be mutated to conform to the Set, it is mutated and updated.
// pod is an in-out parameter, and any updates made to the pod are reflected as mutations to this parameter. If
// the create is successful, the returned error is nil.
UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
// DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful,
// the returned error is nil.
DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
}
func NewRealStatefulPodControl(
client clientset.Interface,
setLister appslisters.StatefulSetLister,
podLister corelisters.PodLister,
pvcLister corelisters.PersistentVolumeClaimLister,
recorder record.EventRecorder,
) StatefulPodControlInterface {
return &realStatefulPodControl{client, setLister, podLister, pvcLister, recorder}
}
// realStatefulPodControl implements StatefulPodControlInterface using a clientset.Interface to communicate with the
// API server. The struct is package private as the internal details are irrelevant to importing packages.
type realStatefulPodControl struct {
client clientset.Interface
setLister appslisters.StatefulSetLister
podLister corelisters.PodLister
pvcLister corelisters.PersistentVolumeClaimLister
recorder record.EventRecorder
}
func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
// Create the Pod's PVCs prior to creating the Pod
if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
spc.recordPodEvent("create", set, pod, err)
return err
}
// If we created the PVCs attempt to create the Pod
_, err := spc.client.Core().Pods(set.Namespace).Create(pod)
// sink already exists errors
if apierrors.IsAlreadyExists(err) {
return err
}
spc.recordPodEvent("create", set, pod, err)
return err
}
func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
attemptedUpdate := false
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// assume the Pod is consistent
consistent := true
// if the Pod does not conform to its identity, update the identity and dirty the Pod
if !identityMatches(set, pod) {
updateIdentity(set, pod)
consistent = false
}
// if the Pod does not conform to the StatefulSet's storage requirements, update the Pod's PVC's,
// dirty the Pod, and create any missing PVCs
if !storageMatches(set, pod) {
updateStorage(set, pod)
consistent = false
if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err)
return err
}
}
// if the Pod is not dirty, do nothing
if consistent {
return nil
}
attemptedUpdate = true
// commit the update, retrying on conflicts
_, updateErr := spc.client.Core().Pods(set.Namespace).Update(pod)
if updateErr == nil {
return nil
}
if updated, err := spc.podLister.Pods(set.Namespace).Get(pod.Name); err == nil {
// make a copy so we don't mutate the shared cache
if copy, err := scheme.Scheme.DeepCopy(updated); err == nil {
pod = copy.(*v1.Pod)
} else {
utilruntime.HandleError(fmt.Errorf("error copying updated Pod: %v", err))
}
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", set.Namespace, pod.Name, err))
}
return updateErr
})
if attemptedUpdate {
spc.recordPodEvent("update", set, pod, err)
}
return err
}
func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
err := spc.client.Core().Pods(set.Namespace).Delete(pod.Name, nil)
spc.recordPodEvent("delete", set, pod, err)
return err
}
// recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will
// have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a reason of v1.EventTypeWarning.
func (spc *realStatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) {
if err == nil {
reason := fmt.Sprintf("Successful%s", strings.Title(verb))
message := fmt.Sprintf("%s Pod %s in StatefulSet %s successful",
strings.ToLower(verb), pod.Name, set.Name)
spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
} else {
reason := fmt.Sprintf("Failed%s", strings.Title(verb))
message := fmt.Sprintf("%s Pod %s in StatefulSet %s failed error: %s",
strings.ToLower(verb), pod.Name, set.Name, err)
spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
}
}
// recordClaimEvent records an event for verb applied to the PersistentVolumeClaim of a Pod in a StatefulSet. If err is
// nil the generated event will have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a
// reason of v1.EventTypeWarning.
func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, err error) {
if err == nil {
reason := fmt.Sprintf("Successful%s", strings.Title(verb))
message := fmt.Sprintf("%s Claim %s Pod %s in StatefulSet %s success",
strings.ToLower(verb), claim.Name, pod.Name, set.Name)
spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
} else {
reason := fmt.Sprintf("Failed%s", strings.Title(verb))
message := fmt.Sprintf("%s Claim %s for Pod %s in StatefulSet %s failed error: %s",
strings.ToLower(verb), claim.Name, pod.Name, set.Name, err)
spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
}
}
// createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which mush be a member of
// set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method
// may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with
// set's Spec.
func (spc *realStatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
var errs []error
for _, claim := range getPersistentVolumeClaims(set, pod) {
_, err := spc.pvcLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
switch {
case apierrors.IsNotFound(err):
_, err := spc.client.Core().PersistentVolumeClaims(claim.Namespace).Create(&claim)
if err != nil {
errs = append(errs, fmt.Errorf("Failed to create PVC %s: %s", claim.Name, err))
}
if err == nil || !apierrors.IsAlreadyExists(err) {
spc.recordClaimEvent("create", set, pod, &claim, err)
}
case err != nil:
errs = append(errs, fmt.Errorf("Failed to retrieve PVC %s: %s", claim.Name, err))
spc.recordClaimEvent("create", set, pod, &claim, err)
}
// TODO: Check resource requirements and accessmodes, update if necessary
}
return errorutils.NewAggregate(errs)
}
var _ StatefulPodControlInterface = &realStatefulPodControl{}