Skip to content

Commit

Permalink
k8s: delete IPs from ipcache for no running Pods
Browse files Browse the repository at this point in the history
In Kubernetes, a Job creates a pod which will complete with either
the "Succeeded" or "Failed" PodPhase. Kubernetes will leave these
Pods around until the Job is deleted by the operator. As soon the pod
enters either one of the previously described PodPhases, Kubelet will
send a CNI delete event to Cilium agent which will then release the
allocated IP addresses of that pod, making the IP address available
again.

If not disabled, Cilium will create a Cilium Endpoint for each Pod in
the cluster that has its network managed by Cilium.

Cilium agent populates the ipcache with the information retrieved from
Pods and Cilium Endpoints events, in case of duplicated information,
ipcache will be stored with the state from Cilium Endpoints.

In a unlikely case of Cilium agent not running and the Pod enters the
"Succeeded" state, it will mean the Cilium agent will not be available
to delete the Cilium Endpoint created for that Pod.

To complement this fix, Cilium agents will also prune Cilium Endpoints
of not running pods on start up.

Signed-off-by: André Martins <andre@cilium.io>
  • Loading branch information
aanm committed Sep 25, 2020
1 parent 2988760 commit b3adc4d
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 56 deletions.
3 changes: 3 additions & 0 deletions daemon/cmd/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ type dummyEpSyncher struct{}
func (epSync *dummyEpSyncher) RunK8sCiliumEndpointSync(e *endpoint.Endpoint, conf endpoint.EndpointStatusConfiguration) {
}

func (epSync *dummyEpSyncher) DeleteK8sCiliumEndpointSync(e *endpoint.Endpoint) {
}

