Skip to content

Commit

Permalink
Refactor preallocation
Browse files Browse the repository at this point in the history
Signed-off-by: Anna Khmelnitsky <akhmelnitsky@vmware.com>
  • Loading branch information
annakhm committed Mar 24, 2022
1 parent 1f7c8a5 commit e2989d0
Showing 1 changed file with 22 additions and 35 deletions.
57 changes: 22 additions & 35 deletions pkg/controller/ipam/antrea_ipam_controller.go
Expand Up @@ -20,10 +20,10 @@ package ipam
import (
"fmt"
"strings"
"sync"
"time"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -47,9 +47,6 @@ import (
const (
controllerName = "AntreaIPAMController"

addEventIndication = "a"
delEventIndication = "d"

// StatefulSet index name for IPPool cache.
statefulSetIndex = "statefulSet"

Expand All @@ -68,8 +65,6 @@ type AntreaIPAMController struct {

// Pool cleanup events triggered by StatefulSet add/delete
statefulSetQueue workqueue.RateLimitingInterface
// StatefulSet objects would be stored here until add event is processed
statefulSetMap sync.Map

// follow changes for Namespace objects
namespaceLister corelisters.NamespaceLister
Expand Down Expand Up @@ -113,7 +108,6 @@ func NewAntreaIPAMController(crdClient versioned.Interface,
c := &AntreaIPAMController{
crdClient: crdClient,
statefulSetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "statefulSetPreallocationAndCleanup"),
statefulSetMap: sync.Map{},
namespaceLister: namespaceInformer.Lister(),
namespaceListerSynced: namespaceInformer.Informer().HasSynced,
statefulSetInformer: statefulSetInformer,
Expand Down Expand Up @@ -141,9 +135,7 @@ func (c *AntreaIPAMController) enqueueStatefulSetCreateEvent(obj interface{}) {
klog.V(2).InfoS("Create notification", "Namespace", ss.Namespace, "StatefulSet", ss.Name)

key := k8s.NamespacedName(ss.Namespace, ss.Name)

c.statefulSetMap.Store(key, ss)
c.statefulSetQueue.Add(addEventIndication + key)
c.statefulSetQueue.Add(key)
}

// Enqueue the StatefulSet delete notification to be processed by the worker
Expand All @@ -152,7 +144,7 @@ func (c *AntreaIPAMController) enqueueStatefulSetDeleteEvent(obj interface{}) {
klog.V(2).InfoS("Delete notification", "Namespace", ss.Namespace, "StatefulSet", ss.Name)

key := k8s.NamespacedName(ss.Namespace, ss.Name)
c.statefulSetQueue.Add(delEventIndication + key)
c.statefulSetQueue.Add(key)
}

// Inspect all IPPools for stale IP Address entries.
Expand All @@ -176,7 +168,7 @@ func (c *AntreaIPAMController) cleanupStaleStatefulSets() {
if _, ok := statefulSetMap[key]; !ok {
// This entry refers to StatefulSet that no longer exists
klog.InfoS("IPPool contains stale IPAddress for StatefulSet that no longer exists", "IPPool", ipPool.Name, "StatefulSet", key)
c.statefulSetQueue.Add(delEventIndication + key)
c.statefulSetQueue.Add(key)
// Mark this entry in map to ensure cleanup is enqueued only once
statefulSetMap[key] = true
}
Expand Down Expand Up @@ -286,36 +278,31 @@ func (c *AntreaIPAMController) processNextStatefulSetWorkItem() bool {

defer c.statefulSetQueue.Done(key)

namespacedName := key.(string)[1:]
namespacedName := key.(string)
namespace, name := k8s.SplitNamespacedName(namespacedName)
ss, err := c.statefulSetInformer.Lister().StatefulSets(namespace).Get(name)

if key.(string)[:1] == delEventIndication {
err := c.cleanIPPoolForStatefulSet(namespacedName)
if err != nil {
// We can not put the event back to the queue because of
// potential recreate scenario (delete + create).
// We rely on garbage collector to clear IPs in case of
// error.
c.statefulSetQueue.Forget(key)
klog.ErrorS(err, "failed to clean IP Pool", "StatefulSet", key)
return true
if err != nil {
if errors.IsNotFound(err) {
// StatefulSet no longer present - clean up reserved pool IPs
err = c.cleanIPPoolForStatefulSet(namespacedName)
if err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.statefulSetQueue.AddRateLimited(key)
klog.ErrorS(err, "Failed to clean IP Pool", "StatefulSet", key)
return true
}
} else {
klog.ErrorS(err, "Failed to get", "StatefulSet", key)
}
} else {
ss, ok := c.statefulSetMap.Load(namespacedName)
if !ok {
// Object not found in map - should never happen
klog.Errorf("failed to locate StatefulSet %s", namespacedName)
c.statefulSetQueue.Forget(key)
return true
}
err := c.preallocateIPPoolForStatefulSet(ss.(*appsv1.StatefulSet))
c.statefulSetMap.Delete(key)
// StatefulSet was created - preallocate IPs based on replicas with best effort
err := c.preallocateIPPoolForStatefulSet(ss)
if err != nil {
// Preallocation is best effort - we do not retry even with transient errors,
// since we don't want to implement logic that would delay Pods while waiting for
// preallocation.
klog.ErrorS(err, "no IPs preallocated")
c.statefulSetQueue.Forget(key)
return true
klog.ErrorS(err, "No IPs preallocated")
}
}

Expand Down

0 comments on commit e2989d0

Please sign in to comment.