Skip to content

Commit

Permalink
Operator: Improve removing stale CEPs from CESs entries on start.
Browse files Browse the repository at this point in the history
Handle situation when the CEP is duplicated in CESs.
Keep only one CEP if the CEP still exists or
delete all the instances if it doesn't exist anymore.
Prefer keeping CEP with the correct identity if such exists.

Signed-off-by: Alan Kutniewski <kutniewski@google.com>
  • Loading branch information
alan-kut authored and dylandreimerink committed Apr 11, 2023
1 parent 0a984a7 commit 80c6ebd
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 44 deletions.
68 changes: 56 additions & 12 deletions operator/pkg/ciliumendpointslice/endpointslice.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cilium/cilium/operator/metrics"
operatorOption "github.com/cilium/cilium/operator/option"
cilium_api_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
capi_v2a1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
Expand Down Expand Up @@ -161,7 +162,7 @@ func (c *CiliumEndpointSliceController) Run(ces cache.Indexer, stopCh <-chan str
c.ciliumEndpointStore = ces

// On operator warm boot, remove stale CEP entries present in CES
c.removeStaleCEPEntries()
c.removeStaleAndDuplicatedCEPEntries()

log.WithFields(logrus.Fields{
logfields.CESSliceMode: c.slicingMode,
Expand All @@ -182,24 +183,67 @@ func (c *CiliumEndpointSliceController) Run(ces cache.Indexer, stopCh <-chan str
// Upon warm boot[restart], Iterate over all CEPs which we got from the api-server
// and compare it with CEPs packed inside CES.
// If there are any stale CEPs present in CESs, remove them from their CES.
func (c *CiliumEndpointSliceController) removeStaleCEPEntries() {
log.Info("Remove stale CEP entries in CES")
// If there are any duplicated CEPs present in CESs, remove all but one trying
// to keep the CEP with matching identity if it's present.
func (c *CiliumEndpointSliceController) removeStaleAndDuplicatedCEPEntries() {
log.Info("Remove stale and duplicated CEP entries in CES")

type cepMapping struct {
identity int64
cesName string
}

cepsMapping := make(map[string][]cepMapping)

// Get all CEPs from local datastore
staleCEPs := c.Manager.getAllCEPNames()
// Map CEP Names to list of whole structure + CES Name
for _, ces := range c.Manager.getAllCESs() {
for _, cep := range ces.getAllCEPs() {
cepName := ces.getCEPNameFromCCEP(&cep)
cepsMapping[cepName] = append(cepsMapping[cepName], cepMapping{identity: cep.IdentityID, cesName: ces.getCESName()})
}
}

// Remove stale CEP entries present in CES
for _, cepName := range staleCEPs {
for cepName, mappings := range cepsMapping {
storeCep, exists, err := c.ciliumEndpointStore.GetByKey(cepName)
// Ignore error from below api, this is added to avoid accidental cep rmeoval from cache
if _, exists, err := c.ciliumEndpointStore.GetByKey(cepName); err == nil && exists || err != nil {
if err != nil {
continue
}
log.WithFields(logrus.Fields{
logfields.CEPName: cepName,
}).Debug("Removing stale CEP entry.")
c.Manager.RemoveCEPFromCache(cepName, DefaultCESSyncTime)
if !exists {
// Remove stale CEP entries present in CES
for _, mapping := range mappings {
log.WithFields(logrus.Fields{
logfields.CEPName: cepName,
}).Debug("Removing stale CEP entry.")
c.Manager.removeCEPFromCES(cepName, mapping.cesName, DefaultCESSyncTime, 0, false)
}
} else if len(mappings) > 1 {
// Remove duplicated CEP entries present in CES
found := false
cep := storeCep.(*cilium_api_v2.CiliumEndpoint)
// Skip first element for now
for _, mapping := range mappings[1:] {
if !found && mapping.identity == cep.Status.Identity.ID {
// Don't remove the first element for which identity matches
found = true
// All others elements will be removed so update mapping to make sure
// it points to the element that was kept
c.Manager.updateCEPToCESMapping(cepName, mapping.cesName)
continue
}
c.Manager.removeCEPFromCES(cepName, mapping.cesName, DefaultCESSyncTime, mapping.identity, true)
}
if found {
// Remove first element if element with matching identity was found
c.Manager.removeCEPFromCES(cepName, mappings[0].cesName, DefaultCESSyncTime, mappings[0].identity, true)
} else {
// All others elements were removed so update mapping to make sure
// it points to the only element left
c.Manager.updateCEPToCESMapping(cepName, mappings[0].cesName)
}
}
}
return
}

// Sync all CESs from cesStore to manager cache.
Expand Down
216 changes: 216 additions & 0 deletions operator/pkg/ciliumendpointslice/endpointslice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package ciliumendpointslice

import (
"context"
"errors"
"strconv"
"sync"
"testing"

"github.com/stretchr/testify/assert"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
capi_v2a1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
"github.com/cilium/cilium/pkg/k8s/client"
)

func createManagerEndpoint(name string, identity int64) capi_v2a1.CoreCiliumEndpoint {
return capi_v2a1.CoreCiliumEndpoint{
Name: name,
IdentityID: identity,
}
}

func createStoreEndpoint(name string, namespace string, identity int64) *v2.CiliumEndpoint {
return &v2.CiliumEndpoint{
ObjectMeta: meta_v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Status: v2.EndpointStatus{
Identity: &v2.EndpointIdentity{
ID: identity,
},
},
}
}

func createStoreEndpointSlice(name string, namespace string, endpoints []capi_v2a1.CoreCiliumEndpoint) *capi_v2a1.CiliumEndpointSlice {
return &capi_v2a1.CiliumEndpointSlice{
ObjectMeta: meta_v1.ObjectMeta{
Name: name,
},
Namespace: namespace,
Endpoints: endpoints,
}
}

func TestRemoveStaleCEPEntries(t *testing.T) {
namespace := "ns"
testCases := []struct {
desc string
storeCESs []*capi_v2a1.CiliumEndpointSlice
storeCEPs []*v2.CiliumEndpoint
want map[string]string
}{
{
desc: "No stale CEPs",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1)}),
},
storeCEPs: []*v2.CiliumEndpoint{createStoreEndpoint("cep1", namespace, 1)},
want: map[string]string{"ns/cep1": "slice1"},
},
{
desc: "Remove stale CEP",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
createManagerEndpoint("cep2", 2),
}),
},
storeCEPs: []*v2.CiliumEndpoint{createStoreEndpoint("cep1", namespace, 1)},
want: map[string]string{"ns/cep1": "slice1"},
},
{
desc: "Remove duplicated CEP from single slice",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
createManagerEndpoint("cep1", 1),
}),
},
storeCEPs: []*v2.CiliumEndpoint{createStoreEndpoint("cep1", namespace, 1)},
want: map[string]string{"ns/cep1": "slice1"},
},
{
desc: "Remove duplicated CEP from separate slice",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
}),
createStoreEndpointSlice("slice2", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
}),
},
storeCEPs: []*v2.CiliumEndpoint{createStoreEndpoint("cep1", namespace, 1)},
want: map[string]string{"ns/cep1": ""}, // empty ces name as any ces is ok
},
{
desc: "Remove old CEP from first slice",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 2),
}),
createStoreEndpointSlice("slice2", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
}),
},
storeCEPs: []*v2.CiliumEndpoint{createStoreEndpoint("cep1", namespace, 1)},
want: map[string]string{"ns/cep1": "slice2"},
},
{
desc: "Remove old CEP from second slice",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
}),
createStoreEndpointSlice("slice2", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 2),
}),
},
storeCEPs: []*v2.CiliumEndpoint{createStoreEndpoint("cep1", namespace, 1)},
want: map[string]string{"ns/cep1": "slice1"},
},
{
desc: "Big remove case with multiple stale and duplicated CEPs from multiple slices",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
createManagerEndpoint("cep2", 1),
createManagerEndpoint("cep3", 1),
}),
createStoreEndpointSlice("slice2", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 2),
createManagerEndpoint("cep4", 2),
createManagerEndpoint("cep5", 2),
createManagerEndpoint("cep6", 2),
}),
createStoreEndpointSlice("slice3", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 2),
createManagerEndpoint("cep4", 2),
createManagerEndpoint("cep7", 2),
createManagerEndpoint("cep8", 2),
createManagerEndpoint("cep9", 2),
}),
},
storeCEPs: []*v2.CiliumEndpoint{
createStoreEndpoint("cep1", namespace, 1),
createStoreEndpoint("cep2", namespace, 1),
createStoreEndpoint("cep3", namespace, 1),
createStoreEndpoint("cep4", namespace, 2),
createStoreEndpoint("cep5", namespace, 2),
},
want: map[string]string{
"ns/cep1": "slice1",
"ns/cep2": "slice1",
"ns/cep3": "slice1",
"ns/cep4": "",
"ns/cep5": "slice2",
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
_, client := client.NewFakeClientset()
ciliumEndpointStore := cache.NewIndexer(
cache.DeletionHandlingMetaNamespaceKeyFunc,
cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
"identity": func(obj interface{}) ([]string, error) {
endpointObj, ok := obj.(*v2.CiliumEndpoint)
if !ok {
return nil, errors.New("failed to convert cilium endpoint")
}
identityID := "0"
if endpointObj.Status.Identity != nil {
identityID = strconv.FormatInt(endpointObj.Status.Identity.ID, 10)
}
return []string{identityID}, nil
},
},
)
cesController := NewCESController(context.Background(), &sync.WaitGroup{}, client, 5, cesIdentityBasedSlicing, 10, 20)
cesController.ciliumEndpointStore = ciliumEndpointStore
manager := cesController.Manager.(*cesManagerIdentity)
for _, cep := range tc.storeCEPs {
ciliumEndpointStore.Add(cep)
}
for _, ces := range tc.storeCESs {
cesController.ciliumEndpointSliceStore.Add(ces)
}
syncCESsInLocalCache(cesController.ciliumEndpointSliceStore, manager)
cesController.removeStaleAndDuplicatedCEPEntries()
wantedCEPs := make([]string, len(tc.want))
i := 0
for cep := range tc.want {
wantedCEPs[i] = cep
i++
}
assert.ElementsMatch(t, wantedCEPs, manager.getAllCEPNames())
for cep := range tc.want {
actualCES, exists := manager.desiredCESs.getCESName(cep)
assert.True(t, exists)
if tc.want[cep] != "" {
assert.Equal(t, tc.want[cep], actualCES)
}
}
})
}
}

0 comments on commit 80c6ebd

Please sign in to comment.