Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement garbage collector for IP Pools #3672

Merged
merged 1 commit into from May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
81 changes: 64 additions & 17 deletions pkg/controller/ipam/antrea_ipam_controller.go
Expand Up @@ -18,12 +18,14 @@
package ipam

import (
"context"
"fmt"
"strings"
"time"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -52,13 +54,13 @@ const (

minRetryDelay = 5 * time.Second
maxRetryDelay = 300 * time.Second

garbageCollectionInterval = 10 * time.Minute
)

// AntreaIPAMController is responsible for IP address cleanup
// for StatefulSet objects.
// In future, it will also be responsible for pre-allocation of
// continuous IP range for StatefulSets that do not have dedicated
// IP Pool annotation.
// AntreaIPAMController is responsible for:
// * reserving continuous IP address space for StatefulSet (if available)
// * periodical cleanup of IP Pools in case stale addresses are present
tnqn marked this conversation as resolved.
Show resolved Hide resolved
type AntreaIPAMController struct {
// crdClient is the clientset for CRD API group.
crdClient versioned.Interface
Expand All @@ -74,6 +76,10 @@ type AntreaIPAMController struct {
statefulSetInformer appsinformers.StatefulSetInformer
statefulSetListerSynced cache.InformerSynced

// follow changes for Pods
podLister corelisters.PodLister
podInformerSynced cache.InformerSynced

// follow changes for IP Pool objects
ipPoolInformer crdinformers.IPPoolInformer
ipPoolLister crdlisters.IPPoolLister
Expand Down Expand Up @@ -102,6 +108,7 @@ func NewAntreaIPAMController(crdClient versioned.Interface,
ipPoolInformer.Informer().AddIndexers(cache.Indexers{statefulSetIndex: statefulSetIndexFunc})

namespaceInformer := informerFactory.Core().V1().Namespaces()
podInformer := informerFactory.Core().V1().Pods()

statefulSetInformer := informerFactory.Apps().V1().StatefulSets()

Expand All @@ -112,6 +119,8 @@ func NewAntreaIPAMController(crdClient versioned.Interface,
namespaceListerSynced: namespaceInformer.Informer().HasSynced,
statefulSetInformer: statefulSetInformer,
statefulSetListerSynced: statefulSetInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podInformerSynced: podInformer.Informer().HasSynced,
ipPoolInformer: ipPoolInformer,
ipPoolLister: ipPoolInformer.Lister(),
ipPoolListerSynced: ipPoolInformer.Informer().HasSynced,
Expand Down Expand Up @@ -152,31 +161,67 @@ func (c *AntreaIPAMController) enqueueStatefulSetDeleteEvent(obj interface{}) {
// Inspect all IPPools for stale IP Address entries.
// This may happen if controller was down during StatefulSet delete event.
// If such entry is found, enqueue cleanup event for this StatefulSet.
func (c *AntreaIPAMController) cleanupStaleStatefulSets() {
func (c *AntreaIPAMController) cleanupStaleAddresses() {
pools, _ := c.ipPoolLister.List(labels.Everything())
lister := c.statefulSetInformer.Lister()
statefulSets, _ := lister.List(labels.Everything())
statefulSetMap := make(map[string]bool)

klog.InfoS("Cleanup job for IP Pools started")

for _, ss := range statefulSets {
// Prepare map of existing StatefulSets for quick reference below
statefulSetMap[k8s.NamespacedName(ss.Namespace, ss.Name)] = true
}

poolsUpdated := 0
for _, ipPool := range pools {
for _, address := range ipPool.Status.IPAddresses {
updateNeeded := false
ipPoolCopy := ipPool.DeepCopy()
var newList []crdv1a2.IPAddressState
for _, address := range ipPoolCopy.Status.IPAddresses {
// Cleanup reserved addresses
if address.Owner.Pod != nil {
_, err := c.podLister.Pods(address.Owner.Pod.Namespace).Get(address.Owner.Pod.Name)
if err != nil && errors.IsNotFound(err) {
klog.InfoS("IPPool contains stale IPAddress for Pod that no longer exists", "IPPool", ipPool.Name, "Namespace", address.Owner.Pod.Namespace, "Pod", address.Owner.Pod.Name)
address.Owner.Pod = nil
if address.Owner.StatefulSet != nil {
address.Phase = crdv1a2.IPAddressPhaseReserved
}
updateNeeded = true
}
}
if address.Owner.StatefulSet != nil {
key := k8s.NamespacedName(address.Owner.StatefulSet.Namespace, address.Owner.StatefulSet.Name)
if _, ok := statefulSetMap[key]; !ok {
// This entry refers to StatefulSet that no longer exists
klog.InfoS("IPPool contains stale IPAddress for StatefulSet that no longer exists", "IPPool", ipPool.Name, "StatefulSet", key)
c.statefulSetQueue.Add(key)
// Mark this entry in map to ensure cleanup is enqueued only once
statefulSetMap[key] = true
klog.InfoS("IPPool contains stale IPAddress for StatefulSet that no longer exists", "IPPool", ipPool.Name, "Namespace", address.Owner.StatefulSet.Namespace, "StatefulSet", address.Owner.StatefulSet.Name)
address.Owner.StatefulSet = nil
updateNeeded = true

}
}

if address.Owner.StatefulSet != nil || address.Owner.Pod != nil {
newList = append(newList, address)
}
}

if updateNeeded {
ipPoolCopy.Status.IPAddresses = newList
_, err := c.crdClient.CrdV1alpha2().IPPools().UpdateStatus(context.TODO(), ipPoolCopy, metav1.UpdateOptions{})
if err != nil {
// Next cleanup job will retry
klog.ErrorS(err, "Updating IP Pool status failed", "IPPool", ipPool.Name)
} else {
poolsUpdated += 1
}

}
}

klog.InfoS("Cleanup job for IP Pools finished", "updated", poolsUpdated)
}

// Look for an IP Pool associated with this StatefulSet.
Expand Down Expand Up @@ -263,9 +308,11 @@ func (c *AntreaIPAMController) preallocateIPPoolForStatefulSet(ss *appsv1.Statef
// Note that AllocateStatefulSet would not preallocate IPs if this StatefulSet is already present
// in the pool. This safeguards us from double allocation in case agent allocated IP by the time
// controller task is executed. Note also that StatefulSet resize will not be handled.
err = allocator.AllocateStatefulSet(ss.Namespace, ss.Name, size)
if err != nil {
return fmt.Errorf("failed to preallocate continuous IP space of size %d from Pool %s: %s", size, ipPoolName, err)
if size > 0 {
err = allocator.AllocateStatefulSet(ss.Namespace, ss.Name, size)
if err != nil {
return fmt.Errorf("failed to preallocate continuous IP space of size %d from Pool %s: %s", size, ipPoolName, err)
}
}

return nil
Expand Down Expand Up @@ -323,13 +370,13 @@ func (c *AntreaIPAMController) Run(stopCh <-chan struct{}) {
klog.InfoS("Starting", "controller", controllerName)
defer klog.InfoS("Shutting down", "controller", controllerName)

cacheSyncs := []cache.InformerSynced{c.namespaceListerSynced, c.statefulSetListerSynced, c.ipPoolListerSynced}
cacheSyncs := []cache.InformerSynced{c.namespaceListerSynced, c.podInformerSynced, c.statefulSetListerSynced, c.ipPoolListerSynced}
if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) {
return
}

// Make sure any stale StatefulSets that might be present in pools are cleaned up
c.cleanupStaleStatefulSets()
// Periodic cleanup IP Pools of stale IP addresses
go wait.NonSlidingUntil(c.cleanupStaleAddresses, garbageCollectionInterval, stopCh)

go wait.Until(c.statefulSetWorker, time.Second, stopCh)

Expand Down
41 changes: 36 additions & 5 deletions pkg/controller/ipam/antrea_ipam_controller_test.go
Expand Up @@ -19,6 +19,7 @@ package ipam

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -195,7 +196,7 @@ func TestReleaseStaleAddresses(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)

namespace, pool, statefulSet := initTestObjects(true, false, 7)
namespace, pool, statefulSet := initTestObjects(true, false, 0)

activeSetOwner := crdv1a2.StatefulSetOwner{
Name: statefulSet.Name,
Expand All @@ -207,16 +208,26 @@ func TestReleaseStaleAddresses(t *testing.T) {
Namespace: namespace.Name,
}

stalePodOwner := crdv1a2.PodOwner{
Name: uuid.New().String(),
Namespace: namespace.Name,
}

addresses := []crdv1a2.IPAddressState{
{IPAddress: "10.2.2.12",
Phase: crdv1a2.IPAddressPhaseReserved,
Owner: crdv1a2.IPAddressOwner{StatefulSet: &activeSetOwner}},
{IPAddress: "20.1.1.100",
{IPAddress: "20.2.2.13",
Phase: crdv1a2.IPAddressPhaseReserved,
Owner: crdv1a2.IPAddressOwner{StatefulSet: &staleSetOwner}},
{IPAddress: "20.1.1.200",
{IPAddress: "20.2.2.14",
Phase: crdv1a2.IPAddressPhaseReserved,
Owner: crdv1a2.IPAddressOwner{StatefulSet: &staleSetOwner}},
{IPAddress: "20.2.2.15",
Phase: crdv1a2.IPAddressPhaseAllocated,
Owner: crdv1a2.IPAddressOwner{StatefulSet: &activeSetOwner,
Pod: &stalePodOwner},
},
}

pool.Status = crdv1a2.IPPoolStatus{
Expand All @@ -239,6 +250,26 @@ func TestReleaseStaleAddresses(t *testing.T) {

go controller.Run(stopCh)

// after cleanup pool should have single entry
verifyPoolAllocatedSize(t, pool.Name, poolLister, 1)
// verify two stale entries were deleted, one updated to Reserved status
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
pool, err := poolLister.Get(pool.Name)
if err != nil {
return false, nil
}

if len(pool.Status.IPAddresses) != 2 {
t.Logf("IP Pool status: %v", pool.Status.IPAddresses)
return false, nil
}

for _, addr := range pool.Status.IPAddresses {
if addr.Phase != crdv1a2.IPAddressPhaseReserved {
return true, fmt.Errorf("Incorrect phase %s after cleanup", addr.Phase)
}
}

return true, nil
})

require.NoError(t, err)
}