forked from istio/istio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ads.go
437 lines (388 loc) · 13 KB
/
ads.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
// Copyright 2017 Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v2
import (
"io"
"os"
"sync"
"time"
xdsapi "github.com/envoyproxy/go-control-plane/envoy/api/v2"
ads "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/gogo/protobuf/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pkg/log"
)
var (
adsDebug = os.Getenv("PILOT_DEBUG_ADS") != "0"
// adsClients reflect active gRPC channels, for both ADS and EDS.
adsClients = map[string]*XdsConnection{}
adsClientsMutex sync.RWMutex
// Map of sidecar IDs to XdsConnections, first key is sidecarID, second key is connID
// This is a map due to an edge case during envoy restart whereby the 'old' envoy
// reconnects after the 'new/restarted' envoy
adsSidecarIDConnectionsMap = map[string]map[string]*XdsConnection{}
)
// XdsConnection is a listener connection type.
type XdsConnection struct {
// PeerAddr is the address of the client envoy, from network layer
PeerAddr string
// Time of connection, for debugging
Connect time.Time
// ConID is the connection identifier, used as a key in the connection table.
// Currently based on the node name and a counter.
ConID string
modelNode *model.Proxy
// Sending on this channel results in push. We may also make it a channel of objects so
// same info can be sent to all clients, without recomputing.
pushChannel chan *XdsEvent
// TODO: migrate other fields as needed from model.Proxy and replace it
//HttpConnectionManagers map[string]*http_conn.HttpConnectionManager
HTTPListeners []*xdsapi.Listener `json:"-"`
RouteConfigs map[string]*xdsapi.RouteConfiguration
HTTPClusters []*xdsapi.Cluster
// current list of clusters monitored by the client
Clusters []string
// TODO: TcpListeners (may combine mongo/etc)
stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer
// Routes is the list of watched Routes.
Routes []string
// LDSWatch is set if the remote server is watching Listeners
LDSWatch bool
// CDSWatch is set if the remote server is watching Clusters
CDSWatch bool
// added will be true if at least one discovery request was received, and the connection
// is added to the map of active.
added bool
}
// XdsEvent represents a config or registry event that results in a push.
type XdsEvent struct {
// If not empty, it is used to indicate the event is caused by a change in the clusters.
// Only EDS for the listed clusters will be sent.
clusters []string
}
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
peerInfo, ok := peer.FromContext(stream.Context())
peerAddr := "0.0.0.0"
if ok {
peerAddr = peerInfo.Addr.String()
}
var discReq *xdsapi.DiscoveryRequest
var receiveError error
reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
if s.services == nil {
// first call - lazy loading.
s.updateModel()
}
con := &XdsConnection{
pushChannel: make(chan *XdsEvent, 1),
PeerAddr: peerAddr,
Connect: time.Now(),
HTTPListeners: []*xdsapi.Listener{},
RouteConfigs: map[string]*xdsapi.RouteConfiguration{},
Clusters: []string{},
stream: stream,
}
// Do not call: defer close(con.pushChannel) !
// the push channel will be garbage collected when the connection is no longer used.
// Closing the channel can cause subtle race conditions with push. According to the spec:
// "It's only necessary to close a channel when it is important to tell the receiving goroutines that all data
// have been sent."
// Reading from a stream is a blocking operation. Each connection needs to read
// discovery requests and wait for push commands on config change, so we add a
// go routine. If go grpc adds gochannel support for streams this will not be needed.
// This also detects close.
go func() {
defer close(reqChannel) // indicates close of the remote side.
for {
req, err := stream.Recv()
if err != nil {
if status.Code(err) == codes.Canceled || err == io.EOF {
log.Infof("ADS: %q %s terminated %v", peerAddr, con.ConID, err)
return
}
receiveError = err
log.Errorf("ADS: %q %s terminated with errors %v", peerAddr, con.ConID, err)
return
}
reqChannel <- req
}
}()
for {
// Block until either a request is received or the ticker ticks
select {
case discReq, ok = <-reqChannel:
if !ok {
// Remote side closed connection.
return receiveError
}
if discReq.Node.Id == "" {
log.Infof("Missing node id %s", discReq.String())
continue
}
nt, err := model.ParseServiceNode(discReq.Node.Id)
if err != nil {
return err
}
nt.Metadata = model.ParseMetadata(discReq.Node.Metadata)
con.modelNode = &nt
if con.ConID == "" {
// first request
con.ConID = connectionID(discReq.Node.Id)
}
switch discReq.TypeUrl {
case ClusterType:
if con.CDSWatch {
// Already received a cluster watch request, this is an ACK
if discReq.ErrorDetail != nil {
log.Warnf("ADS:CDS: ACK ERROR %v %s %v", peerAddr, con.ConID, discReq.String())
}
if adsDebug {
log.Infof("ADS:CDS: ACK %v %v", peerAddr, discReq.String())
}
continue
}
if adsDebug {
log.Infof("ADS:CDS: REQ %s %v raw: %s ", con.ConID, peerAddr, discReq.String())
}
con.CDSWatch = true
err := s.pushCds(*con.modelNode, con)
if err != nil {
return err
}
case ListenerType:
if con.LDSWatch {
// Already received a cluster watch request, this is an ACK
if discReq.ErrorDetail != nil {
log.Warnf("ADS:LDS: ACK ERROR %v %s %v", peerAddr, con.modelNode.ID, discReq.String())
}
if adsDebug {
log.Infof("ADS:LDS: ACK %v", discReq.String())
}
continue
}
if adsDebug {
log.Infof("ADS:LDS: REQ %s %v", con.ConID, peerAddr)
}
con.LDSWatch = true
err := s.pushLds(*con.modelNode, con)
if err != nil {
return err
}
case RouteType:
routes := discReq.GetResourceNames()
if len(routes) == len(con.Routes) || len(routes) == 0 {
if discReq.ErrorDetail != nil {
log.Warnf("ADS:RDS: ACK ERROR %v %s %v", peerAddr, con.ConID, discReq.String())
}
if adsDebug {
// Not logging full request, can be very long.
log.Infof("ADS:RDS: ACK %s %s %s %s", peerAddr, con.ConID, discReq.VersionInfo, discReq.ResponseNonce)
}
if len(con.Routes) > 0 {
// Already got a list of routes to watch and has same length as the request, this is an ack
continue
}
}
con.Routes = routes
if adsDebug {
log.Infof("ADS:RDS: REQ %s %s routes: %d", peerAddr, con.ConID, len(con.Routes))
}
err := s.pushRoute(con)
if err != nil {
return err
}
case EndpointType:
clusters := discReq.GetResourceNames()
if len(clusters) == len(con.Clusters) || len(clusters) == 0 {
if discReq.ErrorDetail != nil {
log.Warnf("ADS:EDS: ACK ERROR %v %s %v", peerAddr, con.ConID, discReq.String())
}
if edsDebug {
// Not logging full request, can be very long.
log.Infof("ADS:EDS: ACK %s %s %s %s", peerAddr, con.ConID, discReq.VersionInfo, discReq.ResponseNonce)
}
if len(con.Clusters) > 0 {
// Already got a list of clusters to watch and has same length as the request, this is an ack
continue
}
}
con.Clusters = clusters
for _, c := range con.Clusters {
s.addEdsCon(c, con.ConID, con)
}
if adsDebug {
log.Infof("ADS:EDS: REQ %s %s clusters: %d", peerAddr, con.ConID, len(con.Clusters))
}
err := s.pushEds(con)
if err != nil {
return err
}
default:
log.Warnf("ADS: Unknown watched resources %s", discReq.String())
}
if !con.added {
con.added = true
s.addCon(con.ConID, con)
defer s.removeCon(con.ConID, con)
}
case <-con.pushChannel:
// It is called when config changes.
// This is not optimized yet - we should detect what changed based on event and only
// push resources that need to be pushed.
if con.CDSWatch {
err := s.pushCds(*con.modelNode, con)
if err != nil {
return err
}
}
if len(con.Routes) > 0 {
err := s.pushRoute(con)
if err != nil {
return err
}
}
if len(con.Clusters) > 0 {
err := s.pushEds(con)
if err != nil {
return err
}
}
if con.LDSWatch {
err := s.pushLds(*con.modelNode, con)
if err != nil {
return err
}
}
}
}
}
func edsClientCount() int {
var n int
edsClusterMutex.Lock()
n = len(adsClients)
edsClusterMutex.Unlock()
return n
}
// adsPushAll implements old style invalidation, generated when any rule or endpoint changes.
// Primary code path is from v1 discoveryService.clearCache(), which is added as a handler
// to the model ConfigStorageCache and Controller.
func adsPushAll() {
// First update all cluster load assignments. This is computed for each cluster once per config change
// instead of once per endpoint.
edsClusterMutex.Lock()
// Create a temp map to avoid locking the add/remove
cMap := make(map[string]*EdsCluster, len(edsClusters))
for k, v := range edsClusters {
cMap[k] = v
}
edsClusterMutex.Unlock()
// UpdateCluster udates the cluster with a mutex, this code is safe ( but computing
// the update may be duplicated if multiple goroutines compute at the same time).
// In general this code is called from the 'event' callback that is throttled.
for clusterName, edsCluster := range cMap {
updateCluster(clusterName, edsCluster)
}
// Push config changes, iterating over connected envoys. This cover ADS and EDS(0.7), both share
// the same connection table
adsClientsMutex.RLock()
// Create a temp map to avoid locking the add/remove
tmpMap := make(map[string]*XdsConnection, len(adsClients))
for k, v := range adsClients {
tmpMap[k] = v
}
adsClientsMutex.RUnlock()
// This will trigger recomputing the config for each connected Envoy.
// It will include sending all configs that envoy is listening for, including EDS.
// TODO: get service, serviceinstances, configs once, to avoid repeated redundant calls.
// TODO: indicate the specific events, to only push what changed.
for _, client := range tmpMap {
client.pushChannel <- &XdsEvent{}
}
}
func (s *DiscoveryServer) addCon(conID string, con *XdsConnection) {
adsClientsMutex.Lock()
defer adsClientsMutex.Unlock()
adsClients[conID] = con
if con.modelNode != nil {
if _, ok := adsSidecarIDConnectionsMap[con.modelNode.ID]; !ok {
adsSidecarIDConnectionsMap[con.modelNode.ID] = map[string]*XdsConnection{conID: con}
} else {
adsSidecarIDConnectionsMap[con.modelNode.ID][conID] = con
}
}
}
func (s *DiscoveryServer) removeCon(conID string, con *XdsConnection) {
adsClientsMutex.Lock()
defer adsClientsMutex.Unlock()
for _, c := range con.Clusters {
s.removeEdsCon(c, conID, con)
}
if adsClients[conID] == nil {
log.Errorf("ADS: Removing connection for non-existing node %v.", s)
}
delete(adsClients, conID)
if con.modelNode != nil {
delete(adsSidecarIDConnectionsMap[con.modelNode.ID], conID)
}
}
func (s *DiscoveryServer) pushRoute(con *XdsConnection) error {
rc := []*xdsapi.RouteConfiguration{}
var services []*model.Service
s.modelMutex.RLock()
services = s.services
s.modelMutex.RUnlock()
proxyInstances, err := s.env.GetProxyServiceInstances(con.modelNode)
if err != nil {
log.Warnf("ADS: RDS: Failed to retrieve proxy service instances %v", err)
return err
}
// TODO: once per config update
for _, routeName := range con.Routes {
// TODO: for ingress/gateway use the other method
r := s.ConfigGenerator.BuildSidecarOutboundHTTPRouteConfig(s.env, *con.modelNode, proxyInstances,
services, routeName)
rc = append(rc, r)
con.RouteConfigs[routeName] = r
}
response, err := routeDiscoveryResponse(rc, *con.modelNode)
if err != nil {
log.Warnf("ADS: RDS: config failure, closing grpc %v", err)
return err
}
err = con.stream.Send(response)
if err != nil {
log.Warnf("ADS: RDS: Send failure, closing grpc %v", err)
return err
}
if adsDebug {
log.Infof("ADS: RDS: PUSH for addr:%s routes:%d", con.PeerAddr, len(rc))
}
return nil
}
func routeDiscoveryResponse(ls []*xdsapi.RouteConfiguration, node model.Proxy) (*xdsapi.DiscoveryResponse, error) {
resp := &xdsapi.DiscoveryResponse{
TypeUrl: RouteType,
VersionInfo: versionInfo(),
Nonce: nonce(),
}
for _, ll := range ls {
lr, _ := types.MarshalAny(ll)
resp.Resources = append(resp.Resources, *lr)
}
return resp, nil
}