Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix identity leak via FQDN selectors #17699

Merged
11 changes: 6 additions & 5 deletions daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/cilium/cilium/pkg/fqdn"
"github.com/cilium/cilium/pkg/hubble/observer"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/identity/identitymanager"
"github.com/cilium/cilium/pkg/ipam"
ipamOption "github.com/cilium/cilium/pkg/ipam/option"
Expand Down Expand Up @@ -147,7 +146,7 @@ type Daemon struct {

endpointManager *endpointmanager.EndpointManager

identityAllocator *cache.CachingIdentityAllocator
identityAllocator CachingIdentityAllocator

k8sWatcher *watchers.K8sWatcher

Expand Down Expand Up @@ -409,8 +408,9 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma
return nil, nil, fmt.Errorf("error while initializing BPF pcap recorder: %w", err)
}

d.identityAllocator = cache.NewCachingIdentityAllocator(&d)
d.policy = policy.NewPolicyRepository(d.identityAllocator.GetIdentityCache(),
d.identityAllocator = NewCachingIdentityAllocator(&d)
d.policy = policy.NewPolicyRepository(d.identityAllocator,
d.identityAllocator.GetIdentityCache(),
certificatemanager.NewManager(option.Config.CertDirectory, k8s.Client()))
d.policy.SetEnvoyRulesFunc(envoy.GetEnvoyHTTPRules)

Expand Down Expand Up @@ -831,7 +831,8 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma
// well known identities have already been initialized above.
// Ignore the channel returned by this function, as we want the global
// identity allocator to run asynchronously.
d.identityAllocator.InitIdentityAllocator(k8s.CiliumClient(), nil)
realIdentityAllocator := d.identityAllocator
realIdentityAllocator.InitIdentityAllocator(k8s.CiliumClient(), nil)

d.bootstrapClusterMesh(nodeMngr)
}
Expand Down
31 changes: 29 additions & 2 deletions daemon/cmd/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const (
dnsSourceConnection = "connection"
)

