Skip to content

Commit

Permalink
policy: track rules by resource, not by labels
Browse files Browse the repository at this point in the history
When upserting a CNP or KNP, we identify existing rules in the
repository by a set of labels. However, evaluating this set of labels is
expensive, especially as we must check against all label selectors every
time we want to add or remove a policy.

Rather than using label selectors internally, track policies by
owning resource, much the way that prefixes are tracked in the ipcache.
Then, when upserting policies, the set of existing rules attached to a
given resource can be easily retrieved.

The existing behavior is preserved, as it is also exposed via the local
gRPC API. However, the k8s handlers no longer use it.

Signed-off-by: Casey Callendrello <cdc@isovalent.com>
  • Loading branch information
squeed committed May 30, 2024
1 parent adbba6d commit df42a7d
Show file tree
Hide file tree
Showing 9 changed files with 349 additions and 82 deletions.
124 changes: 88 additions & 36 deletions daemon/cmd/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ func (d *Daemon) policyAdd(sourceRules policyAPI.Rules, opts *policy.AddOptions,
// updated.
var policySelectionWG sync.WaitGroup

// newRev is the new policy revision after rule updates
var newRev uint64

// Get all endpoints at the time rules were added / updated so we can figure
// out which endpoints to regenerate / bump policy revision.
allEndpoints := d.endpointManager.GetPolicyEndpoints()
Expand All @@ -289,37 +292,69 @@ func (d *Daemon) policyAdd(sourceRules policyAPI.Rules, opts *policy.AddOptions,

endpointsToRegen := policy.NewEndpointSet(nil)

if opts != nil {
if opts.Replace {
for _, r := range sourceRules {
oldRules := d.policy.SearchRLocked(r.Labels)
// Policies can be upserted one of two ways: by labels or by resource.
// Here we replace by resource if specified.
// This block of code is, sadly, copy-pasty because DeleteByLabels / AddList return an unexported type.
if opts != nil && opts.ReplaceByResource && len(opts.Resource) > 0 {
// Update the policy repository with the new rules
addedRules, deletedRules, rev := d.policy.ReplaceByResourceLocked(sourceRules, opts.Resource)
newRev = rev

if len(deletedRules) > 0 {
// Record any prefix allocations that should be deleted
removedPrefixes = append(removedPrefixes, policy.GetCIDRPrefixes(deletedRules.AsPolicyRules())...)

// Determine which endpoints, if any, need to be regenerated due to removing these rules
deletedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
}

// The information needed by the caller is available at this point, signal
// accordingly.
resChan <- &PolicyAddResult{
newRev: newRev,
err: nil,
}

// Determine which endpoints, if any, need to be regenerated due to being selected by a new rule
addedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)

} else {
// Replacing by labels
// This only happens if a policy is specified via the gRPC API. It is much less efficient
// due to needing to scan the entire repository to find matching labels.
if opts != nil {
if opts.Replace {
for _, r := range sourceRules {
oldRules := d.policy.SearchRLocked(r.Labels)
removedPrefixes = append(removedPrefixes, policy.GetCIDRPrefixes(oldRules)...)
if len(oldRules) > 0 {
deletedRules, _, _ := d.policy.DeleteByLabelsLocked(r.Labels)
deletedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
}
}
}
if len(opts.ReplaceWithLabels) > 0 {
oldRules := d.policy.SearchRLocked(opts.ReplaceWithLabels)
removedPrefixes = append(removedPrefixes, policy.GetCIDRPrefixes(oldRules)...)
if len(oldRules) > 0 {
deletedRules, _, _ := d.policy.DeleteByLabelsLocked(r.Labels)
deletedRules, _, _ := d.policy.DeleteByLabelsLocked(opts.ReplaceWithLabels)
deletedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
}
}
}
if len(opts.ReplaceWithLabels) > 0 {
oldRules := d.policy.SearchRLocked(opts.ReplaceWithLabels)
removedPrefixes = append(removedPrefixes, policy.GetCIDRPrefixes(oldRules)...)
if len(oldRules) > 0 {
deletedRules, _, _ := d.policy.DeleteByLabelsLocked(opts.ReplaceWithLabels)
deletedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
}
}
}

addedRules, newRev := d.policy.AddListLocked(sourceRules)
addedRules, rev := d.policy.AddListLocked(sourceRules)
newRev = rev

// The information needed by the caller is available at this point, signal
// accordingly.
resChan <- &PolicyAddResult{
newRev: newRev,
err: nil,
}
// The information needed by the caller is available at this point, signal
// accordingly.
resChan <- &PolicyAddResult{
newRev: newRev,
err: nil,
}

addedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
addedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
}

d.policy.Mutex.Unlock()

Expand Down Expand Up @@ -536,24 +571,42 @@ func (d *Daemon) policyDelete(labels labels.LabelArray, opts *policy.DeleteOptio

endpointsToRegen := policy.NewEndpointSet(nil)

deletedRules, rev, deleted := d.policy.DeleteByLabelsLocked(labels)
var deleted int
var rev uint64
var prefixes []netip.Prefix

// Return an error if a label filter was provided and there are no
// rules matching it. A deletion request for all policy entries should
// not fail if no policies are loaded.
if len(deletedRules) == 0 && len(labels) != 0 {
rev := d.policy.GetRevision()
d.policy.Mutex.Unlock()
if opts.DeleteByResource && len(opts.Resource) > 0 {
deletedRules, newRev := d.policy.DeleteByResourceLocked(opts.Resource)
rev = newRev
deleted = len(deletedRules)

err := api.New(DeletePolicyNotFoundCode, "policy not found")
deletedRules.UpdateRulesEndpointsCaches(epsToBumpRevision, endpointsToRegen, &policySelectionWG)
prefixes = policy.GetCIDRPrefixes(deletedRules.AsPolicyRules())
} else {

res <- &PolicyDeleteResult{
newRev: rev,
err: err,
deletedRules, newRev, _ := d.policy.DeleteByLabelsLocked(labels)
rev = newRev
deleted = len(deletedRules)

// Return an error if a label filter was provided and there are no
// rules matching it. A deletion request for all policy entries should
// not fail if no policies are loaded.
if len(deletedRules) == 0 && len(labels) != 0 {
rev := d.policy.GetRevision()
d.policy.Mutex.Unlock()

err := api.New(DeletePolicyNotFoundCode, "policy not found")

res <- &PolicyDeleteResult{
newRev: rev,
err: err,
}
return
}
return

deletedRules.UpdateRulesEndpointsCaches(epsToBumpRevision, endpointsToRegen, &policySelectionWG)
prefixes = policy.GetCIDRPrefixes(deletedRules.AsPolicyRules())
}
deletedRules.UpdateRulesEndpointsCaches(epsToBumpRevision, endpointsToRegen, &policySelectionWG)

res <- &PolicyDeleteResult{
newRev: rev,
Expand All @@ -568,7 +621,6 @@ func (d *Daemon) policyDelete(labels labels.LabelArray, opts *policy.DeleteOptio
// We don't treat failures to clean up identities as API failures,
// because the policy can still successfully be updated. We're just
// not appropriately performing garbage collection.
prefixes := policy.GetCIDRPrefixes(deletedRules.AsPolicyRules())
log.WithField("prefixes", prefixes).Debug("Policy deleted via API, found prefixes...")

// Updates to the datapath are serialized via the policy reaction queue.
Expand Down
9 changes: 9 additions & 0 deletions pkg/policy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type AddOptions struct {
// ReplaceWithLabels if present indicates that existing rules with the
// given LabelArray should be deleted.
ReplaceWithLabels labels.LabelArray

// Generated should be set as true to signalize a the policy being inserted
// was generated by cilium-agent, e.g. dns poller.
Generated bool
Expand All @@ -58,6 +59,10 @@ type AddOptions struct {
// Resource provides the object ID for the underlying object that backs
// this information from 'source'.
Resource ipcacheTypes.ResourceID

// ReplaceByResource indicates the policy repository should replace any
// rules owned by the given Resource with the new set of rules
ReplaceByResource bool
}

// DeleteOptions are options which can be passed to PolicyDelete
Expand All @@ -68,4 +73,8 @@ type DeleteOptions struct {
// Resource provides the object ID for the underlying object that backs
// this information from 'source'.
Resource ipcacheTypes.ResourceID

// DeleteByResource should be true if the resource should be used to identify
// which rules should be deleted.
DeleteByResource bool
}
9 changes: 5 additions & 4 deletions pkg/policy/k8s/cilium_network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ func (p *policyWatcher) upsertCiliumNetworkPolicyV2(cnp *types.SlimCNP, initialR
rules, policyImportErr := cnp.Parse()
if policyImportErr == nil {
_, policyImportErr = p.policyManager.PolicyAdd(rules, &policy.AddOptions{
ReplaceWithLabels: cnp.GetIdentityLabels(),
Source: source.CustomResource,
ProcessingStartTime: initialRecvTime,
Resource: resourceID,
ReplaceByResource: true,
})
}

Expand All @@ -163,9 +163,10 @@ func (p *policyWatcher) deleteCiliumNetworkPolicyV2(cnp *types.SlimCNP, resource

scopedLog.Debug("Deleting CiliumNetworkPolicy")

_, err := p.policyManager.PolicyDelete(cnp.GetIdentityLabels(), &policy.DeleteOptions{
Source: source.CustomResource,
Resource: resourceID,
_, err := p.policyManager.PolicyDelete(nil, &policy.DeleteOptions{
Source: source.CustomResource,
Resource: resourceID,
DeleteByResource: true,
})
if err == nil {
scopedLog.Info("Deleted CiliumNetworkPolicy")
Expand Down
9 changes: 5 additions & 4 deletions pkg/policy/k8s/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ func (p *policyWatcher) addK8sNetworkPolicyV1(k8sNP *slim_networkingv1.NetworkPo
}

opts := policy.AddOptions{
Replace: true,
Source: source.Kubernetes,
Source: source.Kubernetes,
Resource: ipcacheTypes.NewResourceID(
ipcacheTypes.ResourceKindNetpol,
k8sNP.ObjectMeta.Namespace,
k8sNP.ObjectMeta.Name,
),
ReplaceByResource: true,
}
if _, err := p.policyManager.PolicyAdd(rules, &opts); err != nil {
metrics.PolicyChangeTotal.WithLabelValues(metrics.LabelValueOutcomeFail).Inc()
Expand Down Expand Up @@ -74,8 +74,9 @@ func (p *policyWatcher) deleteK8sNetworkPolicyV1(k8sNP *slim_networkingv1.Networ
logfields.K8sAPIVersion: k8sNP.TypeMeta.APIVersion,
logfields.Labels: logfields.Repr(labels),
})
if _, err := p.policyManager.PolicyDelete(labels, &policy.DeleteOptions{
Source: source.Kubernetes,
if _, err := p.policyManager.PolicyDelete(nil, &policy.DeleteOptions{
Source: source.Kubernetes,
DeleteByResource: true,
Resource: ipcacheTypes.NewResourceID(
ipcacheTypes.ResourceKindNetpol,
k8sNP.ObjectMeta.Namespace,
Expand Down
Loading

0 comments on commit df42a7d

Please sign in to comment.