Skip to content

Commit

Permalink
ipam/multipool: Fix bug where allocator was unable to update CiliumNode
Browse files Browse the repository at this point in the history
This commit fixes an issue where the multi-pool allocator was unable to
update a CiliumNode resource because of concurrent writes. This
manifested in the following error being emitted repeatedly:

```
level=debug msg="Controller run failed" consecutiveErrors=48
error="failed to update spec: Operation cannot be fulfilled on
ciliumnodes.cilium.io \"kind-worker\": the object has been modified;
please apply your changes to the latest version and try again"
name=ipam-multi-pool-sync-kind-worker subsys=controller
uuid=12ba9a52-d36f-48fe-a7b7-3cf97c2cdb26
```

This would happen because the operator CiliumNode watcher does not call
the `Upsert` function if only the metadata (e.g. resource version,
labels, annotations, etc) of a node changes. This meant that the
allocator was working with a stale `resourceVersion` of the CiliumNode
object, causing any updates to fail until `Upsert` would be called again
because some non-metadata field changed.

This commit fixes that issue by having the controller fetch the most
recent version of the `CiliumNode` if the Kubernetes API reports that
there have been concurrent changes. This behavior matches the behavior
of the cluster-pool and ENI/Azure/AlibabaCloud implementation, which
already correctly fetched the resource again upon conflicts.

In addition, this commit also adds a unit test to test this new
behavior.

Signed-off-by: Sebastian Wicki <sebastian@isovalent.com>
  • Loading branch information
gandro committed Sep 7, 2023
1 parent 4bb74a7 commit 525b1ea
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 3 deletions.
27 changes: 24 additions & 3 deletions pkg/ipam/allocator/multipool/node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"fmt"
"time"

k8sErrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/ipam"
"github.com/cilium/cilium/pkg/ipam/allocator"
Expand All @@ -26,7 +28,8 @@ type NodeHandler struct {
nodesPendingAllocation map[string]*v2.CiliumNode
restoreFinished bool

controllerManager *controller.Manager
controllerManager *controller.Manager
controllerErrorRetryBaseDuration time.Duration // only set in unit tests
}