func identitiesForFQDNSelectorIPs(selectorsWithIPsToUpdate map[policyApi.FQDNSelector][]net.IP, identityAllocator *secIDCache.CachingIdentityAllocator) (map[policyApi.FQDNSelector][]*identity.Identity, map[string]*identity.Identity, error) {
func identitiesForFQDNSelectorIPs(selectorsWithIPsToUpdate map[policyApi.FQDNSelector][]net.IP, identityAllocator secIDCache.IdentityAllocator) (map[policyApi.FQDNSelector][]*identity.Identity, map[string]*identity.Identity, error) {
var err error

// Used to track identities which are allocated in calls to
Expand All @@ -67,13 +67,40 @@ func identitiesForFQDNSelectorIPs(selectorsWithIPsToUpdate map[policyApi.FQDNSel
newlyAllocatedIdentities := make(map[string]*identity.Identity)

// Allocate identities for each IPNet and then map to selector
//
// The incoming IPs may already have had corresponding identities
// allocated for them from a prior call to this function, even with the
// exact same selector. In that case, this function will then allocate
// new references to the same identities again! Ideally we would avoid
// this, but at this layer we do not know which of the IPs already has
// had a corresponding identity allocated to it via this selector code.
//
// One might be tempted to think that if the Identity shows up in
// 'newlyAllocatedIdentities' that this is newly allocated by the
// selector (hence this code is responsible for release), and that if
// an Identity is *not* part of this slice then that means the selector
// already allocated this Identity (hence this code is not responsible
// for release). However, the Identity could have been previously
// allocated by some other path like via regular CIDR policy. If that's
// the case and we tried to use 'newlyAllocatedIdentities' to determine
// when we are duplicating identity allocation from the same selector,
// and then the user deleted the CIDR policy, then we could actually
// end up cleaning up the last reference to that identity, even though
// the selector referenced here is still using it.
//
// Therefore, for now we just let the duplicate allocations go through
// here and then balance the dereferences over in the corresponding
// SelectorCache.updateFQDNSelector() call where we have access both
// to the existing set of allocated identities and the newly allocated
// set here. This way we can ensure that each identity is referenced
// exactly once from each selector that selects the identity.
for selector, selectorIPs := range selectorsWithIPsToUpdate {
log.WithFields(logrus.Fields{
"fqdnSelector": selector,
"ips": selectorIPs,
}).Debug("getting identities for IPs associated with FQDNSelector")
var currentlyAllocatedIdentities []*identity.Identity
if currentlyAllocatedIdentities, err = ipcache.AllocateCIDRsForIPs(selectorIPs, newlyAllocatedIdentities); err != nil {
if currentlyAllocatedIdentities, err = identityAllocator.AllocateCIDRsForIPs(selectorIPs, newlyAllocatedIdentities); err != nil {
identityAllocator.ReleaseSlice(context.TODO(), nil, usedIdentities)
log.WithError(err).WithField("prefixes", selectorIPs).Warn(
"failed to allocate identities for IPs")
Expand Down
164 changes: 164 additions & 0 deletions daemon/cmd/fqdn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,117 @@
package cmd

import (
"context"
"fmt"
"net"
"sync"
"time"

"github.com/cilium/cilium/pkg/allocator"
"github.com/cilium/cilium/pkg/checker"
"github.com/cilium/cilium/pkg/counter"
"github.com/cilium/cilium/pkg/endpoint"
"github.com/cilium/cilium/pkg/fqdn"
"github.com/cilium/cilium/pkg/fqdn/dns"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/k8s/client/clientset/versioned"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/policy"
"github.com/cilium/cilium/pkg/policy/api"
testidentity "github.com/cilium/cilium/pkg/testutils/identity"

. "gopkg.in/check.v1"
k8sCache "k8s.io/client-go/tools/cache"
)

type DaemonFQDNSuite struct {
d *Daemon
}

var _ = Suite(&DaemonFQDNSuite{})

type FakeRefcountingIdentityAllocator struct {
*testidentity.FakeIdentityAllocator

// We create a simple identity allocator here to validate that identity
// allocation and release are balanced.
currentID int
ipToIdentity map[string]int
identityCount counter.IntCounter
}

func NewFakeIdentityAllocator(c cache.IdentityCache) *FakeRefcountingIdentityAllocator {
return &FakeRefcountingIdentityAllocator{
FakeIdentityAllocator: testidentity.NewFakeIdentityAllocator(c),
currentID: 1000,
ipToIdentity: make(map[string]int),
identityCount: make(counter.IntCounter),
}
}

// AllocateCIDRsForIPs performs reference counting for IP/identity allocation,
// but doesn't interact with pkg/identity or pkg/ipcache.
// 'newlyAllocatedIdentities' is not properly mocked out.
//
// The resulting identities are not guaranteed to have all fields populated.
func (f *FakeRefcountingIdentityAllocator) AllocateCIDRsForIPs(IPs []net.IP, newlyAllocatedIdentities map[string]*identity.Identity) ([]*identity.Identity, error) {
result := make([]*identity.Identity, 0, len(IPs))
for _, ip := range IPs {
id, ok := f.ipToIdentity[ip.String()]
if !ok {
id = f.currentID
f.ipToIdentity[ip.String()] = id
f.currentID = id + 1
}
f.identityCount.Add(id)
cidrLabels := append([]string{}, ip.String())
result = append(result, &identity.Identity{
ID: identity.NumericIdentity(id),
CIDRLabel: labels.NewLabelsFromModel(cidrLabels),
})
}
return result, nil
}

// ReleaseCIDRIdentitiesByID performs reference counting for IP/identity
// allocation, but doesn't interact with pkg/identity or pkg/ipcache.
func (f *FakeRefcountingIdentityAllocator) ReleaseCIDRIdentitiesByID(ctx context.Context, identities []identity.NumericIdentity) {
// Leave the ipToIdentity mapping alone since we don't have enough info
// to clean it up. That's fine, it's not necessary for current testing.
for _, id := range identities {
f.identityCount.Delete(int(id))
}
}

func (f *FakeRefcountingIdentityAllocator) IdentityReferenceCounter() counter.IntCounter {
return f.identityCount
}

func (f *FakeRefcountingIdentityAllocator) Close() {
}
func (f *FakeRefcountingIdentityAllocator) InitIdentityAllocator(versioned.Interface, k8sCache.Store) <-chan struct{} {
return nil
}
func (f *FakeRefcountingIdentityAllocator) WatchRemoteIdentities(kvstore.BackendOperations) (*allocator.RemoteCache, error) {
return nil, nil
}

func (ds *DaemonFQDNSuite) SetUpTest(c *C) {
d := &Daemon{}
d.identityAllocator = NewFakeIdentityAllocator(nil)
d.policy = policy.NewPolicyRepository(d.identityAllocator, nil, nil)
d.dnsNameManager = fqdn.NewNameManager(fqdn.Config{
MinTTL: 1,
Cache: fqdn.NewDNSCache(0),
UpdateSelectors: d.updateSelectors,
})
d.endpointManager = WithCustomEndpointManager(&dummyEpSyncher{})
d.policy.GetSelectorCache().SetLocalIdentityNotifier(d.dnsNameManager)
ds.d = d
}

// makeIPs generates count sequential IPv4 IPs
func makeIPs(count uint32) []net.IP {
ips := make([]net.IP, 0, count)
Expand Down Expand Up @@ -47,3 +149,65 @@ func (ds *DaemonSuite) BenchmarkFqdnCache(c *C) {

extractDNSLookups(endpoints, "0.0.0.0/0", "*")
}

func (ds *DaemonFQDNSuite) TestFQDNIdentityReferenceCounting(c *C) {
var (
idAllocator = ds.d.identityAllocator.(*FakeRefcountingIdentityAllocator)
nameManager = ds.d.dnsNameManager
ciliumIOSel = api.FQDNSelector{MatchName: "cilium.io"}
ciliumIOSelMatchPattern = api.FQDNSelector{MatchPattern: "*cilium.io."}
ebpfIOSel = api.FQDNSelector{MatchName: "ebpf.io"}
ciliumDNSRecord = map[string]*fqdn.DNSIPRecords{
dns.FQDN("cilium.io"): {TTL: 60, IPs: []net.IP{net.ParseIP("192.0.2.3")}},
}
ebpfDNSRecord = map[string]*fqdn.DNSIPRecords{
dns.FQDN("ebpf.io"): {TTL: 60, IPs: []net.IP{net.ParseIP("192.0.2.4")}},
}
)

// add rules
selectorsToAdd := api.FQDNSelectorSlice{ciliumIOSel, ciliumIOSelMatchPattern, ebpfIOSel}
nameManager.Lock()
for _, sel := range selectorsToAdd {
ids := nameManager.RegisterForIdentityUpdatesLocked(idAllocator, sel)
c.Assert(ids, Not(IsNil))
}
nameManager.Unlock()

// poll DNS once, check that we only generate 1 IP for cilium.io
_, _, err := nameManager.UpdateGenerateDNS(context.Background(), time.Now(), ciliumDNSRecord)
c.Assert(err, IsNil, Commentf("Error mapping selectors to IPs"))
c.Assert(len(idAllocator.IdentityReferenceCounter()), Equals, 1,
Commentf("Unexpected number of identities allocated during DNS name event handler"))

// Same thing, new reference for same identity but otherwise the same.
_, _, err = nameManager.UpdateGenerateDNS(context.Background(), time.Now(), ciliumDNSRecord)
c.Assert(err, IsNil, Commentf("Error mapping selectors to IPs"))
c.Assert(len(idAllocator.IdentityReferenceCounter()), Equals, 1,
Commentf("Unexpected number of identities allocated during DNS name event handler"))

// poll DNS for ebpf.io, check that we now have two different identities referenced
_, _, err = nameManager.UpdateGenerateDNS(context.Background(), time.Now(), ebpfDNSRecord)
c.Assert(err, IsNil, Commentf("Error mapping selectors to IPs"))
c.Assert(len(idAllocator.IdentityReferenceCounter()), Equals, 2,
Commentf("Unexpected number of identities allocated during DNS name event handler"))

// Two selectors are selecting the same identity. If we remove one of
// them, then the identity should remain referenced by the other
// existing selector.
var wg sync.WaitGroup
ds.d.policy.GetSelectorCache().UpdateFQDNSelector(ciliumIOSel, nil, &wg)
wg.Wait()
c.Assert(len(idAllocator.IdentityReferenceCounter()), Equals, 2,
Commentf("Unexpected number of identities allocated during DNS name event handler"))

// Similar to FQDN garbage collection, set the list of identities that
// each selector would select to the empty set and then observe that
// the outstanding identity references are released.
for _, sel := range selectorsToAdd {
ds.d.policy.GetSelectorCache().UpdateFQDNSelector(sel, nil, &wg)
}
wg.Wait()
c.Assert(idAllocator.IdentityReferenceCounter(), checker.DeepEquals, counter.IntCounter{},
Commentf("The Daemon code leaked references to one or more identities"))
}
41 changes: 39 additions & 2 deletions daemon/cmd/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,24 @@
package cmd

import (
"context"
"net"

"github.com/cilium/cilium/api/v1/models"
. "github.com/cilium/cilium/api/v1/server/restapi/policy"
"github.com/cilium/cilium/pkg/allocator"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/identity/identitymanager"
identitymodel "github.com/cilium/cilium/pkg/identity/model"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/k8s/client/clientset/versioned"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/logging/logfields"

"github.com/go-openapi/runtime/middleware"
k8sCache "k8s.io/client-go/tools/cache"
)

type getIdentity struct {
Expand Down Expand Up @@ -43,10 +51,10 @@ func (h *getIdentity) Handle(params GetIdentityParams) middleware.Responder {
}

type getIdentityID struct {
c *cache.CachingIdentityAllocator
c cache.IdentityAllocator
}

func newGetIdentityIDHandler(c *cache.CachingIdentityAllocator) GetIdentityIDHandler {
func newGetIdentityIDHandler(c cache.IdentityAllocator) GetIdentityIDHandler {
return &getIdentityID{c: c}
}

Expand Down Expand Up @@ -79,3 +87,32 @@ func (h *getIdentityEndpoints) Handle(params GetIdentityEndpointsParams) middlew

return NewGetIdentityEndpointsOK().WithPayload(identities)
}

// CachingIdentityAllocator provides an abstraction over the concrete type in
// pkg/identity/cache so that the underlying implementation can be mocked out
// in unit tests.
type CachingIdentityAllocator interface {
cache.IdentityAllocator

InitIdentityAllocator(versioned.Interface, k8sCache.Store) <-chan struct{}
WatchRemoteIdentities(kvstore.BackendOperations) (*allocator.RemoteCache, error)
Close()
}

type cachingIdentityAllocator struct {
*cache.CachingIdentityAllocator
}

func NewCachingIdentityAllocator(d *Daemon) cachingIdentityAllocator {
return cachingIdentityAllocator{
CachingIdentityAllocator: cache.NewCachingIdentityAllocator(d),
}
}

func (c cachingIdentityAllocator) AllocateCIDRsForIPs(ips []net.IP, newlyAllocatedIdentities map[string]*identity.Identity) ([]*identity.Identity, error) {
return ipcache.AllocateCIDRsForIPs(ips, newlyAllocatedIdentities)
}

func (c cachingIdentityAllocator) ReleaseCIDRIdentitiesByID(ctx context.Context, identities []identity.NumericIdentity) {
ipcache.ReleaseCIDRIdentitiesByID(ctx, identities)
}
9 changes: 6 additions & 3 deletions daemon/cmd/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ func (d *Daemon) policyAdd(sourceRules policyAPI.Rules, opts *policy.AddOptions,
// With SelectiveRegeneration this is postponed to the rule reaction queue to be done
// after the affected endpoints have been regenerated, otherwise new identities are
// upserted to the ipcache before we return.
//
// Release of these identities will be tied to the corresponding policy
// in the policy.Repository and released upon policyDelete().
newlyAllocatedIdentities := make(map[string]*identity.Identity)
if _, err := ipcache.AllocateCIDRs(prefixes, newlyAllocatedIdentities); err != nil {
_ = d.prefixLengths.Delete(prefixes)
Expand Down Expand Up @@ -386,7 +389,7 @@ func (d *Daemon) policyAdd(sourceRules policyAPI.Rules, opts *policy.AddOptions,
// and will trigger deletions for those that are no longer used.
if len(removedPrefixes) > 0 {
logger.WithField("prefixes", removedPrefixes).Debug("Decrementing replaced CIDR refcounts when adding rules")
ipcache.ReleaseCIDRs(removedPrefixes)
ipcache.ReleaseCIDRIdentitiesByCIDR(removedPrefixes)
d.prefixLengths.Delete(removedPrefixes)
}

Expand Down Expand Up @@ -473,7 +476,7 @@ func reactToRuleUpdates(epsToBumpRevision, epsToRegen *policy.EndpointSet, rev u
// the stale identities are not used in policy map classifications after we regenerate the
// endpoints below.
if len(releasePrefixes) != 0 {
ipcache.ReleaseCIDRs(releasePrefixes)
ipcache.ReleaseCIDRIdentitiesByCIDR(releasePrefixes)
}

// Bump revision of endpoints which don't need to be regenerated.
Expand Down Expand Up @@ -652,7 +655,7 @@ func (d *Daemon) policyDelete(labels labels.LabelArray, res chan interface{}) {
log.WithError(err).WithField(logfields.PolicyRevision, rev).Error("enqueue of RuleReactionEvent failed")
}
} else {
ipcache.ReleaseCIDRs(prefixes)
ipcache.ReleaseCIDRIdentitiesByCIDR(prefixes)
d.TriggerPolicyUpdates(true, "policy rules deleted")
}

Expand Down