-
Notifications
You must be signed in to change notification settings - Fork 0
/
ads.go
118 lines (98 loc) · 3.05 KB
/
ads.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package service
import (
"context"
"errors"
"github.com/envoyproxy/go-control-plane/envoy/api/v2"
ads "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"log"
)
// XDS v2 implementation of the common Stream interface.
// IncrementalAggregatedResources is not implemented.
func (s *AdsService) DeltaAggregatedResources(stream ads.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return status.Errorf(codes.Unimplemented, "not implemented")
}
// StreamAggregatedResources implements the Envoy variant. Can be used directly with EDS.
func (s *AdsService) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
return s.stream(&adsStream{stream: stream})
}
type adsStream struct {
stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer
}
func (adss *adsStream) Send(p proto.Message) error {
if mp, ok := p.(*v2.DiscoveryResponse); ok {
return adss.stream.Send(mp)
}
return errors.New("Invalid stream")
}
func (adss *adsStream) Recv() (proto.Message, error) {
p, err := adss.stream.Recv()
if err != nil {
return nil, err
}
return p, err
}
func (adss *adsStream) Context() context.Context {
return adss.stream.Context()
}
func (adss *adsStream) Process(s *AdsService, con *Connection, msg proto.Message) error {
req := msg.(*v2.DiscoveryRequest)
if !con.active {
if req.Node == nil || req.Node.Id == "" {
log.Println("Missing node id ", req.String())
return errors.New("Missing node id")
}
con.mu.Lock()
con.NodeID = req.Node.Id
con.Metadata = parseMetadata(req.Node.Metadata)
con.ConID = s.connectionID(con.NodeID)
con.mu.Unlock()
s.mutex.Lock()
s.clients[con.ConID] = con
s.mutex.Unlock()
con.active = true
}
rtype := req.TypeUrl
if req.ErrorDetail != nil && req.ErrorDetail.Message != "" {
nacks.With(prometheus.Labels{"node": con.NodeID, "type": rtype}).Add(1)
log.Println("NACK: ", con.NodeID, rtype, req.ErrorDetail)
return nil
}
if req.ErrorDetail != nil && req.ErrorDetail.Code == 0 {
con.mu.Lock()
con.NonceAcked[rtype] = req.ResponseNonce
con.mu.Unlock()
acks.With(prometheus.Labels{"type": rtype}).Add(1)
return nil
}
if req.ResponseNonce != "" {
// This shouldn't happen
con.mu.Lock()
lastNonce := con.NonceSent[rtype]
con.mu.Unlock()
if lastNonce == req.ResponseNonce {
acks.With(prometheus.Labels{"type": rtype}).Add(1)
con.mu.Lock()
con.NonceAcked[rtype] = req.ResponseNonce
con.mu.Unlock()
return nil
} else {
// will resent the resource, set the nonce - next response should be ok.
log.Println("Unmatching nonce ", req.ResponseNonce, lastNonce)
}
}
con.mu.Lock()
// TODO: find added/removed resources, push only those.
con.Watched[rtype] = req.ResourceNames
con.mu.Unlock()
// Blocking - read will continue
err := s.push(con, rtype, req.ResourceNames)
if err != nil {
// push failed - disconnect
return err
}
return err
}