Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flexible-ipam] Fix IP leak issues #3314

Merged
merged 1 commit into from Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
77 changes: 43 additions & 34 deletions pkg/agent/cniserver/ipam/antrea_ipam.go
Expand Up @@ -17,11 +17,13 @@ package ipam
import (
"fmt"
"net"
"sync"
"time"

"github.com/containernetworking/cni/pkg/invoke"
cnitypes "github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

argtypes "antrea.io/antrea/pkg/agent/cniserver/types"
Expand All @@ -37,14 +39,23 @@ const (
// if present. If annotation is not present, the driver will delegate functionality
// to traditional IPAM driver.
type AntreaIPAM struct {
controller *AntreaIPAMController
controller *AntreaIPAMController
controllerMutex sync.RWMutex
}

// Global variable is needed to work around order of initialization
// Controller will be assigned to the driver after it is initialized
// by agent init.
var antreaIPAMDriver *AntreaIPAM

type mineType uint8

const (
mineUnknown mineType = iota
mineFalse
mineTrue
)

// Resource needs to be unique since it is used as identifier in Del.
// Therefore Container ID is used, while Pod/Namespace are shown for visibility.
// TODO: Consider multi-interface case
Expand Down Expand Up @@ -97,6 +108,8 @@ func generateIPConfig(ip net.IP, prefixLength int, gwIP net.IP) (*current.IPConf
}

func (d *AntreaIPAM) setController(controller *AntreaIPAMController) {
d.controllerMutex.Lock()
defer d.controllerMutex.Unlock()
d.controller = controller
}

Expand All @@ -105,9 +118,9 @@ func (d *AntreaIPAM) setController(controller *AntreaIPAMController) {
func (d *AntreaIPAM) Add(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkConfig []byte) (bool, *current.Result, error) {
mine, allocator, ips, reservedOwner, err := d.owns(k8sArgs)
if err != nil {
return mine, nil, err
return true, nil, err
}
if !mine {
if mine == mineFalse {
// pass this request to next driver
return false, nil, nil
}
Expand Down Expand Up @@ -141,38 +154,27 @@ func (d *AntreaIPAM) Add(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkCo

// Del deletes IP associated with resource from IP Pool status
func (d *AntreaIPAM) Del(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkConfig []byte) (bool, error) {
swallowNotFoundError := func(err error) error {
if errors.IsNotFound(err) {
// The specified IP Pool does not exist, swallow the error
// because it is not recoverable
klog.Warningf("Antrea IPAM Del failed: %+v", err)
return nil
}

return err
}

mine, allocator, _, _, err := d.owns(k8sArgs)
if !mine {
if mine == mineFalse || mine == mineUnknown {
// pass this request to next driver
return false, nil
}
if err != nil {
return true, swallowNotFoundError(err)
return true, err
}

owner := getAllocationOwner(args, k8sArgs, nil)
err = allocator.ReleaseContainerIfPresent(owner.Pod.ContainerID)
return true, swallowNotFoundError(err)
return true, err
}

// Check verifues IP associated with resource is tracked in IP Pool status
func (d *AntreaIPAM) Check(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkConfig []byte) (bool, error) {
mine, allocator, _, _, err := d.owns(k8sArgs)
if err != nil {
return mine, err
return true, err
}
if !mine {
if mine == mineFalse {
// pass this request to next driver
return false, nil
}
Expand All @@ -194,10 +196,26 @@ func (d *AntreaIPAM) Check(args *invoke.Args, k8sArgs *argtypes.K8sArgs, network
// Antrea IPAM annotation for the resource (only Namespace annotation is supported as
// of today). If annotation is not present, or annotated IP Pool not found, the driver
// will not own the request and fall back to next IPAM driver.
func (d *AntreaIPAM) owns(k8sArgs *argtypes.K8sArgs) (bool, *poolallocator.IPPoolAllocator, []net.IP, *crdv1a2.IPAddressOwner, error) {
if d.controller == nil {
klog.Warningf("Antrea IPAM driver failed to initialize due to inconsistent configuration. Falling back to default IPAM")
return false, nil, nil, nil, nil
// return types:
// mineUnknown + PodNotFound error
// mineUnknown + InvalidIPAnnotation error
// mineFalse + nil error
// mineTrue + timeout error
// mineTrue + IPPoolNotFound error
// mineTrue + nil error
func (d *AntreaIPAM) owns(k8sArgs *argtypes.K8sArgs) (mineType, *poolallocator.IPPoolAllocator, []net.IP, *crdv1a2.IPAddressOwner, error) {
// Wait controller ready to avoid inappropriate behavior on CNI request
if err := wait.PollImmediate(500*time.Millisecond, 5*time.Second, func() (bool, error) {
d.controllerMutex.RLock()
defer d.controllerMutex.RUnlock()
if d.controller == nil {
klog.Warningf("Antrea IPAM driver is not ready.")
return false, nil
}
return true, nil
}); err != nil {
// return mineTrue to make this request failed and kubelet will retry
return mineTrue, nil, nil, nil, err
}

// As of today, only Namespace annotation is supported
Expand All @@ -206,16 +224,7 @@ func (d *AntreaIPAM) owns(k8sArgs *argtypes.K8sArgs) (bool, *poolallocator.IPPoo
namespace := string(k8sArgs.K8S_POD_NAMESPACE)
podName := string(k8sArgs.K8S_POD_NAME)
klog.V(2).InfoS("Inspecting IPAM annotation", "Namespace", namespace, "Pod", podName)
allocator, ips, reservedOwner, err := d.controller.getPoolAllocatorByPod(namespace, podName)
if err != nil {
// Failed to find pool - error should be returned from this driver
return true, nil, nil, nil, err
}
if allocator == nil {
// No pool annotation for this namespace
return false, nil, nil, nil, nil
}
return true, allocator, ips, reservedOwner, nil
return d.controller.getPoolAllocatorByPod(namespace, podName)
}

func init() {
Expand Down
13 changes: 7 additions & 6 deletions pkg/agent/cniserver/ipam/antrea_ipam_controller.go
Expand Up @@ -105,8 +105,6 @@ func InitializeAntreaIPAMController(kubeClient clientset.Interface, crdClient cl
return nil, fmt.Errorf("Antrea IPAM driver failed to initialize")
}

antreaIPAMDriver.setController(antreaIPAMController)

return antreaIPAMController, nil
}

Expand All @@ -121,6 +119,7 @@ func (c *AntreaIPAMController) Run(stopCh <-chan struct{}) {
if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.namespaceInformer.Informer().HasSynced, c.ipPoolInformer.Informer().HasSynced, c.podInformer.HasSynced) {
return
}
antreaIPAMDriver.setController(c)

<-stopCh
}
Expand Down Expand Up @@ -204,14 +203,16 @@ ownerReferenceLoop:
return strings.Split(annotations, AntreaIPAMAnnotationDelimiter), ips, reservedOwner, ipErr
}

func (c *AntreaIPAMController) getPoolAllocatorByPod(namespace, podName string) (*poolallocator.IPPoolAllocator, []net.IP, *crdv1a2.IPAddressOwner, error) {
func (c *AntreaIPAMController) getPoolAllocatorByPod(namespace, podName string) (mineType, *poolallocator.IPPoolAllocator, []net.IP, *crdv1a2.IPAddressOwner, error) {
poolNames, ips, reservedOwner, err := c.getIPPoolsByPod(namespace, podName)
if err != nil || len(poolNames) < 1 {
return nil, nil, nil, err
if err != nil {
return mineUnknown, nil, nil, nil, err
} else if len(poolNames) < 1 {
return mineFalse, nil, nil, nil, nil
}
// Only one pool is supported as of today
// TODO - support a pool for each IP version
ipPool := poolNames[0]
allocator, err := poolallocator.NewIPPoolAllocator(ipPool, c.crdClient, c.ipPoolLister)
return allocator, ips, reservedOwner, err
return mineTrue, allocator, ips, reservedOwner, err
}