-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
k8s_cep_gc.go
131 lines (115 loc) · 4.52 KB
/
k8s_cep_gc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright 2016-2018 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"time"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/k8s"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/sirupsen/logrus"
core_v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var (
// ciliumEndpointGCInterval is the interval between attempts of the CEP GC
// controller.
// Note that only one node per cluster should run this, and most iterations
// will simply return.
ciliumEndpointGCInterval time.Duration
)
// enableCiliumEndpointSyncGC starts the node-singleton sweeper for
// CiliumEndpoint objects where the managing node is no longer running. These
// objects are created by the sync-to-k8s-ciliumendpoint controller on each
// Endpoint.
// The general steps are:
// - get list of nodes
// - only run with probability 1/nodes
// - get list of CEPs
// - for each CEP
// delete CEP if the corresponding pod does not exist
// CiliumEndpoint objects have the same name as the pod they represent
func enableCiliumEndpointSyncGC() {
var (
controllerName = "to-k8s-ciliumendpoint-gc"
scopedLog = log.WithField("controller", controllerName)
)
log.Info("Starting to garbage collect stale CiliumEndpoint custom resources...")
ciliumClient := ciliumK8sClient.CiliumV2()
// this dummy manager is needed only to add this controller to the global list
controller.NewManager().UpdateController(controllerName,
controller.ControllerParams{
RunInterval: ciliumEndpointGCInterval,
DoFunc: func(ctx context.Context) error {
var (
listOpts = meta_v1.ListOptions{Limit: 10}
loopStop = time.Now().Add(ciliumEndpointGCInterval)
)
pods, err := k8s.Client().CoreV1().Pods("").List(meta_v1.ListOptions{})
if err != nil {
return err
}
podsCache := map[string]*core_v1.Pod{}
for _, pod := range pods.Items {
podsCache[pod.Namespace+"/"+pod.Name] = &pod
}
perCEPFetch:
for time.Now().Before(loopStop) { // Guard against no-break bugs
time.Sleep(time.Second) // Throttle lookups in case of a busy loop
ceps, err := ciliumClient.CiliumEndpoints(meta_v1.NamespaceAll).List(listOpts)
switch {
case err != nil && k8serrors.IsResourceExpired(err) && ceps.Continue != "":
// This combination means we saw a 410 ResourceExpired error but we
// can iterate on the now-current snapshot. We need to refetch,
// however.
// See https://github.com/kubernetes/apimachinery/blob/master/pkg/apis/meta/v1/types.go#L350-L381
// or the docs for k8s.io/apimachinery/pkg/apis/meta/v1.ListOptions
// vendored into this repo.
listOpts.Continue = ceps.Continue
continue perCEPFetch
case err != nil:
scopedLog.WithError(err).Debug("Cannot list CEPs")
return err
}
// setup listOpts for the next iteration
listOpts.Continue = ceps.Continue
// For each CEP we fetched, check if we know about it
for _, cep := range ceps.Items {
cepFullName := cep.Namespace + "/" + cep.Name
_, exists := podsCache[cepFullName]
if !exists {
// delete
scopedLog = scopedLog.WithFields(logrus.Fields{
logfields.EndpointID: cep.Status.ID,
logfields.K8sPodName: cepFullName,
})
scopedLog.Debug("Orphaned CiliumEndpoint is being garbage collected")
PropagationPolicy := meta_v1.DeletePropagationBackground // because these are const strings but the API wants pointers
if err := ciliumClient.CiliumEndpoints(cep.Namespace).Delete(cep.Name, &meta_v1.DeleteOptions{PropagationPolicy: &PropagationPolicy}); err != nil {
scopedLog.WithError(err).Debug("Unable to delete orphaned CEP")
return err
}
}
}
if ceps.Continue != "" {
// there is more data, continue
continue perCEPFetch
}
break perCEPFetch // break out as a safe default to avoid spammy loops
}
return nil
},
})
}