Skip to content

Commit

Permalink
daemon: Switch policy over to new IPCache interface
Browse files Browse the repository at this point in the history
Convert the daemon over to using the new IPCache interface which handles
merging of metadata from multiple sources. As a side effect, we can
clean up some of the identity refcounting/management handling code from
the daemon in favour of the core code in pkg/ipcache.

This patch may subtly change the ordering of operations for policies
that have {to,from}CIDR{,Set} rules in them. Previously:

1. Figure out the prefixes
2. Synchronously allocate identities for the prefixes
3. Regenerate policy for all relevant endpoints
4. Trigger IPCache updates asynchronously, including
4a. Trigger SelectorCache / policy updates via IPCache
4b. Update datapath IPCache

Step (2) may now occur after 3 but will continue to occur before (4).
This means that step (3) will only prepare the SelectorCache and policy
constructs associated with the endpoints, but not associate the new
identities for the CIDRs to those policy constructs. That latter step is
deferred to the incremental "identity update" logic in (4a).

Signed-off-by: Joe Stringer <joe@cilium.io>
  • Loading branch information
joestringer authored and christarazi committed Apr 26, 2023
1 parent b6cef92 commit 2820a1f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 50 deletions.
87 changes: 44 additions & 43 deletions daemon/cmd/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"github.com/cilium/cilium/pkg/eventqueue"
"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/ipcache"
ipcacheTypes "github.com/cilium/cilium/pkg/ipcache/types"
"github.com/cilium/cilium/pkg/k8s"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/logging/logfields"
Expand Down Expand Up @@ -274,26 +274,6 @@ func (d *Daemon) policyAdd(sourceRules policyAPI.Rules, opts *policy.AddOptions,
return
}

// Any newly allocated identities MUST be upserted to the ipcache if
// no error is returned. This is postponed to the rule reaction queue
// to be done after the affected endpoints have been regenerated,
// otherwise new identities are upserted to the ipcache before we
// return.
//
// Release of these identities will be tied to the corresponding policy
// in the policy.Repository and released upon policyDelete().
newlyAllocatedIdentities := make(map[netip.Prefix]*identity.Identity)
if _, err := d.ipcache.AllocateCIDRs(prefixes, nil, newlyAllocatedIdentities); err != nil {
d.prefixLengths.Delete(prefixes)
logger.WithError(err).WithField("prefixes", prefixes).Warn(
"Failed to allocate identities for CIDRs during policy add")
resChan <- &PolicyAddResult{
newRev: 0,
err: err,
}
return
}

// No errors past this point!

d.policy.Mutex.Lock()
Expand Down Expand Up @@ -363,12 +343,9 @@ func (d *Daemon) policyAdd(sourceRules policyAPI.Rules, opts *policy.AddOptions,
metrics.PolicyImplementationDelay.WithLabelValues(source).Observe(duration.Seconds())
})

// remove prefixes of replaced rules above. Refcounts have been incremented
// above, so any decrements here will be no-ops for CIDRs that are re-added,
// and will trigger deletions for those that are no longer used.
// Remove prefixes of replaced rules above from the d.prefixLengths tracker.
if len(removedPrefixes) > 0 {
logger.WithField("prefixes", removedPrefixes).Debug("Decrementing replaced CIDR refcounts when adding rules")
d.ipcache.ReleaseCIDRIdentitiesByCIDR(removedPrefixes)
d.prefixLengths.Delete(removedPrefixes)
}

Expand Down Expand Up @@ -399,7 +376,10 @@ func (d *Daemon) policyAdd(sourceRules policyAPI.Rules, opts *policy.AddOptions,
epsToBumpRevision: endpointsToBumpRevision,
endpointsToRegen: endpointsToRegen,
newRev: newRev,
upsertIdentities: newlyAllocatedIdentities,
upsertPrefixes: prefixes,
releasePrefixes: removedPrefixes,
source: opts.Source,
resource: opts.Resource,
}

ev := eventqueue.NewEvent(r)
Expand All @@ -423,16 +403,18 @@ type PolicyReactionEvent struct {
epsToBumpRevision *policy.EndpointSet
endpointsToRegen *policy.EndpointSet
newRev uint64
upsertIdentities map[netip.Prefix]*identity.Identity // deferred CIDR identity upserts, if any
releasePrefixes []netip.Prefix // deferred CIDR identity deletes, if any
upsertPrefixes []netip.Prefix
releasePrefixes []netip.Prefix
source source.Source
resource ipcacheTypes.ResourceID
}

