-
Notifications
You must be signed in to change notification settings - Fork 51
/
controller.go
334 lines (297 loc) · 12.9 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
package podmonitor
import (
"context"
errs "errors"
"time"
"k8s.io/client-go/tools/record"
"go.aporeto.io/trireme-lib/common"
"go.aporeto.io/trireme-lib/monitor/config"
"go.aporeto.io/trireme-lib/monitor/extractors"
"go.aporeto.io/trireme-lib/policy"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
var (
// ErrHandlePUStartEventFailed is the error sent back if a start event fails
ErrHandlePUStartEventFailed = errs.New("Aporeto Enforcer start event failed")
// ErrNetnsExtractionMissing is the error when we are missing a PID or netns path after successful metadata extraction
ErrNetnsExtractionMissing = errs.New("Aporeto Enforcer missed to extract PID or netns path")
// ErrHandlePUStopEventFailed is the error sent back if a stop event fails
ErrHandlePUStopEventFailed = errs.New("Aporeto Enforcer stop event failed")
// ErrHandlePUDestroyEventFailed is the error sent back if a create event fails
ErrHandlePUDestroyEventFailed = errs.New("Aporeto Enforcer destroy event failed")
)
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager, handler *config.ProcessorConfig, metadataExtractor extractors.PodMetadataExtractor, netclsProgrammer extractors.PodNetclsProgrammer, nodeName string, enableHostPods bool) *ReconcilePod {
return &ReconcilePod{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
recorder: mgr.GetRecorder("trireme-pod-controller"),
handler: handler,
metadataExtractor: metadataExtractor,
netclsProgrammer: netclsProgrammer,
nodeName: nodeName,
enableHostPods: enableHostPods,
// TODO: might move into configuration
handlePUEventTimeout: 5 * time.Second,
getterTimeout: 2 * time.Second,
metadataExtractTimeout: 3 * time.Second,
netclsProgramTimeout: 2 * time.Second,
}
}
// addController adds a new Controller to mgr with r as the reconcile.Reconciler
func addController(mgr manager.Manager, r *ReconcilePod, eventsCh <-chan event.GenericEvent) error {
// Create a new controller
c, err := controller.New("trireme-pod-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}
// we use this mapper in both of our event sources
mapper := &WatchPodMapper{
client: mgr.GetClient(),
nodeName: r.nodeName,
enableHostPods: r.enableHostPods,
}
// use the our watch pod mapper which filters pods before we reconcile
if err := c.Watch(
&source.Kind{Type: &corev1.Pod{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: mapper},
); err != nil {
return err
}
// we pass in a custom channel for events generated by resync
return c.Watch(
&source.Channel{Source: eventsCh},
&handler.EnqueueRequestsFromMapFunc{ToRequests: mapper},
)
}
var _ reconcile.Reconciler = &ReconcilePod{}
// ReconcilePod reconciles a Pod object
type ReconcilePod struct {
// This client, initialized using mgr.Client() above, is a split client
// that reads objects from the cache and writes to the apiserver
client client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
handler *config.ProcessorConfig
metadataExtractor extractors.PodMetadataExtractor
netclsProgrammer extractors.PodNetclsProgrammer
nodeName string
enableHostPods bool
getterTimeout time.Duration
metadataExtractTimeout time.Duration
handlePUEventTimeout time.Duration
netclsProgramTimeout time.Duration
}
// Reconcile reads that state of the cluster for a pod object
func (r *ReconcilePod) Reconcile(request reconcile.Request) (reconcile.Result, error) {
ctx := context.Background()
puID := request.NamespacedName.String()
// Fetch the corresponding pod object.
pod := &corev1.Pod{}
if err := r.client.Get(ctx, request.NamespacedName, pod); err != nil {
if errors.IsNotFound(err) {
zap.L().Debug("Pod IsNotFound", zap.String("puID", puID))
handlePUCtx, handlePUCancel := context.WithTimeout(ctx, r.handlePUEventTimeout)
defer handlePUCancel()
// NOTE: We should not need to call Stop first, a Destroy should also do a Stop if necessary.
// This should be fixed in the policy engine.
// try to call stop, but don't fail if that errors
err := r.handler.Policy.HandlePUEvent(
handlePUCtx,
puID,
common.EventStop,
policy.NewPURuntimeWithDefaults(),
)
if err != nil {
zap.L().Warn("failed to handle stop event during destroy", zap.String("puID", puID), zap.Error(err))
}
// call destroy regardless if stop succeeded
if err := r.handler.Policy.HandlePUEvent(
handlePUCtx,
puID,
common.EventDestroy,
policy.NewPURuntimeWithDefaults(),
); err != nil {
zap.L().Error("failed to handle destroy event", zap.String("puID", puID), zap.Error(err))
return reconcile.Result{}, ErrHandlePUDestroyEventFailed
}
return reconcile.Result{}, nil
}
// Otherwise, we retry.
return reconcile.Result{}, err
}
// abort immediately if this is a HostNetwork pod, but we don't want to activate them
// NOTE: is already done in the mapper, however, this additional check does not hurt
if pod.Spec.HostNetwork && !r.enableHostPods {
zap.L().Debug("Pod is a HostNetwork pod, but enableHostPods is false", zap.String("puID", puID))
return reconcile.Result{}, nil
}
// abort if this pod is terminating / shutting down
//
// NOTE: a pod that is terminating, is going to reconcile as well in the PodRunning phase,
// however, it will have the deletion timestamp set which is an indicator for us that it is
// shutting down. It means for us, that we don't have to do anything yet. We can safely stop
// the PU when the phase is PodSucceeded/PodFailed, or delete it once we reconcile on a pod
// that cannot be found any longer.
if pod.DeletionTimestamp != nil {
zap.L().Debug("Pod is terminating, deletion timestamp found", zap.String("puID", puID), zap.String("deletionTimestamp", pod.DeletionTimestamp.String()))
return reconcile.Result{}, nil
}
// every HandlePUEvent call gets done in this context
handlePUCtx, handlePUCancel := context.WithTimeout(ctx, r.handlePUEventTimeout)
defer handlePUCancel()
switch pod.Status.Phase {
case corev1.PodPending:
fallthrough
case corev1.PodRunning:
// we will create the PU already, but not start it
zap.L().Debug("PodPending/PodRunning", zap.String("puID", puID))
// try to find out if any of the containers have been started yet
// NOTE: This is important because InitContainers are started during the PodPending phase which is
// what we need to rely on for activation as early as possible
var started bool
for _, status := range pod.Status.InitContainerStatuses {
if status.State.Running != nil {
started = true
break
}
}
if !started {
for _, status := range pod.Status.ContainerStatuses {
if status.State.Running != nil {
started = true
break
}
}
}
zap.L().Debug("PodPending / PodRunning", zap.String("puID", puID), zap.Bool("anyContainerStarted", started))
// now try to do the metadata extraction
extractCtx, extractCancel := context.WithTimeout(ctx, r.metadataExtractTimeout)
defer extractCancel()
puRuntime, err := r.metadataExtractor(extractCtx, r.client, r.scheme, pod, started)
if err != nil {
zap.L().Error("failed to extract metadata", zap.String("puID", puID), zap.Error(err))
r.recorder.Eventf(pod, "Warning", "PUExtractMetadata", "PU '%s' failed to extract metadata: %s", puID, err.Error())
return reconcile.Result{}, err
}
// now create/update the PU
// try to update the PU first
if err := r.handler.Policy.HandlePUEvent(
handlePUCtx,
puID,
common.EventUpdate,
puRuntime,
); err != nil {
// if it does not exist, we create it
if policy.IsErrPUNotFound(err) {
// create it - metadata might have changed
if err := r.handler.Policy.HandlePUEvent(
handlePUCtx,
puID,
common.EventCreate,
puRuntime,
); err != nil {
zap.L().Error("failed to handle create event", zap.String("puID", puID), zap.Error(err))
r.recorder.Eventf(pod, "Warning", "PUCreate", "failed to handle create event for PU '%s': %s", puID, err.Error())
return reconcile.Result{}, err
}
r.recorder.Eventf(pod, "Normal", "PUCreate", "PU '%s' created successfully", puID)
} else {
zap.L().Error("failed to handle update event", zap.String("puID", puID), zap.Error(err))
r.recorder.Eventf(pod, "Warning", "PUUpdate", "failed to handle update event for PU '%s': %s", puID, err.Error())
return reconcile.Result{}, err
}
} else {
r.recorder.Eventf(pod, "Normal", "PUUpdate", "PU '%s' updated successfully", puID)
}
if started {
// if the metadata extractor is missing the PID or nspath, we need to try again
// we need it for starting the PU. However, only require this if we are not in host network mode.
// NOTE: this can happen for example if the containers are not in a running state on their own
if !pod.Spec.HostNetwork && len(puRuntime.NSPath()) == 0 && puRuntime.Pid() == 0 {
zap.L().Error("Kubernetes thinks a container is running, however, we failed to extract a PID or NSPath with the metadata extractor. Requeueing...", zap.String("puID", puID))
r.recorder.Eventf(pod, "Warning", "PUStart", "PU '%s' failed to extract netns", puID)
return reconcile.Result{}, ErrNetnsExtractionMissing
}
// now start the PU
if err := r.handler.Policy.HandlePUEvent(
handlePUCtx,
puID,
common.EventStart,
puRuntime,
); err != nil {
if policy.IsErrPUAlreadyActivated(err) {
// abort early if this PU has already been activated before
zap.L().Debug("PU has already been activated", zap.String("puID", puID), zap.Error(err))
} else {
zap.L().Error("failed to handle start event", zap.String("puID", puID), zap.Error(err))
r.recorder.Eventf(pod, "Warning", "PUStart", "PU '%s' failed to start: %s", puID, err.Error())
return reconcile.Result{}, ErrHandlePUStartEventFailed
}
} else {
r.recorder.Eventf(pod, "Normal", "PUStart", "PU '%s' started successfully", puID)
}
// if this is a host network pod, we need to program the net_cls cgroup
if pod.Spec.HostNetwork {
netclsProgramCtx, netclsProgramCancel := context.WithTimeout(ctx, r.netclsProgramTimeout)
defer netclsProgramCancel()
if err := r.netclsProgrammer(netclsProgramCtx, pod, puRuntime); err != nil {
if extractors.IsErrNetclsAlreadyProgrammed(err) {
zap.L().Warn("net_cls cgroup has already been programmed previously", zap.String("puID", puID), zap.Error(err))
} else if extractors.IsErrNoHostNetworkPod(err) {
zap.L().Error("net_cls cgroup programmer told us that this is no host network pod. Aborting.", zap.String("puID", puID), zap.Error(err))
} else {
zap.L().Error("failed to program net_cls cgroup of pod", zap.String("puID", puID), zap.Error(err))
r.recorder.Eventf(pod, "Warning", "PUStart", "Host Network PU '%s' failed to program its net_cls cgroups: %s", puID, err.Error())
return reconcile.Result{}, err
}
} else {
zap.L().Debug("net_cls cgroup has been successfully programmed for trireme", zap.String("puID", puID))
r.recorder.Eventf(pod, "Normal", "PUStart", "Host Network PU '%s' has successfully programmed its net_cls cgroups", puID)
}
}
}
return reconcile.Result{}, nil
case corev1.PodSucceeded:
fallthrough
case corev1.PodFailed:
zap.L().Debug("PodSucceeded / PodFailed", zap.String("puID", puID))
err := r.handler.Policy.HandlePUEvent(
handlePUCtx,
puID,
common.EventStop,
policy.NewPURuntimeWithDefaults(),
)
if err != nil {
if policy.IsErrPUNotFound(err) || policy.IsErrPUCreateFailed(err) {
// not found means nothing needed stopping
// just return
zap.L().Debug("failed to handle stop event (IsErrPUNotFound==true)", zap.String("puID", puID), zap.Error(err))
return reconcile.Result{}, nil
}
zap.L().Error("failed to handle stop event", zap.String("puID", puID), zap.Error(err))
r.recorder.Eventf(pod, "Warning", "PUStop", "PU '%s' failed to stop: %s", puID, err.Error())
return reconcile.Result{}, ErrHandlePUStopEventFailed
}
r.recorder.Eventf(pod, "Normal", "PUStop", "PU '%s' has been successfully stopped", puID)
return reconcile.Result{}, nil
case corev1.PodUnknown:
zap.L().Error("pod is in unknown state", zap.String("puID", puID))
// we don't need to retry, there is nothing *we* can do about it to fix this
return reconcile.Result{}, nil
default:
zap.L().Error("unknown pod phase", zap.String("puID", puID), zap.String("podPhase", string(pod.Status.Phase)))
// we don't need to retry, there is nothing *we* can do about it to fix this
return reconcile.Result{}, nil
}
}