forked from kubernetes/ingress-gce
/
syncer.go
252 lines (221 loc) · 7.1 KB
/
syncer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package backends
import (
"fmt"
"net/http"
"strings"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/backends/features"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/healthchecks"
"k8s.io/ingress-gce/pkg/utils"
)
// backendSyncer manages the lifecycle of backends.
type backendSyncer struct {
backendPool Pool
healthChecker healthchecks.HealthChecker
backendConfigEnabled bool
prober ProbeProvider
namer *utils.Namer
}
// backendSyncer is a Syncer
var _ Syncer = (*backendSyncer)(nil)
func NewBackendSyncer(
backendPool Pool,
healthChecker healthchecks.HealthChecker,
namer *utils.Namer,
backendConfigEnabled bool) Syncer {
return &backendSyncer{
backendPool: backendPool,
healthChecker: healthChecker,
namer: namer,
backendConfigEnabled: backendConfigEnabled,
}
}
// Init implements Syncer.
func (s *backendSyncer) Init(pp ProbeProvider) {
s.prober = pp
}
// Sync implements Syncer.
func (s *backendSyncer) Sync(svcPorts []utils.ServicePort) error {
for _, sp := range svcPorts {
glog.V(3).Infof("Sync: backend %+v", sp)
if err := s.ensureBackendService(sp); err != nil {
return err
}
}
return nil
}
// ensureBackendService will update or create a BackendService for the given port.
func (s *backendSyncer) ensureBackendService(sp utils.ServicePort) error {
// We must track the ports even if creating the backends failed, because
// we might've created health-check for them.
be := &composite.BackendService{}
beName := sp.BackendName(s.namer)
version := features.VersionFromServicePort(&sp)
be, getErr := s.backendPool.Get(beName, version)
hasLegacyHC := false
if be != nil {
// If the backend already exists, find out if it is using a legacy health check.
existingHCLink := getHealthCheckLink(be)
if strings.Contains(existingHCLink, "/httpHealthChecks/") {
hasLegacyHC = true
}
}
// Ensure health check for backend service exists.
hcLink, err := s.ensureHealthCheck(sp, hasLegacyHC)
if err != nil {
return err
}
// Verify existance of a backend service for the proper port
// but do not specify any backends for it (IG / NEG).
if getErr != nil {
if !utils.IsNotFoundError(getErr) {
return getErr
}
// Only create the backend service if the error was 404.
glog.V(2).Infof("Creating backend service for port %v named %v", sp.NodePort, beName)
be, err = s.backendPool.Create(sp, hcLink)
if err != nil {
return err
}
}
needUpdate := ensureProtocol(be, sp)
needUpdate = ensureHealthCheckLink(be, hcLink) || needUpdate
needUpdate = ensureDescription(be, &sp) || needUpdate
if s.backendConfigEnabled && sp.BackendConfig != nil {
needUpdate = features.EnsureCDN(sp, be) || needUpdate
needUpdate = features.EnsureIAP(sp, be) || needUpdate
needUpdate = features.EnsureTimeout(sp, be) || needUpdate
needUpdate = features.EnsureDraining(sp, be) || needUpdate
needUpdate = features.EnsureAffinity(sp, be) || needUpdate
}
if needUpdate {
if err := s.backendPool.Update(be); err != nil {
return err
}
}
if s.backendConfigEnabled && sp.BackendConfig != nil {
cloud := s.backendPool.(*Backends).cloud
if err := features.EnsureSecurityPolicy(cloud, sp, be, beName); err != nil {
return err
}
}
return nil
}
// GC implements Syncer.
func (s *backendSyncer) GC(svcPorts []utils.ServicePort) error {
knownPorts := sets.NewString()
for _, sp := range svcPorts {
name := sp.BackendName(s.namer)
knownPorts.Insert(name)
}
backendNames, err := s.backendPool.List()
if err != nil {
return fmt.Errorf("error getting the names of controller-managed backends: %v", err)
}
for _, name := range backendNames {
if knownPorts.Has(name) {
continue
}
glog.V(3).Infof("GCing backendService for port %s", name)
if err := s.backendPool.Delete(name); err != nil && !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
return err
}
if err := s.healthChecker.Delete(name); err != nil {
return err
}
}
return nil
}
// Status implements Syncer.
func (s *backendSyncer) Status(name string) string {
return s.backendPool.Health(name)
}
// Shutdown implements Syncer.
func (s *backendSyncer) Shutdown() error {
if err := s.GC([]utils.ServicePort{}); err != nil {
return err
}
return nil
}
func (s *backendSyncer) ensureHealthCheck(sp utils.ServicePort, hasLegacyHC bool) (string, error) {
if hasLegacyHC {
glog.Errorf("Backend %+v has legacy health check", sp.ID)
}
hc := s.healthChecker.New(sp)
if s.prober != nil {
probe, err := s.prober.GetProbe(sp)
if err != nil {
return "", err
}
if probe != nil {
glog.V(4).Infof("Applying httpGet settings of readinessProbe to health check on port %+v", sp)
applyProbeSettingsToHC(probe, hc)
}
}
return s.healthChecker.Sync(hc)
}
// getHealthCheckLink gets the Healthcheck link off the BackendService
func getHealthCheckLink(be *composite.BackendService) string {
if len(be.HealthChecks) == 1 {
return be.HealthChecks[0]
}
return "invalid-healthcheck-link"
}
// ensureProtocol updates the BackendService Protocol with the expected value
func ensureProtocol(be *composite.BackendService, p utils.ServicePort) (needsUpdate bool) {
if be.Protocol == string(p.Protocol) {
return false
}
be.Protocol = string(p.Protocol)
return true
}
// ensureHealthCheckLink updates the BackendService HealthCheck with the expected value
func ensureHealthCheckLink(be *composite.BackendService, hcLink string) (needsUpdate bool) {
existingHCLink := getHealthCheckLink(be)
if utils.EqualResourceIDs(existingHCLink, hcLink) {
return false
}
be.HealthChecks = []string{hcLink}
return true
}
func applyProbeSettingsToHC(p *v1.Probe, hc *healthchecks.HealthCheck) {
healthPath := p.Handler.HTTPGet.Path
// GCE requires a leading "/" for health check urls.
if !strings.HasPrefix(healthPath, "/") {
healthPath = "/" + healthPath
}
// Extract host from HTTP headers
host := p.Handler.HTTPGet.Host
for _, header := range p.Handler.HTTPGet.HTTPHeaders {
if header.Name == "Host" {
host = header.Value
break
}
}
hc.RequestPath = healthPath
hc.Host = host
hc.Description = "Kubernetes L7 health check generated with readiness probe settings."
hc.TimeoutSec = int64(p.TimeoutSeconds)
if hc.ForNEG {
// For NEG mode, we can support more aggressive healthcheck interval.
hc.CheckIntervalSec = int64(p.PeriodSeconds)
} else {
// For IG mode, short healthcheck interval may health check flooding problem.
hc.CheckIntervalSec = int64(p.PeriodSeconds) + int64(healthchecks.DefaultHealthCheckInterval.Seconds())
}
}