forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
lease_endpoint_reconciler.go
236 lines (199 loc) · 7.41 KB
/
lease_endpoint_reconciler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
package election
import (
"fmt"
"net"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/storage"
"k8s.io/kubernetes/pkg/api/endpoints"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/master/reconcilers"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
)
// Leases is an interface which assists in managing the set of active masters
type Leases interface {
// ListLeases retrieves a list of the current master IPs
ListLeases() ([]string, error)
// UpdateLease adds or refreshes a master's lease
UpdateLease(ip string) error
}
type storageLeases struct {
storage storage.Interface
baseKey string
leaseTime uint64
}
var _ Leases = &storageLeases{}
// ListLeases retrieves a list of the current master IPs from storage
func (s *storageLeases) ListLeases() ([]string, error) {
ipInfoList := &api.EndpointsList{}
if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, "0", storage.Everything, ipInfoList); err != nil {
return nil, err
}
ipList := make([]string, len(ipInfoList.Items))
for i, ip := range ipInfoList.Items {
ipList[i] = ip.Subsets[0].Addresses[0].IP
}
glog.V(6).Infof("Current master IPs listed in storage are %v", ipList)
return ipList, nil
}
// UpdateLease resets the TTL on a master IP in storage
func (s *storageLeases) UpdateLease(ip string) error {
return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &api.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) {
// just make sure we've got the right IP set, and then refresh the TTL
existing := input.(*api.Endpoints)
existing.Subsets = []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: ip}},
},
}
leaseTime := s.leaseTime
// NB: GuaranteedUpdate does not perform the store operation unless
// something changed between load and store (not including resource
// version), meaning we can't refresh the TTL without actually
// changing a field.
existing.Generation += 1
glog.V(6).Infof("Resetting TTL on master IP %q listed in storage to %v", ip, leaseTime)
return existing, &leaseTime, nil
})
}
// NewLeases creates a new etcd-based Leases implementation.
func NewLeases(storage storage.Interface, baseKey string, leaseTime uint64) Leases {
return &storageLeases{
storage: storage,
baseKey: baseKey,
leaseTime: leaseTime,
}
}
type leaseEndpointReconciler struct {
endpointRegistry endpoint.Registry
masterLeases Leases
}
var _ reconcilers.EndpointReconciler = &leaseEndpointReconciler{}
func NewLeaseEndpointReconciler(endpointRegistry endpoint.Registry, masterLeases Leases) *leaseEndpointReconciler {
return &leaseEndpointReconciler{
endpointRegistry: endpointRegistry,
masterLeases: masterLeases,
}
}
// ReconcileEndpoints lists keys in a special etcd directory.
// Each key is expected to have a TTL of R+n, where R is the refresh interval
// at which this function is called, and n is some small value. If an
// apiserver goes down, it will fail to refresh its key's TTL and the key will
// expire. ReconcileEndpoints will notice that the endpoints object is
// different from the directory listing, and update the endpoints object
// accordingly.
func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
ctx := apirequest.NewDefaultContext()
// Refresh the TTL on our key, independently of whether any error or
// update conflict happens below. This makes sure that at least some of
// the masters will add our endpoint.
if err := r.masterLeases.UpdateLease(ip.String()); err != nil {
return err
}
// Retrieve the current list of endpoints...
e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName, &metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
return err
}
e = &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: api.NamespaceDefault,
},
}
}
// ... and the list of master IP keys from etcd
masterIPs, err := r.masterLeases.ListLeases()
if err != nil {
return err
}
// Since we just refreshed our own key, assume that zero endpoints
// returned from storage indicates an issue or invalid state, and thus do
// not update the endpoints list based on the result.
if len(masterIPs) == 0 {
return fmt.Errorf("no master IPs were listed in storage, refusing to erase all endpoints for the kubernetes service")
}
// Next, we compare the current list of endpoints with the list of master IP keys
formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
if formatCorrect && ipCorrect && portsCorrect {
return nil
}
if !formatCorrect {
// Something is egregiously wrong, just re-make the endpoints record.
e.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{},
Ports: endpointPorts,
}}
}
if !formatCorrect || !ipCorrect {
// repopulate the addresses according to the expected IPs from etcd
e.Subsets[0].Addresses = make([]api.EndpointAddress, len(masterIPs))
for ind, ip := range masterIPs {
e.Subsets[0].Addresses[ind] = api.EndpointAddress{IP: ip}
}
// Lexicographic order is retained by this step.
e.Subsets = endpoints.RepackSubsets(e.Subsets)
}
if !portsCorrect {
// Reset ports.
e.Subsets[0].Ports = endpointPorts
}
glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs)
return r.endpointRegistry.UpdateEndpoints(ctx, e, nil, nil)
}
// TODO collapse onto upstream
func (r *leaseEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error {
return nil
}
// checkEndpointSubsetFormatWithLease determines if the endpoint is in the
// format ReconcileEndpoints expects when the controller is using leases.
//
// Return values:
// * formatCorrect is true if exactly one subset is found.
// * ipsCorrect when the addresses in the endpoints match the expected addresses list
// * portsCorrect is true when endpoint ports exactly match provided ports.
// portsCorrect is only evaluated when reconcilePorts is set to true.
func checkEndpointSubsetFormatWithLease(e *api.Endpoints, expectedIPs []string, ports []api.EndpointPort, reconcilePorts bool) (formatCorrect bool, ipsCorrect bool, portsCorrect bool) {
if len(e.Subsets) != 1 {
return false, false, false
}
sub := &e.Subsets[0]
portsCorrect = true
if reconcilePorts {
if len(sub.Ports) != len(ports) {
portsCorrect = false
} else {
for i, port := range ports {
if port != sub.Ports[i] {
portsCorrect = false
break
}
}
}
}
ipsCorrect = true
if len(sub.Addresses) != len(expectedIPs) {
ipsCorrect = false
} else {
// check the actual content of the addresses
// present addrs is used as a set (the keys) and to indicate if a
// value was already found (the values)
presentAddrs := make(map[string]bool, len(expectedIPs))
for _, ip := range expectedIPs {
presentAddrs[ip] = false
}
// uniqueness is assumed amongst all Addresses.
for _, addr := range sub.Addresses {
if alreadySeen, ok := presentAddrs[addr.IP]; alreadySeen || !ok {
ipsCorrect = false
break
}
presentAddrs[addr.IP] = true
}
}
return true, ipsCorrect, portsCorrect
}