var ipamMultipoolSyncControllerGroup = controller.NewGroup("ipam-multi-pool-sync")
Expand Down Expand Up @@ -92,13 +95,25 @@ func (n *NodeHandler) createUpsertController(resource *v2.CiliumNode) {
// This controller serves two purposes:
// 1. It will retry allocations upon failure, e.g. if a pool does not exist yet.
// 2. Will try to synchronize the allocator's state with the CiliumNode CRD in k8s.
refetchNode := false
n.controllerManager.UpdateController(controllerName(resource.Name), controller.ControllerParams{
Group: ipamMultipoolSyncControllerGroup,
Group: ipamMultipoolSyncControllerGroup,
ErrorRetryBaseDuration: n.controllerErrorRetryBaseDuration,
DoFunc: func(ctx context.Context) error {
// errorMessage is written to the resource status
errorMessage := ""
var controllerErr error

// If a previous run of the controller failed due to a conflict,
// we need to re-fetch the node to make sure we have the latest version.
if refetchNode {
resource, controllerErr = n.nodeUpdater.Get(resource.Name)
if controllerErr != nil {
return controllerErr
}
refetchNode = false
}

err := n.poolManager.AllocateToNode(resource)
if err != nil {
log.WithField(logfields.NodeName, resource.Name).WithError(err).
Expand All @@ -116,13 +131,19 @@ func (n *NodeHandler) createUpsertController(resource *v2.CiliumNode) {
_, err = n.nodeUpdater.Update(resource, newResource)
if err != nil {
controllerErr = errors.Join(controllerErr, fmt.Errorf("failed to update spec: %w", err))
if k8sErrors.IsConflict(err) {
refetchNode = true
}
}
}

if !newResource.Status.IPAM.OperatorStatus.DeepEqual(&resource.Status.IPAM.OperatorStatus) {
if !newResource.Status.IPAM.OperatorStatus.DeepEqual(&resource.Status.IPAM.OperatorStatus) && !refetchNode {
_, err = n.nodeUpdater.UpdateStatus(resource, newResource)
if err != nil {
controllerErr = errors.Join(controllerErr, fmt.Errorf("failed to update status: %w", err))
if k8sErrors.IsConflict(err) {
refetchNode = true
}
}
}

Expand Down
217 changes: 217 additions & 0 deletions pkg/ipam/allocator/multipool/node_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package multipool

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

ipamTypes "github.com/cilium/cilium/pkg/ipam/types"
v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
)

type k8sNodeMock struct {
OnUpdate func(oldNode, newNode *v2.CiliumNode) (*v2.CiliumNode, error)
OnUpdateStatus func(oldNode, newNode *v2.CiliumNode) (*v2.CiliumNode, error)
OnGet func(node string) (*v2.CiliumNode, error)
OnCreate func(n *v2.CiliumNode) (*v2.CiliumNode, error)
}

func (k *k8sNodeMock) Update(origNode, node *v2.CiliumNode) (*v2.CiliumNode, error) {
if k.OnUpdate != nil {
return k.OnUpdate(origNode, node)
}
panic("d.Update should not be called!")
}

func (k *k8sNodeMock) UpdateStatus(origNode, node *v2.CiliumNode) (*v2.CiliumNode, error) {
if k.OnUpdateStatus != nil {
return k.OnUpdateStatus(origNode, node)
}
panic("d.UpdateStatus should not be called!")
}

func (k *k8sNodeMock) Get(node string) (*v2.CiliumNode, error) {
if k.OnGet != nil {
return k.OnGet(node)
}
panic("d.Get should not be called!")
}

func (k *k8sNodeMock) Create(n *v2.CiliumNode) (*v2.CiliumNode, error) {
if k.OnCreate != nil {
return k.OnCreate(n)
}
panic("d.Create should not be called!")
}

type mockArgs struct {
oldNode *v2.CiliumNode
newNode *v2.CiliumNode
}

type mockResult struct {
node *v2.CiliumNode
err error
}

func TestNodeHandler(t *testing.T) {
backend := NewPoolAllocator()
err := backend.addPool("default", []string{"10.0.0.0/8"}, 24, nil, 0)
assert.NoError(t, err)

onUpdateArgs := make(chan mockArgs)
onUpdateResult := make(chan mockResult)

onUpdateStatusArgs := make(chan mockArgs)
onUpdateStatusResult := make(chan mockResult)

onGetArgs := make(chan string)
onGetResult := make(chan mockResult)
nodeUpdater := &k8sNodeMock{
OnUpdate: func(oldNode, newNode *v2.CiliumNode) (*v2.CiliumNode, error) {
onUpdateArgs <- mockArgs{oldNode, newNode}
r := <-onUpdateResult
return r.node, r.err
},
OnUpdateStatus: func(oldNode, newNode *v2.CiliumNode) (*v2.CiliumNode, error) {
onUpdateStatusArgs <- mockArgs{oldNode, newNode}
r := <-onUpdateStatusResult
return r.node, r.err
},
OnGet: func(node string) (*v2.CiliumNode, error) {
onGetArgs <- node
r := <-onGetResult
return r.node, r.err
},
}
nh := NewNodeHandler(backend, nodeUpdater)

// wait 1ms instead of default 1s base duration in unit tests
nh.controllerErrorRetryBaseDuration = 1 * time.Millisecond

nh.Upsert(&v2.CiliumNode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Spec: v2.NodeSpec{
IPAM: ipamTypes.IPAMSpec{
Pools: ipamTypes.IPAMPoolSpec{
Requested: []ipamTypes.IPAMPoolRequest{
{
Pool: "default",
Needed: ipamTypes.IPAMPoolDemand{IPv4Addrs: 16},
},
},
},
},
},
})

// Tests: Node should only be updated after Resync
select {
case <-onUpdateArgs:
t.Fatal("Update should not have be called before Resync")
default:
}
nh.Resync(context.TODO(), time.Time{})

node1Update := <-onUpdateArgs
assert.Equal(t, "node1", node1Update.newNode.Name)
assert.Len(t, node1Update.newNode.Spec.IPAM.Pools.Allocated, 1)
assert.Equal(t, "default", node1Update.newNode.Spec.IPAM.Pools.Allocated[0].Pool)
onUpdateResult <- mockResult{node: node1Update.newNode}

// Tests: Attempt to occupy already in-use CIDR from node1
nh.Upsert(&v2.CiliumNode{
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
Spec: v2.NodeSpec{
IPAM: ipamTypes.IPAMSpec{
Pools: ipamTypes.IPAMPoolSpec{
Requested: []ipamTypes.IPAMPoolRequest{
{
Pool: "default",
Needed: ipamTypes.IPAMPoolDemand{IPv4Addrs: 16},
},
},
Allocated: node1Update.newNode.Spec.IPAM.Pools.Allocated,
},
},
},
})
node2Update := <-onUpdateArgs
assert.Equal(t, "node2", node2Update.newNode.Name)
assert.Len(t, node2Update.newNode.Spec.IPAM.Pools.Allocated, 1)
assert.Equal(t, "default", node2Update.newNode.Spec.IPAM.Pools.Allocated[0].Pool)
assert.NotEqual(t, node1Update.newNode.Spec.IPAM.Pools.Allocated[0], node2Update.newNode.Spec.IPAM.Pools.Allocated[0].Pool)
onUpdateResult <- mockResult{node: node2Update.newNode}

node2UpdateStatus := <-onUpdateStatusArgs
assert.Equal(t, "node2", node2UpdateStatus.newNode.Name)
assert.Contains(t, node2Update.newNode.Status.IPAM.OperatorStatus.Error, "unable to reuse from pool default")
onUpdateStatusResult <- mockResult{node: node2Update.newNode}

// wait for the controller to retry, this time we reject the update with a conflict error
node2Update = <-onUpdateArgs
assert.Equal(t, "node2", node2Update.newNode.Name)
assert.Len(t, node2Update.newNode.Spec.IPAM.Pools.Allocated, 1)
assert.Equal(t, "default", node2Update.newNode.Spec.IPAM.Pools.Allocated[0].Pool)
ciliumNodeSchema := schema.GroupResource{Group: v2.CustomResourceDefinitionGroup, Resource: v2.CNKindDefinition}
conflictErr := k8sErrors.NewConflict(ciliumNodeSchema, "node2", errors.New("update refused by unit test"))
onUpdateResult <- mockResult{err: conflictErr}

// ensure controller does not attempt to update status of outdated resource
select {
case <-onUpdateStatusArgs:
t.Fatal("UpdateStatus should not have be called after update conflict")
default:
}

// update node2: remove occupied CIDR and add annotation
updatedNode2 := &v2.CiliumNode{
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
Annotations: map[string]string{
"test-annotation": "test-value",
},
},
Spec: v2.NodeSpec{
IPAM: ipamTypes.IPAMSpec{
Pools: ipamTypes.IPAMPoolSpec{
Requested: []ipamTypes.IPAMPoolRequest{
{
Pool: "default",
Needed: ipamTypes.IPAMPoolDemand{IPv4Addrs: 16},
},
},
},
},
},
}

// we now expect the controller to fetch the latest version of node2
node2Get := <-onGetArgs
assert.Equal(t, "node2", node2Get)
onGetResult <- mockResult{node: updatedNode2}

node2Update = <-onUpdateArgs
assert.Equal(t, "node2", node2Update.newNode.Name)
assert.Len(t, node2Update.newNode.Spec.IPAM.Pools.Allocated, 1)
assert.Equal(t, "default", node2Update.newNode.Spec.IPAM.Pools.Allocated[0].Pool)
assert.NotEqual(t, node1Update.newNode.Spec.IPAM.Pools.Allocated[0], node2Update.newNode.Spec.IPAM.Pools.Allocated[0].Pool)
assert.Equal(t, "test-value", node2Update.newNode.Annotations["test-annotation"])
onUpdateResult <- mockResult{node: node2Update.newNode}

nh.Delete(node1Update.newNode)
nh.Delete(node2Update.newNode)
}

0 comments on commit 525b1ea

Please sign in to comment.