Skip to content

Commit

Permalink
k8s: Use CCNP shared resource in k8s watcher
Browse files Browse the repository at this point in the history
Refactor the k8s watchers code to rely on the Cilium Clusterwide Network
Policy shared resource instead of starting a new Informer.

Starting a new informer increases the load on api server and without a
workqueue, the handlers will block the events streaming until
completion.  The usage of the CCNP shared resource solves this issues.

Signed-off-by: Fabio Falzoi <fabio.falzoi@isovalent.com>
  • Loading branch information
pippolo84 authored and christarazi committed Mar 31, 2023
1 parent c6110fa commit d6f5504
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 85 deletions.
122 changes: 38 additions & 84 deletions pkg/k8s/watchers/cilium_clusterwide_network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,96 +4,50 @@
package watchers

import (
"time"
"context"
"sync/atomic"

"k8s.io/client-go/tools/cache"

"github.com/cilium/cilium/pkg/k8s"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/informer"
"github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/k8s/resource"
"github.com/cilium/cilium/pkg/k8s/types"
"github.com/cilium/cilium/pkg/k8s/watchers/resources"
)

func (k *K8sWatcher) ciliumClusterwideNetworkPoliciesInit(ciliumNPClient client.Clientset) {
func (k *K8sWatcher) ciliumClusterwideNetworkPoliciesInit(ctx context.Context, cs client.Clientset) {
var hasSynced atomic.Bool
apiGroup := k8sAPIGroupCiliumClusterwideNetworkPolicyV2
_, ciliumV2ClusterwidePolicyController := informer.NewInformer(
utils.ListerWatcherFromTyped[*cilium_v2.CiliumClusterwideNetworkPolicyList](
ciliumNPClient.CiliumV2().CiliumClusterwideNetworkPolicies()),
&cilium_v2.CiliumClusterwideNetworkPolicy{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
initialRecvTime := time.Now()
var valid, equal bool
defer func() {
k.K8sEventReceived(apiGroup, resources.MetricCCNP, resources.MetricCreate, valid, equal)
}()
if cnp := k8s.ObjToSlimCNP(obj); cnp != nil {
valid = true
if cnp.RequiresDerivative() {
return
}

// We need to deepcopy this structure because we are writing
// fields.
// See https://github.com/cilium/cilium/blob/27fee207f5422c95479422162e9ea0d2f2b6c770/pkg/policy/api/ingress.go#L112-L134
cnpCpy := cnp.DeepCopy()

err := k.addCiliumNetworkPolicyV2(ciliumNPClient, cnpCpy, initialRecvTime)
k.K8sEventProcessed(resources.MetricCCNP, resources.MetricCreate, err == nil)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
initialRecvTime := time.Now()
var valid, equal bool
defer func() { k.K8sEventReceived(apiGroup, resources.MetricCCNP, resources.MetricUpdate, valid, equal) }()
if oldCNP := k8s.ObjToSlimCNP(oldObj); oldCNP != nil {
if newCNP := k8s.ObjToSlimCNP(newObj); newCNP != nil {
valid = true
if oldCNP.DeepEqual(newCNP) {
equal = true
return
}

if newCNP.RequiresDerivative() {
return
}

// We need to deepcopy this structure because we are writing
// fields.
// See https://github.com/cilium/cilium/blob/27fee207f5422c95479422162e9ea0d2f2b6c770/pkg/policy/api/ingress.go#L112-L134
oldCNPCpy := oldCNP.DeepCopy()
newCNPCpy := newCNP.DeepCopy()

err := k.updateCiliumNetworkPolicyV2(ciliumNPClient, oldCNPCpy, newCNPCpy, initialRecvTime)
k.K8sEventProcessed(resources.MetricCCNP, resources.MetricUpdate, err == nil)
}
}
},
DeleteFunc: func(obj interface{}) {
var valid, equal bool
defer func() { k.K8sEventReceived(apiGroup, resources.MetricCCNP, resources.MetricDelete, valid, equal) }()
cnp := k8s.ObjToSlimCNP(obj)
if cnp == nil {
return
}
valid = true
err := k.deleteCiliumNetworkPolicyV2(cnp)
k.K8sEventProcessed(resources.MetricCCNP, resources.MetricDelete, err == nil)
},
},
k8s.ConvertToCCNP,
)

k.blockWaitGroupToSyncResources(
k.stop,
nil,
ciliumV2ClusterwidePolicyController.HasSynced,
apiGroup,
)

go ciliumV2ClusterwidePolicyController.Run(k.stop)
metricLabel := resources.MetricCCNP
go func() {
cache := make(map[resource.Key]*types.SlimCNP)

for event := range k.sharedResources.CiliumClusterwideNetworkPolicies.Events(ctx) {
if event.Kind == resource.Sync {
hasSynced.Store(true)
event.Done(nil)
continue
}

slimCNP := &types.SlimCNP{
CiliumNetworkPolicy: &cilium_v2.CiliumNetworkPolicy{
TypeMeta: event.Object.TypeMeta,
ObjectMeta: event.Object.ObjectMeta,
Spec: event.Object.Spec,
Specs: event.Object.Specs,
},
}

var err error
switch event.Kind {
case resource.Upsert:
err = k.onUpsertCNP(slimCNP, cache, event.Key, cs, apiGroup, metricLabel)
case resource.Delete:
err = k.onDeleteCNP(slimCNP, cache, event.Key, apiGroup, metricLabel)
}
event.Done(err)
}
}()

k.blockWaitGroupToSyncResources(ctx.Done(), nil, hasSynced.Load, apiGroup)
k.k8sAPIGroups.AddAPI(apiGroup)
}
2 changes: 1 addition & 1 deletion pkg/k8s/watchers/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (k *K8sWatcher) enableK8sWatchers(ctx context.Context, resourceNames []stri
case k8sAPIGroupCiliumNetworkPolicyV2:
k.ciliumNetworkPoliciesInit(ctx, k.clientset)
case k8sAPIGroupCiliumClusterwideNetworkPolicyV2:
k.ciliumClusterwideNetworkPoliciesInit(k.clientset)
k.ciliumClusterwideNetworkPoliciesInit(ctx, k.clientset)
case k8sAPIGroupCiliumEndpointV2:
k.initCiliumEndpointOrSlices(k.clientset, asyncControllers)
case k8sAPIGroupCiliumEndpointSliceV2Alpha1:
Expand Down

0 comments on commit d6f5504

Please sign in to comment.