Skip to content

Commit

Permalink
Refactor preallocation
Browse files Browse the repository at this point in the history
Signed-off-by: Anna Khmelnitsky <akhmelnitsky@vmware.com>
  • Loading branch information
annakhm committed Mar 25, 2022
1 parent 1f7c8a5 commit 00dfb7e
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 88 deletions.
62 changes: 27 additions & 35 deletions pkg/controller/ipam/antrea_ipam_controller.go
Expand Up @@ -20,10 +20,10 @@ package ipam
import (
"fmt"
"strings"
"sync"
"time"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -47,9 +47,6 @@ import (
const (
controllerName = "AntreaIPAMController"

addEventIndication = "a"
delEventIndication = "d"

// StatefulSet index name for IPPool cache.
statefulSetIndex = "statefulSet"

Expand All @@ -68,8 +65,6 @@ type AntreaIPAMController struct {

// Pool cleanup events triggered by StatefulSet add/delete
statefulSetQueue workqueue.RateLimitingInterface
// StatefulSet objects would be stored here until add event is processed
statefulSetMap sync.Map

// follow changes for Namespace objects
namespaceLister corelisters.NamespaceLister
Expand Down Expand Up @@ -113,7 +108,6 @@ func NewAntreaIPAMController(crdClient versioned.Interface,
c := &AntreaIPAMController{
crdClient: crdClient,
statefulSetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "statefulSetPreallocationAndCleanup"),
statefulSetMap: sync.Map{},
namespaceLister: namespaceInformer.Lister(),
namespaceListerSynced: namespaceInformer.Informer().HasSynced,
statefulSetInformer: statefulSetInformer,
Expand All @@ -124,6 +118,8 @@ func NewAntreaIPAMController(crdClient versioned.Interface,
}

// Add handlers for Stateful Set events.
// Note that update is not handled here: IP Pool annotation should not be
// updated without recreating the resource
klog.V(2).InfoS("Subscribing for StatefulSet notifications", "controller", controllerName)
statefulSetInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
Expand All @@ -141,9 +137,7 @@ func (c *AntreaIPAMController) enqueueStatefulSetCreateEvent(obj interface{}) {
klog.V(2).InfoS("Create notification", "Namespace", ss.Namespace, "StatefulSet", ss.Name)

key := k8s.NamespacedName(ss.Namespace, ss.Name)

c.statefulSetMap.Store(key, ss)
c.statefulSetQueue.Add(addEventIndication + key)
c.statefulSetQueue.Add(key)
}

// Enqueue the StatefulSet delete notification to be processed by the worker
Expand All @@ -152,7 +146,7 @@ func (c *AntreaIPAMController) enqueueStatefulSetDeleteEvent(obj interface{}) {
klog.V(2).InfoS("Delete notification", "Namespace", ss.Namespace, "StatefulSet", ss.Name)

key := k8s.NamespacedName(ss.Namespace, ss.Name)
c.statefulSetQueue.Add(delEventIndication + key)
c.statefulSetQueue.Add(key)
}

// Inspect all IPPools for stale IP Address entries.
Expand All @@ -176,7 +170,7 @@ func (c *AntreaIPAMController) cleanupStaleStatefulSets() {
if _, ok := statefulSetMap[key]; !ok {
// This entry refers to StatefulSet that no longer exists
klog.InfoS("IPPool contains stale IPAddress for StatefulSet that no longer exists", "IPPool", ipPool.Name, "StatefulSet", key)
c.statefulSetQueue.Add(delEventIndication + key)
c.statefulSetQueue.Add(key)
// Mark this entry in map to ensure cleanup is enqueued only once
statefulSetMap[key] = true
}
Expand Down Expand Up @@ -266,6 +260,9 @@ func (c *AntreaIPAMController) preallocateIPPoolForStatefulSet(ss *appsv1.Statef
}

size := int(*ss.Spec.Replicas)
// Note that AllocateStatefulSet would not preallocate IPs if this StatefulSet is already present
// in the pool. This safeguards us from double allocation in case agent allocated IP by the time
// controller task is executed. Note also that StatefulSet resize will not be handled.
err = allocator.AllocateStatefulSet(ss.Namespace, ss.Name, size)
if err != nil {
return fmt.Errorf("failed to preallocate continuous IP space of size %d from Pool %s: %s", size, ipPoolName, err)
Expand All @@ -286,36 +283,31 @@ func (c *AntreaIPAMController) processNextStatefulSetWorkItem() bool {

defer c.statefulSetQueue.Done(key)

namespacedName := key.(string)[1:]
namespacedName := key.(string)
namespace, name := k8s.SplitNamespacedName(namespacedName)
ss, err := c.statefulSetInformer.Lister().StatefulSets(namespace).Get(name)

if key.(string)[:1] == delEventIndication {
err := c.cleanIPPoolForStatefulSet(namespacedName)
if err != nil {
// We can not put the event back to the queue because of
// potential recreate scenario (delete + create).
// We rely on garbage collector to clear IPs in case of
// error.
c.statefulSetQueue.Forget(key)
klog.ErrorS(err, "failed to clean IP Pool", "StatefulSet", key)
return true
if err != nil {
if errors.IsNotFound(err) {
// StatefulSet no longer present - clean up reserved pool IPs
err = c.cleanIPPoolForStatefulSet(namespacedName)
if err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.statefulSetQueue.AddRateLimited(key)
klog.ErrorS(err, "Failed to clean IP addresses reserved for StatefulSet from IP Pool", "StatefulSet", key)
return true
}
} else {
klog.ErrorS(err, "Failed to get StatefulSet", "StatefulSet", key)
}
} else {
ss, ok := c.statefulSetMap.Load(namespacedName)
if !ok {
// Object not found in map - should never happen
klog.Errorf("failed to locate StatefulSet %s", namespacedName)
c.statefulSetQueue.Forget(key)
return true
}
err := c.preallocateIPPoolForStatefulSet(ss.(*appsv1.StatefulSet))
c.statefulSetMap.Delete(key)
// StatefulSet was created - preallocate IPs based on replicas with best effort
err := c.preallocateIPPoolForStatefulSet(ss)
if err != nil {
// Preallocation is best effort - we do not retry even with transient errors,
// since we don't want to implement logic that would delay Pods while waiting for
// preallocation.
klog.ErrorS(err, "no IPs preallocated")
c.statefulSetQueue.Forget(key)
return true
klog.ErrorS(err, "No IPs reserved for StatefulSet", "StatefulSet", key)
}
}

Expand Down
122 changes: 69 additions & 53 deletions pkg/controller/ipam/antrea_ipam_controller_test.go
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2"

crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
fakecrd "antrea.io/antrea/pkg/client/clientset/versioned/fake"
Expand Down Expand Up @@ -108,69 +109,84 @@ func verifyPoolAllocatedSize(t *testing.T, poolName string, poolLister listers.I
require.NoError(t, err)
}

func testStatefulSetLifecycle(t *testing.T, dedicatedPool bool, replicas int32) {
stopCh := make(chan struct{})
defer close(stopCh)
func TestStatefulSetLifecycle(t *testing.T) {

tests := []struct {
name string
dedicatedPool bool
replicas int32
expectAllocatedSize int
}{
{
name: "Dedicated pool",
dedicatedPool: true,
replicas: 5,
expectAllocatedSize: 5,
},
{
name: "Full reservation of dedicated pool",
dedicatedPool: true,
replicas: 11,
expectAllocatedSize: 11,
},
{
name: "Namespace pool",
dedicatedPool: false,
replicas: 7,
expectAllocatedSize: 7,
},
{
name: "No enough IPs",
dedicatedPool: false,
replicas: 20,
expectAllocatedSize: 0,
},
}

namespace, pool, statefulSet := initTestObjects(!dedicatedPool, dedicatedPool, replicas)
crdClient := fakecrd.NewSimpleClientset(pool)
k8sClient := fake.NewSimpleClientset(namespace, statefulSet)
for _, tt := range tests {
stopCh := make(chan struct{})
defer close(stopCh)

informerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
klog.InfoS("Running", "test", tt.name)
namespace, pool, statefulSet := initTestObjects(!tt.dedicatedPool, tt.dedicatedPool, tt.replicas)
crdClient := fakecrd.NewSimpleClientset(pool)
k8sClient := fake.NewSimpleClientset(namespace, statefulSet)

crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0)
poolInformer := crdInformerFactory.Crd().V1alpha2().IPPools()
poolLister := poolInformer.Lister()
informerFactory := informers.NewSharedInformerFactory(k8sClient, 0)

controller := NewAntreaIPAMController(crdClient, informerFactory, crdInformerFactory)
require.NotNil(t, controller)
informerFactory.Start(stopCh)
crdInformerFactory.Start(stopCh)
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0)
poolInformer := crdInformerFactory.Crd().V1alpha2().IPPools()
poolLister := poolInformer.Lister()

go controller.Run(stopCh)
controller := NewAntreaIPAMController(crdClient, informerFactory, crdInformerFactory)
require.NotNil(t, controller)
informerFactory.Start(stopCh)
crdInformerFactory.Start(stopCh)

var allocator *poolallocator.IPPoolAllocator
var err error
// Wait until pool propagates to the informer
pollErr := wait.PollImmediate(100*time.Millisecond, 1*time.Second, func() (bool, error) {
allocator, err = poolallocator.NewIPPoolAllocator(pool.Name, crdClient, poolLister)
if err != nil {
return false, nil
}
return true, nil
})
require.NoError(t, pollErr)
defer allocator.ReleaseStatefulSet(statefulSet.Namespace, statefulSet.Name)

if int(replicas) < allocator.Total() {
// Verify create event was handled by the controller and preallocation was succesfull
verifyPoolAllocatedSize(t, pool.Name, poolLister, int(replicas))
} else {
// Not enough IPs in the pool - preallocation should fail
verifyPoolAllocatedSize(t, pool.Name, poolLister, 0)
}
go controller.Run(stopCh)

// Delete StatefulSet
k8sClient.AppsV1().StatefulSets(namespace.Name).Delete(context.TODO(), statefulSet.Name, metav1.DeleteOptions{})
var allocator *poolallocator.IPPoolAllocator
var err error
// Wait until pool propagates to the informer
pollErr := wait.PollImmediate(100*time.Millisecond, 1*time.Second, func() (bool, error) {
allocator, err = poolallocator.NewIPPoolAllocator(pool.Name, crdClient, poolLister)
if err != nil {
return false, nil
}
return true, nil
})
require.NoError(t, pollErr)
defer allocator.ReleaseStatefulSet(statefulSet.Namespace, statefulSet.Name)

// Verify Delete event was processed
verifyPoolAllocatedSize(t, pool.Name, poolLister, 0)
}
// Verify create event was handled by the controller
verifyPoolAllocatedSize(t, pool.Name, poolLister, tt.expectAllocatedSize)

// This test verifies preallocation of IPs for dedicated IP pool annotation.
func TestStatefulSetLifecycle_DedicatedPool(t *testing.T) {
testStatefulSetLifecycle(t, true, 5)
}
// Delete StatefulSet
k8sClient.AppsV1().StatefulSets(namespace.Name).Delete(context.TODO(), statefulSet.Name, metav1.DeleteOptions{})

// This test verifies preallocation of IPs for IP pool annotation based on StatefulSet Namespace.
func TestStatefulSetLifecycle_NamespacePool(t *testing.T) {
testStatefulSetLifecycle(t, false, 7)
}

// This test verifies use case when continuous IP range can not be preallocated.
// However we don't expect error since preallocation is best-effort feature.
func TestStatefulSetLifecycle_NoPreallocation(t *testing.T) {
testStatefulSetLifecycle(t, false, 20)
// Verify Delete event was processed
verifyPoolAllocatedSize(t, pool.Name, poolLister, 0)
}
}

// Test for cleanup on controller startup: stale addresses that belong no StatefulSet objects
Expand Down

0 comments on commit 00dfb7e

Please sign in to comment.