Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use resource for CNPs and CCNPs #24509

Merged
merged 3 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 26 additions & 6 deletions pkg/k8s/shared_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,22 @@ var (
namespaceResource,
lbIPPoolsResource,
ciliumIdentityResource,
ciliumNetworkPolicy,
ciliumClusterwideNetworkPolicy,
),
)
)

type SharedResources struct {
cell.In
LocalNode *LocalNodeResource
LocalCiliumNode *LocalCiliumNodeResource
Services resource.Resource[*slim_corev1.Service]
Namespaces resource.Resource[*slim_corev1.Namespace]
LBIPPools resource.Resource[*cilium_api_v2alpha1.CiliumLoadBalancerIPPool]
Identities resource.Resource[*cilium_api_v2.CiliumIdentity]
LocalNode *LocalNodeResource
LocalCiliumNode *LocalCiliumNodeResource
Services resource.Resource[*slim_corev1.Service]
Namespaces resource.Resource[*slim_corev1.Namespace]
LBIPPools resource.Resource[*cilium_api_v2alpha1.CiliumLoadBalancerIPPool]
Identities resource.Resource[*cilium_api_v2.CiliumIdentity]
CiliumNetworkPolicies resource.Resource[*cilium_api_v2.CiliumNetworkPolicy]
CiliumClusterwideNetworkPolicies resource.Resource[*cilium_api_v2.CiliumClusterwideNetworkPolicy]
}

func serviceResource(lc hive.Lifecycle, cs client.Clientset) (resource.Resource[*slim_corev1.Service], error) {
Expand Down Expand Up @@ -120,3 +124,19 @@ func ciliumIdentityResource(lc hive.Lifecycle, cs client.Clientset) (resource.Re
)
return resource.New[*cilium_api_v2.CiliumIdentity](lc, lw), nil
}

func ciliumNetworkPolicy(lc hive.Lifecycle, cs client.Clientset) (resource.Resource[*cilium_api_v2.CiliumNetworkPolicy], error) {
if !cs.IsEnabled() {
return nil, nil
}
lw := utils.ListerWatcherFromTyped[*cilium_api_v2.CiliumNetworkPolicyList](cs.CiliumV2().CiliumNetworkPolicies(""))
return resource.New[*cilium_api_v2.CiliumNetworkPolicy](lc, lw), nil
}

func ciliumClusterwideNetworkPolicy(lc hive.Lifecycle, cs client.Clientset) (resource.Resource[*cilium_api_v2.CiliumClusterwideNetworkPolicy], error) {
if !cs.IsEnabled() {
return nil, nil
}
lw := utils.ListerWatcherFromTyped[*cilium_api_v2.CiliumClusterwideNetworkPolicyList](cs.CiliumV2().CiliumClusterwideNetworkPolicies())
return resource.New[*cilium_api_v2.CiliumClusterwideNetworkPolicy](lc, lw), nil
}
122 changes: 38 additions & 84 deletions pkg/k8s/watchers/cilium_clusterwide_network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,96 +4,50 @@
package watchers

import (
"time"
"context"
"sync/atomic"

"k8s.io/client-go/tools/cache"

"github.com/cilium/cilium/pkg/k8s"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/informer"
"github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/k8s/resource"
"github.com/cilium/cilium/pkg/k8s/types"
"github.com/cilium/cilium/pkg/k8s/watchers/resources"
)