func (ds *DaemonSuite) SetUpSuite(c *C) {
// Register metrics once before running the suite
_, ds.collectors = metrics.CreateConfiguration([]string{"cilium_endpoint_state"})
Expand Down
3 changes: 3 additions & 0 deletions daemon/cmd/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,9 @@ func (d *dummyManager) AllocateID(id uint16) (uint16, error) {
func (d *dummyManager) RunK8sCiliumEndpointSync(*endpoint.Endpoint, endpoint.EndpointStatusConfiguration) {
}

func (d *dummyManager) DeleteK8sCiliumEndpointSync(e *endpoint.Endpoint) {
}

func (d *dummyManager) UpdateReferences(map[id.PrefixType]string, *endpoint.Endpoint) {
}

Expand Down
1 change: 1 addition & 0 deletions daemon/cmd/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func (d *Daemon) restoreOldEndpoints(state *endpointRestoreState, clean bool) er
if err != nil {
// Disconnected EPs are not failures, clean them silently below
if !ep.IsDisconnecting() {
d.endpointManager.DeleteK8sCiliumEndpointSync(ep)
scopedLog.WithError(err).Warningf("Unable to restore endpoint, ignoring")
failed++
}
Expand Down
100 changes: 72 additions & 28 deletions operator/k8s_cep_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ package main

import (
"context"
"time"

operatorOption "github.com/cilium/cilium/operator/option"
"github.com/cilium/cilium/operator/watchers"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/k8s"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/core/v1"
k8sUtils "github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/logging/logfields"

"github.com/sirupsen/logrus"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -37,24 +41,37 @@ import (
// - for each CEP
// delete CEP if the corresponding pod does not exist
// CiliumEndpoint objects have the same name as the pod they represent
func enableCiliumEndpointSyncGC() {
func enableCiliumEndpointSyncGC(once bool) {
var (
controllerName = "to-k8s-ciliumendpoint-gc"
scopedLog = log.WithField("controller", controllerName)
gcInterval time.Duration
stopCh = make(chan struct{})
)

log.Info("Starting to garbage collect stale CiliumEndpoint custom resources...")

ciliumClient := ciliumK8sClient.CiliumV2()

if once {
log.Info("Running the garbage collector only once to clean up leftover CiliumEndpoint custom resources...")
gcInterval = 0
} else {
log.Info("Starting to garbage collect stale CiliumEndpoint custom resources...")
gcInterval = operatorOption.Config.EndpointGCInterval
}

// This functions will block until the resources are synced with k8s.
watchers.CiliumEndpointsInit(ciliumClient)
watchers.PodsInit(k8s.WatcherCli())
watchers.CiliumEndpointsInit(ciliumClient, stopCh)
if !once {
// If we are running this function "once" it means that we
// will delete all CEPs in the cluster regardless of the pod
// state.
watchers.PodsInit(k8s.WatcherCli(), stopCh)
}

// this dummy manager is needed only to add this controller to the global list
controller.NewManager().UpdateController(controllerName,
controller.ControllerParams{
RunInterval: operatorOption.Config.EndpointGCInterval,
RunInterval: gcInterval,
DoFunc: func(ctx context.Context) error {
// For each CEP we fetched, check if we know about it
for _, cepObj := range watchers.CiliumEndpointStore.List() {
Expand All @@ -64,34 +81,61 @@ func enableCiliumEndpointSyncGC() {
Errorf("Saw %T object while expecting *cilium_v2.CiliumEndpoint", cepObj)
continue
}

cepFullName := cep.Namespace + "/" + cep.Name
_, exists, err := watchers.PodStore.GetByKey(cepFullName)
if err != nil {
scopedLog.WithError(err).Warn("Unable to get pod from store")
continue
}
if !exists {
// FIXME: this is fragile as we might have received the
// CEP notification first but not the pod notification
// so we need to have a similar mechanism that we have
// for the keep alive of security identities.
scopedLog = scopedLog.WithFields(logrus.Fields{
logfields.EndpointID: cep.Status.ID,
logfields.K8sPodName: cepFullName,
})
scopedLog.Debug("Orphaned CiliumEndpoint is being garbage collected")
PropagationPolicy := meta_v1.DeletePropagationBackground // because these are const strings but the API wants pointers
err := ciliumClient.CiliumEndpoints(cep.Namespace).Delete(
ctx,
cep.Name,
meta_v1.DeleteOptions{PropagationPolicy: &PropagationPolicy})
scopedLog = scopedLog.WithFields(logrus.Fields{
logfields.K8sPodName: cepFullName,
})

// If we are running this function "once" it means that we
// will delete all CEPs in the cluster regardless of the pod
// state therefore we won't even watch for the pod store.
if !once {
podObj, exists, err := watchers.PodStore.GetByKey(cepFullName)
if err != nil {
scopedLog.WithError(err).Debug("Unable to delete orphaned CEP")
scopedLog.WithError(err).Warn("Unable to get pod from store")
continue
}
if exists {
pod := podObj.(*slim_corev1.Pod)
if !ok {
log.WithField(logfields.Object, podObj).
Errorf("Saw %T object while expecting *slim_corev1.Pod", podObj)
continue
}
// In Kubernetes Jobs, Pods can be left in Kubernetes until the Job
// is deleted. If the Job is never deleted, Cilium will never receive a Pod
// delete event, causing the IP to be left in the ipcache.
// For this reason we should delete the ipcache entries whenever the pod
// status is either PodFailed or PodSucceeded as it means the IP address
// is no longer in use.
if k8sUtils.IsPodRunning(pod.Status) {
continue
}
}
}
// FIXME: this is fragile as we might have received the
// CEP notification first but not the pod notification
// so we need to have a similar mechanism that we have
// for the keep alive of security identities.
scopedLog = scopedLog.WithFields(logrus.Fields{
logfields.EndpointID: cep.Status.ID,
})
scopedLog.Debug("Orphaned CiliumEndpoint is being garbage collected")
PropagationPolicy := meta_v1.DeletePropagationBackground // because these are const strings but the API wants pointers
err := ciliumClient.CiliumEndpoints(cep.Namespace).Delete(
ctx,
cep.Name,
meta_v1.DeleteOptions{PropagationPolicy: &PropagationPolicy})
if !k8serrors.IsNotFound(err) {
scopedLog.WithError(err).Warning("Unable to delete orphaned CEP")
return err
}
}
// We have cleaned up all CEPs from Kubernetes so we can stop
// the k8s watchers.
if once {
close(stopCh)
}
return nil
},
})
Expand Down
3 changes: 2 additions & 1 deletion operator/k8s_pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/core/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
Expand All @@ -39,7 +40,7 @@ var (

func enableUnmanagedKubeDNSController() {
// These functions will block until the resources are synced with k8s.
watchers.CiliumEndpointsInit(k8s.CiliumClient().CiliumV2())
watchers.CiliumEndpointsInit(k8s.CiliumClient().CiliumV2(), wait.NeverStop)
watchers.UnmanagedPodsInit(k8s.WatcherCli())

controller.NewManager().UpdateController("restart-unmanaged-kube-dns",
Expand Down
7 changes: 6 additions & 1 deletion operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,12 @@ func onOperatorStartLeading(ctx context.Context) {
}

if operatorOption.Config.EndpointGCInterval != 0 {
enableCiliumEndpointSyncGC()
enableCiliumEndpointSyncGC(false)
} else {
// Even if the EndpointGC is disabled we still want it to run at least
// once. This is to prevent leftover CEPs from populating ipcache with
// stale entries.
enableCiliumEndpointSyncGC(true)
}

err = enableCNPWatcher(apiextensionsK8sClient)
Expand Down
7 changes: 3 additions & 4 deletions operator/watchers/cilium_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
)

Expand Down Expand Up @@ -67,7 +66,7 @@ func identityIndexFunc(obj interface{}) ([]string, error) {
}

// CiliumEndpointsInit starts a CiliumEndpointWatcher
func CiliumEndpointsInit(ciliumNPClient cilium_cli.CiliumV2Interface) {
func CiliumEndpointsInit(ciliumNPClient cilium_cli.CiliumV2Interface, stopCh <-chan struct{}) {
once.Do(func() {
CiliumEndpointStore = cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, indexers)

Expand All @@ -80,9 +79,9 @@ func CiliumEndpointsInit(ciliumNPClient cilium_cli.CiliumV2Interface) {
convertToCiliumEndpoint,
CiliumEndpointStore,
)
go ciliumEndpointInformer.Run(wait.NeverStop)
go ciliumEndpointInformer.Run(stopCh)

cache.WaitForCacheSync(wait.NeverStop, ciliumEndpointInformer.HasSynced)
cache.WaitForCacheSync(stopCh, ciliumEndpointInformer.HasSynced)
close(CiliumEndpointsSynced)
})
}
Expand Down
12 changes: 9 additions & 3 deletions operator/watchers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
UnmanagedPodStoreSynced = make(chan struct{})
)

func PodsInit(k8sClient kubernetes.Interface) {
func PodsInit(k8sClient kubernetes.Interface, stopCh <-chan struct{}) {
var podInformer cache.Controller
PodStore, podInformer = informer.NewInformer(
cache.NewListWatchFromClient(k8sClient.CoreV1().RESTClient(),
Expand All @@ -57,9 +57,9 @@ func PodsInit(k8sClient kubernetes.Interface) {
cache.ResourceEventHandlerFuncs{},
convertToPod,
)
go podInformer.Run(wait.NeverStop)
go podInformer.Run(stopCh)

cache.WaitForCacheSync(wait.NeverStop, podInformer.HasSynced)
cache.WaitForCacheSync(stopCh, podInformer.HasSynced)
close(PodStoreSynced)
}

Expand All @@ -76,6 +76,9 @@ func convertToPod(obj interface{}) interface{} {
Namespace: concreteObj.Namespace,
ResourceVersion: concreteObj.ResourceVersion,
},
Status: slim_corev1.PodStatus{
Phase: concreteObj.Status.Phase,
},
}
*concreteObj = slim_corev1.Pod{}
return p
Expand All @@ -93,6 +96,9 @@ func convertToPod(obj interface{}) interface{} {
Namespace: pod.Namespace,
ResourceVersion: pod.ResourceVersion,
},
Status: slim_corev1.PodStatus{
Phase: pod.Status.Phase,
},
},
}
// Small GC optimization
Expand Down
1 change: 1 addition & 0 deletions pkg/endpointmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type EndpointManager struct {
// resources with Kubernetes.
type EndpointResourceSynchronizer interface {
RunK8sCiliumEndpointSync(ep *endpoint.Endpoint, conf endpoint.EndpointStatusConfiguration)
DeleteK8sCiliumEndpointSync(e *endpoint.Endpoint)
}

// NewEndpointManager creates a new EndpointManager.
Expand Down
3 changes: 3 additions & 0 deletions pkg/endpointmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type dummyEpSyncher struct{}
func (epSync *dummyEpSyncher) RunK8sCiliumEndpointSync(e *endpoint.Endpoint, conf endpoint.EndpointStatusConfiguration) {
}

func (epSync *dummyEpSyncher) DeleteK8sCiliumEndpointSync(e *endpoint.Endpoint) {
}

func (s *EndpointManagerSuite) TestLookup(c *C) {
ep := endpoint.NewEndpointWithState(s, &endpoint.FakeEndpointProxy{}, &allocator.FakeIdentityAllocator{}, 10, endpoint.StateReady)
mgr := NewEndpointManager(&dummyEpSyncher{})
Expand Down
15 changes: 13 additions & 2 deletions pkg/k8s/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"fmt"
"sort"

slimcorev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/core/v1"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/core/v1"
"github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/labels"
"github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/selection"
"github.com/cilium/cilium/pkg/option"
Expand Down Expand Up @@ -111,7 +111,7 @@ func GetServiceListOptionsModifier() (func(options *v1meta.ListOptions), error)

// ValidIPs return a sorted slice of unique IP addresses retrieved from the given PodStatus.
// Returns an error when no IPs are found.
func ValidIPs(podStatus slimcorev1.PodStatus) ([]string, error) {
func ValidIPs(podStatus slim_corev1.PodStatus) ([]string, error) {
if len(podStatus.PodIPs) == 0 && len(podStatus.PodIP) == 0 {
return nil, fmt.Errorf("empty PodIPs")
}
Expand All @@ -134,3 +134,14 @@ func ValidIPs(podStatus slimcorev1.PodStatus) ([]string, error) {
sort.Strings(ips)
return ips, nil
}

// IsPodRunning returns true if the pod is considered to be in running state.
// We consider a Running pod a pod that does not report a Failed nor a Succeeded
// pod Phase.
func IsPodRunning(status slim_corev1.PodStatus) bool {
switch status.Phase {
case slim_corev1.PodFailed, slim_corev1.PodSucceeded:
return false
}
return true
}
Loading

0 comments on commit b3adc4d

Please sign in to comment.