-
Notifications
You must be signed in to change notification settings - Fork 39.7k
/
lease.go
296 lines (248 loc) · 9.29 KB
/
lease.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconcilers
/*
Original Source:
https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c7/pkg/cmd/server/election/lease_endpoint_reconciler.go
*/
import (
"fmt"
"net"
"path"
"sync"
"time"
"k8s.io/klog"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/storage"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
)
// Leases is an interface which assists in managing the set of active masters
type Leases interface {
// ListLeases retrieves a list of the current master IPs
ListLeases() ([]string, error)
// UpdateLease adds or refreshes a master's lease
UpdateLease(ip string) error
// RemoveLease removes a master's lease
RemoveLease(ip string) error
}
type storageLeases struct {
storage storage.Interface
baseKey string
leaseTime time.Duration
}
var _ Leases = &storageLeases{}
// ListLeases retrieves a list of the current master IPs from storage
func (s *storageLeases) ListLeases() ([]string, error) {
ipInfoList := &corev1.EndpointsList{}
if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, "0", storage.Everything, ipInfoList); err != nil {
return nil, err
}
ipList := make([]string, len(ipInfoList.Items))
for i, ip := range ipInfoList.Items {
ipList[i] = ip.Subsets[0].Addresses[0].IP
}
klog.V(6).Infof("Current master IPs listed in storage are %v", ipList)
return ipList, nil
}
// UpdateLease resets the TTL on a master IP in storage
func (s *storageLeases) UpdateLease(ip string) error {
key := path.Join(s.baseKey, ip)
return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) {
// just make sure we've got the right IP set, and then refresh the TTL
existing := input.(*corev1.Endpoints)
existing.Subsets = []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{{IP: ip}},
},
}
// leaseTime needs to be in seconds
leaseTime := uint64(s.leaseTime / time.Second)
// NB: GuaranteedUpdate does not perform the store operation unless
// something changed between load and store (not including resource
// version), meaning we can't refresh the TTL without actually
// changing a field.
existing.Generation++
klog.V(6).Infof("Resetting TTL on master IP %q listed in storage to %v", ip, leaseTime)
return existing, &leaseTime, nil
})
}
// RemoveLease removes the lease on a master IP in storage
func (s *storageLeases) RemoveLease(ip string) error {
return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &corev1.Endpoints{}, nil)
}
// NewLeases creates a new etcd-based Leases implementation.
func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases {
return &storageLeases{
storage: storage,
baseKey: baseKey,
leaseTime: leaseTime,
}
}
type leaseEndpointReconciler struct {
endpointClient corev1client.EndpointsGetter
masterLeases Leases
stopReconcilingCalled bool
reconcilingLock sync.Mutex
}
// NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler
func NewLeaseEndpointReconciler(endpointClient corev1client.EndpointsGetter, masterLeases Leases) EndpointReconciler {
return &leaseEndpointReconciler{
endpointClient: endpointClient,
masterLeases: masterLeases,
stopReconcilingCalled: false,
}
}
// ReconcileEndpoints lists keys in a special etcd directory.
// Each key is expected to have a TTL of R+n, where R is the refresh interval
// at which this function is called, and n is some small value. If an
// apiserver goes down, it will fail to refresh its key's TTL and the key will
// expire. ReconcileEndpoints will notice that the endpoints object is
// different from the directory listing, and update the endpoints object
// accordingly.
func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
if r.stopReconcilingCalled {
return nil
}
// Refresh the TTL on our key, independently of whether any error or
// update conflict happens below. This makes sure that at least some of
// the masters will add our endpoint.
if err := r.masterLeases.UpdateLease(ip.String()); err != nil {
return err
}
return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
}
func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
e, err := r.endpointClient.Endpoints(corev1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
shouldCreate := false
if err != nil {
if !errors.IsNotFound(err) {
return err
}
shouldCreate = true
e = &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: corev1.NamespaceDefault,
},
}
}
// ... and the list of master IP keys from etcd
masterIPs, err := r.masterLeases.ListLeases()
if err != nil {
return err
}
// Since we just refreshed our own key, assume that zero endpoints
// returned from storage indicates an issue or invalid state, and thus do
// not update the endpoints list based on the result.
if len(masterIPs) == 0 {
return fmt.Errorf("no master IPs were listed in storage, refusing to erase all endpoints for the kubernetes service")
}
// Next, we compare the current list of endpoints with the list of master IP keys
formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
if formatCorrect && ipCorrect && portsCorrect {
return nil
}
if !formatCorrect {
// Something is egregiously wrong, just re-make the endpoints record.
e.Subsets = []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{},
Ports: endpointPorts,
}}
}
if !formatCorrect || !ipCorrect {
// repopulate the addresses according to the expected IPs from etcd
e.Subsets[0].Addresses = make([]corev1.EndpointAddress, len(masterIPs))
for ind, ip := range masterIPs {
e.Subsets[0].Addresses[ind] = corev1.EndpointAddress{IP: ip}
}
// Lexicographic order is retained by this step.
e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
}
if !portsCorrect {
// Reset ports.
e.Subsets[0].Ports = endpointPorts
}
klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs)
if shouldCreate {
if _, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Create(e); errors.IsAlreadyExists(err) {
err = nil
}
} else {
_, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Update(e)
}
return err
}
// checkEndpointSubsetFormatWithLease determines if the endpoint is in the
// format ReconcileEndpoints expects when the controller is using leases.
//
// Return values:
// * formatCorrect is true if exactly one subset is found.
// * ipsCorrect when the addresses in the endpoints match the expected addresses list
// * portsCorrect is true when endpoint ports exactly match provided ports.
// portsCorrect is only evaluated when reconcilePorts is set to true.
func checkEndpointSubsetFormatWithLease(e *corev1.Endpoints, expectedIPs []string, ports []corev1.EndpointPort, reconcilePorts bool) (formatCorrect bool, ipsCorrect bool, portsCorrect bool) {
if len(e.Subsets) != 1 {
return false, false, false
}
sub := &e.Subsets[0]
portsCorrect = true
if reconcilePorts {
if len(sub.Ports) != len(ports) {
portsCorrect = false
} else {
for i, port := range ports {
if port != sub.Ports[i] {
portsCorrect = false
break
}
}
}
}
ipsCorrect = true
if len(sub.Addresses) != len(expectedIPs) {
ipsCorrect = false
} else {
// check the actual content of the addresses
// present addrs is used as a set (the keys) and to indicate if a
// value was already found (the values)
presentAddrs := make(map[string]bool, len(expectedIPs))
for _, ip := range expectedIPs {
presentAddrs[ip] = false
}
// uniqueness is assumed amongst all Addresses.
for _, addr := range sub.Addresses {
if alreadySeen, ok := presentAddrs[addr.IP]; alreadySeen || !ok {
ipsCorrect = false
break
}
presentAddrs[addr.IP] = true
}
}
return true, ipsCorrect, portsCorrect
}
func (r *leaseEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
r.stopReconcilingCalled = true
if err := r.masterLeases.RemoveLease(ip.String()); err != nil {
return err
}
return r.doReconcile(serviceName, endpointPorts, true)
}