func (k *K8sWatcher) ciliumClusterwideNetworkPoliciesInit(ciliumNPClient client.Clientset) {
func (k *K8sWatcher) ciliumClusterwideNetworkPoliciesInit(ctx context.Context, cs client.Clientset) {
var hasSynced atomic.Bool
apiGroup := k8sAPIGroupCiliumClusterwideNetworkPolicyV2
_, ciliumV2ClusterwidePolicyController := informer.NewInformer(
utils.ListerWatcherFromTyped[*cilium_v2.CiliumClusterwideNetworkPolicyList](
ciliumNPClient.CiliumV2().CiliumClusterwideNetworkPolicies()),
&cilium_v2.CiliumClusterwideNetworkPolicy{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
initialRecvTime := time.Now()
var valid, equal bool
defer func() {
k.K8sEventReceived(apiGroup, resources.MetricCCNP, resources.MetricCreate, valid, equal)
}()
if cnp := k8s.ObjToSlimCNP(obj); cnp != nil {
valid = true
if cnp.RequiresDerivative() {
return
}

// We need to deepcopy this structure because we are writing
// fields.
// See https://github.com/cilium/cilium/blob/27fee207f5422c95479422162e9ea0d2f2b6c770/pkg/policy/api/ingress.go#L112-L134
cnpCpy := cnp.DeepCopy()

err := k.addCiliumNetworkPolicyV2(ciliumNPClient, cnpCpy, initialRecvTime)
k.K8sEventProcessed(resources.MetricCCNP, resources.MetricCreate, err == nil)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
initialRecvTime := time.Now()
var valid, equal bool
defer func() { k.K8sEventReceived(apiGroup, resources.MetricCCNP, resources.MetricUpdate, valid, equal) }()
if oldCNP := k8s.ObjToSlimCNP(oldObj); oldCNP != nil {
if newCNP := k8s.ObjToSlimCNP(newObj); newCNP != nil {
valid = true
if oldCNP.DeepEqual(newCNP) {
equal = true
return
}

if newCNP.RequiresDerivative() {
return
}

// We need to deepcopy this structure because we are writing
// fields.
// See https://github.com/cilium/cilium/blob/27fee207f5422c95479422162e9ea0d2f2b6c770/pkg/policy/api/ingress.go#L112-L134
oldCNPCpy := oldCNP.DeepCopy()
newCNPCpy := newCNP.DeepCopy()

err := k.updateCiliumNetworkPolicyV2(ciliumNPClient, oldCNPCpy, newCNPCpy, initialRecvTime)
k.K8sEventProcessed(resources.MetricCCNP, resources.MetricUpdate, err == nil)
}
}
},
DeleteFunc: func(obj interface{}) {
var valid, equal bool
defer func() { k.K8sEventReceived(apiGroup, resources.MetricCCNP, resources.MetricDelete, valid, equal) }()
cnp := k8s.ObjToSlimCNP(obj)
if cnp == nil {
return
}
valid = true
err := k.deleteCiliumNetworkPolicyV2(cnp)
k.K8sEventProcessed(resources.MetricCCNP, resources.MetricDelete, err == nil)
},
},
k8s.ConvertToCCNP,
)

k.blockWaitGroupToSyncResources(
k.stop,
nil,
ciliumV2ClusterwidePolicyController.HasSynced,
apiGroup,
)

go ciliumV2ClusterwidePolicyController.Run(k.stop)
metricLabel := resources.MetricCCNP
go func() {
christarazi marked this conversation as resolved.
Show resolved Hide resolved
cache := make(map[resource.Key]*types.SlimCNP)

for event := range k.sharedResources.CiliumClusterwideNetworkPolicies.Events(ctx) {
if event.Kind == resource.Sync {
hasSynced.Store(true)
event.Done(nil)
continue
}

slimCNP := &types.SlimCNP{
CiliumNetworkPolicy: &cilium_v2.CiliumNetworkPolicy{
TypeMeta: event.Object.TypeMeta,
ObjectMeta: event.Object.ObjectMeta,
Spec: event.Object.Spec,
Specs: event.Object.Specs,
},
}

var err error
switch event.Kind {
case resource.Upsert:
err = k.onUpsertCNP(slimCNP, cache, event.Key, cs, apiGroup, metricLabel)
case resource.Delete:
err = k.onDeleteCNP(slimCNP, cache, event.Key, apiGroup, metricLabel)
}
event.Done(err)
}
}()

k.blockWaitGroupToSyncResources(ctx.Done(), nil, hasSynced.Load, apiGroup)
k.k8sAPIGroups.AddAPI(apiGroup)
}
177 changes: 103 additions & 74 deletions pkg/k8s/watchers/cilium_network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@ package watchers
import (
"context"
"errors"
"sync/atomic"
"time"

"github.com/sirupsen/logrus"
"k8s.io/client-go/tools/cache"

"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/k8s"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/client"
clientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned"
"github.com/cilium/cilium/pkg/k8s/informer"
"github.com/cilium/cilium/pkg/k8s/resource"
"github.com/cilium/cilium/pkg/k8s/types"
"github.com/cilium/cilium/pkg/k8s/utils"
k8sUtils "github.com/cilium/cilium/pkg/k8s/utils"
"github.com/cilium/cilium/pkg/k8s/watchers/resources"
"github.com/cilium/cilium/pkg/lock"
Expand Down Expand Up @@ -82,84 +81,114 @@ func (r *ruleImportMetadataCache) get(cnp *types.SlimCNP) (policyImportMetadata,
return policyImportMeta, ok
}

func (k *K8sWatcher) ciliumNetworkPoliciesInit(cs client.Clientset) {
func (k *K8sWatcher) ciliumNetworkPoliciesInit(ctx context.Context, cs client.Clientset) {
var hasSynced atomic.Bool
apiGroup := k8sAPIGroupCiliumNetworkPolicyV2
_, ciliumV2Controller := informer.NewInformer(
utils.ListerWatcherFromTyped[*cilium_v2.CiliumNetworkPolicyList](
cs.CiliumV2().CiliumNetworkPolicies("")),
&cilium_v2.CiliumNetworkPolicy{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
initialRecvTime := time.Now()
var valid, equal bool
defer func() { k.K8sEventReceived(apiGroup, resources.MetricCNP, resources.MetricCreate, valid, equal) }()
if cnp := k8s.ObjToSlimCNP(obj); cnp != nil {
valid = true
if cnp.RequiresDerivative() {
return
}
metricLabel := resources.MetricCNP
go func() {
christarazi marked this conversation as resolved.
Show resolved Hide resolved
cache := make(map[resource.Key]*types.SlimCNP)

for event := range k.sharedResources.CiliumNetworkPolicies.Events(ctx) {
if event.Kind == resource.Sync {
hasSynced.Store(true)
event.Done(nil)
continue
}

// We need to deepcopy this structure because we are writing
// fields.
// See https://github.com/cilium/cilium/blob/27fee207f5422c95479422162e9ea0d2f2b6c770/pkg/policy/api/ingress.go#L112-L134
cnpCpy := cnp.DeepCopy()
slimCNP := &types.SlimCNP{
CiliumNetworkPolicy: &cilium_v2.CiliumNetworkPolicy{
TypeMeta: event.Object.TypeMeta,
ObjectMeta: event.Object.ObjectMeta,
Spec: event.Object.Spec,
Specs: event.Object.Specs,
},
}

err := k.addCiliumNetworkPolicyV2(cs, cnpCpy, initialRecvTime)
reportCNPChangeMetrics(err)
var err error
switch event.Kind {
case resource.Upsert:
err = k.onUpsertCNP(slimCNP, cache, event.Key, cs, apiGroup, metricLabel)
case resource.Delete:
err = k.onDeleteCNP(slimCNP, cache, event.Key, apiGroup, metricLabel)
}
reportCNPChangeMetrics(err)
event.Done(err)
}
}()

k.K8sEventProcessed(resources.MetricCNP, resources.MetricCreate, err == nil)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
initialRecvTime := time.Now()
var valid, equal bool
defer func() { k.K8sEventReceived(apiGroup, resources.MetricCNP, resources.MetricUpdate, valid, equal) }()
if oldCNP := k8s.ObjToSlimCNP(oldObj); oldCNP != nil {
if newCNP := k8s.ObjToSlimCNP(newObj); newCNP != nil {
valid = true
if oldCNP.DeepEqual(newCNP) {
equal = true
return
}

if newCNP.RequiresDerivative() {
return
}

// We need to deepcopy this structure because we are writing
// fields.
// See https://github.com/cilium/cilium/blob/27fee207f5422c95479422162e9ea0d2f2b6c770/pkg/policy/api/ingress.go#L112-L134
oldCNPCpy := oldCNP.DeepCopy()
newCNPCpy := newCNP.DeepCopy()

err := k.updateCiliumNetworkPolicyV2(cs, oldCNPCpy, newCNPCpy, initialRecvTime)
reportCNPChangeMetrics(err)

k.K8sEventProcessed(resources.MetricCNP, resources.MetricUpdate, err == nil)
}
}
},
DeleteFunc: func(obj interface{}) {
var valid, equal bool
defer func() { k.K8sEventReceived(apiGroup, resources.MetricCNP, resources.MetricDelete, valid, equal) }()
cnp := k8s.ObjToSlimCNP(obj)
if cnp == nil {
return
}
valid = true
err := k.deleteCiliumNetworkPolicyV2(cnp)
reportCNPChangeMetrics(err)
k.blockWaitGroupToSyncResources(ctx.Done(), nil, hasSynced.Load, apiGroup)
k.k8sAPIGroups.AddAPI(apiGroup)
}

k.K8sEventProcessed(resources.MetricCNP, resources.MetricDelete, err == nil)
},
},
k8s.ConvertToCNP,
func (k *K8sWatcher) onUpsertCNP(
cnp *types.SlimCNP,
cache map[resource.Key]*types.SlimCNP,
key resource.Key,
cs client.Clientset,
apiGroup string,
metricLabel string,
) error {
initialRecvTime := time.Now()

var (
equal bool
action string
)

k.blockWaitGroupToSyncResources(k.stop, nil, ciliumV2Controller.HasSynced, k8sAPIGroupCiliumNetworkPolicyV2)
go ciliumV2Controller.Run(k.stop)
k.k8sAPIGroups.AddAPI(k8sAPIGroupCiliumNetworkPolicyV2)
// wrap k.K8sEventReceived call into a naked func() to capture equal in the closure
defer func() {
k.K8sEventReceived(apiGroup, metricLabel, action, true, equal)
}()

oldCNP, ok := cache[key]
if !ok {
action = resources.MetricCreate
} else {
action = resources.MetricUpdate
if oldCNP.DeepEqual(cnp) {
equal = true
return nil
}
}

if cnp.RequiresDerivative() {
return nil
}

// We need to deepcopy this structure because we are writing
// fields.
// See https://github.com/cilium/cilium/blob/27fee207f5422c95479422162e9ea0d2f2b6c770/pkg/policy/api/ingress.go#L112-L134
cnpCpy := cnp.DeepCopy()

var err error
if ok {
err = k.updateCiliumNetworkPolicyV2(cs, oldCNP, cnpCpy, initialRecvTime)
} else {
err = k.addCiliumNetworkPolicyV2(cs, cnpCpy, initialRecvTime)
}
if err == nil {
cache[key] = cnpCpy
}

k.K8sEventProcessed(metricLabel, action, err == nil)

return err
}

func (k *K8sWatcher) onDeleteCNP(
cnp *types.SlimCNP,
cache map[resource.Key]*types.SlimCNP,
key resource.Key,
apiGroup string,
metricLabel string,
) error {
err := k.deleteCiliumNetworkPolicyV2(cnp)
delete(cache, key)

k.K8sEventProcessed(metricLabel, resources.MetricDelete, err == nil)
k.K8sEventReceived(apiGroup, metricLabel, resources.MetricDelete, true, true)

return err
}

func (k *K8sWatcher) addCiliumNetworkPolicyV2(ciliumNPClient clientset.Interface, cnp *types.SlimCNP, initialRecvTime time.Time) error {
Expand Down