Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support defining IPAM pools using CiliumPodIPPool CRD #25824

Merged
merged 10 commits into from
Jun 7, 2023
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ CRDS_CILIUM_V2ALPHA1 := ciliumendpointslices \
ciliumloadbalancerippools \
ciliumnodeconfigs \
ciliumcidrgroups \
ciliuml2announcementpolicies
ciliuml2announcementpolicies \
ciliumpodippools

manifests: ## Generate K8s manifests e.g. CRD, RBAC etc.
$(eval TMPDIR := $(shell mktemp -d -t cilium.tmpXXXXXXXX))
$(QUIET)$(GO) run sigs.k8s.io/controller-tools/cmd/controller-gen $(CRD_OPTIONS) paths=$(CRD_PATHS) output:crd:artifacts:config="$(TMPDIR)"
Expand Down Expand Up @@ -687,4 +689,4 @@ force :;
# it exists here so the entire source code repo can be mounted into the container.
CILIUM_BUILDER_IMAGE=$(shell cat images/cilium/Dockerfile | grep "ARG CILIUM_BUILDER_IMAGE=" | cut -d"=" -f2)
run_bpf_tests:
docker run -v $$(pwd):/src --privileged -w /src -e RUN_WITH_SUDO=false $(CILIUM_BUILDER_IMAGE) "make" "-C" "test/" "run_bpf_tests"
docker run -v $$(pwd):/src --privileged -w /src -e RUN_WITH_SUDO=false $(CILIUM_BUILDER_IMAGE) "make" "-C" "test/" "run_bpf_tests"
6 changes: 6 additions & 0 deletions install/kubernetes/cilium/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,9 @@ annotations:
description: |
CiliumL2AnnouncementPolicy is a policy which determines which service IPs will be announced to
the local area network, by which nodes, and via which interfaces.
- kind: CiliumPodIPPool
version: v2alpha1
name: ciliumpodippools.cilium.io
displayName: Cilium Pod IP Pool
description: |
CiliumPodIPPool defines an IP pool that can be used for pooled IPAM (i.e. the multi-pool IPAM mode).
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ rules:
- ciliumnodeconfigs
- ciliumcidrgroups
- ciliuml2announcementpolicies
- ciliumpodippools
verbs:
- list
- watch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,12 @@ rules:
- ciliumnodeconfigs.cilium.io
- ciliumcidrgroups.cilium.io
- ciliuml2announcementpolicies.cilium.io
- ciliumpodippools.cilium.io
- apiGroups:
- cilium.io
resources:
- ciliumloadbalancerippools
- ciliumpodippools
verbs:
- get
- list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ rules:
- ciliumnodeconfigs
- ciliumcidrgroups
- ciliuml2announcementpolicies
- ciliumpodippools
verbs:
- list
- watch
Expand Down
6 changes: 6 additions & 0 deletions operator/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,12 @@ func (legacy *legacyOnLeader) onStart(_ hive.HookContext) error {
log.WithError(err).Fatalf("Unable to init %s allocator", ipamMode)
}

if pooledAlloc, ok := alloc.(operatorWatchers.PooledAllocatorProvider); ok {
// The following operation will block until all pools are restored, thus it
// is safe to continue starting node allocation right after return.
operatorWatchers.StartIPPoolAllocator(legacy.ctx, legacy.clientset, pooledAlloc, legacy.resources.CiliumPodIPPools)
}

