diff --git a/pkg/controller/ipam/antrea_ipam_controller.go b/pkg/controller/ipam/antrea_ipam_controller.go index a57ad39110c..68bd3483a17 100644 --- a/pkg/controller/ipam/antrea_ipam_controller.go +++ b/pkg/controller/ipam/antrea_ipam_controller.go @@ -20,10 +20,10 @@ package ipam import ( "fmt" "strings" - "sync" "time" appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -47,9 +47,6 @@ import ( const ( controllerName = "AntreaIPAMController" - addEventIndication = "a" - delEventIndication = "d" - // StatefulSet index name for IPPool cache. statefulSetIndex = "statefulSet" @@ -68,8 +65,6 @@ type AntreaIPAMController struct { // Pool cleanup events triggered by StatefulSet add/delete statefulSetQueue workqueue.RateLimitingInterface - // StatefulSet objects would be stored here until add event is processed - statefulSetMap sync.Map // follow changes for Namespace objects namespaceLister corelisters.NamespaceLister @@ -113,7 +108,6 @@ func NewAntreaIPAMController(crdClient versioned.Interface, c := &AntreaIPAMController{ crdClient: crdClient, statefulSetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "statefulSetPreallocationAndCleanup"), - statefulSetMap: sync.Map{}, namespaceLister: namespaceInformer.Lister(), namespaceListerSynced: namespaceInformer.Informer().HasSynced, statefulSetInformer: statefulSetInformer, @@ -141,9 +135,7 @@ func (c *AntreaIPAMController) enqueueStatefulSetCreateEvent(obj interface{}) { klog.V(2).InfoS("Create notification", "Namespace", ss.Namespace, "StatefulSet", ss.Name) key := k8s.NamespacedName(ss.Namespace, ss.Name) - - c.statefulSetMap.Store(key, ss) - c.statefulSetQueue.Add(addEventIndication + key) + c.statefulSetQueue.Add(key) } // Enqueue the StatefulSet delete notification to be processed by the worker @@ -152,7 +144,7 @@ func (c *AntreaIPAMController) enqueueStatefulSetDeleteEvent(obj interface{}) { klog.V(2).InfoS("Delete notification", "Namespace", ss.Namespace, "StatefulSet", ss.Name) key := k8s.NamespacedName(ss.Namespace, ss.Name) - c.statefulSetQueue.Add(delEventIndication + key) + c.statefulSetQueue.Add(key) } // Inspect all IPPools for stale IP Address entries. @@ -176,7 +168,7 @@ func (c *AntreaIPAMController) cleanupStaleStatefulSets() { if _, ok := statefulSetMap[key]; !ok { // This entry refers to StatefulSet that no longer exists klog.InfoS("IPPool contains stale IPAddress for StatefulSet that no longer exists", "IPPool", ipPool.Name, "StatefulSet", key) - c.statefulSetQueue.Add(delEventIndication + key) + c.statefulSetQueue.Add(key) // Mark this entry in map to ensure cleanup is enqueued only once statefulSetMap[key] = true } @@ -266,6 +258,9 @@ func (c *AntreaIPAMController) preallocateIPPoolForStatefulSet(ss *appsv1.Statef } size := int(*ss.Spec.Replicas) + // Note that AllocateStatefulSet would not preallocate IPs if this StatefulSet is already present + // in the pool. This safeguards us from double allocation in case agent allocated IP by the time + // controller task is executed. Note also that StatefulSet resize will not be handled. err = allocator.AllocateStatefulSet(ss.Namespace, ss.Name, size) if err != nil { return fmt.Errorf("failed to preallocate continuous IP space of size %d from Pool %s: %s", size, ipPoolName, err) @@ -286,36 +281,31 @@ func (c *AntreaIPAMController) processNextStatefulSetWorkItem() bool { defer c.statefulSetQueue.Done(key) - namespacedName := key.(string)[1:] + namespacedName := key.(string) + namespace, name := k8s.SplitNamespacedName(namespacedName) + ss, err := c.statefulSetInformer.Lister().StatefulSets(namespace).Get(name) - if key.(string)[:1] == delEventIndication { - err := c.cleanIPPoolForStatefulSet(namespacedName) - if err != nil { - // We can not put the event back to the queue because of - // potential recreate scenario (delete + create). - // We rely on garbage collector to clear IPs in case of - // error. - c.statefulSetQueue.Forget(key) - klog.ErrorS(err, "failed to clean IP Pool", "StatefulSet", key) - return true + if err != nil { + if errors.IsNotFound(err) { + // StatefulSet no longer present - clean up reserved pool IPs + err = c.cleanIPPoolForStatefulSet(namespacedName) + if err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.statefulSetQueue.AddRateLimited(key) + klog.ErrorS(err, "Failed to clean IP Pool", "StatefulSet", key) + return true + } + } else { + klog.ErrorS(err, "Failed to get", "StatefulSet", key) } } else { - ss, ok := c.statefulSetMap.Load(namespacedName) - if !ok { - // Object not found in map - should never happen - klog.Errorf("failed to locate StatefulSet %s", namespacedName) - c.statefulSetQueue.Forget(key) - return true - } - err := c.preallocateIPPoolForStatefulSet(ss.(*appsv1.StatefulSet)) - c.statefulSetMap.Delete(key) + // StatefulSet was created - preallocate IPs based on replicas with best effort + err := c.preallocateIPPoolForStatefulSet(ss) if err != nil { // Preallocation is best effort - we do not retry even with transient errors, // since we don't want to implement logic that would delay Pods while waiting for // preallocation. - klog.ErrorS(err, "no IPs preallocated") - c.statefulSetQueue.Forget(key) - return true + klog.ErrorS(err, "No IPs preallocated") } }