forked from cisco-app-networking/nsm-nse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
vl3_connect.go
556 lines (511 loc) · 19.2 KB
/
vl3_connect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
package main
import (
"context"
"os"
"sync"
"github.com/golang/protobuf/ptypes/empty"
"cisco-app-networking.github.io/networkservicemesh/controlplane/api/connection"
"cisco-app-networking.github.io/networkservicemesh/controlplane/api/connection/mechanisms/memif"
"cisco-app-networking.github.io/networkservicemesh/controlplane/api/connectioncontext"
"cisco-app-networking.github.io/networkservicemesh/controlplane/api/networkservice"
"cisco-app-networking.github.io/networkservicemesh/controlplane/api/registry"
"cisco-app-networking.github.io/networkservicemesh/pkg/tools"
"cisco-app-networking.github.io/networkservicemesh/sdk/client"
"cisco-app-networking.github.io/networkservicemesh/sdk/common"
"cisco-app-networking.github.io/networkservicemesh/sdk/endpoint"
"github.com/sirupsen/logrus"
"go.ligato.io/vpp-agent/v3/proto/ligato/vpp"
"google.golang.org/grpc"
"github.com/cisco-app-networking/nsm-nse/pkg/metrics"
"github.com/cisco-app-networking/nsm-nse/pkg/universal-cnf/config"
)
const (
NSREGISTRY_ADDR = "nsmgr.nsm-system"
NSREGISTRY_PORT = "5000"
NSCLIENT_PORT = "5001"
LABEL_NSESOURCE = "vl3Nse/nseSource/endpointName"
)
type vL3PeerState int
const (
PEER_STATE_NOTCONN vL3PeerState = iota
PEER_STATE_CONN
PEER_STATE_CONNERR
PEER_STATE_CONN_INPROG
PEER_STATE_CONN_RX
)
type vL3NsePeer struct {
sync.RWMutex
endpointName string
networkServiceManagerName string
state vL3PeerState
connHdl *connection.Connection
connErr error
excludedPrefixes []string
remoteIp string
}
type vL3ConnectComposite struct {
sync.RWMutex
//endpoint.BaseCompositeEndpoint
myEndpointName string
nsConfig *common.NSConfiguration
defaultRouteIpCidr string
remoteNsIpList []string
vL3NetCidr string
vl3NsePeers map[string]*vL3NsePeer
nsRegGrpcClient *grpc.ClientConn
nsDiscoveryClient registry.NetworkServiceDiscoveryClient
//nsClient networkservice.NetworkServiceClient
nsmClient *client.NsmClient
backend config.UniversalCNFBackend
myNseNameFunc fnGetNseName
connDomain string
nseControlAddr string
}
func (peer *vL3NsePeer) setPeerState(state vL3PeerState) {
peer.Lock()
defer peer.Unlock()
peer.state = state
}
func (peer *vL3NsePeer) getPeerState() vL3PeerState {
peer.Lock()
defer peer.Unlock()
return peer.state
}
func (peer *vL3NsePeer) setPeerConnHdl(connHdl *connection.Connection, connErr error) {
peer.Lock()
defer peer.Unlock()
peer.connHdl = connHdl
peer.connErr = connErr
}
func (vxc *vL3ConnectComposite) getPeer(endpointName string) *vL3NsePeer {
vxc.Lock()
defer vxc.Unlock()
peer, ok := vxc.vl3NsePeers[endpointName]
if !ok {
return nil
}
return peer
}
func (vxc *vL3ConnectComposite) addPeer(endpointName, networkServiceManagerName, remoteIp string) *vL3NsePeer {
vxc.Lock()
defer vxc.Unlock()
_, ok := vxc.vl3NsePeers[endpointName]
if !ok {
vxc.vl3NsePeers[endpointName] = &vL3NsePeer{
endpointName: endpointName,
networkServiceManagerName: networkServiceManagerName,
state: PEER_STATE_NOTCONN,
remoteIp: remoteIp,
}
}
return vxc.vl3NsePeers[endpointName]
}
func (vxc *vL3ConnectComposite) SetMyNseName(request *networkservice.NetworkServiceRequest) {
vxc.Lock()
defer vxc.Unlock()
if vxc.myEndpointName == "" {
nseName := vxc.myNseNameFunc()
logrus.Infof("Setting vL3connect composite endpoint name to \"%s\"--req contains \"%s\"", nseName, request.GetConnection().GetNetworkServiceEndpointName())
if request.GetConnection().GetNetworkServiceEndpointName() != "" {
vxc.myEndpointName = request.GetConnection().GetNetworkServiceEndpointName()
} else {
vxc.myEndpointName = nseName
}
}
}
func (vxc *vL3ConnectComposite) GetMyNseName() string {
vxc.Lock()
defer vxc.Unlock()
return vxc.myEndpointName
}
func (vxc *vL3ConnectComposite) processPeerRequest(vl3SrcEndpointName string, request *networkservice.NetworkServiceRequest, incoming *connection.Connection) error {
logrus.Infof("vL3ConnectComposite received connection request from vL3 NSE %s", vl3SrcEndpointName)
go func() {
metrics.ReceivedConnRequests.Inc()
}()
peer := vxc.addPeer(vl3SrcEndpointName, request.GetConnection().GetSourceNetworkServiceManagerName(), "")
peer.Lock()
defer peer.Unlock()
logrus.WithFields(logrus.Fields{
"endpointName": peer.endpointName,
"networkServiceManagerName": peer.networkServiceManagerName,
"prior_state": peer.state,
"new_state": PEER_STATE_CONN_RX,
}).Infof("vL3ConnectComposite vl3 NSE peer %s added", vl3SrcEndpointName)
peer.excludedPrefixes = removeDuplicates(append(peer.excludedPrefixes, incoming.Context.IpContext.ExcludedPrefixes...))
incoming.Context.IpContext.ExcludedPrefixes = peer.excludedPrefixes
peer.connHdl = request.GetConnection()
/* tell my peer to route to me for my vL3NetCidr */
mySubnetRoute := connectioncontext.Route{
Prefix: vxc.vL3NetCidr,
}
incoming.Context.IpContext.DstRoutes = append(incoming.Context.IpContext.DstRoutes, &mySubnetRoute)
peer.state = PEER_STATE_CONN_RX
return nil
}
func (vxc *vL3ConnectComposite) Request(ctx context.Context,
request *networkservice.NetworkServiceRequest) (*connection.Connection, error) {
logger := logrus.New() // endpoint.Log(ctx)
conn := request.GetConnection()
logger.WithFields(logrus.Fields{
"endpointName": conn.GetNetworkServiceEndpointName(),
"networkServiceManagerName": conn.GetSourceNetworkServiceManagerName(),
}).Infof("vL3ConnectComposite Request handler")
//var err error
/* NOTE: for IPAM we assume there's no IPAM endpoint in the composite endpoint list */
/* -we are taking care of that here in this handler */
/*incoming, err := vxc.GetNext().Request(ctx, request)
if err != nil {
logrus.Error(err)
return nil, err
}*/
if vl3SrcEndpointName, ok := conn.GetLabels()[LABEL_NSESOURCE]; ok {
// request is from another vl3 NSE
conn.Labels[config.PEER_NAME] = vl3SrcEndpointName
_ = vxc.processPeerRequest(vl3SrcEndpointName, request, request.Connection)
} else {
/* set NSC route to this NSE for full vL3 CIDR */
nscVL3Route := connectioncontext.Route{
Prefix: vxc.defaultRouteIpCidr,
}
request.Connection.Context.IpContext.DstRoutes = append(request.Connection.Context.IpContext.DstRoutes, &nscVL3Route)
vxc.SetMyNseName(request)
logger.Infof("vL3ConnectComposite serviceRegistry.DiscoveryClient")
if vxc.nsDiscoveryClient == nil {
logger.Error("nsDiscoveryClient is nil")
} else {
/* Find all NSEs registered as the same type as this one */
req := ®istry.FindNetworkServiceRequest{
NetworkServiceName: conn.GetNetworkService(),
}
logger.Infof("vL3ConnectComposite FindNetworkService for NS=%s", conn.GetNetworkService())
response, err := vxc.nsDiscoveryClient.FindNetworkService(context.Background(), req)
if err != nil {
logger.Error(err)
go func() {
metrics.FailedFindNetworkService.Inc()
}()
} else {
logger.Infof("vL3ConnectComposite found network service; processing endpoints")
go vxc.processNsEndpoints(context.TODO(), response, "")
}
vxc.nsmClient.Configuration.ClientNetworkService = req.NetworkServiceName
logger.Infof("vL3ConnectComposite check remotes for endpoints")
for _, remoteIp := range vxc.remoteNsIpList {
req.NetworkServiceName = req.NetworkServiceName + "@" + remoteIp
logger.Infof("vL3ConnectComposite querying remote NS %s", req.NetworkServiceName)
response, err := vxc.nsDiscoveryClient.FindNetworkService(context.Background(), req)
if err != nil {
logger.Error(err)
go func() {
metrics.FailedFindNetworkService.Inc()
}()
} else {
logger.Infof("vL3ConnectComposite found network service; processing endpoints from remote %s", remoteIp)
go vxc.processNsEndpoints(context.TODO(), response, remoteIp)
}
}
}
}
err := ValidateInLabels(conn.Labels)
if err != nil {
logger.Errorf("vL3 workload params not in labels: %v", err)
} else {
serviceRegistry, registryClient, err := NewServiceRegistry(vxc.nseControlAddr, ctx)
if err != nil {
logger.Error(err)
} else {
err = serviceRegistry.RegisterWorkload(ctx, conn.Labels, vxc.connDomain,
processWorkloadIps(conn.Context.IpContext.SrcIpAddr, ";"), config.GetEndpointName())
if err != nil {
logger.Error(err)
}
registryClient.Stop()
}
}
logger.Infof("vL3ConnectComposite request done")
//return incoming, nil
if endpoint.Next(ctx) != nil {
return endpoint.Next(ctx).Request(ctx, request)
}
return conn, nil
}
func (vxc *vL3ConnectComposite) Close(ctx context.Context, conn *connection.Connection) (*empty.Empty, error) {
// remove from connections
logrus.Infof("vL3 DeleteConnection: %v", conn)
err := ValidateInLabels(conn.Labels)
if err != nil {
logrus.Errorf("vL3 workload params not in labels: %v", err)
} else {
logrus.WithFields(logrus.Fields{
"SrcIP": processWorkloadIps(conn.Context.IpContext.SrcIpAddr, ";"),
}).Infof("vL3 Removing workload instance")
serviceRegistry, registryClient, err := NewServiceRegistry(vxc.nseControlAddr, ctx)
if err != nil {
logrus.Error(err)
} else {
err = serviceRegistry.RemoveWorkload(ctx, conn.Labels, vxc.connDomain,
processWorkloadIps(conn.Context.IpContext.SrcIpAddr, ";"), config.GetEndpointName())
if err != nil {
logrus.Error(err)
}
registryClient.Stop()
}
}
if endpoint.Next(ctx) != nil {
return endpoint.Next(ctx).Close(ctx, conn)
}
return &empty.Empty{}, nil
}
// Name returns the composite name
func (vxc *vL3ConnectComposite) Name() string {
return "vL3 NSE"
}
func (vxc *vL3ConnectComposite) processNsEndpoints(ctx context.Context, response *registry.FindNetworkServiceResponse, remoteIp string) error {
/* TODO: For NSs with multiple endpoint types how do we know their type?
- do we need to match the name portion? labels?
*/
// just create a new logger for this go thread
logger := logrus.New()
for _, vl3endpoint := range response.GetNetworkServiceEndpoints() {
if vl3endpoint.GetName() != vxc.GetMyNseName() {
logger.Infof("Found vL3 service %s peer %s", vl3endpoint.NetworkServiceName,
vl3endpoint.GetName())
peer := vxc.addPeer(vl3endpoint.GetName(), vl3endpoint.NetworkServiceManagerName, remoteIp)
peer.Lock()
//peer.excludedPrefixes = removeDuplicates(append(peer.excludedPrefixes, incoming.Context.IpContext.ExcludedPrefixes...))
err := vxc.ConnectPeerEndpoint(ctx, peer, logger)
if err != nil {
logger.WithFields(logrus.Fields{
"peerEndpoint": vl3endpoint.GetName(),
}).Errorf("Failed to connect to vL3 Peer")
} else {
if peer.connHdl != nil {
logger.WithFields(logrus.Fields{
"peerEndpoint": vl3endpoint.GetName(),
"srcIP": peer.connHdl.Context.IpContext.SrcIpAddr,
"ConnExcludedPrefixes": peer.connHdl.Context.IpContext.ExcludedPrefixes,
"peerExcludedPrefixes": peer.excludedPrefixes,
"peer.DstRoutes": peer.connHdl.Context.IpContext.DstRoutes,
}).Infof("Connected to vL3 Peer")
} else {
logger.WithFields(logrus.Fields{
"peerEndpoint": vl3endpoint.GetName(),
"peerExcludedPrefixes": peer.excludedPrefixes,
}).Infof("Connected to vL3 Peer but connhdl == nil")
}
}
peer.Unlock()
} else {
logger.Infof("Found my vL3 service %s instance endpoint name: %s", vl3endpoint.NetworkServiceName,
vl3endpoint.GetName())
}
}
return nil
}
func (vxc *vL3ConnectComposite) createPeerConnectionRequest(ctx context.Context, peer *vL3NsePeer, routes []string, logger logrus.FieldLogger) error {
/* expected to be called with peer.Lock() */
if peer.state == PEER_STATE_CONN || peer.state == PEER_STATE_CONN_INPROG {
logger.WithFields(logrus.Fields{
"peer.Endpoint": peer.endpointName,
}).Infof("Already connected to peer")
return peer.connErr
}
peer.state = PEER_STATE_CONN_INPROG
logger.WithFields(logrus.Fields{
"peer.Endpoint": peer.endpointName,
}).Infof("Performing connect to peer")
dpconfig := &vpp.ConfigData{}
peer.connHdl, peer.connErr = vxc.performPeerConnectRequest(ctx, peer, routes, dpconfig, logger)
if peer.connErr != nil {
logger.WithFields(logrus.Fields{
"peer.Endpoint": peer.endpointName,
}).Errorf("NSE peer connection failed - %v", peer.connErr)
peer.state = PEER_STATE_CONNERR
return peer.connErr
}
if peer.connErr = vxc.backend.ProcessDPConfig(dpconfig, true); peer.connErr != nil {
logger.Errorf("endpoint %s Error processing dpconfig: %+v -- %v", peer.endpointName, dpconfig, peer.connErr)
peer.state = PEER_STATE_CONNERR
return peer.connErr
}
peer.state = PEER_STATE_CONN
logger.WithFields(logrus.Fields{
"peer.Endpoint": peer.endpointName,
}).Infof("Done with connect to peer")
return nil
}
func (vxc *vL3ConnectComposite) performPeerConnectRequest(ctx context.Context, peer *vL3NsePeer, routes []string, dpconfig interface{}, logger logrus.FieldLogger) (*connection.Connection, error) {
/* expected to be called with peer.Lock() */
go func() {
metrics.PerormedConnRequests.Inc()
}()
ifName := peer.endpointName
vxc.nsmClient.ClientLabels[LABEL_NSESOURCE] = vxc.GetMyNseName()
conn, err := vxc.nsmClient.ConnectToEndpoint(ctx, peer.remoteIp, peer.endpointName, peer.networkServiceManagerName, ifName, memif.MECHANISM, "VPP interface "+ifName, routes)
if err != nil {
logger.Errorf("Error creating %s: %v", ifName, err)
return nil, err
}
err = vxc.backend.ProcessClient(dpconfig, ifName, conn)
return conn, nil
}
func (vxc *vL3ConnectComposite) ConnectPeerEndpoint(ctx context.Context, peer *vL3NsePeer, logger logrus.FieldLogger) error {
/* expected to be called with peer.Lock() */
// build connection object
// perform remote networkservice request
state := peer.state
logger.WithFields(logrus.Fields{
"endpointName": peer.endpointName,
"networkServiceManagerName": peer.networkServiceManagerName,
"state": state,
}).Info("newVL3Connect ConnectPeerEndpoint")
switch state {
case PEER_STATE_NOTCONN:
// TODO do connection request
logger.WithFields(logrus.Fields{
"endpointName": peer.endpointName,
"networkServiceManagerName": peer.networkServiceManagerName,
}).Info("request remote connection")
routes := []string{vxc.vL3NetCidr}
return vxc.createPeerConnectionRequest(ctx, peer, routes, logger)
case PEER_STATE_CONN:
logger.WithFields(logrus.Fields{
"endpointName": peer.endpointName,
"networkServiceManagerName": peer.networkServiceManagerName,
}).Info("remote connection already established")
case PEER_STATE_CONNERR:
logger.WithFields(logrus.Fields{
"endpointName": peer.endpointName,
"networkServiceManagerName": peer.networkServiceManagerName,
}).Info("remote connection attempted prior and errored")
case PEER_STATE_CONN_INPROG:
logger.WithFields(logrus.Fields{
"endpointName": peer.endpointName,
"networkServiceManagerName": peer.networkServiceManagerName,
}).Info("remote connection in progress")
case PEER_STATE_CONN_RX:
logger.WithFields(logrus.Fields{
"endpointName": peer.endpointName,
"networkServiceManagerName": peer.networkServiceManagerName,
}).Info("remote connection already established--rx from peer")
default:
logger.WithFields(logrus.Fields{
"endpointName": peer.endpointName,
"networkServiceManagerName": peer.networkServiceManagerName,
}).Info("remote connection state unknown")
}
return nil
}
func removeDuplicates(elements []string) []string {
encountered := map[string]bool{}
result := []string{}
for v := range elements {
if !encountered[elements[v]] {
encountered[elements[v]] = true
result = append(result, elements[v])
}
}
return result
}
// newVL3ConnectComposite creates a new VL3 composite
func newVL3ConnectComposite(configuration *common.NSConfiguration, vL3NetCidr string, backend config.UniversalCNFBackend, remoteIpList []string, getNseName fnGetNseName, defaultCdPrefix, nseControlAddr, connDomain string) *vL3ConnectComposite {
nsRegAddr, ok := os.LookupEnv("NSREGISTRY_ADDR")
if !ok {
nsRegAddr = NSREGISTRY_ADDR
}
nsRegPort, ok := os.LookupEnv("NSREGISTRY_PORT")
if !ok {
nsRegPort = NSREGISTRY_PORT
}
// ensure the env variables are processed
if configuration == nil {
configuration = &common.NSConfiguration{}
configuration.FromEnv()
}
logrus.Infof("newVL3ConnectComposite")
var nsDiscoveryClient registry.NetworkServiceDiscoveryClient
/*
regAddr := net.ParseIP(nsRegAddr)
if regAddr == nil {
regAddrList, err := net.LookupHost(nsRegAddr)
if err != nil {
logrus.Errorf("nsmConnection registry address resolution Error: %v", err)
} else {
logrus.Infof("newVL3ConnectComposite: resolved %s to %v", nsRegAddr, regAddrList)
for _, regAddrVal := range regAddrList {
if regAddr = net.ParseIP(regAddrVal); regAddr != nil {
logrus.Infof("newVL3ConnectComposite: NSregistry using IP %s", regAddrVal)
break
}
}
}
}
regPort, _ := strconv.Atoi(nsRegPort)
nsRegGrpcClient, err := tools.SocketOperationCheck(&net.TCPAddr{IP: regAddr, Port: regPort})
*/
nsRegGrpcClient, err := tools.DialTCP(nsRegAddr + ":" + nsRegPort)
if err != nil {
logrus.Errorf("nsmRegistryConnection GRPC Client Socket Error: %v", err)
//return nil
} else {
logrus.Infof("newVL3ConnectComposite socket operation ok... create networkDiscoveryClient")
nsDiscoveryClient = registry.NewNetworkServiceDiscoveryClient(nsRegGrpcClient)
if nsDiscoveryClient == nil {
logrus.Errorf("newVL3ConnectComposite networkDiscoveryClient nil")
} else {
logrus.Infof("newVL3ConnectComposite networkDiscoveryClient ok")
}
}
// create remote_networkservice API connection
//var nsClient networkservice.NetworkServiceClient
/*
nsGrpcClient, err := tools.DialTCP(nsRegAddr + ":" + nsPort)
if err != nil {
logrus.Errorf("nsmConnection GRPC Client Socket Error: %v", err)
//return nil
} else {
logrus.Infof("newVL3ConnectComposite socket operation ok... create network-service client")
nsClient = networkservice.NewNetworkServiceClient(nsGrpcClient)
logrus.Infof("newVL3ConnectComposite network-service client ok")
}
*/
// Call the NS Client initiation
/* nsConfig := &common.NSConfiguration{
ClientNetworkService: configuration.EndpointNetworkService,
ClientLabels: "",
Routes: configuration.Routes,
} */
nsConfig := configuration
nsConfig.ClientLabels = ""
var nsmClient *client.NsmClient
nsmClient, err = client.NewNSMClient(context.TODO(), nsConfig)
if err != nil {
logrus.Errorf("Unable to create the NSM client %v", err)
}
/*
nsmConn, err := common.NewNSMConnection(context.TODO(), configuration)
if err != nil {
logrus.Errorf("nsmConnection Client Connection Error: %v", err)
} else {
nsClient = nsmConn.NsClient
}
*/
newVL3ConnectComposite := &vL3ConnectComposite{
nsConfig: configuration,
remoteNsIpList: remoteIpList,
vL3NetCidr: vL3NetCidr,
myEndpointName: "",
vl3NsePeers: make(map[string]*vL3NsePeer),
nsRegGrpcClient: nsRegGrpcClient,
nsDiscoveryClient: nsDiscoveryClient,
nsmClient: nsmClient,
backend: backend,
myNseNameFunc: getNseName,
defaultRouteIpCidr: defaultCdPrefix,
nseControlAddr: nseControlAddr,
connDomain: connDomain,
}
logrus.Infof("newVL3ConnectComposite returning")
return newVL3ConnectComposite
}