From e2989d0bb4f14f5af3fa674ac22fda0ec4669eff Mon Sep 17 00:00:00 2001 From: Anna Khmelnitsky Date: Thu, 24 Mar 2022 21:23:55 +0000 Subject: [PATCH] Refactor preallocation Signed-off-by: Anna Khmelnitsky --- pkg/controller/ipam/antrea_ipam_controller.go | 57 +++++++------------ 1 file changed, 22 insertions(+), 35 deletions(-) diff --git a/pkg/controller/ipam/antrea_ipam_controller.go b/pkg/controller/ipam/antrea_ipam_controller.go index a57ad39110c..7ebff23a777 100644 --- a/pkg/controller/ipam/antrea_ipam_controller.go +++ b/pkg/controller/ipam/antrea_ipam_controller.go @@ -20,10 +20,10 @@ package ipam import ( "fmt" "strings" - "sync" "time" appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -47,9 +47,6 @@ import ( const ( controllerName = "AntreaIPAMController" - addEventIndication = "a" - delEventIndication = "d" - // StatefulSet index name for IPPool cache. statefulSetIndex = "statefulSet" @@ -68,8 +65,6 @@ type AntreaIPAMController struct { // Pool cleanup events triggered by StatefulSet add/delete statefulSetQueue workqueue.RateLimitingInterface - // StatefulSet objects would be stored here until add event is processed - statefulSetMap sync.Map // follow changes for Namespace objects namespaceLister corelisters.NamespaceLister @@ -113,7 +108,6 @@ func NewAntreaIPAMController(crdClient versioned.Interface, c := &AntreaIPAMController{ crdClient: crdClient, statefulSetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "statefulSetPreallocationAndCleanup"), - statefulSetMap: sync.Map{}, namespaceLister: namespaceInformer.Lister(), namespaceListerSynced: namespaceInformer.Informer().HasSynced, statefulSetInformer: statefulSetInformer, @@ -141,9 +135,7 @@ func (c *AntreaIPAMController) enqueueStatefulSetCreateEvent(obj interface{}) { klog.V(2).InfoS("Create notification", "Namespace", ss.Namespace, "StatefulSet", ss.Name) key := k8s.NamespacedName(ss.Namespace, ss.Name) - - c.statefulSetMap.Store(key, ss) - c.statefulSetQueue.Add(addEventIndication + key) + c.statefulSetQueue.Add(key) } // Enqueue the StatefulSet delete notification to be processed by the worker @@ -152,7 +144,7 @@ func (c *AntreaIPAMController) enqueueStatefulSetDeleteEvent(obj interface{}) { klog.V(2).InfoS("Delete notification", "Namespace", ss.Namespace, "StatefulSet", ss.Name) key := k8s.NamespacedName(ss.Namespace, ss.Name) - c.statefulSetQueue.Add(delEventIndication + key) + c.statefulSetQueue.Add(key) } // Inspect all IPPools for stale IP Address entries. @@ -176,7 +168,7 @@ func (c *AntreaIPAMController) cleanupStaleStatefulSets() { if _, ok := statefulSetMap[key]; !ok { // This entry refers to StatefulSet that no longer exists klog.InfoS("IPPool contains stale IPAddress for StatefulSet that no longer exists", "IPPool", ipPool.Name, "StatefulSet", key) - c.statefulSetQueue.Add(delEventIndication + key) + c.statefulSetQueue.Add(key) // Mark this entry in map to ensure cleanup is enqueued only once statefulSetMap[key] = true } @@ -286,36 +278,31 @@ func (c *AntreaIPAMController) processNextStatefulSetWorkItem() bool { defer c.statefulSetQueue.Done(key) - namespacedName := key.(string)[1:] + namespacedName := key.(string) + namespace, name := k8s.SplitNamespacedName(namespacedName) + ss, err := c.statefulSetInformer.Lister().StatefulSets(namespace).Get(name) - if key.(string)[:1] == delEventIndication { - err := c.cleanIPPoolForStatefulSet(namespacedName) - if err != nil { - // We can not put the event back to the queue because of - // potential recreate scenario (delete + create). - // We rely on garbage collector to clear IPs in case of - // error. - c.statefulSetQueue.Forget(key) - klog.ErrorS(err, "failed to clean IP Pool", "StatefulSet", key) - return true + if err != nil { + if errors.IsNotFound(err) { + // StatefulSet no longer present - clean up reserved pool IPs + err = c.cleanIPPoolForStatefulSet(namespacedName) + if err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.statefulSetQueue.AddRateLimited(key) + klog.ErrorS(err, "Failed to clean IP Pool", "StatefulSet", key) + return true + } + } else { + klog.ErrorS(err, "Failed to get", "StatefulSet", key) } } else { - ss, ok := c.statefulSetMap.Load(namespacedName) - if !ok { - // Object not found in map - should never happen - klog.Errorf("failed to locate StatefulSet %s", namespacedName) - c.statefulSetQueue.Forget(key) - return true - } - err := c.preallocateIPPoolForStatefulSet(ss.(*appsv1.StatefulSet)) - c.statefulSetMap.Delete(key) + // StatefulSet was created - preallocate IPs based on replicas with best effort + err := c.preallocateIPPoolForStatefulSet(ss) if err != nil { // Preallocation is best effort - we do not retry even with transient errors, // since we don't want to implement logic that would delay Pods while waiting for // preallocation. - klog.ErrorS(err, "no IPs preallocated") - c.statefulSetQueue.Forget(key) - return true + klog.ErrorS(err, "No IPs preallocated") } }