Skip to content

Commit

Permalink
Implement garbage collector for IP Pools (#3672)
Browse files Browse the repository at this point in the history
A periodic task would go over all IP Pools and clean up
all allocations and reservations for which owner no longer
exists in k8s.

Signed-off-by: Anna Khmelnitsky <akhmelnitsky@vmware.com>
  • Loading branch information
annakhm committed May 5, 2022
1 parent 8f451f7 commit 09d11e9
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 22 deletions.
81 changes: 64 additions & 17 deletions pkg/controller/ipam/antrea_ipam_controller.go
Expand Up @@ -18,12 +18,14 @@
package ipam

import (
"context"
"fmt"
"strings"
"time"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -52,13 +54,13 @@ const (

minRetryDelay = 5 * time.Second
maxRetryDelay = 300 * time.Second

garbageCollectionInterval = 10 * time.Minute
)

// AntreaIPAMController is responsible for IP address cleanup
// for StatefulSet objects.
// In future, it will also be responsible for pre-allocation of
// continuous IP range for StatefulSets that do not have dedicated
// IP Pool annotation.
// AntreaIPAMController is responsible for:
// * reserving continuous IP address space for StatefulSet (if available)
// * periodical cleanup of IP Pools in case stale addresses are present
type AntreaIPAMController struct {
// crdClient is the clientset for CRD API group.
crdClient versioned.Interface
Expand All @@ -74,6 +76,10 @@ type AntreaIPAMController struct {
statefulSetInformer appsinformers.StatefulSetInformer
statefulSetListerSynced cache.InformerSynced

// follow changes for Pods
podLister corelisters.PodLister
podInformerSynced cache.InformerSynced

// follow changes for IP Pool objects
ipPoolInformer crdinformers.IPPoolInformer
ipPoolLister crdlisters.IPPoolLister
Expand Down Expand Up @@ -102,6 +108,7 @@ func NewAntreaIPAMController(crdClient versioned.Interface,
ipPoolInformer.Informer().AddIndexers(cache.Indexers{statefulSetIndex: statefulSetIndexFunc})

namespaceInformer := informerFactory.Core().V1().Namespaces()
podInformer := informerFactory.Core().V1().Pods()

statefulSetInformer := informerFactory.Apps().V1().StatefulSets()

Expand All @@ -112,6 +119,8 @@ func NewAntreaIPAMController(crdClient versioned.Interface,
namespaceListerSynced: namespaceInformer.Informer().HasSynced,
statefulSetInformer: statefulSetInformer,
statefulSetListerSynced: statefulSetInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podInformerSynced: podInformer.Informer().HasSynced,
ipPoolInformer: ipPoolInformer,
ipPoolLister: ipPoolInformer.Lister(),
ipPoolListerSynced: ipPoolInformer.Informer().HasSynced,
Expand Down Expand Up @@ -152,31 +161,67 @@ func (c *AntreaIPAMController) enqueueStatefulSetDeleteEvent(obj interface{}) {
// Inspect all IPPools for stale IP Address entries.
// This may happen if controller was down during StatefulSet delete event.
// If such entry is found, enqueue cleanup event for this StatefulSet.
func (c *AntreaIPAMController) cleanupStaleStatefulSets() {
func (c *AntreaIPAMController) cleanupStaleAddresses() {
pools, _ := c.ipPoolLister.List(labels.Everything())
lister := c.statefulSetInformer.Lister()
statefulSets, _ := lister.List(labels.Everything())
statefulSetMap := make(map[string]bool)

klog.InfoS("Cleanup job for IP Pools started")

for _, ss := range statefulSets {
// Prepare map of existing StatefulSets for quick reference below
statefulSetMap[k8s.NamespacedName(ss.Namespace, ss.Name)] = true
}

poolsUpdated := 0
for _, ipPool := range pools {
for _, address := range ipPool.Status.IPAddresses {
updateNeeded := false
ipPoolCopy := ipPool.DeepCopy()
var newList []crdv1a2.IPAddressState
for _, address := range ipPoolCopy.Status.IPAddresses {
// Cleanup reserved addresses
if address.Owner.Pod != nil {
_, err := c.podLister.Pods(address.Owner.Pod.Namespace).Get(address.Owner.Pod.Name)
if err != nil && errors.IsNotFound(err) {
klog.InfoS("IPPool contains stale IPAddress for Pod that no longer exists", "IPPool", ipPool.Name, "Namespace", address.Owner.Pod.Namespace, "Pod", address.Owner.Pod.Name)
address.Owner.Pod = nil
if address.Owner.StatefulSet != nil {
address.Phase = crdv1a2.IPAddressPhaseReserved
}
updateNeeded = true
}
}
if address.Owner.StatefulSet != nil {
key := k8s.NamespacedName(address.Owner.StatefulSet.Namespace, address.Owner.StatefulSet.Name)
if _, ok := statefulSetMap[key]; !ok {
// This entry refers to StatefulSet that no longer exists
klog.InfoS("IPPool contains stale IPAddress for StatefulSet that no longer exists", "IPPool", ipPool.Name, "StatefulSet", key)
c.statefulSetQueue.Add(key)
// Mark this entry in map to ensure cleanup is enqueued only once
statefulSetMap[key] = true
klog.InfoS("IPPool contains stale IPAddress for StatefulSet that no longer exists", "IPPool", ipPool.Name, "Namespace", address.Owner.StatefulSet.Namespace, "StatefulSet", address.Owner.StatefulSet.Name)
address.Owner.StatefulSet = nil
updateNeeded = true

}
}

if address.Owner.StatefulSet != nil || address.Owner.Pod != nil {
newList = append(newList, address)
}
}

if updateNeeded {
ipPoolCopy.Status.IPAddresses = newList
_, err := c.crdClient.CrdV1alpha2().IPPools().UpdateStatus(context.TODO(), ipPoolCopy, metav1.UpdateOptions{})
if err != nil {
// Next cleanup job will retry
klog.ErrorS(err, "Updating IP Pool status failed", "IPPool", ipPool.Name)
} else {
poolsUpdated += 1
}

}
}

klog.InfoS("Cleanup job for IP Pools finished", "updated", poolsUpdated)
}

// Look for an IP Pool associated with this StatefulSet.
Expand Down Expand Up @@ -263,9 +308,11 @@ func (c *AntreaIPAMController) preallocateIPPoolForStatefulSet(ss *appsv1.Statef
// Note that AllocateStatefulSet would not preallocate IPs if this StatefulSet is already present
// in the pool. This safeguards us from double allocation in case agent allocated IP by the time
// controller task is executed. Note also that StatefulSet resize will not be handled.
err = allocator.AllocateStatefulSet(ss.Namespace, ss.Name, size)
if err != nil {
return fmt.Errorf("failed to preallocate continuous IP space of size %d from Pool %s: %s", size, ipPoolName, err)
if size > 0 {
err = allocator.AllocateStatefulSet(ss.Namespace, ss.Name, size)
if err != nil {
return fmt.Errorf("failed to preallocate continuous IP space of size %d from Pool %s: %s", size, ipPoolName, err)
}
}

return nil
Expand Down Expand Up @@ -323,13 +370,13 @@ func (c *AntreaIPAMController) Run(stopCh <-chan struct{}) {
klog.InfoS("Starting", "controller", controllerName)
defer klog.InfoS("Shutting down", "controller", controllerName)

cacheSyncs := []cache.InformerSynced{c.namespaceListerSynced, c.statefulSetListerSynced, c.ipPoolListerSynced}
cacheSyncs := []cache.InformerSynced{c.namespaceListerSynced, c.podInformerSynced, c.statefulSetListerSynced, c.ipPoolListerSynced}
if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) {
return
}

// Make sure any stale StatefulSets that might be present in pools are cleaned up
c.cleanupStaleStatefulSets()
// Periodic cleanup IP Pools of stale IP addresses
go wait.NonSlidingUntil(c.cleanupStaleAddresses, garbageCollectionInterval, stopCh)

go wait.Until(c.statefulSetWorker, time.Second, stopCh)

Expand Down
41 changes: 36 additions & 5 deletions pkg/controller/ipam/antrea_ipam_controller_test.go
Expand Up @@ -19,6 +19,7 @@ package ipam

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -195,7 +196,7 @@ func TestReleaseStaleAddresses(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)

namespace, pool, statefulSet := initTestObjects(true, false, 7)
namespace, pool, statefulSet := initTestObjects(true, false, 0)

activeSetOwner := crdv1a2.StatefulSetOwner{
Name: statefulSet.Name,
Expand All @@ -207,16 +208,26 @@ func TestReleaseStaleAddresses(t *testing.T) {
Namespace: namespace.Name,
}

stalePodOwner := crdv1a2.PodOwner{
Name: uuid.New().String(),
Namespace: namespace.Name,
}

addresses := []crdv1a2.IPAddressState{
{IPAddress: "10.2.2.12",
Phase: crdv1a2.IPAddressPhaseReserved,
Owner: crdv1a2.IPAddressOwner{StatefulSet: &activeSetOwner}},
{IPAddress: "20.1.1.100",
{IPAddress: "20.2.2.13",
Phase: crdv1a2.IPAddressPhaseReserved,
Owner: crdv1a2.IPAddressOwner{StatefulSet: &staleSetOwner}},
{IPAddress: "20.1.1.200",
{IPAddress: "20.2.2.14",
Phase: crdv1a2.IPAddressPhaseReserved,
Owner: crdv1a2.IPAddressOwner{StatefulSet: &staleSetOwner}},
{IPAddress: "20.2.2.15",
Phase: crdv1a2.IPAddressPhaseAllocated,
Owner: crdv1a2.IPAddressOwner{StatefulSet: &activeSetOwner,
Pod: &stalePodOwner},
},
}

pool.Status = crdv1a2.IPPoolStatus{
Expand All @@ -239,6 +250,26 @@ func TestReleaseStaleAddresses(t *testing.T) {

go controller.Run(stopCh)

// after cleanup pool should have single entry
verifyPoolAllocatedSize(t, pool.Name, poolLister, 1)
// verify two stale entries were deleted, one updated to Reserved status
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
pool, err := poolLister.Get(pool.Name)
if err != nil {
return false, nil
}

if len(pool.Status.IPAddresses) != 2 {
t.Logf("IP Pool status: %v", pool.Status.IPAddresses)
return false, nil
}

for _, addr := range pool.Status.IPAddresses {
if addr.Phase != crdv1a2.IPAddressPhaseReserved {
return true, fmt.Errorf("Incorrect phase %s after cleanup", addr.Phase)
}
}

return true, nil
})

require.NoError(t, err)
}

0 comments on commit 09d11e9

Please sign in to comment.