-
Notifications
You must be signed in to change notification settings - Fork 681
/
fake_k8s_store.go
298 lines (269 loc) · 8.33 KB
/
fake_k8s_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
package entrypoint
import (
"encoding/json"
"fmt"
"io/ioutil"
"sort"
"strings"
"sync"
"github.com/datawire/ambassador/pkg/kates"
)
// A K8sStore is implement just enough data structures to mock the watch aspect of kubernetes for
// testing purposes. It holds a map of kubernetes resources. Whenever any of these resources change
// it computes a delta and adds it to the list of deltas. The store is also capable of creating
// cursors that can be used to track multiple watches independently consuming the deltas at
// different rates.
type K8sStore struct {
// The mutex protects the entire struct, including any cursors that may have been created.
mutex sync.Mutex
resources map[K8sKey]kates.Object
// This tracks every delta forever. That's ok because we only use this for tests, so we want to
// favor simplicity over efficiency. Also tests don't run that long, so it's not a big deal.
deltas []*kates.Delta
}
type K8sKey struct {
Kind string
Namespace string
Name string
}
func (k K8sKey) sortKey() string {
return fmt.Sprintf("%s:%s:%s", k.Kind, k.Namespace, k.Name)
}
// NewK8sStore creates a new and empty store.
func NewK8sStore() *K8sStore {
return &K8sStore{resources: map[K8sKey]kates.Object{}}
}
// Upsert will either update or insert the given object depending on whether or not an object with
// that key already exists. Note that this is currently done based solely on the key (namespace,
// name) of the resource. Theoretically resources are assigned UUIDs and so in theory we could
// detect changes to the name and namespace, however I'm not even sure how kubernetes handles this
// or if it even permits that, so I am not going to attempt to consider those cases, and that may
// well result in some very obscure edgecases around changing names/namespaces that behave
// differently different from kubernetes.
func (k *K8sStore) Upsert(resource kates.Object) {
var un *kates.Unstructured
bytes, err := json.Marshal(resource)
if err != nil {
panic(err)
}
err = json.Unmarshal(bytes, &un)
if err != nil {
panic(err)
}
kind, apiVersion := canonGVK(un.GetKind())
un.SetKind(kind)
un.SetAPIVersion(apiVersion)
if un.GetNamespace() == "" {
un.SetNamespace("default")
}
k.mutex.Lock()
defer k.mutex.Unlock()
key := K8sKey{un.GetKind(), un.GetNamespace(), un.GetName()}
_, ok := k.resources[key]
if ok {
k.deltas = append(k.deltas, kates.NewDelta(kates.ObjectUpdate, un))
} else {
k.deltas = append(k.deltas, kates.NewDelta(kates.ObjectAdd, un))
}
k.resources[key] = un
}
// Delete will remove the identified resource from the store.
func (k *K8sStore) Delete(kind, namespace, name string) {
k.mutex.Lock()
defer k.mutex.Unlock()
key := K8sKey{canon(kind), namespace, name}
old, ok := k.resources[key]
if ok {
k.deltas = append(k.deltas, kates.NewDeltaFromObject(kates.ObjectDelete, old))
}
delete(k.resources, key)
}
// UpsertFile will parse the yaml manifests in the referenced file and Upsert each resource from the
// file.
func (k *K8sStore) UpsertFile(filename string) {
content, err := ioutil.ReadFile(filename)
if err != nil {
panic(err)
}
k.UpsertYAML(string(content))
}
// UpsertYAML will parse the provided YAML and feed the resources in it into the control plane,
// creating or updating any overlapping resources that exist.
func (k *K8sStore) UpsertYAML(yaml string) {
objs, err := kates.ParseManifests(yaml)
if err != nil {
panic(err)
}
for _, obj := range objs {
k.Upsert(obj)
}
}
// A Cursor allows multiple views of the same stream of deltas. The cursors implement a bootstrap
// semantic where they will generate synthetic Add deltas for every resource that currently exists,
// and from that point on report the real deltas that actually occur on the store.
func (k *K8sStore) Cursor() *K8sStoreCursor {
k.mutex.Lock()
defer k.mutex.Unlock()
return &K8sStoreCursor{store: k, offset: -1}
}
type K8sStoreCursor struct {
store *K8sStore
// Offset into the deltas slice, or negative one if the cursor is brand new.
offset int
}
// Get returns a map of resources plus all the deltas that lead to the map being in its current
// state.
func (kc *K8sStoreCursor) Get() (map[K8sKey]kates.Object, []*kates.Delta) {
kc.store.mutex.Lock()
defer kc.store.mutex.Unlock()
var deltas []*kates.Delta
resources := map[K8sKey]kates.Object{}
for _, key := range sortedKeys(kc.store.resources) {
resource := kc.store.resources[key]
resources[key] = resource
// This is the first time Get() has been called, so we shall create a synthetic ADD delta
// for every resource that currently exists.
if kc.offset < 0 {
deltas = append(deltas, kates.NewDeltaFromObject(kates.ObjectAdd, resource))
}
}
if kc.offset >= 0 {
deltas = append(deltas, kc.store.deltas[kc.offset:len(kc.store.deltas)]...)
}
kc.offset = len(kc.store.deltas)
return resources, deltas
}
func sortedKeys(resources map[K8sKey]kates.Object) []K8sKey {
var keys []K8sKey
for k := range resources {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool {
return keys[i].sortKey() < keys[j].sortKey()
})
return keys
}
func canonGVK(kind string) (canonKind string, canonGroupVersion string) {
// XXX: there is probably a better way to do this, but this is good enough for now, we just need
// this to work well for ambassador and core types.
switch strings.ToLower(kind) {
case "service":
fallthrough
case "services":
fallthrough
case "services.":
return "Service", "v1"
case "secret":
fallthrough
case "secrets":
fallthrough
case "secrets.":
return "Secret", "v1"
case "endpoints":
fallthrough
case "endpoints.":
return "Endpoints", "v1"
case "ingress":
fallthrough
case "ingresses":
fallthrough
case "ingresses.extensions":
return "Ingress", "v1"
case "ingressclass":
fallthrough
case "ingressclasses":
fallthrough
case "ingressclasses.networking.k8s.io":
return "IngressClass", "v1"
case "authservice":
fallthrough
case "authservices":
fallthrough
case "authservices.getambassador.io":
return "AuthService", "getambassador.io/v2"
case "consulresolver":
fallthrough
case "consulresolvers":
fallthrough
case "consulresolvers.getambassador.io":
return "ConsulResolver", "getambassador.io/v2"
case "devportal":
fallthrough
case "devportals":
fallthrough
case "devportals.getambassador.io":
return "DevPortal", "getambassador.io/v2"
case "host":
fallthrough
case "hosts":
fallthrough
case "hosts.getambassador.io":
return "Host", "getambassador.io/v2"
case "kubernetesendpointresolver":
fallthrough
case "kubernetesendpointresolvers":
fallthrough
case "kubernetesendpointresolvers.getambassador.io":
return "KubernetesEndpointResolver", "getambassador.io/v2"
case "kubernetesserviceresolver":
fallthrough
case "kubernetesserviceresolvers":
fallthrough
case "kubernetesserviceresolvers.getambassador.io":
return "KubernetesServiceResolver", "getambassador.io/v2"
case "logservice":
fallthrough
case "logservices":
fallthrough
case "logservices.getambassador.io":
return "LogService", "getambassador.io/v2"
case "mapping":
fallthrough
case "mappings":
fallthrough
case "mappings.getambassador.io":
return "Mapping", "getambassador.io/v2"
case "module":
fallthrough
case "modules":
fallthrough
case "modules.getambassador.io":
return "Module", "getambassador.io/v2"
case "ratelimitservice":
fallthrough
case "ratelimitservices":
fallthrough
case "ratelimitservices.getambassador.io":
return "RateLimitServices", "getambassador.io/v2"
case "tcpmapping":
fallthrough
case "tcpmappings":
fallthrough
case "tcpmappings.getambassador.io":
return "TCPMapping", "getambassador.io/v2"
case "tlscontext":
fallthrough
case "tlscontexts":
fallthrough
case "tlscontexts.getambassador.io":
return "TLSContext", "getambassador.io/v2"
case "tracingservice":
fallthrough
case "tracingservices":
fallthrough
case "tracingservices.getambassador.io":
return "TracingService", "getambassador.io/v2"
case "gatewayclasses.networking.x-k8s.io":
return "GatewayClass", "networking.x-k8s.io/v1alpha1"
case "gateways.networking.x-k8s.io":
return "Gateway", "networking.x-k8s.io/v1alpha1"
case "httproutes.networking.x-k8s.io":
return "HTTPRoute", "networking.x-k8s.io/v1alpha1"
default:
panic(fmt.Sprintf("I don't know how to canonicalize kind: %q", kind))
}
}
func canon(kind string) string {
canonKind, _ := canonGVK(kind)
return canonKind
}