Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PolicyRepository: index and replace rules by resource. #32703

Merged
merged 3 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 88 additions & 36 deletions daemon/cmd/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ func (d *Daemon) policyAdd(sourceRules policyAPI.Rules, opts *policy.AddOptions,
// updated.
var policySelectionWG sync.WaitGroup

// newRev is the new policy revision after rule updates
var newRev uint64

// Get all endpoints at the time rules were added / updated so we can figure
// out which endpoints to regenerate / bump policy revision.
allEndpoints := d.endpointManager.GetPolicyEndpoints()
Expand All @@ -289,37 +292,69 @@ func (d *Daemon) policyAdd(sourceRules policyAPI.Rules, opts *policy.AddOptions,

endpointsToRegen := policy.NewEndpointSet(nil)

if opts != nil {
if opts.Replace {
for _, r := range sourceRules {
oldRules := d.policy.SearchRLocked(r.Labels)
// Policies can be upserted one of two ways: by labels or by resource.
// Here we replace by resource if specified.
// This block of code is, sadly, copy-pasty because DeleteByLabels / AddList return an unexported type.
if opts != nil && opts.ReplaceByResource && len(opts.Resource) > 0 {
// Update the policy repository with the new rules
addedRules, deletedRules, rev := d.policy.ReplaceByResourceLocked(sourceRules, opts.Resource)
newRev = rev

if len(deletedRules) > 0 {
// Record any prefix allocations that should be deleted
removedPrefixes = append(removedPrefixes, policy.GetCIDRPrefixes(deletedRules.AsPolicyRules())...)

// Determine which endpoints, if any, need to be regenerated due to removing these rules
deletedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
}

// The information needed by the caller is available at this point, signal
// accordingly.
resChan <- &PolicyAddResult{
newRev: newRev,
err: nil,
}

// Determine which endpoints, if any, need to be regenerated due to being selected by a new rule
addedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)

} else {
// Replacing by labels
// This only happens if a policy is specified via the gRPC API. It is much less efficient
// due to needing to scan the entire repository to find matching labels.
if opts != nil {
if opts.Replace {
for _, r := range sourceRules {
oldRules := d.policy.SearchRLocked(r.Labels)
removedPrefixes = append(removedPrefixes, policy.GetCIDRPrefixes(oldRules)...)
if len(oldRules) > 0 {
deletedRules, _, _ := d.policy.DeleteByLabelsLocked(r.Labels)
deletedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
}
}
}
if len(opts.ReplaceWithLabels) > 0 {
oldRules := d.policy.SearchRLocked(opts.ReplaceWithLabels)
removedPrefixes = append(removedPrefixes, policy.GetCIDRPrefixes(oldRules)...)
if len(oldRules) > 0 {
deletedRules, _, _ := d.policy.DeleteByLabelsLocked(r.Labels)
deletedRules, _, _ := d.policy.DeleteByLabelsLocked(opts.ReplaceWithLabels)
deletedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
}
}
}
if len(opts.ReplaceWithLabels) > 0 {
oldRules := d.policy.SearchRLocked(opts.ReplaceWithLabels)
removedPrefixes = append(removedPrefixes, policy.GetCIDRPrefixes(oldRules)...)
if len(oldRules) > 0 {
deletedRules, _, _ := d.policy.DeleteByLabelsLocked(opts.ReplaceWithLabels)
deletedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
}
}
}

addedRules, newRev := d.policy.AddListLocked(sourceRules)
addedRules, rev := d.policy.AddListLocked(sourceRules)
newRev = rev

// The information needed by the caller is available at this point, signal
// accordingly.
resChan <- &PolicyAddResult{
newRev: newRev,
err: nil,
}
// The information needed by the caller is available at this point, signal
// accordingly.
resChan <- &PolicyAddResult{
newRev: newRev,
err: nil,
}

addedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
addedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
}

d.policy.Mutex.Unlock()

Expand Down Expand Up @@ -536,24 +571,42 @@ func (d *Daemon) policyDelete(labels labels.LabelArray, opts *policy.DeleteOptio

endpointsToRegen := policy.NewEndpointSet(nil)

deletedRules, rev, deleted := d.policy.DeleteByLabelsLocked(labels)
var deleted int
var rev uint64
var prefixes []netip.Prefix

// Return an error if a label filter was provided and there are no
// rules matching it. A deletion request for all policy entries should
// not fail if no policies are loaded.
if len(deletedRules) == 0 && len(labels) != 0 {
rev := d.policy.GetRevision()
d.policy.Mutex.Unlock()
if opts.DeleteByResource && len(opts.Resource) > 0 {
deletedRules, newRev := d.policy.DeleteByResourceLocked(opts.Resource)
rev = newRev
deleted = len(deletedRules)

err := api.New(DeletePolicyNotFoundCode, "policy not found")
deletedRules.UpdateRulesEndpointsCaches(epsToBumpRevision, endpointsToRegen, &policySelectionWG)
prefixes = policy.GetCIDRPrefixes(deletedRules.AsPolicyRules())
} else {

res <- &PolicyDeleteResult{
newRev: rev,
err: err,
deletedRules, newRev, _ := d.policy.DeleteByLabelsLocked(labels)
rev = newRev
deleted = len(deletedRules)

// Return an error if a label filter was provided and there are no
// rules matching it. A deletion request for all policy entries should
// not fail if no policies are loaded.
if len(deletedRules) == 0 && len(labels) != 0 {
rev := d.policy.GetRevision()
d.policy.Mutex.Unlock()

err := api.New(DeletePolicyNotFoundCode, "policy not found")

res <- &PolicyDeleteResult{
newRev: rev,
err: err,
}
return
}
return

deletedRules.UpdateRulesEndpointsCaches(epsToBumpRevision, endpointsToRegen, &policySelectionWG)
prefixes = policy.GetCIDRPrefixes(deletedRules.AsPolicyRules())
}
deletedRules.UpdateRulesEndpointsCaches(epsToBumpRevision, endpointsToRegen, &policySelectionWG)

res <- &PolicyDeleteResult{
newRev: rev,
Expand All @@ -568,7 +621,6 @@ func (d *Daemon) policyDelete(labels labels.LabelArray, opts *policy.DeleteOptio
// We don't treat failures to clean up identities as API failures,
// because the policy can still successfully be updated. We're just
// not appropriately performing garbage collection.
prefixes := policy.GetCIDRPrefixes(deletedRules.AsPolicyRules())
log.WithField("prefixes", prefixes).Debug("Policy deleted via API, found prefixes...")

// Updates to the datapath are serialized via the policy reaction queue.
Expand Down
9 changes: 9 additions & 0 deletions pkg/policy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type AddOptions struct {
// ReplaceWithLabels if present indicates that existing rules with the
// given LabelArray should be deleted.
ReplaceWithLabels labels.LabelArray

// Generated should be set as true to signalize a the policy being inserted
// was generated by cilium-agent, e.g. dns poller.
Generated bool
Expand All @@ -58,6 +59,10 @@ type AddOptions struct {
// Resource provides the object ID for the underlying object that backs
// this information from 'source'.
Resource ipcacheTypes.ResourceID

// ReplaceByResource indicates the policy repository should replace any
// rules owned by the given Resource with the new set of rules
ReplaceByResource bool
}

// DeleteOptions are options which can be passed to PolicyDelete
Expand All @@ -68,4 +73,8 @@ type DeleteOptions struct {
// Resource provides the object ID for the underlying object that backs
// this information from 'source'.
Resource ipcacheTypes.ResourceID

// DeleteByResource should be true if the resource should be used to identify
// which rules should be deleted.
DeleteByResource bool
}
3 changes: 2 additions & 1 deletion pkg/policy/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func FuzzResolveEgressPolicy(f *testing.F) {
}
rule := &rule{Rule: r}
state := traceState{}
_, _ = rule.resolveEgressPolicy(testPolicyContext, fromBar, &state, L4PolicyMap{}, nil, nil)
td := newTestData()
squeed marked this conversation as resolved.
Show resolved Hide resolved
_, _ = rule.resolveEgressPolicy(td.testPolicyContext, fromBar, &state, L4PolicyMap{}, nil, nil)

})
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/policy/k8s/cilium_network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ func (p *policyWatcher) upsertCiliumNetworkPolicyV2(cnp *types.SlimCNP, initialR
rules, policyImportErr := cnp.Parse()
if policyImportErr == nil {
_, policyImportErr = p.policyManager.PolicyAdd(rules, &policy.AddOptions{
ReplaceWithLabels: cnp.GetIdentityLabels(),
Source: source.CustomResource,
ProcessingStartTime: initialRecvTime,
Resource: resourceID,
ReplaceByResource: true,
})
}

Expand All @@ -163,9 +163,10 @@ func (p *policyWatcher) deleteCiliumNetworkPolicyV2(cnp *types.SlimCNP, resource

scopedLog.Debug("Deleting CiliumNetworkPolicy")

_, err := p.policyManager.PolicyDelete(cnp.GetIdentityLabels(), &policy.DeleteOptions{
Source: source.CustomResource,
Resource: resourceID,
_, err := p.policyManager.PolicyDelete(nil, &policy.DeleteOptions{
Source: source.CustomResource,
Resource: resourceID,
DeleteByResource: true,
})
if err == nil {
scopedLog.Info("Deleted CiliumNetworkPolicy")
Expand Down
9 changes: 5 additions & 4 deletions pkg/policy/k8s/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ func (p *policyWatcher) addK8sNetworkPolicyV1(k8sNP *slim_networkingv1.NetworkPo
}

opts := policy.AddOptions{
Replace: true,
Source: source.Kubernetes,
Source: source.Kubernetes,
Resource: ipcacheTypes.NewResourceID(
ipcacheTypes.ResourceKindNetpol,
k8sNP.ObjectMeta.Namespace,
k8sNP.ObjectMeta.Name,
),
ReplaceByResource: true,
}
if _, err := p.policyManager.PolicyAdd(rules, &opts); err != nil {
metrics.PolicyChangeTotal.WithLabelValues(metrics.LabelValueOutcomeFail).Inc()
Expand Down Expand Up @@ -74,8 +74,9 @@ func (p *policyWatcher) deleteK8sNetworkPolicyV1(k8sNP *slim_networkingv1.Networ
logfields.K8sAPIVersion: k8sNP.TypeMeta.APIVersion,
logfields.Labels: logfields.Repr(labels),
})
if _, err := p.policyManager.PolicyDelete(labels, &policy.DeleteOptions{
Source: source.Kubernetes,
if _, err := p.policyManager.PolicyDelete(nil, &policy.DeleteOptions{
Source: source.Kubernetes,
DeleteByResource: true,
Resource: ipcacheTypes.NewResourceID(
ipcacheTypes.ResourceKindNetpol,
k8sNP.ObjectMeta.Namespace,
Expand Down
Loading
Loading