nm, err := alloc.Start(legacy.ctx, &ciliumNodeUpdateImplementation{legacy.clientset})
if err != nil {
log.WithError(err).Fatalf("Unable to start %s allocator", ipamMode)
Expand Down
10 changes: 6 additions & 4 deletions operator/k8s/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
k8s.EndpointsResource,
k8s.LBIPPoolsResource,
k8s.CiliumIdentityResource,
k8s.CiliumPodIPPoolResource,
),
)
)
Expand All @@ -35,8 +36,9 @@ var (
type Resources struct {
cell.In

Services resource.Resource[*slim_corev1.Service]
Endpoints resource.Resource[*k8s.Endpoints]
LBIPPools resource.Resource[*cilium_api_v2alpha1.CiliumLoadBalancerIPPool]
Identities resource.Resource[*cilium_api_v2.CiliumIdentity]
Services resource.Resource[*slim_corev1.Service]
Endpoints resource.Resource[*k8s.Endpoints]
LBIPPools resource.Resource[*cilium_api_v2alpha1.CiliumLoadBalancerIPPool]
Identities resource.Resource[*cilium_api_v2.CiliumIdentity]
CiliumPodIPPools resource.Resource[*cilium_api_v2alpha1.CiliumPodIPPool]
}
58 changes: 58 additions & 0 deletions operator/watchers/cilium_podippool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package watchers

import (
"context"

cilium_v2alpha1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
"github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/resource"
)

// PooledAllocatorProvider defines the functions of IPAM provider front-end which additionally allow
// definition of IP pools at runtime.
// This is implemented by e.g. pkg/ipam/allocator/multipool
type PooledAllocatorProvider interface {
UpsertPool(ctx context.Context, pool *cilium_v2alpha1.CiliumPodIPPool) error
DeletePool(ctx context.Context, pool *cilium_v2alpha1.CiliumPodIPPool) error
}

func StartIPPoolAllocator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How hard would it be to implement this as a hive module instead? I suppose we'd need to pull out PooledAllocatorProvider first from the legacy code plus make alloc.Start depend on IPPoolAllocator to make sure it's started first?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought about implementing this as a hive cell. Unfortunately, it would require lifting quite a bit of IPAM code around. For example, I think we cannot make alloc.Start depend on IPPoolAllocator unconditionally, since only once specific implementation (namely ipam/allocator/multipool.Allocator) requires the pooled allocator, while all the others don't. Initially, I wanted to include IPPoolAllocator into ipam/allocator.AllocatorProvider, but IIRC that lead to nasty cyclic dependencies. Also, given this PR is already quite large and should make it into the tree for 1.14, I didn't want to include additional refactoring.

Admittedly it's not ideal to introduce new code in a non-modular fashion, but based on the above I'd defer addressing modularization of this code together with the other IPAM bits to a follow-up PR. I'll discuss the specifics with @gandro and @cilium/sig-foundations in a Slack thread or GH issue. Hope that's OK?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, given the feature freeze coming up in a week, I don't think it's realistic to tackle a larger refactor at the moment unfortunately 😿

The upside is that this is very little and simple code, so it should not make any future refactor more complicated

ctx context.Context,
clientset client.Clientset,
allocator PooledAllocatorProvider,
ipPools resource.Resource[*cilium_v2alpha1.CiliumPodIPPool],
) {
log.Info("Starting CiliumPodIPPool allocator watcher")

synced := make(chan struct{})

go func() {
for ev := range ipPools.Events(ctx) {
var err error
var action string

switch ev.Kind {
case resource.Sync:
close(synced)
case resource.Upsert:
err = allocator.UpsertPool(ctx, ev.Object)
action = "upsert"
case resource.Delete:
err = allocator.DeletePool(ctx, ev.Object)
action = "delete"
}
ev.Done(err)
if err != nil {
log.WithError(err).Errorf("failed to %s pool %q", action, ev.Key)
}
}
}()

// Block until all pools are restored, so callers can safely start node allocation
// right after return.
<-synced
log.Info("All CiliumPodIPPool resources synchronized")
}
3 changes: 3 additions & 0 deletions pkg/ipam/allocator/clusterpool/cidralloc/cidralloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package cidralloc
import (
"fmt"
"net"
"net/netip"

"github.com/cilium/cilium/pkg/ip"
"github.com/cilium/cilium/pkg/ipam/cidrset"
Expand All @@ -20,6 +21,8 @@ type CIDRAllocator interface {
IsAllocated(cidr *net.IPNet) (bool, error)
IsFull() bool
InRange(cidr *net.IPNet) bool
IsClusterCIDR(cidr *net.IPNet) bool
Prefix() netip.Prefix
}

type ErrCIDRCollision struct {
Expand Down
46 changes: 41 additions & 5 deletions pkg/ipam/allocator/multipool/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
operatorOption "github.com/cilium/cilium/operator/option"
"github.com/cilium/cilium/pkg/ipam"
"github.com/cilium/cilium/pkg/ipam/allocator"
cilium_v2alpha1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
)

// Allocator implements allocator.AllocatorProvider
Expand Down Expand Up @@ -86,11 +87,11 @@ func (a *Allocator) Init(ctx context.Context) (err error) {
}

log.WithFields(logrus.Fields{
"pool-name": poolName,
"ipv4-cidrs": pool.ipv4CIDRs,
"ipv4-masksize": pool.ipv4MaskSize,
"ipv6-cidrs": pool.ipv6CIDRs,
"ipv6-masksize": pool.ipv6MaskSize,
"pool-name": poolName,
"ipv4-cidrs": pool.ipv4CIDRs,
"ipv4-mask-size": pool.ipv4MaskSize,
"ipv6-cidrs": pool.ipv6CIDRs,
"ipv6-mask-size": pool.ipv6MaskSize,
}).Debug("adding pool")
if addErr := a.poolAlloc.AddPool(poolName, pool.ipv4CIDRs, pool.ipv4MaskSize, pool.ipv6CIDRs, pool.ipv6MaskSize); addErr != nil {
tklauser marked this conversation as resolved.
Show resolved Hide resolved
err = errors.Join(err, fmt.Errorf("failed to add IP pool %s to allocator: %w", poolName, addErr))
Expand All @@ -104,3 +105,38 @@ func (a *Allocator) Init(ctx context.Context) (err error) {
func (a *Allocator) Start(ctx context.Context, getterUpdater ipam.CiliumNodeGetterUpdater) (allocator.NodeEventHandler, error) {
return NewNodeHandler(a.poolAlloc, getterUpdater), nil
}

func (a *Allocator) UpsertPool(ctx context.Context, pool *cilium_v2alpha1.CiliumPodIPPool) error {
log.WithFields(logrus.Fields{
"pool-name": pool.Name,
"ipv4-cidrs": pool.Spec.IPv4.CIDRs,
"ipv4-mask-size": pool.Spec.IPv4.MaskSize,
"ipv6-cidrs": pool.Spec.IPv6.CIDRs,
"ipv6-mask-size": pool.Spec.IPv6.MaskSize,
}).Debug("upserting pool")

ipv4CIDRs := make([]string, len(pool.Spec.IPv4.CIDRs))
for i, cidr := range pool.Spec.IPv4.CIDRs {
ipv4CIDRs[i] = string(cidr)
}
ipv6CIDRs := make([]string, len(pool.Spec.IPv6.CIDRs))
for i, cidr := range pool.Spec.IPv6.CIDRs {
ipv6CIDRs[i] = string(cidr)
}

return a.poolAlloc.UpsertPool(
tklauser marked this conversation as resolved.
Show resolved Hide resolved
pool.Name,
ipv4CIDRs,
int(pool.Spec.IPv4.MaskSize),
ipv6CIDRs,
int(pool.Spec.IPv6.MaskSize),
)
}

func (a *Allocator) DeletePool(ctx context.Context, pool *cilium_v2alpha1.CiliumPodIPPool) error {
log.WithFields(logrus.Fields{
"pool-name": pool.Name,
}).Debug("deleting pool")

return a.poolAlloc.DeletePool(pool.Name)
}
67 changes: 31 additions & 36 deletions pkg/ipam/allocator/multipool/node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ import (
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/ipam"
"github.com/cilium/cilium/pkg/ipam/allocator"

"github.com/cilium/cilium/pkg/logging/logfields"

v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging/logfields"
)

type NodeHandler struct {
Expand All @@ -26,7 +24,7 @@ type NodeHandler struct {
nodeUpdater ipam.CiliumNodeGetterUpdater

nodesPendingAllocation map[string]*v2.CiliumNode
nodesPendingK8sUpdate map[string]*v2.CiliumNode
restoreFinished bool

controllerManager *controller.Manager
}
Expand All @@ -38,7 +36,6 @@ func NewNodeHandler(manager *PoolAllocator, nodeUpdater ipam.CiliumNodeGetterUpd
poolManager: manager,
nodeUpdater: nodeUpdater,
nodesPendingAllocation: map[string]*v2.CiliumNode{},
nodesPendingK8sUpdate: map[string]*v2.CiliumNode{},
controllerManager: controller.NewManager(),
}
}
Expand Down Expand Up @@ -66,77 +63,75 @@ func (n *NodeHandler) Delete(resource *v2.CiliumNode) {
Warning("Errors while release node and its CIDRs")
}

delete(n.nodesPendingAllocation, resource.Name)

// Make sure any pending update controller is stopped
n.controllerManager.RemoveController(controllerName(resource.Name))
}

func (n *NodeHandler) Resync(ctx context.Context, time time.Time) {
func (n *NodeHandler) Resync(context.Context, time.Time) {
n.mutex.Lock()
defer n.mutex.Unlock()

n.poolManager.RestoreFinished()
for _, cn := range n.nodesPendingAllocation {
delete(n.nodesPendingAllocation, cn.Name)
n.upsertLocked(cn)
n.createUpsertController(cn)
}
n.restoreFinished = true
n.nodesPendingAllocation = nil
}

func (n *NodeHandler) upsertLocked(resource *v2.CiliumNode) bool {
err := n.poolManager.AllocateToNode(resource)
if err != nil {
if errors.Is(err, ErrAllocatorNotReady) {
n.nodesPendingAllocation[resource.Name] = resource
return false // try again later
} else {
log.WithField(logfields.NodeName, resource.Name).WithError(err).
Warning("Failed to allocate PodCIDRs to node")
}
}

// refreshNode is set to true if the node needs to be refreshed before
// performing the update
refreshNode := false
// errorMessage is written to the resource status
errorMessage := ""
if err != nil {
errorMessage = err.Error()
if !n.restoreFinished {
n.nodesPendingAllocation[resource.Name] = resource
err := n.poolManager.AllocateToNode(resource)
return err == nil
}
n.createUpsertController(resource)
return true
}

func (n *NodeHandler) createUpsertController(resource *v2.CiliumNode) {
// This controller serves two purposes:
// 1. It will retry allocations upon failure, e.g. if a pool does not exist yet.
// 2. Will try to synchronize the allocator's state with the CiliumNode CRD in k8s.
n.controllerManager.UpdateController(controllerName(resource.Name), controller.ControllerParams{
DoFunc: func(ctx context.Context) error {
if refreshNode {
resource, err = n.nodeUpdater.Get(resource.Name)
if err != nil {
return fmt.Errorf("failed to refresh node: %w", err)
}
// errorMessage is written to the resource status
errorMessage := ""
var controllerErr error

err := n.poolManager.AllocateToNode(resource)
if err != nil {
log.WithField(logfields.NodeName, resource.Name).WithError(err).
Warning("Failed to allocate PodCIDRs to node")
errorMessage = err.Error()
controllerErr = err
}

newResource := resource.DeepCopy()
newResource.Status.IPAM.OperatorStatus.Error = errorMessage

n.mutex.Lock()
newResource.Spec.IPAM.Pools.Allocated = n.poolManager.AllocatedPools(newResource.Name)
n.mutex.Unlock()

var controllerErr error
if !newResource.Spec.IPAM.Pools.DeepEqual(&resource.Spec.IPAM.Pools) {
_, err = n.nodeUpdater.Update(resource, newResource)
if err != nil {
refreshNode = true
controllerErr = errors.Join(controllerErr, fmt.Errorf("failed to update spec: %w", err))
}
}

if !newResource.Status.IPAM.OperatorStatus.DeepEqual(&resource.Status.IPAM.OperatorStatus) {
_, err = n.nodeUpdater.UpdateStatus(resource, newResource)
if err != nil {
refreshNode = true
controllerErr = errors.Join(controllerErr, fmt.Errorf("failed to update status: %w", err))
}
}

return controllerErr
},
})
return true
}

func controllerName(nodeName string) string {
Expand Down
18 changes: 18 additions & 0 deletions pkg/ipam/allocator/multipool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ func releaseCIDR(allocators []cidralloc.CIDRAllocator, cidr netip.Prefix) error
return fmt.Errorf("released cidr %s was not part the pool", cidr)
}

func hasCIDR(allocators []cidralloc.CIDRAllocator, cidr netip.Prefix) bool {
ipnet := ip.PrefixToIPNet(cidr)
for _, alloc := range allocators {
if alloc.IsClusterCIDR(ipnet) {
return true
}
}
return false
}

func (c *cidrPool) allocCIDR(family ipam.Family) (netip.Prefix, error) {
switch family {
case ipam.IPv4:
Expand All @@ -110,3 +120,11 @@ func (c *cidrPool) releaseCIDR(cidr netip.Prefix) error {
return releaseCIDR(c.v6, cidr)
}
}

func (c *cidrPool) hasCIDR(cidr netip.Prefix) bool {
if cidr.Addr().Is4() {
return hasCIDR(c.v4, cidr)
} else {
return hasCIDR(c.v6, cidr)
}
}