-
Notifications
You must be signed in to change notification settings - Fork 682
/
fake.go
442 lines (378 loc) · 13.8 KB
/
fake.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
package entrypoint
import (
"context"
"encoding/json"
"fmt"
"os/exec"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/datawire/ambassador/v2/cmd/ambex"
amb "github.com/datawire/ambassador/v2/pkg/api/getambassador.io/v2"
"github.com/datawire/ambassador/v2/pkg/consulwatch"
"github.com/datawire/ambassador/v2/pkg/kates"
"github.com/datawire/ambassador/v2/pkg/snapshot/v1"
"github.com/datawire/dlib/dgroup"
v3bootstrap "github.com/datawire/ambassador/v2/pkg/api/envoy/config/bootstrap/v3"
)
// The Fake struct is a test harness for edgestack. Its goals are to help us fill out our test
// pyramid by making it super easy to create unit-like tests directly from the snapshots, bug
// reports, and other inputs provided by users who find regressions and/or encounter other problems
// in the field. Since we have no shortage of these reports, if we make it easy to create tests from
// them, we will fill out our test pyramid quickly and hopefully reduce our rate of
// regressions. This also means the tests produced this way need to scale well both in terms of
// execution time/parallelism as well as flakiness since we will quickly have a large number of
// these tests.
//
// The way this works is by isolating via dependency injection the key portions of the control plane
// where the bulk of our business logic is implemented. The Fake utilities directly feed this
// lightweight control plane its input as specified by the test code without passing the resources
// all the way through a real kubernetes API server and/or a real consul deployment. This is not
// only significantly more efficient than spinning up real kubernetes and/or consul deployments, but
// it also lets us precisely control the order of events thereby a) removing the nondeterminism that
// leads to flaky tests, and b) also allowing us to deliberately create/recreate the sort of low
// probability sequence of events that are often at the root of heisenbugs.
//
// The key to being able to build tests this way is expressing our business logic as "hermetically
// sealed" libraries, i.e. libraries with no/few hardcoded dependencies. This doesn't have to be
// done in a fancy/elegant way, it is well worth practicing "stupidly mechanical dependency
// injection" in order to quickly excise some business logic of its hardcoded dependencies and
// enable this sort of testing.
//
// See TestFakeHello, TestFakeHelloWithEnvoyConfig, and TestFakeHelloConsul for examples of how to
// get started using this struct to write tests.
type Fake struct {
// These are all read only fields. They implement the dependencies that get injected into
// the watcher loop.
config FakeConfig
T *testing.T
group *dgroup.Group
cancel context.CancelFunc
k8sSource *fakeK8sSource
watcher *fakeWatcher
istioCertSource *fakeIstioCertSource
// This group of fields are used to store kubernetes resources and consul endpoint data and
// provide explicit control over when changes to that data are sent to the control plane.
k8sStore *K8sStore
consulStore *ConsulStore
k8sNotifier *Notifier
consulNotifier *Notifier
// This holds the current snapshot.
currentSnapshot *atomic.Value
fastpath *Queue // All fastpath snapshots that have been produced.
snapshots *Queue // All snapshots that have been produced.
envoyConfigs *Queue // All envoyConfigs that have been produced.
// This is used to make Teardown idempotent.
teardownOnce sync.Once
ambassadorMeta *snapshot.AmbassadorMetaInfo
}
// FakeConfig provides option when constructing a new Fake.
type FakeConfig struct {
EnvoyConfig bool // If true then the Fake will produce envoy configs in addition to Snapshots.
DiagdDebug bool // If true then diagd will have debugging enabled
Timeout time.Duration // How long to wait for snapshots and/or envoy configs to become available.
}
func (fc *FakeConfig) fillDefaults() {
if fc.Timeout == 0 {
fc.Timeout = 10 * time.Second
}
}
// NewFake will construct a new Fake object. See RunFake for a convenient way to handle construct,
// Setup, and Teardown of a Fake with one line of code.
func NewFake(t *testing.T, config FakeConfig) *Fake {
config.fillDefaults()
ctx, cancel := context.WithCancel(context.Background())
k8sStore := NewK8sStore()
consulStore := NewConsulStore()
fake := &Fake{
config: config,
T: t,
cancel: cancel,
group: dgroup.NewGroup(ctx, dgroup.GroupConfig{EnableWithSoftness: true}),
k8sStore: k8sStore,
consulStore: consulStore,
k8sNotifier: NewNotifier(),
consulNotifier: NewNotifier(),
currentSnapshot: &atomic.Value{},
fastpath: NewQueue(t, config.Timeout),
snapshots: NewQueue(t, config.Timeout),
envoyConfigs: NewQueue(t, config.Timeout),
}
fake.k8sSource = &fakeK8sSource{fake: fake, store: k8sStore}
fake.watcher = &fakeWatcher{fake: fake, store: consulStore}
fake.istioCertSource = &fakeIstioCertSource{}
return fake
}
// RunFake will create a new fake, invoke its Setup method and register its Teardown method as a
// Cleanup function with the test object.
func RunFake(t *testing.T, config FakeConfig, ambMeta *snapshot.AmbassadorMetaInfo) *Fake {
fake := NewFake(t, config)
fake.SetAmbassadorMeta(ambMeta)
fake.Setup()
fake.T.Cleanup(fake.Teardown)
return fake
}
// Setup will start up all the goroutines needed for this fake edgestack instance. Depending on the
// FakeConfig supplied wen constructing the Fake, this may also involve launching external
// processes, you should therefore ensure that you call Teardown whenever you call Setup.
func (f *Fake) Setup() {
if f.config.EnvoyConfig {
_, err := exec.LookPath("diagd")
if err != nil {
f.T.Fatal("unable to find diagd, cannot run")
}
f.group.Go("snapshot_server", func(ctx context.Context) error {
return snapshotServer(ctx, f.currentSnapshot)
})
f.group.Go("diagd", func(ctx context.Context) error {
cmdArgs := []string{
"/tmp", "/tmp/bootstrap-ads.json", "/tmp/envoy.json",
"--no-envoy", "--host", "127.0.0.1", "--port", GetDiagdBindPort(),
}
if f.config.DiagdDebug {
cmdArgs = append(cmdArgs, "--debug")
}
cmd := subcommand(ctx, "diagd", cmdArgs...)
if envbool("DEV_SHUTUP_DIAGD") {
cmd.Stdout = nil
cmd.Stderr = nil
}
err := cmd.Run()
if err != nil {
exErr, ok := err.(*exec.ExitError)
if ok {
f.T.Logf("diagd exited with error: %+v", exErr)
return nil
}
}
return err
})
}
f.group.Go("fake-watcher", f.runWatcher)
}
// Teardown will clean up anything that Setup has started. It is idempotent. Note that if you use
// RunFake Setup will be called and Teardown will be automatically registered as a Cleanup function
// with the supplied testing.T
func (f *Fake) Teardown() {
f.teardownOnce.Do(func() {
f.cancel()
err := f.group.Wait()
if err != nil && err != context.Canceled {
f.T.Fatalf("fake edgestack errored out: %+v", err)
}
})
}
func (f *Fake) runWatcher(ctx context.Context) error {
interestingTypes := GetInterestingTypes(ctx, nil)
queries := GetQueries(ctx, interestingTypes)
var err error
defer func() {
r := recover()
if r != nil {
err = r.(error)
}
}()
watcherLoop(ctx, f.currentSnapshot, f.k8sSource, queries, f.watcher, f.istioCertSource, f.notifySnapshot, f.notifyFastpath, f.ambassadorMeta)
return err
}
func (f *Fake) notifyFastpath(ctx context.Context, fastpath *ambex.FastpathSnapshot) {
f.fastpath.Add(fastpath)
}
func (f *Fake) GetEndpoints(predicate func(*ambex.Endpoints) bool) *ambex.Endpoints {
f.T.Helper()
return f.fastpath.Get(func(obj interface{}) bool {
fastpath := obj.(*ambex.FastpathSnapshot)
return predicate(fastpath.Endpoints)
}).(*ambex.FastpathSnapshot).Endpoints
}
func (f *Fake) AssertEndpointsEmpty(timeout time.Duration) {
f.T.Helper()
f.fastpath.AssertEmpty(timeout, "endpoints queue not empty")
}
type SnapshotEntry struct {
Disposition SnapshotDisposition
Snapshot *snapshot.Snapshot
}
// We pass this into the watcher loop to get notified when a snapshot is produced.
func (f *Fake) notifySnapshot(ctx context.Context, disp SnapshotDisposition, snapJSON []byte) {
if disp == SnapshotReady {
if f.config.EnvoyConfig {
notifyReconfigWebhooksFunc(ctx, &noopNotable{}, false)
f.appendEnvoyConfig()
}
}
var snap *snapshot.Snapshot
err := json.Unmarshal(snapJSON, &snap)
if err != nil {
f.T.Fatalf("error decoding snapshot: %+v", err)
}
f.snapshots.Add(SnapshotEntry{disp, snap})
}
// GetSnapshotEntry will return the next SnapshotEntry that satisfies the supplied predicate.
func (f *Fake) GetSnapshotEntry(predicate func(SnapshotEntry) bool) SnapshotEntry {
f.T.Helper()
return f.snapshots.Get(func(obj interface{}) bool {
entry := obj.(SnapshotEntry)
return predicate(entry)
}).(SnapshotEntry)
}
// GetSnapshot will return the next snapshot that satisfies the supplied predicate.
func (f *Fake) GetSnapshot(predicate func(*snapshot.Snapshot) bool) *snapshot.Snapshot {
f.T.Helper()
return f.GetSnapshotEntry(func(entry SnapshotEntry) bool {
return entry.Disposition == SnapshotReady && predicate(entry.Snapshot)
}).Snapshot
}
func (f *Fake) appendEnvoyConfig() {
msg, err := ambex.Decode("/tmp/envoy.json")
if err != nil {
f.T.Fatalf("error decoding envoy.json after sending snapshot to python: %+v", err)
}
bs := msg.(*v3bootstrap.Bootstrap)
f.envoyConfigs.Add(bs)
}
// GetEnvoyConfig will return the next envoy config that satisfies the supplied predicate.
func (f *Fake) GetEnvoyConfig(predicate func(*v3bootstrap.Bootstrap) bool) *v3bootstrap.Bootstrap {
f.T.Helper()
return f.envoyConfigs.Get(func(obj interface{}) bool {
return predicate(obj.(*v3bootstrap.Bootstrap))
}).(*v3bootstrap.Bootstrap)
}
// AutoFlush will cause a flush whenever any inputs are modified.
func (f *Fake) AutoFlush(enabled bool) {
f.k8sNotifier.AutoNotify(enabled)
f.consulNotifier.AutoNotify(enabled)
}
// Feed will cause inputs from all datasources to be delivered to the control plane.
func (f *Fake) Flush() {
f.k8sNotifier.Notify()
f.consulNotifier.Notify()
}
// sets the ambassador meta info that should get sent in each snapshot
func (f *Fake) SetAmbassadorMeta(ambMeta *snapshot.AmbassadorMetaInfo) {
f.ambassadorMeta = ambMeta
}
// UpsertFile will parse the contents of the file as yaml and feed them into the control plane
// created or updating any overlapping resources that exist.
func (f *Fake) UpsertFile(filename string) {
f.k8sStore.UpsertFile(filename)
f.k8sNotifier.Changed()
}
// UpsertYAML will parse the provided YAML and feed the resources in it into the control plane,
// creating or updating any overlapping resources that exist.
func (f *Fake) UpsertYAML(yaml string) {
f.k8sStore.UpsertYAML(yaml)
f.k8sNotifier.Changed()
}
// Upsert will update (or if necessary create) the supplied resource in the fake k8s datastore.
func (f *Fake) Upsert(resource kates.Object) {
f.k8sStore.Upsert(resource)
f.k8sNotifier.Changed()
}
// Delete will removes the specified resource from the fake k8s datastore.
func (f *Fake) Delete(kind, namespace, name string) {
f.k8sStore.Delete(kind, namespace, name)
f.k8sNotifier.Changed()
}
// ConsulEndpoint stores the supplied consul endpoint data.
func (f *Fake) ConsulEndpoint(datacenter, service, address string, port int, tags ...string) {
f.consulStore.ConsulEndpoint(datacenter, service, address, port, tags...)
f.consulNotifier.Changed()
}
// SendIstioCertUpdate sends the supplied Istio certificate update.
func (f *Fake) SendIstioCertUpdate(update IstioCertUpdate) {
f.istioCertSource.updateChannel <- update
}
type fakeK8sSource struct {
fake *Fake
store *K8sStore
}
func (fs *fakeK8sSource) Watch(ctx context.Context, queries ...kates.Query) K8sWatcher {
fw := &fakeK8sWatcher{fs.store.Cursor(), make(chan struct{}), queries}
fs.fake.k8sNotifier.Listen(func() {
go func() {
fw.notifyCh <- struct{}{}
}()
})
return fw
}
type fakeK8sWatcher struct {
cursor *K8sStoreCursor
notifyCh chan struct{}
queries []kates.Query
}
func (f *fakeK8sWatcher) Changed() chan struct{} {
return f.notifyCh
}
func (f *fakeK8sWatcher) FilteredUpdate(target interface{}, deltas *[]*kates.Delta, predicate func(*kates.Unstructured) bool) bool {
byname := map[string][]kates.Object{}
resources, newDeltas := f.cursor.Get()
for _, obj := range resources {
for _, q := range f.queries {
var un *kates.Unstructured
err := convert(obj, &un)
if err != nil {
panic(err)
}
if matches(q, obj) && predicate(un) {
byname[q.Name] = append(byname[q.Name], obj)
}
}
}
// XXX: this stuff is copied from kates/accumulator.go
targetVal := reflect.ValueOf(target)
targetType := targetVal.Type().Elem()
for _, q := range f.queries {
name := q.Name
v := byname[q.Name]
fieldEntry, ok := targetType.FieldByName(name)
if !ok {
panic(fmt.Sprintf("no such field: %q", name))
}
val := reflect.New(fieldEntry.Type)
err := convert(v, val.Interface())
if err != nil {
panic(err)
}
targetVal.Elem().FieldByName(name).Set(reflect.Indirect(val))
}
*deltas = newDeltas
return len(newDeltas) > 0
}
func matches(query kates.Query, obj kates.Object) bool {
kind := canon(query.Kind)
gvk := obj.GetObjectKind().GroupVersionKind()
return kind == canon(gvk.Kind)
}
type fakeWatcher struct {
fake *Fake
store *ConsulStore
}
func (f *fakeWatcher) Watch(resolver *amb.ConsulResolver, svc string, endpoints chan consulwatch.Endpoints) Stopper {
var sent consulwatch.Endpoints
stop := f.fake.consulNotifier.Listen(func() {
ep, ok := f.store.Get(resolver.Spec.Datacenter, svc)
if ok && !reflect.DeepEqual(ep, sent) {
endpoints <- ep
sent = ep
}
})
return &fakeStopper{stop}
}
type fakeStopper struct {
stop StopFunc
}
func (f *fakeStopper) Stop() {
f.stop()
}
type fakeIstioCertSource struct {
updateChannel chan IstioCertUpdate
}
func (src *fakeIstioCertSource) Watch(ctx context.Context) IstioCertWatcher {
src.updateChannel = make(chan IstioCertUpdate)
return &istioCertWatcher{
updateChannel: src.updateChannel,
}
}