Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

identity: Make identity allocations observable #26373

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 7 additions & 3 deletions daemon/cmd/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
policyAPI "github.com/cilium/cilium/pkg/policy/api"
"github.com/cilium/cilium/pkg/safetime"
"github.com/cilium/cilium/pkg/source"
"github.com/cilium/cilium/pkg/stream"
"github.com/cilium/cilium/pkg/trigger"
)

Expand Down Expand Up @@ -81,9 +82,11 @@ type policyOut struct {
IdentityAllocator CachingIdentityAllocator
CacheIdentityAllocator cache.IdentityAllocator
RemoteIdentityWatcher clustermesh.RemoteIdentityWatcher
Repository *policy.Repository
Updater *policy.Updater
IPCache *ipcache.IPCache
IdentityObservable stream.Observable[cache.IdentityChange]

Repository *policy.Repository
Updater *policy.Updater
IPCache *ipcache.IPCache
}

// newPolicyTrifecta instantiates CachingIdentityAllocator, Repository and IPCache.
Expand Down Expand Up @@ -145,6 +148,7 @@ func newPolicyTrifecta(params policyParams) (policyOut, error) {
IdentityAllocator: idAlloc,
CacheIdentityAllocator: idAlloc,
RemoteIdentityWatcher: idAlloc,
IdentityObservable: idAlloc,
Repository: iao.policy,
Updater: policyUpdater,
IPCache: ipc,
Expand Down
6 changes: 6 additions & 0 deletions pkg/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,3 +1020,9 @@ func (rc *RemoteCache) NumEntries() int {
func (rc *RemoteCache) close() {
rc.cache.allocator.Delete()
}

// Observe the identity changes. Conforms to stream.Observable.
// Replays the current state of the cache when subscribing.
func (a *Allocator) Observe(ctx context.Context, next func(AllocatorChange), complete func(error)) {
a.mainCache.Observe(ctx, next, complete)
}
60 changes: 60 additions & 0 deletions pkg/allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"sort"
"strings"
"sync"
"testing"
"time"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/rand"
"github.com/cilium/cilium/pkg/rate"
"github.com/cilium/cilium/pkg/stream"
)

const (
Expand Down Expand Up @@ -363,6 +365,64 @@ func (s *AllocatorSuite) TestAllocateCached(c *C) {
testAllocator(c, idpool.ID(256), randomTestName(), "a") // enable use of local cache
}

func TestObserveAllocatorChanges(t *testing.T) {
backend := newDummyBackend()
allocator, err := NewAllocator(TestAllocatorKey(""), backend, WithMin(idpool.ID(1)), WithMax(idpool.ID(256)), WithoutGC())
require.NoError(t, err)
require.NotNil(t, allocator)

numAllocations := 10

// Allocate few ids
for i := 0; i < numAllocations; i++ {
key := TestAllocatorKey(fmt.Sprintf("key%04d", i))
id, new, firstUse, err := allocator.Allocate(context.Background(), key)
require.NoError(t, err)
require.NotEqual(t, 0, id)
require.True(t, new)
require.True(t, firstUse)

// refcnt must be 1
require.Equal(t, uint64(1), allocator.localKeys.keys[allocator.encodeKey(key)].refcnt)
}

// Subscribe to the changes. This should replay the current state.
ctx, cancel := context.WithCancel(context.Background())
changes := stream.ToChannel[AllocatorChange](ctx, allocator)
for i := 0; i < numAllocations; i++ {
change := <-changes
// Since these are replayed in hash map traversal order, just validate that
// the fields are set.
require.True(t, strings.HasPrefix(change.Key.String(), "key0"))
require.NotEqual(t, 0, change.ID)
require.Equal(t, AllocatorChangeUpsert, change.Kind)
}

// After replay we should see a sync event.
change := <-changes
require.Equal(t, AllocatorChangeSync, change.Kind)

// Simulate changes to the allocations via the backend
go func() {
backend.(*dummyBackend).handler.OnAdd(idpool.ID(123), TestAllocatorKey("remote"))
backend.(*dummyBackend).handler.OnDelete(idpool.ID(123), TestAllocatorKey("remote"))
}()

// Check that we observe the allocation and the deletions.
change = <-changes
require.Equal(t, AllocatorChangeUpsert, change.Kind)
require.Equal(t, TestAllocatorKey("remote"), change.Key)

change = <-changes
require.Equal(t, AllocatorChangeDelete, change.Kind)
require.Equal(t, TestAllocatorKey("remote"), change.Key)

// Cancel the subscription and verify it completes.
cancel()
_, notClosed := <-changes
require.False(t, notClosed)
}

// The following tests are currently disabled as they are not 100% reliable in
// the Jenkins CI.
// These were copied from pkg/kvstore/allocator/allocator_test.go and don't
Expand Down
66 changes: 64 additions & 2 deletions pkg/allocator/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cilium/cilium/pkg/idpool"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/stream"
)

// backendOpTimeout is the time allowed for operations sent to backends in
Expand Down Expand Up @@ -58,15 +59,21 @@ type cache struct {
// watcher is started with the conditions marked as done when the
// watcher has exited
stopWatchWg sync.WaitGroup

changeSrc stream.Observable[AllocatorChange]
emitChange func(AllocatorChange)
giorio94 marked this conversation as resolved.
Show resolved Hide resolved
completeChangeSrc func(error)
giorio94 marked this conversation as resolved.
Show resolved Hide resolved
}

func newCache(a *Allocator) cache {
return cache{
func newCache(a *Allocator) (c cache) {
c = cache{
allocator: a,
cache: idMap{},
keyCache: keyMap{},
stopChan: make(chan struct{}),
}
c.changeSrc, c.emitChange, c.completeChangeSrc = stream.Multicast[AllocatorChange]()
return
}

type waitChan chan struct{}
Expand Down Expand Up @@ -122,6 +129,8 @@ func (c *cache) OnAdd(id idpool.ID, key AllocatorKey) {
}
c.allocator.idPool.Remove(id)

c.emitChange(AllocatorChange{Kind: AllocatorChangeUpsert, ID: id, Key: key})

c.sendEvent(kvstore.EventTypeCreate, id, key)
}

Expand All @@ -138,6 +147,8 @@ func (c *cache) OnModify(id idpool.ID, key AllocatorKey) {
c.nextKeyCache[c.allocator.encodeKey(key)] = id
}

c.emitChange(AllocatorChange{Kind: AllocatorChangeUpsert, ID: id, Key: key})

c.sendEvent(kvstore.EventTypeModify, id, key)
}

Expand Down Expand Up @@ -170,6 +181,8 @@ func (c *cache) onDeleteLocked(id idpool.ID, key AllocatorKey) {
delete(c.nextCache, id)
a.idPool.Insert(id)

c.emitChange(AllocatorChange{Kind: AllocatorChangeDelete, ID: id, Key: key})
mhofstetter marked this conversation as resolved.
Show resolved Hide resolved

c.sendEvent(kvstore.EventTypeDelete, id, key)
}

Expand Down Expand Up @@ -198,6 +211,7 @@ func (c *cache) start() waitChan {
func (c *cache) stop() {
close(c.stopChan)
c.stopWatchWg.Wait()
c.completeChangeSrc(nil)
}

// drain emits a deletion event for all known IDs. It must be called after the
Expand Down Expand Up @@ -274,3 +288,51 @@ func (c *cache) numEntries() int {
defer c.mutex.RUnlock()
return len(c.nextCache)
}

type AllocatorChangeKind string

const (
AllocatorChangeSync AllocatorChangeKind = "sync"
AllocatorChangeUpsert AllocatorChangeKind = "upsert"
AllocatorChangeDelete AllocatorChangeKind = "delete"
)

type AllocatorChange struct {
Kind AllocatorChangeKind
ID idpool.ID
Key AllocatorKey
}

// Observe the allocator changes. Conforms to stream.Observable.
// Replays the current state of the cache when subscribing.
func (c *cache) Observe(ctx context.Context, next func(AllocatorChange), complete func(error)) {
// This short-lived go routine serves the purpose of replaying the current state of the cache before starting
// to observe the actual source changeSrc. ChangeSrc is backed by a stream.FuncObservable, that will start its own
// go routine. Therefore, the current go routine will stop and free the lock on the mutex after the registration.
mhofstetter marked this conversation as resolved.
Show resolved Hide resolved
go func() {
mhofstetter marked this conversation as resolved.
Show resolved Hide resolved
// Wait until initial listing has completed before
// replaying the state.
select {
case <-c.listDone:
case <-ctx.Done():
complete(ctx.Err())
return
}

c.mutex.RLock()
defer c.mutex.RUnlock()
joestringer marked this conversation as resolved.
Show resolved Hide resolved

for id, key := range c.cache {
next(AllocatorChange{Kind: AllocatorChangeUpsert, ID: id, Key: key})
}

// Emit a sync event to inform the subscriber that it has received a consistent
// initial state.
next(AllocatorChange{Kind: AllocatorChangeSync})

// And subscribe to new events. Since we held the read-lock there won't be any
// missed or duplicate events.
c.changeSrc.Observe(ctx, next, complete)
}()

}
68 changes: 68 additions & 0 deletions pkg/identity/cache/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package cache

import (
"context"
"errors"
"fmt"
"net"
"net/netip"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/metrics"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/stream"
)

var (
Expand Down Expand Up @@ -78,6 +80,9 @@ type IdentityAllocatorOwner interface {
// identities based of sets of labels, and caching information about identities
// locally.
type IdentityAllocator interface {
// Identity changes are observable.
stream.Observable[IdentityChange]

// WaitForInitialGlobalIdentities waits for the initial set of global
// security identities to have been received.
WaitForInitialGlobalIdentities(context.Context) error
Expand Down Expand Up @@ -482,3 +487,66 @@ func (m *CachingIdentityAllocator) RemoveRemoteIdentities(name string) {
m.IdentityAllocator.RemoveRemoteKVStore(name)
}
}

type IdentityChangeKind string

const (
IdentityChangeSync IdentityChangeKind = IdentityChangeKind(allocator.AllocatorChangeSync)
IdentityChangeUpsert IdentityChangeKind = IdentityChangeKind(allocator.AllocatorChangeUpsert)
IdentityChangeDelete IdentityChangeKind = IdentityChangeKind(allocator.AllocatorChangeDelete)
)

type IdentityChange struct {
Kind IdentityChangeKind
ID identity.NumericIdentity
Labels labels.Labels
}

// Observe the identity changes. Conforms to stream.Observable.
// Replays the current state of the cache when subscribing.
func (m *CachingIdentityAllocator) Observe(ctx context.Context, next func(IdentityChange), complete func(error)) {
// This short-lived go routine serves the purpose of waiting for the global identity allocator becoming ready
// before starting to observe the underlying allocator for changes.
// m.IdentityAllocator is backed by a stream.FuncObservable, that will start its own
// go routine. Therefore, the current go routine will stop and free the lock on the setupMutex after the registration.
go func() {
if err := m.WaitForInitialGlobalIdentities(ctx); err != nil {
complete(ctx.Err())
return
}

m.setupMutex.Lock()
defer m.setupMutex.Unlock()

if m.IdentityAllocator == nil {
complete(errors.New("allocator no longer initialized"))
return
}

// Observe the underlying allocator for changes and map the events to identities.
stream.Map[allocator.AllocatorChange, IdentityChange](
m.IdentityAllocator,
func(change allocator.AllocatorChange) IdentityChange {
return IdentityChange{
Kind: IdentityChangeKind(change.Kind),
ID: identity.NumericIdentity(change.ID),
Labels: mapLabels(change.Key),
}
},
).Observe(ctx, next, complete)
}()
}

func mapLabels(allocatorKey allocator.AllocatorKey) labels.Labels {
var idLabels labels.Labels = nil

if allocatorKey != nil {
idLabels = labels.Labels{}
for k, v := range allocatorKey.GetAsMap() {
label := labels.ParseLabel(k + "=" + v)
idLabels[label.Key] = label
}
}

return idLabels
}
4 changes: 4 additions & 0 deletions pkg/testutils/identity/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,7 @@ func (f *MockIdentityAllocator) ReleaseCIDRIdentitiesByID(ctx context.Context, i
func (f *MockIdentityAllocator) GetIdentityCache() cache.IdentityCache {
return f.IdentityCache
}

func (f *MockIdentityAllocator) Observe(ctx context.Context, next func(cache.IdentityChange), complete func(error)) {
go complete(nil)
}