-
Notifications
You must be signed in to change notification settings - Fork 684
/
endpoints.go
274 lines (234 loc) · 8.62 KB
/
endpoints.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
package entrypoint
import (
"context"
"fmt"
"net"
"reflect"
"strconv"
"strings"
amb "github.com/datawire/ambassador/pkg/api/getambassador.io/v2"
"github.com/datawire/ambassador/pkg/kates"
snapshotTypes "github.com/datawire/ambassador/pkg/snapshot/v1"
"github.com/datawire/dlib/dlog"
)
// endpointRoutingInfo keeps track of everything we need to know to figure out if
// endpoint routing is active.
type endpointRoutingInfo struct {
// Map from resolver name to resolver type.
resolverTypes map[string]ResolverType
module moduleResolver
endpointWatches map[string]bool // A set to track the subset of kubernetes endpoints we care about.
previousWatches map[string]bool
}
type ResolverType int
const (
KubernetesServiceResolver ResolverType = iota
KubernetesEndpointResolver
ConsulResolver
)
func (rt ResolverType) String() string {
switch rt {
case KubernetesServiceResolver:
return "KubernetesServiceResolver"
case KubernetesEndpointResolver:
return "KubernetesEndpointResolver"
case ConsulResolver:
return "ConsulResolver"
}
panic("unknown resolver type")
}
// newEndpointRoutingInfo creates a shiny new struct to hold information about
// resolvers in use and such.
func newEndpointRoutingInfo() endpointRoutingInfo {
return endpointRoutingInfo{
// resolverTypes keeps track of the type of every resolver in the system.
// It starts out empty.
//
// Why do we need to look at all the resolvers? Because, unless the user
// overrides them, resolvers "endpoint" and "kubernetes-endpoint" are
// implicitly endpoint resolvers -- but they won't show up in the snapshot.
// So we need to track whether they've been redefined. Sigh.
resolverTypes: make(map[string]ResolverType),
// Track which endpoints we actually want to watch.
endpointWatches: make(map[string]bool),
}
}
func (eri *endpointRoutingInfo) reconcileEndpointWatches(ctx context.Context, s *snapshotTypes.KubernetesSnapshot) {
// Reset our state except for the previous endpoint watches. We keep them so we can detect if
// the set of things we are interested in has changed.
eri.resolverTypes = map[string]ResolverType{}
eri.module = moduleResolver{}
eri.previousWatches = eri.endpointWatches
eri.endpointWatches = map[string]bool{}
// Phase one processes all the configuration stuff that Mappings depend on. Right now this
// includes Modules and Resolvers. When we are done with Phase one we have processed enough
// resources to correctly interpret Mappings.
for _, a := range s.Annotations {
if include(GetAmbId(a)) {
eri.checkResourcePhase1(ctx, a, "annotation")
}
}
// After that, walk all the other resources. We do this with separate loops
// for each type -- since we know a priori what type they are, there's no
// need to test every resource, and no need to walk over things we're not
// interested in.
for _, m := range s.Modules {
if include(m.Spec.AmbassadorID) {
eri.checkModule(ctx, m, "CRD")
}
}
for _, r := range s.KubernetesServiceResolvers {
if include(r.Spec.AmbassadorID) {
eri.saveResolver(ctx, r.GetName(), KubernetesServiceResolver, "CRD")
}
}
for _, r := range s.KubernetesEndpointResolvers {
if include(r.Spec.AmbassadorID) {
eri.saveResolver(ctx, r.GetName(), KubernetesEndpointResolver, "CRD")
}
}
for _, r := range s.ConsulResolvers {
if include(r.Spec.AmbassadorID) {
eri.saveResolver(ctx, r.GetName(), ConsulResolver, "CRD")
}
}
// Once all THAT is done, make sure to define the default "endpoint" and
// "kubernetes-endpoint" resolvers if they don't exist.
for _, rName := range []string{"endpoint", "kubernetes-endpoint"} {
_, found := eri.resolverTypes[rName]
if !found {
dlog.Debugf(ctx, "WATCHER: endpoint resolver %s exists by default", rName)
eri.resolverTypes[rName] = KubernetesEndpointResolver
}
}
for _, a := range s.Annotations {
if include(GetAmbId(a)) {
eri.checkResourcePhase2(ctx, a, "annotation")
}
}
for _, m := range s.Mappings {
if include(m.Spec.AmbassadorID) {
eri.checkMapping(ctx, m, "CRD")
}
}
for _, t := range s.TCPMappings {
if include(t.Spec.AmbassadorID) {
eri.checkTCPMapping(ctx, t, "CRD")
}
}
}
func (eri *endpointRoutingInfo) watchesChanged() bool {
return !reflect.DeepEqual(eri.endpointWatches, eri.previousWatches)
}
// checkResourcePhase1 processes Modules and Resolvers and calls the correct type specific handler.
func (eri *endpointRoutingInfo) checkResourcePhase1(ctx context.Context, obj kates.Object, source string) {
switch v := obj.(type) {
case *amb.Module:
eri.checkModule(ctx, v, source)
case *amb.KubernetesServiceResolver:
eri.saveResolver(ctx, v.GetName(), KubernetesServiceResolver, "CRD")
case *amb.KubernetesEndpointResolver:
eri.saveResolver(ctx, v.GetName(), KubernetesEndpointResolver, "CRD")
case *amb.ConsulResolver:
eri.saveResolver(ctx, v.GetName(), ConsulResolver, "CRD")
}
}
// checkResourcePhase2 processes both regular and tcp Mappings and calls the correct type specific handler.
func (eri *endpointRoutingInfo) checkResourcePhase2(ctx context.Context, obj kates.Object, source string) {
switch v := obj.(type) {
case *amb.Mapping:
eri.checkMapping(ctx, v, source)
case *amb.TCPMapping:
eri.checkTCPMapping(ctx, v, source)
}
}
type moduleResolver struct {
Resolver string `json:"resolver"`
UseAmbassadorNamespaceForServiceResolution bool `json:"use_ambassador_namespace_for_service_resolution"`
}
// checkModule parses the stuff we care about out of the ambassador Module.
func (eri *endpointRoutingInfo) checkModule(ctx context.Context, mod *amb.Module, source string) {
if mod.GetName() != "ambassador" {
return
}
mr := moduleResolver{}
err := convert(mod.Spec.Config, &mr)
if err != nil {
dlog.Errorf(ctx, "error parsing ambassador module: %v", err)
return
}
// The default resolver is the kubernetes service resolver.
if mr.Resolver == "" {
mr.Resolver = "kubernetes-service"
}
eri.module = mr
}
// saveResolver saves an active resolver in our resolver-type map. This is used for
// all kinds of resolvers, hence the resType parameter.
func (eri *endpointRoutingInfo) saveResolver(ctx context.Context, name string, resType ResolverType, source string) {
// No magic here, just save the silly thing.
eri.resolverTypes[name] = resType
dlog.Debugf(ctx, "WATCHER: %s resolver %s is active (%s)", resType.String(), name, source)
}
// checkMapping figures out what resolver is in use for a given Mapping.
func (eri *endpointRoutingInfo) checkMapping(ctx context.Context, mapping *amb.Mapping, source string) {
// Grab the name and the (possibly-empty) resolver.
name := mapping.GetName()
resolver := mapping.Spec.Resolver
service := mapping.Spec.Service
if resolver == "" {
// No specified resolver means "use the default resolver".
resolver = eri.module.Resolver
dlog.Debugf(ctx, "WATCHER: Mapping %s uses the default resolver (%s)", name, source)
}
if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
svc, ns, _ := eri.module.parseService(service, mapping.GetNamespace())
eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
}
}
// checkTCPMapping figures out what resolver is in use for a given TCPMapping.
func (eri *endpointRoutingInfo) checkTCPMapping(ctx context.Context, tcpmapping *amb.TCPMapping, source string) {
// Grab the name and the (possibly-empty) resolver.
name := tcpmapping.GetName()
resolver := tcpmapping.Spec.Resolver
service := tcpmapping.Spec.Service
if resolver == "" {
// No specified resolver means "use the default resolver".
dlog.Debugf(ctx, "WATCHER: TCPMapping %s uses the default resolver (%s)", name, source)
resolver = eri.module.Resolver
}
if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
svc, ns, _ := eri.module.parseService(service, tcpmapping.GetNamespace())
eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
}
}
func (m *moduleResolver) parseService(svcName, svcNamespace string) (name string, namespace string, port string) {
// First split off the port if it exists.
parts := strings.SplitN(svcName, ":", 2)
if len(parts) > 1 {
_, err := strconv.Atoi(parts[1])
if err == nil {
port = parts[1]
svcName = parts[0]
}
}
// Next check to see if it is an IP address.
ip := net.ParseIP(svcName)
if ip != nil {
name = svcName
} else if strings.Contains(svcName, ".") {
// If it's not an ip address but does have a dot then we split it up to find the namespace.
parts := strings.SplitN(svcName, ".", 2)
name = parts[0]
namespace = parts[1]
return
} else {
name = svcName
}
if m.UseAmbassadorNamespaceForServiceResolution || svcNamespace == "" {
namespace = GetAmbassadorNamespace()
} else {
namespace = svcNamespace
}
return
}