Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator: Improve removing stale CEPs from CESs entries on start. #24596

Merged
merged 1 commit into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 56 additions & 12 deletions operator/pkg/ciliumendpointslice/endpointslice.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cilium/cilium/operator/metrics"
operatorOption "github.com/cilium/cilium/operator/option"
cilium_api_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
capi_v2a1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
Expand Down Expand Up @@ -161,7 +162,7 @@ func (c *CiliumEndpointSliceController) Run(ces cache.Indexer, stopCh <-chan str
c.ciliumEndpointStore = ces

// On operator warm boot, remove stale CEP entries present in CES
c.removeStaleCEPEntries()
c.removeStaleAndDuplicatedCEPEntries()

log.WithFields(logrus.Fields{
logfields.CESSliceMode: c.slicingMode,
Expand All @@ -182,24 +183,67 @@ func (c *CiliumEndpointSliceController) Run(ces cache.Indexer, stopCh <-chan str
// Upon warm boot[restart], Iterate over all CEPs which we got from the api-server
// and compare it with CEPs packed inside CES.
// If there are any stale CEPs present in CESs, remove them from their CES.
func (c *CiliumEndpointSliceController) removeStaleCEPEntries() {
log.Info("Remove stale CEP entries in CES")
// If there are any duplicated CEPs present in CESs, remove all but one trying
// to keep the CEP with matching identity if it's present.
func (c *CiliumEndpointSliceController) removeStaleAndDuplicatedCEPEntries() {
log.Info("Remove stale and duplicated CEP entries in CES")

type cepMapping struct {
identity int64
cesName string
}

cepsMapping := make(map[string][]cepMapping)

// Get all CEPs from local datastore
staleCEPs := c.Manager.getAllCEPNames()
// Map CEP Names to list of whole structure + CES Name
for _, ces := range c.Manager.getAllCESs() {
for _, cep := range ces.getAllCEPs() {
cepName := ces.getCEPNameFromCCEP(&cep)
cepsMapping[cepName] = append(cepsMapping[cepName], cepMapping{identity: cep.IdentityID, cesName: ces.getCESName()})
}
}

// Remove stale CEP entries present in CES
for _, cepName := range staleCEPs {
for cepName, mappings := range cepsMapping {
storeCep, exists, err := c.ciliumEndpointStore.GetByKey(cepName)
// Ignore error from below api, this is added to avoid accidental cep rmeoval from cache
if _, exists, err := c.ciliumEndpointStore.GetByKey(cepName); err == nil && exists || err != nil {
if err != nil {
continue
}
log.WithFields(logrus.Fields{
logfields.CEPName: cepName,
}).Debug("Removing stale CEP entry.")
c.Manager.RemoveCEPFromCache(cepName, DefaultCESSyncTime)
if !exists {
// Remove stale CEP entries present in CES
for _, mapping := range mappings {
log.WithFields(logrus.Fields{
logfields.CEPName: cepName,
}).Debug("Removing stale CEP entry.")
c.Manager.removeCEPFromCES(cepName, mapping.cesName, DefaultCESSyncTime, 0, false)
}
} else if len(mappings) > 1 {
// Remove duplicated CEP entries present in CES
found := false
cep := storeCep.(*cilium_api_v2.CiliumEndpoint)
// Skip first element for now
for _, mapping := range mappings[1:] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it maybe simpler if it goes through the mappings twice? First to find the CEP with the matching ID, and then to remove all but selected (the first CEP or with matching ID).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be multiple CEPs with matching ID or no CEPs with matching ID.
The code would be something like:
matchingIdIndex := 0
for loop {
if matches id {
matchingIdIndex = currentIndex
}
for loop {
if currentIndex == matchingIdIndex {
keep
} else {
remove
}
}

I don't think it would be much simpler

if !found && mapping.identity == cep.Status.Identity.ID {
// Don't remove the first element for which identity matches
found = true
// All others elements will be removed so update mapping to make sure
// it points to the element that was kept
c.Manager.updateCEPToCESMapping(cepName, mapping.cesName)
continue
}
c.Manager.removeCEPFromCES(cepName, mapping.cesName, DefaultCESSyncTime, mapping.identity, true)
}
if found {
// Remove first element if element with matching identity was found
c.Manager.removeCEPFromCES(cepName, mappings[0].cesName, DefaultCESSyncTime, mappings[0].identity, true)
} else {
// All others elements were removed so update mapping to make sure
// it points to the only element left
c.Manager.updateCEPToCESMapping(cepName, mappings[0].cesName)
}
}
}
return
}

// Sync all CESs from cesStore to manager cache.
Expand Down
216 changes: 216 additions & 0 deletions operator/pkg/ciliumendpointslice/endpointslice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package ciliumendpointslice

import (
"context"
"errors"
"strconv"
"sync"
"testing"

"github.com/stretchr/testify/assert"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
capi_v2a1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
"github.com/cilium/cilium/pkg/k8s/client"
)

func createManagerEndpoint(name string, identity int64) capi_v2a1.CoreCiliumEndpoint {
return capi_v2a1.CoreCiliumEndpoint{
Name: name,
IdentityID: identity,
}
}

func createStoreEndpoint(name string, namespace string, identity int64) *v2.CiliumEndpoint {
return &v2.CiliumEndpoint{
ObjectMeta: meta_v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Status: v2.EndpointStatus{
Identity: &v2.EndpointIdentity{
ID: identity,
},
},
}
}

func createStoreEndpointSlice(name string, namespace string, endpoints []capi_v2a1.CoreCiliumEndpoint) *capi_v2a1.CiliumEndpointSlice {
return &capi_v2a1.CiliumEndpointSlice{
ObjectMeta: meta_v1.ObjectMeta{
Name: name,
},
Namespace: namespace,
Endpoints: endpoints,
}
}

func TestRemoveStaleCEPEntries(t *testing.T) {
namespace := "ns"
testCases := []struct {
desc string
storeCESs []*capi_v2a1.CiliumEndpointSlice
storeCEPs []*v2.CiliumEndpoint
want map[string]string
}{
{
desc: "No stale CEPs",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1)}),
},
storeCEPs: []*v2.CiliumEndpoint{createStoreEndpoint("cep1", namespace, 1)},
want: map[string]string{"ns/cep1": "slice1"},
},
{
desc: "Remove stale CEP",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
createManagerEndpoint("cep2", 2),
}),
},
storeCEPs: []*v2.CiliumEndpoint{createStoreEndpoint("cep1", namespace, 1)},
want: map[string]string{"ns/cep1": "slice1"},
},
{
desc: "Remove duplicated CEP from single slice",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
createManagerEndpoint("cep1", 1),
}),
},
storeCEPs: []*v2.CiliumEndpoint{createStoreEndpoint("cep1", namespace, 1)},
want: map[string]string{"ns/cep1": "slice1"},
},
{
desc: "Remove duplicated CEP from separate slice",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
}),
createStoreEndpointSlice("slice2", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
}),
},
storeCEPs: []*v2.CiliumEndpoint{createStoreEndpoint("cep1", namespace, 1)},
want: map[string]string{"ns/cep1": ""}, // empty ces name as any ces is ok
},
{
desc: "Remove old CEP from first slice",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 2),
}),
createStoreEndpointSlice("slice2", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
}),
},
storeCEPs: []*v2.CiliumEndpoint{createStoreEndpoint("cep1", namespace, 1)},
want: map[string]string{"ns/cep1": "slice2"},
},
{
desc: "Remove old CEP from second slice",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
}),
createStoreEndpointSlice("slice2", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 2),
}),
},
storeCEPs: []*v2.CiliumEndpoint{createStoreEndpoint("cep1", namespace, 1)},
want: map[string]string{"ns/cep1": "slice1"},
},
{
desc: "Big remove case with multiple stale and duplicated CEPs from multiple slices",
storeCESs: []*capi_v2a1.CiliumEndpointSlice{
createStoreEndpointSlice("slice1", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 1),
createManagerEndpoint("cep2", 1),
createManagerEndpoint("cep3", 1),
}),
createStoreEndpointSlice("slice2", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 2),
createManagerEndpoint("cep4", 2),
createManagerEndpoint("cep5", 2),
createManagerEndpoint("cep6", 2),
}),
createStoreEndpointSlice("slice3", namespace, []capi_v2a1.CoreCiliumEndpoint{
createManagerEndpoint("cep1", 2),
createManagerEndpoint("cep4", 2),
createManagerEndpoint("cep7", 2),
createManagerEndpoint("cep8", 2),
createManagerEndpoint("cep9", 2),
}),
},
storeCEPs: []*v2.CiliumEndpoint{
createStoreEndpoint("cep1", namespace, 1),
createStoreEndpoint("cep2", namespace, 1),
createStoreEndpoint("cep3", namespace, 1),
createStoreEndpoint("cep4", namespace, 2),
createStoreEndpoint("cep5", namespace, 2),
},
want: map[string]string{
"ns/cep1": "slice1",
"ns/cep2": "slice1",
"ns/cep3": "slice1",
"ns/cep4": "",
"ns/cep5": "slice2",
},
},
}
alan-kut marked this conversation as resolved.
Show resolved Hide resolved
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
_, client := client.NewFakeClientset()
ciliumEndpointStore := cache.NewIndexer(
cache.DeletionHandlingMetaNamespaceKeyFunc,
cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
"identity": func(obj interface{}) ([]string, error) {
endpointObj, ok := obj.(*v2.CiliumEndpoint)
if !ok {
return nil, errors.New("failed to convert cilium endpoint")
}
identityID := "0"
if endpointObj.Status.Identity != nil {
identityID = strconv.FormatInt(endpointObj.Status.Identity.ID, 10)
}
return []string{identityID}, nil
},
},
)
cesController := NewCESController(context.Background(), &sync.WaitGroup{}, client, 5, cesIdentityBasedSlicing, 10, 20)
cesController.ciliumEndpointStore = ciliumEndpointStore
manager := cesController.Manager.(*cesManagerIdentity)
for _, cep := range tc.storeCEPs {
ciliumEndpointStore.Add(cep)
}
for _, ces := range tc.storeCESs {
cesController.ciliumEndpointSliceStore.Add(ces)
}
syncCESsInLocalCache(cesController.ciliumEndpointSliceStore, manager)
cesController.removeStaleAndDuplicatedCEPEntries()
wantedCEPs := make([]string, len(tc.want))
i := 0
for cep := range tc.want {
wantedCEPs[i] = cep
i++
}
assert.ElementsMatch(t, wantedCEPs, manager.getAllCEPNames())
for cep := range tc.want {
actualCES, exists := manager.desiredCESs.getCESName(cep)
assert.True(t, exists)
if tc.want[cep] != "" {
assert.Equal(t, tc.want[cep], actualCES)
}
}
})
}
}