// Handle implements pkg/eventqueue/EventHandler interface.
func (r *PolicyReactionEvent) Handle(res chan interface{}) {
// Wait until we have calculated which endpoints need to be selected
// across multiple goroutines.
r.wg.Wait()
r.d.reactToRuleUpdates(r.epsToBumpRevision, r.endpointsToRegen, r.newRev, r.upsertIdentities, r.releasePrefixes)
r.reactToRuleUpdates(r.epsToBumpRevision, r.endpointsToRegen, r.newRev, r.upsertPrefixes, r.releasePrefixes)
}

// reactToRuleUpdates does the following:
Expand All @@ -441,14 +423,23 @@ func (r *PolicyReactionEvent) Handle(res chan interface{}) {
// in allEps, to revision rev.
// - wait for the regenerations to be finished
// - upsert or delete CIDR identities to the ipcache, as needed.
func (d *Daemon) reactToRuleUpdates(epsToBumpRevision, epsToRegen *policy.EndpointSet, rev uint64, upsertIdentities map[netip.Prefix]*identity.Identity, releasePrefixes []netip.Prefix) {
func (r *PolicyReactionEvent) reactToRuleUpdates(epsToBumpRevision, epsToRegen *policy.EndpointSet, rev uint64, upsertPrefixes, releasePrefixes []netip.Prefix) {
var enqueueWaitGroup sync.WaitGroup

// Release CIDR identities before regenerations have been started, if any. This makes sure
// the stale identities are not used in policy map classifications after we regenerate the
// endpoints below.
// Asynchronously remove the CIDRs from the IPCache, potentially
// causing release of the corresponding identities if now unused.
// We can proceed with policy regeneration for endpoints even without
// ensuring that the ipcache is updated because:
// - If another policy still selects the CIDR, the corresponding
// identity will remain live due to the other CIDR. Policy update
// is a no-op for that CIDR.
// - If the policy being deleted is the last policy referring to this
// CIDR, then the policy rules will be updated to remove the allow
// for the CIDR below. The traffic would begin to be dropped after
// this operation completes regardless of whether the BPF ipcache or
// policymap gets updated first, so the ordering is not consequential.
if len(releasePrefixes) != 0 {
d.ipcache.ReleaseCIDRIdentitiesByCIDR(releasePrefixes)
r.d.ipcache.RemovePrefixes(releasePrefixes, r.source, r.resource)
}

// Bump revision of endpoints which don't need to be regenerated.
Expand All @@ -460,6 +451,15 @@ func (d *Daemon) reactToRuleUpdates(epsToBumpRevision, epsToRegen *policy.Endpoi
})

// Regenerate all other endpoints.
//
// This recalculates the policy for the endpoints, taking into account
// the latest changes from this event. Any references to new CIDRs
// will be processed to determine the selectors for those CIDRs and
// prepare the SelectorCache for the CIDR identites. However, at this
// point the CIDR identities may not yet exist. They'll be created in
// ipcache.UpsertPrefixes() below, which will separately update the
// SelectorCache and plumb the datapath for the corresponding BPF
// policymap and ipcache map entries.
regenMetadata := &regeneration.ExternalRegenerationMetadata{
Reason: "policy rules added",
RegenerationLevel: regeneration.RegenerateWithoutDatapath,
Expand All @@ -479,11 +479,11 @@ func (d *Daemon) reactToRuleUpdates(epsToBumpRevision, epsToRegen *policy.Endpoi

enqueueWaitGroup.Wait()

// Upsert new identities after regeneration has completed, if any. This makes sure the
// policy maps are ready to classify packets using the newly allocated identities before
// they are upserted to the ipcache here.
if upsertIdentities != nil {
d.ipcache.UpsertGeneratedIdentities(upsertIdentities, nil)
// Asynchronously allocate identities for new CIDRs and notify the
// SelectorCache / Endpoints to do an incremental identity update to
// the datapath maps (if necessary).
if len(upsertPrefixes) != 0 {
r.d.ipcache.UpsertPrefixes(upsertPrefixes, r.source, r.resource)
}
}

Expand Down Expand Up @@ -589,17 +589,18 @@ func (d *Daemon) policyDelete(labels labels.LabelArray, opts *policy.DeleteOptio

d.prefixLengths.Delete(prefixes)

// Releasing prefixes from ipcache is serialized with the corresponding
// ipcache upserts via the policy reaction queue. Execution order
// w.r.t. to endpoint regenerations remains the same, endpoints are
// regenerated after any prefixes have been removed from the ipcache.
// Updates to the datapath are serialized via the policy reaction queue.
// This way there is a canonical ordering for policy updates and hence
// the subsequent Endpoint regenerations and ipcache updates.
r := &PolicyReactionEvent{
d: d,
wg: &policySelectionWG,
epsToBumpRevision: epsToBumpRevision,
endpointsToRegen: endpointsToRegen,
newRev: rev,
releasePrefixes: prefixes,
source: opts.Source,
resource: opts.Resource,
}

ev := eventqueue.NewEvent(r)
Expand Down
16 changes: 9 additions & 7 deletions daemon/cmd/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ var (
testQAEndpointID = uint16(1)
testProdEndpointID = uint16(2)

policyAddOptions = &policy.AddOptions{}

regenerationMetadata = &regeneration.ExternalRegenerationMetadata{
Reason: "test",
RegenerationLevel: regeneration.RegenerateWithoutDatapath,
Expand Down Expand Up @@ -294,7 +296,7 @@ func (ds *DaemonSuite) TestUpdateConsumerMap(c *C) {

ds.d.l7Proxy.RemoveAllNetworkPolicies()

_, err3 := ds.d.PolicyAdd(rules, nil)
_, err3 := ds.d.PolicyAdd(rules, policyAddOptions)
c.Assert(err3, Equals, nil)

// Prepare the identities necessary for testing
Expand Down Expand Up @@ -468,7 +470,7 @@ func (ds *DaemonSuite) TestL4_L7_Shadowing(c *C) {

ds.d.l7Proxy.RemoveAllNetworkPolicies()

_, err = ds.d.PolicyAdd(rules, nil)
_, err = ds.d.PolicyAdd(rules, policyAddOptions)
c.Assert(err, Equals, nil)

// Prepare endpoints
Expand Down Expand Up @@ -552,7 +554,7 @@ func (ds *DaemonSuite) TestL4_L7_ShadowingShortCircuit(c *C) {

ds.d.l7Proxy.RemoveAllNetworkPolicies()

_, err = ds.d.PolicyAdd(rules, nil)
_, err = ds.d.PolicyAdd(rules, policyAddOptions)
c.Assert(err, Equals, nil)

// Prepare endpoints
Expand Down Expand Up @@ -639,7 +641,7 @@ func (ds *DaemonSuite) TestL3_dependent_L7(c *C) {

ds.d.l7Proxy.RemoveAllNetworkPolicies()

_, err = ds.d.PolicyAdd(rules, nil)
_, err = ds.d.PolicyAdd(rules, policyAddOptions)
c.Assert(err, Equals, nil)

// Prepare endpoints
Expand Down Expand Up @@ -712,7 +714,7 @@ func (ds *DaemonSuite) TestReplacePolicy(c *C) {
},
}

_, err := ds.d.PolicyAdd(rules, nil)
_, err := ds.d.PolicyAdd(rules, policyAddOptions)
c.Assert(err, IsNil)
ds.d.policy.Mutex.RLock()
c.Assert(len(ds.d.policy.SearchRLocked(lbls)), Equals, 2)
Expand Down Expand Up @@ -814,7 +816,7 @@ func (ds *DaemonSuite) TestRemovePolicy(c *C) {

ds.d.l7Proxy.RemoveAllNetworkPolicies()

_, err3 := ds.d.PolicyAdd(rules, nil)
_, err3 := ds.d.PolicyAdd(rules, policyAddOptions)
c.Assert(err3, Equals, nil)

cleanup, err2 := prepareEndpointDirs()
Expand Down Expand Up @@ -899,7 +901,7 @@ func (ds *DaemonSuite) TestIncrementalPolicy(c *C) {

ds.d.l7Proxy.RemoveAllNetworkPolicies()

_, err3 := ds.d.PolicyAdd(rules, nil)
_, err3 := ds.d.PolicyAdd(rules, policyAddOptions)
c.Assert(err3, Equals, nil)

cleanup, err2 := prepareEndpointDirs()
Expand Down

0 comments on commit 2820a1f

Please sign in to comment.