Skip to content

Commit

Permalink
[flexible-ipam] Fix IP leak issues
Browse files Browse the repository at this point in the history
This commit fixed 2 IP leak issues:
1. AntreaIPAM enabled and the NodeIPAM Pods were deleted.
2. Incoming CniDel request when agent restart.

Signed-off-by: gran <gran@vmware.com>

[flexible-ipam]

Signed-off-by: gran <gran@vmware.com>
  • Loading branch information
gran-vmv committed Mar 21, 2022
1 parent 821b97b commit 50d5cdc
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 40 deletions.
79 changes: 45 additions & 34 deletions pkg/agent/cniserver/ipam/antrea_ipam.go
Expand Up @@ -17,11 +17,13 @@ package ipam
import (
"fmt"
"net"
"sync"
"time"

"github.com/containernetworking/cni/pkg/invoke"
cnitypes "github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

argtypes "antrea.io/antrea/pkg/agent/cniserver/types"
Expand All @@ -37,14 +39,23 @@ const (
// if present. If annotation is not present, the driver will delegate functionality
// to traditional IPAM driver.
type AntreaIPAM struct {
controller *AntreaIPAMController
controller *AntreaIPAMController
controllerMutex sync.RWMutex
}

// Global variable is needed to work around order of initialization
// Controller will be assigned to the driver after it is initialized
// by agent init.
var antreaIPAMDriver *AntreaIPAM

type mineType uint8

const (
mineUnknown mineType = iota
mineFalse
mineTrue
)

// Resource needs to be unique since it is used as identifier in Del.
// Therefore Container ID is used, while Pod/Namespace are shown for visibility.
// TODO: Consider multi-interface case
Expand Down Expand Up @@ -97,6 +108,8 @@ func generateIPConfig(ip net.IP, prefixLength int, gwIP net.IP) (*current.IPConf
}

func (d *AntreaIPAM) setController(controller *AntreaIPAMController) {
d.controllerMutex.Lock()
defer d.controllerMutex.Unlock()
d.controller = controller
}

Expand All @@ -105,9 +118,9 @@ func (d *AntreaIPAM) setController(controller *AntreaIPAMController) {
func (d *AntreaIPAM) Add(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkConfig []byte) (bool, *current.Result, error) {
mine, allocator, ips, reservedOwner, err := d.owns(k8sArgs)
if err != nil {
return mine, nil, err
return true, nil, err
}
if !mine {
if mine == mineFalse {
// pass this request to next driver
return false, nil, nil
}
Expand Down Expand Up @@ -141,38 +154,29 @@ func (d *AntreaIPAM) Add(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkCo

// Del deletes IP associated with resource from IP Pool status
func (d *AntreaIPAM) Del(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkConfig []byte) (bool, error) {
swallowNotFoundError := func(err error) error {
if errors.IsNotFound(err) {
// The specified IP Pool does not exist, swallow the error
// because it is not recoverable
klog.Warningf("Antrea IPAM Del failed: %+v", err)
return nil
}

return err
}

// When AntreaIPAM.owns() is called for a NodeIPAM Pod, the IPPool cannot be found for the Pod. We pass
// swallowNotFoundError to the call to sallow the "NotFound" error.
mine, allocator, _, _, err := d.owns(k8sArgs)
if !mine {
if mine == mineFalse || mine == mineUnknown {
// pass this request to next driver
return false, nil
}
if err != nil {
return true, swallowNotFoundError(err)
return true, err
}

owner := getAllocationOwner(args, k8sArgs, nil)
err = allocator.ReleaseContainerIfPresent(owner.Pod.ContainerID)
return true, swallowNotFoundError(err)
return true, err
}

// Check verifues IP associated with resource is tracked in IP Pool status
func (d *AntreaIPAM) Check(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkConfig []byte) (bool, error) {
mine, allocator, _, _, err := d.owns(k8sArgs)
if err != nil {
return mine, err
return true, err
}
if !mine {
if mine == mineFalse {
// pass this request to next driver
return false, nil
}
Expand All @@ -194,10 +198,26 @@ func (d *AntreaIPAM) Check(args *invoke.Args, k8sArgs *argtypes.K8sArgs, network
// Antrea IPAM annotation for the resource (only Namespace annotation is supported as
// of today). If annotation is not present, or annotated IP Pool not found, the driver
// will not own the request and fall back to next IPAM driver.
func (d *AntreaIPAM) owns(k8sArgs *argtypes.K8sArgs) (bool, *poolallocator.IPPoolAllocator, []net.IP, *crdv1a2.IPAddressOwner, error) {
if d.controller == nil {
klog.Warningf("Antrea IPAM driver failed to initialize due to inconsistent configuration. Falling back to default IPAM")
return false, nil, nil, nil, nil
// return types:
// mineUnknown + PodNotFound error
// mineUnknown + InvalidIPAnnotation error
// mineFalse + nil error
// mineTrue + timeout error
// mineTrue + IPPoolNotFound error
// mineTrue + nil error
func (d *AntreaIPAM) owns(k8sArgs *argtypes.K8sArgs) (mineType, *poolallocator.IPPoolAllocator, []net.IP, *crdv1a2.IPAddressOwner, error) {
// Wait controller ready to avoid inappropriate behavior on CNI request
if err := wait.PollImmediate(500*time.Millisecond, 5*time.Second, func() (bool, error) {
d.controllerMutex.RLock()
defer d.controllerMutex.RUnlock()
if d.controller == nil {
klog.Warningf("Antrea IPAM driver is not ready.")
return false, nil
}
return true, nil
}); err != nil {
// return mineTrue to make this request failed and kubelet will retry
return mineTrue, nil, nil, nil, err
}

// As of today, only Namespace annotation is supported
Expand All @@ -206,16 +226,7 @@ func (d *AntreaIPAM) owns(k8sArgs *argtypes.K8sArgs) (bool, *poolallocator.IPPoo
namespace := string(k8sArgs.K8S_POD_NAMESPACE)
podName := string(k8sArgs.K8S_POD_NAME)
klog.V(2).InfoS("Inspecting IPAM annotation", "Namespace", namespace, "Pod", podName)
allocator, ips, reservedOwner, err := d.controller.getPoolAllocatorByPod(namespace, podName)
if err != nil {
// Failed to find pool - error should be returned from this driver
return true, nil, nil, nil, err
}
if allocator == nil {
// No pool annotation for this namespace
return false, nil, nil, nil, nil
}
return true, allocator, ips, reservedOwner, nil
return d.controller.getPoolAllocatorByPod(namespace, podName)
}

func init() {
Expand Down
13 changes: 7 additions & 6 deletions pkg/agent/cniserver/ipam/antrea_ipam_controller.go
Expand Up @@ -105,8 +105,6 @@ func InitializeAntreaIPAMController(kubeClient clientset.Interface, crdClient cl
return nil, fmt.Errorf("Antrea IPAM driver failed to initialize")
}

antreaIPAMDriver.setController(antreaIPAMController)

return antreaIPAMController, nil
}

Expand All @@ -121,6 +119,7 @@ func (c *AntreaIPAMController) Run(stopCh <-chan struct{}) {
if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.namespaceInformer.Informer().HasSynced, c.ipPoolInformer.Informer().HasSynced, c.podInformer.HasSynced) {
return
}
antreaIPAMDriver.setController(c)

<-stopCh
}
Expand Down Expand Up @@ -204,14 +203,16 @@ ownerReferenceLoop:
return strings.Split(annotations, AntreaIPAMAnnotationDelimiter), ips, reservedOwner, ipErr
}

func (c *AntreaIPAMController) getPoolAllocatorByPod(namespace, podName string) (*poolallocator.IPPoolAllocator, []net.IP, *crdv1a2.IPAddressOwner, error) {
func (c *AntreaIPAMController) getPoolAllocatorByPod(namespace, podName string) (mineType, *poolallocator.IPPoolAllocator, []net.IP, *crdv1a2.IPAddressOwner, error) {
poolNames, ips, reservedOwner, err := c.getIPPoolsByPod(namespace, podName)
if err != nil || len(poolNames) < 1 {
return nil, nil, nil, err
if err != nil {
return mineUnknown, nil, nil, nil, err
} else if len(poolNames) < 1 {
return mineFalse, nil, nil, nil, nil
}
// Only one pool is supported as of today
// TODO - support a pool for each IP version
ipPool := poolNames[0]
allocator, err := poolallocator.NewIPPoolAllocator(ipPool, c.crdClient, c.ipPoolLister)
return allocator, ips, reservedOwner, err
return mineTrue, allocator, ips, reservedOwner, err
}

0 comments on commit 50d5cdc

Please sign in to comment.