-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
server.go
472 lines (399 loc) · 15.7 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package server
import (
"fmt"
"path"
"time"
"github.com/cilium/cilium/api/v1/client/daemon"
healthModels "github.com/cilium/cilium/api/v1/health/models"
healthApi "github.com/cilium/cilium/api/v1/health/server"
"github.com/cilium/cilium/api/v1/health/server/restapi"
"github.com/cilium/cilium/api/v1/models"
"github.com/cilium/cilium/pkg/api"
ciliumPkg "github.com/cilium/cilium/pkg/client"
ciliumDefaults "github.com/cilium/cilium/pkg/defaults"
healthClientPkg "github.com/cilium/cilium/pkg/health/client"
"github.com/cilium/cilium/pkg/health/defaults"
"github.com/cilium/cilium/pkg/health/probe/responder"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/metrics"
"github.com/cilium/cilium/pkg/node"
"github.com/cilium/cilium/pkg/option"
)
var (
log = logging.DefaultLogger.WithField(logfields.LogSubsys, "health-server")
)
// Config stores the configuration data for a cilium-health server.
type Config struct {
Debug bool
CiliumURI string
ProbeInterval time.Duration
ProbeDeadline time.Duration
HTTPPathPort int
HealthAPISpec *healthApi.Spec
}
// ipString is an IP address used as a more descriptive type name in maps.
type ipString string
// nodeMap maps IP addresses to healthNode objects for convenient access to
// node information.
type nodeMap map[ipString]healthNode
// Server is the cilium-health daemon that is in charge of performing health
// and connectivity checks periodically, and serving the cilium-health API.
type Server struct {
healthApi.Server // Server to provide cilium-health API
*ciliumPkg.Client // Client to "GET /healthz" on cilium daemon
Config
// clientID is the client ID returned by the cilium-agent that should
// be used when making frequent requests. The server will return
// a diff of the nodes added and removed based on this clientID.
clientID int64
httpPathServer *responder.Server // HTTP server for external pings
startTime time.Time
// The lock protects against read and write access to the IP->Node map,
// the list of statuses as most recently seen, and the last time a
// probe was conducted.
lock.RWMutex
connectivity *healthReport
localStatus *healthModels.SelfStatus
}
// DumpUptime returns the time that this server has been running.
func (s *Server) DumpUptime() string {
return time.Since(s.startTime).String()
}
// getNodes fetches the nodes added and removed from the last time the server
// made a request to the daemon.
func (s *Server) getNodes() (nodeMap, nodeMap, error) {
scopedLog := log
if s.CiliumURI != "" {
scopedLog = log.WithField("URI", s.CiliumURI)
}
scopedLog.Debug("Sending request for /cluster/nodes ...")
clusterNodesParam := daemon.NewGetClusterNodesParams()
s.RWMutex.RLock()
cID := s.clientID
s.RWMutex.RUnlock()
clusterNodesParam.SetClientID(&cID)
resp, err := s.Daemon.GetClusterNodes(clusterNodesParam)
if err != nil {
return nil, nil, fmt.Errorf("unable to get nodes' cluster: %w", err)
}
log.Debug("Got cilium /cluster/nodes")
if resp == nil || resp.Payload == nil {
return nil, nil, fmt.Errorf("received nil health response")
}
s.RWMutex.Lock()
s.clientID = resp.Payload.ClientID
if resp.Payload.Self != "" {
s.localStatus = &healthModels.SelfStatus{
Name: resp.Payload.Self,
}
}
s.RWMutex.Unlock()
nodesAdded := nodeElementSliceToNodeMap(resp.Payload.NodesAdded)
nodesRemoved := nodeElementSliceToNodeMap(resp.Payload.NodesRemoved)
return nodesAdded, nodesRemoved, nil
}
// getAllNodes fetches all nodes the daemon is aware of.
func (s *Server) getAllNodes() (nodeMap, error) {
scopedLog := log
if s.CiliumURI != "" {
scopedLog = log.WithField("URI", s.CiliumURI)
}
scopedLog.Debug("Sending request for /cluster/nodes ...")
resp, err := s.Daemon.GetClusterNodes(nil)
if err != nil {
return nil, fmt.Errorf("unable to get nodes' cluster: %w", err)
}
log.Debug("Got cilium /cluster/nodes")
if resp == nil || resp.Payload == nil {
return nil, fmt.Errorf("received nil health response")
}
nodesAdded := nodeElementSliceToNodeMap(resp.Payload.NodesAdded)
return nodesAdded, nil
}
// nodeElementSliceToNodeMap returns a slice of models.NodeElement into a
// nodeMap.
func nodeElementSliceToNodeMap(nodeElements []*models.NodeElement) nodeMap {
nodes := make(nodeMap)
for _, n := range nodeElements {
if n.PrimaryAddress != nil {
if n.PrimaryAddress.IPV4 != nil {
nodes[ipString(n.PrimaryAddress.IPV4.IP)] = NewHealthNode(n)
}
if n.PrimaryAddress.IPV6 != nil {
nodes[ipString(n.PrimaryAddress.IPV6.IP)] = NewHealthNode(n)
}
}
for _, addr := range n.SecondaryAddresses {
nodes[ipString(addr.IP)] = NewHealthNode(n)
}
if n.HealthEndpointAddress != nil {
if n.HealthEndpointAddress.IPV4 != nil {
nodes[ipString(n.HealthEndpointAddress.IPV4.IP)] = NewHealthNode(n)
}
if n.HealthEndpointAddress.IPV6 != nil {
nodes[ipString(n.HealthEndpointAddress.IPV6.IP)] = NewHealthNode(n)
}
}
}
return nodes
}
// updateCluster makes the specified health report visible to the API.
//
// It only updates the server's API-visible health report if the provided
// report started after the current report.
func (s *Server) updateCluster(report *healthReport) {
s.Lock()
defer s.Unlock()
if s.connectivity.startTime.Before(report.startTime) {
s.connectivity = report
s.collectNodeConnectivityMetrics()
}
}
func (s *Server) collectNodeConnectivityMetrics() {
if s.localStatus == nil || s.connectivity == nil {
return
}
localClusterName, localNodeName := getClusterNodeName(s.localStatus.Name)
for _, n := range s.connectivity.nodes {
if n == nil || n.Host == nil || n.Host.PrimaryAddress == nil || n.HealthEndpoint == nil || n.HealthEndpoint.PrimaryAddress == nil {
continue
}
targetClusterName, targetNodeName := getClusterNodeName(n.Name)
nodePathPrimaryAddress := healthClientPkg.GetHostPrimaryAddress(n)
nodePathSecondaryAddress := healthClientPkg.GetHostSecondaryAddresses(n)
endpointPathStatus := n.HealthEndpoint
isEndpointReachable := healthClientPkg.SummarizePathConnectivityStatusType(healthClientPkg.GetAllEndpointAddresses(n)) == healthClientPkg.ConnStatusReachable
isNodeReachable := healthClientPkg.SummarizePathConnectivityStatusType(healthClientPkg.GetAllHostAddresses(n)) == healthClientPkg.ConnStatusReachable
location := metrics.LabelLocationLocalNode
if targetClusterName != localClusterName {
location = metrics.LabelLocationRemoteInterCluster
} else if targetNodeName != localNodeName {
location = metrics.LabelLocationRemoteIntraCluster
}
// Aggregated status for endpoint connectivity
metrics.NodeConnectivityStatus.WithLabelValues(
localClusterName, localNodeName, targetClusterName, targetNodeName, location, metrics.LabelPeerEndpoint).
Set(metrics.BoolToFloat64(isEndpointReachable))
// Aggregated status for node connectivity
metrics.NodeConnectivityStatus.WithLabelValues(
localClusterName, localNodeName, targetClusterName, targetNodeName, location, metrics.LabelPeerNode).
Set(metrics.BoolToFloat64(isNodeReachable))
// HTTP endpoint primary
collectConnectivityMetric(endpointPathStatus.PrimaryAddress.HTTP, localClusterName, localNodeName,
targetClusterName, targetNodeName, endpointPathStatus.PrimaryAddress.IP,
location, metrics.LabelPeerEndpoint, metrics.LabelTrafficHTTP, metrics.LabelAddressTypePrimary)
// HTTP endpoint secondary
for _, secondary := range endpointPathStatus.SecondaryAddresses {
collectConnectivityMetric(secondary.HTTP, localClusterName, localNodeName,
targetClusterName, targetNodeName, secondary.IP,
location, metrics.LabelPeerEndpoint, metrics.LabelTrafficHTTP, metrics.LabelAddressTypeSecondary)
}
// HTTP node primary
collectConnectivityMetric(nodePathPrimaryAddress.HTTP, localClusterName, localNodeName,
targetClusterName, targetNodeName, nodePathPrimaryAddress.IP,
location, metrics.LabelPeerNode, metrics.LabelTrafficHTTP, metrics.LabelAddressTypePrimary)
// HTTP node secondary
for _, secondary := range nodePathSecondaryAddress {
collectConnectivityMetric(secondary.HTTP, localClusterName, localNodeName,
targetClusterName, targetNodeName, secondary.IP,
location, metrics.LabelPeerNode, metrics.LabelTrafficHTTP, metrics.LabelAddressTypeSecondary)
}
// ICMP endpoint primary
collectConnectivityMetric(endpointPathStatus.PrimaryAddress.Icmp, localClusterName, localNodeName,
targetClusterName, targetNodeName, endpointPathStatus.PrimaryAddress.IP,
location, metrics.LabelPeerEndpoint, metrics.LabelTrafficICMP, metrics.LabelAddressTypePrimary)
// ICMP endpoint secondary
for _, secondary := range endpointPathStatus.SecondaryAddresses {
collectConnectivityMetric(secondary.Icmp, localClusterName, localNodeName,
targetClusterName, targetNodeName, secondary.IP,
location, metrics.LabelPeerEndpoint, metrics.LabelTrafficICMP, metrics.LabelAddressTypeSecondary)
}
// ICMP node primary
collectConnectivityMetric(nodePathPrimaryAddress.Icmp, localClusterName, localNodeName,
targetClusterName, targetNodeName, nodePathPrimaryAddress.IP,
location, metrics.LabelPeerNode, metrics.LabelTrafficICMP, metrics.LabelAddressTypePrimary)
// ICMP node secondary
for _, secondary := range nodePathSecondaryAddress {
collectConnectivityMetric(secondary.Icmp, localClusterName, localNodeName,
targetClusterName, targetNodeName, secondary.IP,
location, metrics.LabelPeerNode, metrics.LabelTrafficICMP, metrics.LabelAddressTypeSecondary)
}
}
}
func collectConnectivityMetric(status *healthModels.ConnectivityStatus, labels ...string) {
var metricValue float64 = -1
if status != nil {
metricValue = float64(status.Latency) / float64(time.Second)
}
metrics.NodeConnectivityLatency.WithLabelValues(labels...).Set(metricValue)
}
// getClusterNodeName returns the cluster name and node name if possible.
func getClusterNodeName(str string) (string, string) {
clusterName, nodeName := path.Split(str)
if len(clusterName) == 0 {
return ciliumDefaults.ClusterName, nodeName
}
// remove forward slash at the end if any for cluster name
return path.Dir(clusterName), nodeName
}
// GetStatusResponse returns the most recent cluster connectivity status.
func (s *Server) GetStatusResponse() *healthModels.HealthStatusResponse {
s.RLock()
defer s.RUnlock()
var name string
// Check if localStatus is populated already. If not, the name is empty
if s.localStatus != nil {
name = s.localStatus.Name
}
return &healthModels.HealthStatusResponse{
Local: &healthModels.SelfStatus{
Name: name,
},
Nodes: s.connectivity.nodes,
Timestamp: s.connectivity.startTime.Format(time.RFC3339),
}
}
// FetchStatusResponse updates the cluster with the latest set of nodes,
// runs a synchronous probe across the cluster, updates the connectivity cache
// and returns the results.
func (s *Server) FetchStatusResponse() (*healthModels.HealthStatusResponse, error) {
nodes, err := s.getAllNodes()
if err != nil {
return nil, err
}
prober := newProber(s, nodes)
if err := prober.Run(); err != nil {
log.WithError(err).Info("Failed to run ping")
return nil, err
}
log.Debug("Run complete")
s.updateCluster(prober.getResults())
return s.GetStatusResponse(), nil
}
// Run services that are actively probing other hosts and endpoints over
// ICMP and HTTP, and hosting the health admin API on a local Unix socket.
// Blocks indefinitely, or returns any errors that occur hosting the Unix
// socket API server.
func (s *Server) runActiveServices() error {
// Run it once at the start so we get some initial status
s.FetchStatusResponse()
// We can safely ignore nodesRemoved since it's the first time we are
// fetching the nodes from the server.
nodesAdded, _, _ := s.getNodes()
prober := newProber(s, nodesAdded)
prober.MaxRTT = s.ProbeInterval
prober.OnIdle = func() {
// OnIdle is called every ProbeInterval after sending out all icmp pings.
// There are a few important consideration here:
// (1) ICMP prober doesn't report failed probes
// (2) We can receive the same nodes multiple times in nodesAdded in case of updates
// (3) We need to clean icmp status to not retain stale probe results
// (4) We don't want to report stale nodes in metrics
if nodesAdded, nodesRemoved, err := s.getNodes(); err != nil {
// reset the cache by setting clientID to 0 and removing all current nodes
s.clientID = 0
prober.setNodes(nil, prober.nodes)
log.WithError(err).Error("unable to get cluster nodes")
return
} else {
// (1) Mark ips that did not receive ICMP as unreachable.
prober.updateIcmpStatus()
// (2) setNodes implementation doesn't override results for existing nodes.
// (4) Remove stale nodes so we don't report them in metrics before updating results
prober.setNodes(nodesAdded, nodesRemoved)
// (4) Update results without stale nodes
s.updateCluster(prober.getResults())
// (3) Cleanup icmp results for next iteration of probing
prober.clearIcmpStatus()
}
}
prober.RunLoop()
defer prober.Stop()
return s.Server.Serve()
}
// Serve spins up the following goroutines:
// - HTTP API Server: Responder to the health API "/hello" message
// - Prober: Periodically run pings across the cluster at a configured interval
// and update the server's connectivity status cache.
// - Unix API Server: Handle all health API requests over a unix socket.
//
// Callers should first defer the Server.Shutdown(), then call Serve().
func (s *Server) Serve() (err error) {
errors := make(chan error)
go func() {
errors <- s.httpPathServer.Serve()
}()
go func() {
errors <- s.runActiveServices()
}()
// Block for the first error, then return.
err = <-errors
return err
}
// Shutdown server and clean up resources
func (s *Server) Shutdown() {
s.httpPathServer.Shutdown()
s.Server.Shutdown()
}
// newServer instantiates a new instance of the health API server on the
// defaults unix socket.
func (s *Server) newServer(spec *healthApi.Spec) *healthApi.Server {
restAPI := restapi.NewCiliumHealthAPIAPI(spec.Document)
restAPI.Logger = log.Printf
// Admin API
restAPI.GetHealthzHandler = NewGetHealthzHandler(s)
restAPI.ConnectivityGetStatusHandler = NewGetStatusHandler(s)
restAPI.ConnectivityPutStatusProbeHandler = NewPutStatusProbeHandler(s)
api.DisableAPIs(spec.DeniedAPIs, restAPI.AddMiddlewareFor)
srv := healthApi.NewServer(restAPI)
srv.EnabledListeners = []string{"unix"}
srv.SocketPath = defaults.SockPath
srv.ConfigureAPI()
return srv
}
// NewServer creates a server to handle health requests.
func NewServer(config Config) (*Server, error) {
server := &Server{
startTime: time.Now(),
Config: config,
connectivity: &healthReport{},
}
cl, err := ciliumPkg.NewClient(config.CiliumURI)
if err != nil {
return nil, err
}
server.Client = cl
server.Server = *server.newServer(config.HealthAPISpec)
server.httpPathServer = responder.NewServers(getAddresses(), config.HTTPPathPort)
return server, nil
}
// Get internal node ipv4/ipv6 addresses based on config enabled.
// If it fails to get either of internal node address, it returns "0.0.0.0" if ipv4 or "::" if ipv6.
func getAddresses() []string {
addresses := make([]string, 0, 2)
// listen on all interfaces and all families in case of external-workloads
if option.Config.JoinCluster {
return []string{""}
}
if option.Config.EnableIPv4 {
if ipv4 := node.GetInternalIPv4(); ipv4 != nil {
addresses = append(addresses, ipv4.String())
} else {
// if Get ipv4 fails, then listen on all ipv4 addr.
addresses = append(addresses, "0.0.0.0")
}
}
if option.Config.EnableIPv6 {
if ipv6 := node.GetInternalIPv6(); ipv6 != nil {
addresses = append(addresses, ipv6.String())
} else {
// if Get ipv6 fails, then listen on all ipv6 addr.
addresses = append(addresses, "::")
}
}
return addresses
}