-
Notifications
You must be signed in to change notification settings - Fork 686
/
accumulator.go
364 lines (320 loc) · 10.4 KB
/
accumulator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
package kates
import (
"context"
"encoding/json"
"fmt"
"reflect"
"sync"
"k8s.io/apimachinery/pkg/api/meta"
)
// The Accumulator struct is used to efficiently maintain an in-memory copy of kubernetes resources
// present in a cluster in a form that is easy for business logic to process. It functions as a
// bridge between delta-based kubernetes watches on individual Kinds and the complete/consistent set
// of objects on which business logic needs to operate. In that sense it accumulates both multiple
// kinds of kubernetes resources into a single snapshot, as well as accumulating deltas on
// individual objects into relevant sets of objects.
//
// The Goals/Requirements below are based heavily on the needs of Ambassador as they have evolved
// over the years. A lot of this comes down to the fact that unlike the exemplary
// deployment/replicaset controller examples which typically operate on a single resource and render
// it into another (deployment -> N replicasets, replicaset -> N pods), Ambassador's controller
// logic has some additional requirements:
//
// 1. Complete knowledge of resources in a cluster. Because many thousands of Mappings are
// ultimately assembled into a single envoy configuration responsible for ingress into the
// cluster, the consequences of producing an envoy configuration when you e.g. know about only
// half of those Mappings is catastrophic (you are black-holing half your traffic).
//
// 2. Complete knowledge of multiple resources. Instead of having one self contained input like a
// deployment or a replicaset, Ambassador's business logic has many inputs, and the consequence
// of producing an envoy without knowledge of *all* of those inputs is equally catastrophic,
// e.g. it's no use knowing about all the Mappings if you don't know about any of the Hosts yet.
//
// Goals/Requirements:
//
// 1. Bootstrap of a single Kind: the Accumulator will ensure that all pre-existing resources of
// that Kind have been loaded into memory prior to triggering any notifications. This guarantees
// we will never trigger business logic on an egregiously incomplete view of the cluster
// (e.g. when 500 out of 1000 Mappings have been loaded) and makes it safe for the business
// logic to assume complete knowledge of the cluster.
//
// 2. When multiple Kinds are needed by a controller, the Accumulator will not notify the
// controller until all the Kinds have been fully bootstrapped.
//
// 3. Graceful load shedding: When the rate of change of resources is very fast, the API and
// implementation are structured so that individual object deltas get coalesced into a single
// snapshot update. This prevents excessively triggering business logic to process an entire
// snapshot for each individual object change that occurs.
type Accumulator struct {
client *Client
fields map[string]*field
// keyed by unKey(*Unstructured), tracks excluded resources for filtered updates
excluded map[string]bool
synced int
changed chan struct{}
mutex sync.Mutex
}
type field struct {
query Query
selector Selector
mapping *meta.RESTMapping
// The values and deltas map are keyed by unKey(*Unstructured)
values map[string]*Unstructured
// The values map has a true for a new or update object, false for a deleted object.
deltas map[string]*Delta
synced bool
firstUpdate bool
}
type DeltaType int
const (
ObjectAdd DeltaType = iota
ObjectUpdate
ObjectDelete
)
func (dt DeltaType) MarshalJSON() ([]byte, error) {
switch dt {
case ObjectAdd:
return []byte(`"add"`), nil
case ObjectUpdate:
return []byte(`"update"`), nil
case ObjectDelete:
return []byte(`"delete"`), nil
default:
panic("missing case")
}
}
func (dt *DeltaType) UnmarshalJSON(b []byte) error {
var str string
err := json.Unmarshal(b, &str)
if err != nil {
return err
}
switch str {
case "add":
*dt = ObjectAdd
case "update":
*dt = ObjectUpdate
case "delete":
*dt = ObjectDelete
default:
return fmt.Errorf("unrecognized delta type: %s", str)
}
return nil
}
type Delta struct {
TypeMeta `json:""`
ObjectMeta `json:"metadata,omitempty"`
DeltaType DeltaType `json:"deltaType"`
}
func NewDelta(deltaType DeltaType, obj *Unstructured) *Delta {
return newDelta(deltaType, obj)
}
func NewDeltaFromObject(deltaType DeltaType, obj Object) *Delta {
var un *Unstructured
err := convert(obj, &un)
if err != nil {
panic(err)
}
return NewDelta(deltaType, un)
}
func newDelta(deltaType DeltaType, obj *Unstructured) *Delta {
// We don't want all of the object, just a subset.
return &Delta{
TypeMeta: TypeMeta{
APIVersion: obj.GetAPIVersion(),
Kind: obj.GetKind(),
},
ObjectMeta: ObjectMeta{
Name: obj.GetName(),
Namespace: obj.GetNamespace(),
// Not sure we need this, but it marshals as null if we don't provide it.
CreationTimestamp: obj.GetCreationTimestamp(),
},
DeltaType: deltaType,
}
}
func newAccumulator(ctx context.Context, client *Client, queries ...Query) *Accumulator {
changed := make(chan struct{})
fields := make(map[string]*field)
rawUpdateCh := make(chan rawUpdate)
for _, q := range queries {
mapping, err := client.mappingFor(q.Kind)
if err != nil {
panic(err)
}
sel, err := ParseSelector(q.LabelSelector)
if err != nil {
panic(err)
}
fields[q.Name] = &field{
query: q,
mapping: mapping,
selector: sel,
values: make(map[string]*Unstructured),
deltas: make(map[string]*Delta),
}
client.watchRaw(ctx, q, rawUpdateCh, client.cliFor(mapping, q.Namespace))
}
acc := &Accumulator{client, fields, map[string]bool{}, 0, changed, sync.Mutex{}}
// This coalesces reads from rawUpdateCh to notifications that changes are available to be
// processed. This loop along with the logic in storeField guarantees the 3
// Goals/Requirements listed in the documentation for the Accumulator struct, i.e. Ensuring
// all Kinds are bootstrapped before any notification occurs, as well as ensuring that we
// continue to coalesce updates in the background while business logic is executing in order
// to ensure graceful load shedding.
go func() {
canSend := false
for {
var rawUp rawUpdate
if canSend {
select {
case changed <- struct{}{}:
canSend = false
continue
case rawUp = <-rawUpdateCh:
case <-ctx.Done():
return
}
} else {
select {
case rawUp = <-rawUpdateCh:
case <-ctx.Done():
return
}
}
// Don't overwrite canSend if storeField returns false. We may not yet have
// had a chance to send a notification down the changed channel.
if acc.storeUpdate(rawUp) {
canSend = true
}
}
}()
return acc
}
func (a *Accumulator) Changed() chan struct{} {
return a.changed
}
func (a *Accumulator) Update(target interface{}) bool {
return a.UpdateWithDeltas(target, nil)
}
func (a *Accumulator) UpdateWithDeltas(target interface{}, deltas *[]*Delta) bool {
return a.FilteredUpdate(target, deltas, nil)
}
// The FilteredUpdate method updates the target snapshot with only those resources for which
// "predicate" returns true. The predicate is only called when objects are added/updated, it is not
// repeatedly called on objects that have not changed. The predicate must not modify its argument.
func (a *Accumulator) FilteredUpdate(target interface{}, deltas *[]*Delta, predicate func(*Unstructured) bool) bool {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.update(reflect.ValueOf(target), deltas, predicate)
}
func (a *Accumulator) storeUpdate(update rawUpdate) bool {
a.mutex.Lock()
defer a.mutex.Unlock()
field := a.fields[update.name]
if update.new != nil {
key := unKey(update.new)
oldValue, oldExists := field.values[key]
field.values[key] = update.new
if oldExists && oldValue.GetResourceVersion() == update.new.GetResourceVersion() {
// no delta in this case, we have already delivered the new value and the delta via a
// patch
} else {
if update.old == nil {
field.deltas[key] = newDelta(ObjectAdd, update.new)
} else {
field.deltas[key] = newDelta(ObjectUpdate, update.new)
}
}
} else if update.old != nil {
key := unKey(update.old)
_, oldExists := field.values[key]
delete(field.values, key)
if !oldExists {
// no delta in this case, we have already delivered the deletion and the delta via a
// patch
} else {
field.deltas[key] = newDelta(ObjectDelete, update.old)
}
}
if update.synced && !field.synced {
field.synced = true
a.synced += 1
}
return a.synced >= len(a.fields)
}
func (a *Accumulator) updateField(target reflect.Value, name string, field *field, deltas *[]*Delta,
predicate func(*Unstructured) bool) bool {
a.client.patchWatch(field)
if field.firstUpdate && len(field.deltas) == 0 {
return false
}
field.firstUpdate = true
for key, delta := range field.deltas {
delete(field.deltas, key)
if deltas != nil {
*deltas = append(*deltas, delta)
}
if predicate != nil {
if delta.DeltaType == ObjectDelete {
delete(a.excluded, key)
} else {
un := field.values[key]
if predicate(un) {
delete(a.excluded, key)
} else {
a.excluded[key] = true
}
}
}
}
var items []*Unstructured
for key, un := range field.values {
if a.excluded[key] {
continue
}
items = append(items, un)
}
jsonBytes, err := json.Marshal(items)
if err != nil {
panic(err)
}
fieldEntry, ok := target.Type().Elem().FieldByName(name)
if !ok {
panic(fmt.Sprintf("no such field: %q", name))
}
var val reflect.Value
if fieldEntry.Type.Kind() == reflect.Slice {
val = reflect.New(fieldEntry.Type)
err := json.Unmarshal(jsonBytes, val.Interface())
if err != nil {
panic(err)
}
} else if fieldEntry.Type.Kind() == reflect.Map {
val = reflect.MakeMap(fieldEntry.Type)
for _, item := range items {
innerVal := reflect.New(fieldEntry.Type.Elem())
err := convert(item, innerVal.Interface())
if err != nil {
panic(err)
}
val.SetMapIndex(reflect.ValueOf(item.GetName()), reflect.Indirect(innerVal))
}
} else {
panic(fmt.Sprintf("don't know how to unmarshal to: %v", fieldEntry.Type))
}
target.Elem().FieldByName(name).Set(reflect.Indirect(val))
return true
}
func (a *Accumulator) update(target reflect.Value, deltas *[]*Delta, predicate func(*Unstructured) bool) bool {
if deltas != nil {
*deltas = nil
}
updated := false
for name, field := range a.fields {
if a.updateField(target, name, field, deltas, predicate) {
updated = true
}
}
return updated
}