-
Notifications
You must be signed in to change notification settings - Fork 0
/
events.go
385 lines (339 loc) · 11.3 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package endpoint
import (
"fmt"
"strconv"
"github.com/sirupsen/logrus"
"github.com/cilium/cilium/pkg/bandwidth"
"github.com/cilium/cilium/pkg/eventqueue"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/maps/bwmap"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/policy"
)
// EndpointRegenerationEvent contains all fields necessary to regenerate an endpoint.
type EndpointRegenerationEvent struct {
regenContext *regenerationContext
ep *Endpoint
}
// Handle handles the regeneration event for the endpoint.
func (ev *EndpointRegenerationEvent) Handle(res chan interface{}) {
e := ev.ep
regenContext := ev.regenContext
err := e.rlockAlive()
if err != nil {
e.logDisconnectedMutexAction(err, "before regeneration")
res <- &EndpointRegenerationResult{
err: err,
}
return
}
e.runlock()
// We should only queue the request after we use all the endpoint's
// lock/unlock. Otherwise this can get a deadlock if the endpoint is
// being deleted at the same time. More info PR-1777.
doneFunc, err := e.owner.QueueEndpointBuild(regenContext.parentContext, uint64(e.ID))
if err != nil {
e.getLogger().WithError(err).Warning("unable to queue endpoint build")
} else if doneFunc != nil {
e.getLogger().Debug("Dequeued endpoint from build queue")
regenContext.DoneFunc = doneFunc
err = ev.ep.regenerate(ev.regenContext)
doneFunc()
e.notifyEndpointRegeneration(err)
} else {
// If another build has been queued for the endpoint, that means that
// that build will be able to take care of all of the work needed to
// regenerate the endpoint at this current point in time; queueing
// another build is a waste of resources.
e.getLogger().Debug("build not queued for endpoint because another build has already been queued")
}
res <- &EndpointRegenerationResult{
err: err,
}
return
}
// EndpointRegenerationResult contains the results of an endpoint regeneration.
type EndpointRegenerationResult struct {
err error
}
// EndpointRevisionBumpEvent contains all fields necessary to bump the policy
// revision of a given endpoint.
type EndpointRevisionBumpEvent struct {
Rev uint64
ep *Endpoint
}
// Handle handles the revision bump event for the Endpoint.
func (ev *EndpointRevisionBumpEvent) Handle(res chan interface{}) {
// TODO: if the endpoint is not in a 'ready' state that means that
// we cannot set the policy revision, as something else has
// changed endpoint state which necessitates regeneration,
// *or* the endpoint is in a not-ready state (i.e., a prior
// regeneration failed, so there is no way that we can
// realize the policy revision yet. Should this be signaled
// to the routine waiting for the result of this event?
ev.ep.SetPolicyRevision(ev.Rev)
res <- struct{}{}
}
// PolicyRevisionBumpEvent queues an event for the given endpoint to set its
// realized policy revision to rev. This may block depending on if events have
// been queued up for the given endpoint. It blocks until the event has
// succeeded, or if the event has been cancelled.
func (e *Endpoint) PolicyRevisionBumpEvent(rev uint64) {
epBumpEvent := eventqueue.NewEvent(&EndpointRevisionBumpEvent{Rev: rev, ep: e})
// Don't check policy revision event results - it is best effort.
_, err := e.eventQueue.Enqueue(epBumpEvent)
if err != nil {
log.WithFields(logrus.Fields{
logfields.PolicyRevision: rev,
logfields.EndpointID: e.ID,
}).Errorf("enqueue of EndpointRevisionBumpEvent failed: %s", err)
}
}
/// EndpointNoTrackEvent contains all fields necessary to update the NOTRACK rules.
type EndpointNoTrackEvent struct {
ep *Endpoint
annoCB AnnotationsResolverCB
}
// Handle handles the NOTRACK rule update.
func (ev *EndpointNoTrackEvent) Handle(res chan interface{}) {
var port uint16
e := ev.ep
// If this endpoint is going away, nothing to do.
if err := e.lockAlive(); err != nil {
res <- &EndpointRegenerationResult{
err: nil,
}
return
}
defer e.unlock()
portStr, err := ev.annoCB(e.K8sNamespace, e.K8sPodName)
if err != nil {
res <- &EndpointRegenerationResult{
err: err,
}
return
}
if portStr == "" {
port = 0
} else {
// Validate annotation before we do any actual alteration to the endpoint.
p64, err := strconv.ParseUint(portStr, 10, 16)
// Port should be within [1-65535].
if err != nil || p64 == 0 {
res <- &EndpointRegenerationResult{
err: err,
}
return
}
port = uint16(p64)
}
if port != e.noTrackPort {
log.Debug("Updating NOTRACK rules")
if e.IPv4.IsSet() {
if port > 0 {
err = e.owner.Datapath().InstallNoTrackRules(e.IPv4.String(), port, false)
log.Warnf("Error installing iptable NOTRACK rules %s", err)
}
if e.noTrackPort > 0 {
err = e.owner.Datapath().RemoveNoTrackRules(e.IPv4.String(), e.noTrackPort, false)
log.Warnf("Error removing iptable NOTRACK rules %s", err)
}
}
if e.IPv6.IsSet() {
if port > 0 {
e.owner.Datapath().InstallNoTrackRules(e.IPv6.String(), port, true)
log.Warnf("Error installing iptable NOTRACK rules %s", err)
}
if e.noTrackPort > 0 {
err = e.owner.Datapath().RemoveNoTrackRules(e.IPv6.String(), e.noTrackPort, true)
log.Warnf("Error removing iptable NOTRACK rules %s", err)
}
}
e.noTrackPort = port
}
res <- &EndpointRegenerationResult{
err: nil,
}
return
}
// EndpointPolicyVisibilityEvent contains all fields necessary to update the
// visibility policy.
type EndpointPolicyVisibilityEvent struct {
ep *Endpoint
annoCB AnnotationsResolverCB
}
// Handle handles the policy visibility update.
func (ev *EndpointPolicyVisibilityEvent) Handle(res chan interface{}) {
e := ev.ep
if err := e.lockAlive(); err != nil {
// If the endpoint is being deleted, we don't need to update its
// visibility policy.
res <- &EndpointRegenerationResult{
err: nil,
}
return
}
defer func() {
// Ensure that policy computation is performed so that endpoint
// desiredPolicy and realizedPolicy pointers are different. This state
// is needed to update endpoint policy maps with the policy map state
// generated from the visibility policy. This can, and should be more
// elegant in the future.
e.forcePolicyComputation()
e.unlock()
}()
var (
nvp *policy.VisibilityPolicy
err error
)
proxyVisibility, err := ev.annoCB(e.K8sNamespace, e.K8sPodName)
if err != nil {
res <- &EndpointRegenerationResult{
err: err,
}
return
}
if proxyVisibility != "" {
e.getLogger().Debug("creating visibility policy")
nvp, err = policy.NewVisibilityPolicy(proxyVisibility)
if err != nil {
e.getLogger().WithError(err).Warning("unable to parse annotations into visibility policy; disabling visibility policy for endpoint")
e.visibilityPolicy = &policy.VisibilityPolicy{
Ingress: make(policy.DirectionalVisibilityPolicy),
Egress: make(policy.DirectionalVisibilityPolicy),
Error: err,
}
res <- &EndpointRegenerationResult{
err: nil,
}
return
}
}
e.visibilityPolicy = nvp
res <- &EndpointRegenerationResult{
err: nil,
}
return
}
// EndpointPolicyBandwidthEvent contains all fields necessary to update
// the Pod's bandwidth policy.
type EndpointPolicyBandwidthEvent struct {
ep *Endpoint
annoCB AnnotationsResolverCB
}
// Handle handles the policy bandwidth update.
func (ev *EndpointPolicyBandwidthEvent) Handle(res chan interface{}) {
var bps uint64
e := ev.ep
if err := e.lockAlive(); err != nil {
// If the endpoint is being deleted, we don't need to
// update its bandwidth policy.
res <- &EndpointRegenerationResult{
err: nil,
}
return
}
defer func() {
e.unlock()
}()
bandwidthEgress, err := ev.annoCB(e.K8sNamespace, e.K8sPodName)
if err != nil || !option.Config.EnableBandwidthManager {
res <- &EndpointRegenerationResult{
err: err,
}
return
}
if bandwidthEgress != "" {
bps, err = bandwidth.GetBytesPerSec(bandwidthEgress)
if err == nil {
err = bwmap.Update(e.ID, bps)
}
} else {
err = bwmap.Delete(e.ID)
}
if err != nil {
res <- &EndpointRegenerationResult{
err: err,
}
return
}
bpsOld := "inf"
bpsNew := "inf"
if e.bps != 0 {
bpsOld = strconv.FormatUint(e.bps, 10)
}
if bps != 0 {
bpsNew = strconv.FormatUint(bps, 10)
}
e.getLogger().Debugf("Updating %s from %s to %s bytes/sec", bandwidth.EgressBandwidth,
bpsOld, bpsNew)
e.bps = bps
res <- &EndpointRegenerationResult{
err: nil,
}
}
// InitEventQueue initializes the endpoint's event queue. Note that this
// function does not begin processing events off the queue, as that's left up
// to the caller to call Expose in order to allow other subsystems to access
// the endpoint. This function assumes that the endpoint ID has already been
// allocated!
//
// Having this be a separate function allows us to prepare
// the event queue while the endpoint is being validated (during restoration)
// so that when its metadata is resolved, events can be enqueued (such as
// visibility policy and bandwidth policy).
func (e *Endpoint) InitEventQueue() {
e.eventQueue = eventqueue.NewEventQueueBuffered(fmt.Sprintf("endpoint-%d", e.ID), option.Config.EndpointQueueSize)
}
// Start assigns a Cilium Endpoint ID to the endpoint and prepares it to
// receive events from other subsystems.
//
// The endpoint must not already be exposed via the endpointmanager prior to
// calling Start(), as it assumes unconditional access over the Endpoint
// object.
func (e *Endpoint) Start(id uint16) {
// No need to check liveness as an endpoint can only be deleted via the
// API after it has been inserted into the manager.
// 'e.ID' written below, read lock is not enough.
e.unconditionalLock()
defer e.unlock()
e.ID = id
e.UpdateLogger(map[string]interface{}{
logfields.EndpointID: e.ID,
})
// Start goroutines that are responsible for handling events.
e.startRegenerationFailureHandler()
if e.eventQueue == nil {
e.InitEventQueue()
}
e.eventQueue.Run()
e.getLogger().Info("New endpoint")
}
// Stop cleans up all goroutines managed by this endpoint (EventQueue,
// Controllers).
// This function should be used directly in cleanup functions which aim to stop
// goroutines managed by this endpoint, but without removing BPF maps and
// datapath state (for instance, because the daemon is shutting down but the
// endpoint should remain operational while the daemon is not running).
func (e *Endpoint) Stop() {
// Since the endpoint is being deleted, we no longer need to run events
// in its event queue. This is a no-op if the queue has already been
// closed elsewhere.
e.eventQueue.Stop()
// Wait for the queue to be drained in case an event which is currently
// running for the endpoint tries to acquire the lock - we cannot be sure
// what types of events will be pushed onto the EventQueue for an endpoint
// and when they will happen. After this point, no events for the endpoint
// will be processed on its EventQueue, specifically regenerations.
e.eventQueue.WaitToBeDrained()
// Given that we are deleting the endpoint and that no more builds are
// going to occur for this endpoint, close the channel which signals whether
// the endpoint has its BPF program compiled or not to avoid it persisting
// if anything is blocking on it. If a delete request has already been
// enqueued for this endpoint, this is a no-op.
e.closeBPFProgramChannel()
// Cancel active controllers for the endpoint tied to e.aliveCtx.
e.aliveCancel()
}