Skip to content

Commit

Permalink
[ upstream commit e39fcae ]
Browse files Browse the repository at this point in the history
[ Backporter's notes: Ignoring changes from cilium#23064, replaced maps.Clone() with Go 1.19 compatible code ]

pkg/allocator: Improve 'Key allocation attempt failed' handling for CRD mode

In CRD mode, the Cilium agent uses CRD to create identities. After an
identity is created, the agent acquires a reference for that key. This
involves fetching the CRD from the local Kubernetes cache and checking
for an annotation applied by cilium-operator to mark the identity for
deletion. However, there may be a delay before the Cilium Identity is
cached locally, leading to the 'Key allocation attempt failed' error. This
patch ensures that we fallback to the newly allocated Cilium Identity if
it's not found in the Kubernetes cache.

Signed-off-by: André Martins <andre@cilium.io>
Signed-off-by: Anton Ippolitov <anton.ippolitov@datadoghq.com>
  • Loading branch information
aanm authored and antonipp committed Jan 5, 2024
1 parent 518add5 commit 41aa568
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 26 deletions.
2 changes: 1 addition & 1 deletion cilium/cmd/preflight_identity_crd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func migrateIdentities(ctx hive.HookContext, clientset k8sClient.Clientset, shut
})

ctx, cancel := context.WithTimeout(ctx, opTimeout)
err := crdBackend.AllocateID(ctx, id, key)
_, err := crdBackend.AllocateID(ctx, id, key)
switch {
case err != nil && k8serrors.IsAlreadyExists(err):
alreadyAllocatedKeys[id] = key
Expand Down
21 changes: 18 additions & 3 deletions pkg/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,19 @@ type Backend interface {
// error in that case is not expected to be fatal. The actual ID is obtained
// by Allocator from the local idPool, which is updated with used-IDs as the
// Backend makes calls to the handler in ListAndWatch.
AllocateID(ctx context.Context, id idpool.ID, key AllocatorKey) error
// The implementation of the backend might return an AllocatorKey that is
// a copy of 'key' with an internal reference of the backend key or, if it
// doesn't use the internal reference of the backend key it simply returns
// 'key'. In case of an error the returned 'AllocatorKey' should be nil.
AllocateID(ctx context.Context, id idpool.ID, key AllocatorKey) (AllocatorKey, error)

// AllocateIDIfLocked behaves like AllocateID but when lock is non-nil the
// operation proceeds only if it is still valid.
AllocateIDIfLocked(ctx context.Context, id idpool.ID, key AllocatorKey, lock kvstore.KVLocker) error
// The implementation of the backend might return an AllocatorKey that is
// a copy of 'key' with an internal reference of the backend key or, if it
// doesn't use the internal reference of the backend key it simply returns
// 'key'. In case of an error the returned 'AllocatorKey' should be nil.
AllocateIDIfLocked(ctx context.Context, id idpool.ID, key AllocatorKey, lock kvstore.KVLocker) (AllocatorKey, error)

// AcquireReference records that this node is using this key->ID mapping.
// This is distinct from any reference counting within this agent; only one
Expand Down Expand Up @@ -450,6 +458,13 @@ type AllocatorKey interface {
// PutKeyFromMap stores the labels in v into the key to be used later. This
// is the inverse operation to GetAsMap.
PutKeyFromMap(v map[string]string) AllocatorKey

// PutValue puts metadata inside the global identity for the given 'key' with
// the given 'value'.
PutValue(key any, value any) AllocatorKey

// Value returns the value stored in the metadata map.
Value(key any) any
}

func (a *Allocator) encodeKey(key AllocatorKey) string {
Expand Down Expand Up @@ -565,7 +580,7 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo
return 0, false, false, fmt.Errorf("Found master key after proceeding with new allocation for %s", k)
}

err = a.backend.AllocateIDIfLocked(ctx, id, key, lock)
key, err = a.backend.AllocateIDIfLocked(ctx, id, key, lock)
if err != nil {
// Creation failed. Another agent most likely beat us to allocting this
// ID, retry.
Expand Down
16 changes: 12 additions & 4 deletions pkg/allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func (d *dummyBackend) DeleteAllKeys(ctx context.Context) {
d.identities = map[idpool.ID]AllocatorKey{}
}

func (d *dummyBackend) AllocateID(ctx context.Context, id idpool.ID, key AllocatorKey) error {
func (d *dummyBackend) AllocateID(ctx context.Context, id idpool.ID, key AllocatorKey) (AllocatorKey, error) {
d.mutex.Lock()
defer d.mutex.Unlock()

if _, ok := d.identities[id]; ok {
return fmt.Errorf("identity already exists")
return nil, fmt.Errorf("identity already exists")
}

d.identities[id] = key
Expand All @@ -66,10 +66,10 @@ func (d *dummyBackend) AllocateID(ctx context.Context, id idpool.ID, key Allocat
d.handler.OnAdd(id, key)
}

return nil
return key, nil
}

func (d *dummyBackend) AllocateIDIfLocked(ctx context.Context, id idpool.ID, key AllocatorKey, lock kvstore.KVLocker) error {
func (d *dummyBackend) AllocateIDIfLocked(ctx context.Context, id idpool.ID, key AllocatorKey, lock kvstore.KVLocker) (AllocatorKey, error) {
return d.AllocateID(ctx, id, key)
}

Expand Down Expand Up @@ -194,6 +194,14 @@ func (t TestAllocatorKey) PutKeyFromMap(m map[string]string) AllocatorKey {
panic("empty map")
}

func (t TestAllocatorKey) PutValue(key any, value any) AllocatorKey {
panic("not implemented")
}

func (t TestAllocatorKey) Value(any) any {
panic("not implemented")
}

func randomTestName() string {
return rand.RandomStringWithPrefix(testPrefix, 12)
}
Expand Down
32 changes: 28 additions & 4 deletions pkg/identity/cache/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ var (
// GlobalIdentity is the structure used to store an identity
type GlobalIdentity struct {
labels.LabelArray

// metadata contains metadata that are stored for example by the backends.
metadata map[any]any
}

// GetKey encodes an Identity as string
Expand All @@ -56,15 +59,36 @@ func (gi GlobalIdentity) GetAsMap() map[string]string {

// PutKey decodes an Identity from its string representation
func (gi GlobalIdentity) PutKey(v string) allocator.AllocatorKey {
return GlobalIdentity{labels.NewLabelArrayFromSortedList(v)}
return GlobalIdentity{LabelArray: labels.NewLabelArrayFromSortedList(v)}
}

// PutKeyFromMap decodes an Identity from a map of key to value. Output
// from GetAsMap can be parsed.
// Note: NewLabelArrayFromMap will parse the ':' separated label source from
// the keys because the source parameter is ""
func (gi GlobalIdentity) PutKeyFromMap(v map[string]string) allocator.AllocatorKey {
return GlobalIdentity{labels.Map2Labels(v, "").LabelArray()}
return GlobalIdentity{LabelArray: labels.Map2Labels(v, "").LabelArray()}
}

// PutValue puts metadata inside the global identity for the given 'key' with
// the given 'value'.
func (gi GlobalIdentity) PutValue(key, value any) allocator.AllocatorKey {
newMap := map[any]any{}
if gi.metadata != nil {
for k, v := range gi.metadata {
newMap[k] = v
}
}
newMap[key] = value
return GlobalIdentity{
LabelArray: gi.LabelArray,
metadata: newMap,
}
}

// Value returns the value stored in the metadata map.
func (gi GlobalIdentity) Value(key any) any {
return gi.metadata[key]
}

// CachingIdentityAllocator manages the allocation of identities for both
Expand Down Expand Up @@ -379,7 +403,7 @@ func (m *CachingIdentityAllocator) AllocateIdentity(ctx context.Context, lbls la
return nil, false, fmt.Errorf("allocator not initialized")
}

idp, isNew, isNewLocally, err := m.IdentityAllocator.Allocate(ctx, GlobalIdentity{lbls.LabelArray()})
idp, isNew, isNewLocally, err := m.IdentityAllocator.Allocate(ctx, GlobalIdentity{LabelArray: lbls.LabelArray()})
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -446,7 +470,7 @@ func (m *CachingIdentityAllocator) Release(ctx context.Context, id *identity.Ide
// ID is no longer used locally, it may still be used by
// remote nodes, so we can't rely on the locally computed
// "lastUse".
return m.IdentityAllocator.Release(ctx, GlobalIdentity{id.LabelArray})
return m.IdentityAllocator.Release(ctx, GlobalIdentity{LabelArray: id.LabelArray})
}

// ReleaseSlice attempts to release a set of identities. It is a helper
Expand Down
2 changes: 1 addition & 1 deletion pkg/identity/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (m *CachingIdentityAllocator) LookupIdentity(ctx context.Context, lbls labe
}

lblArray := lbls.LabelArray()
id, err := m.IdentityAllocator.GetIncludeRemoteCaches(ctx, GlobalIdentity{lblArray})
id, err := m.IdentityAllocator.GetIncludeRemoteCaches(ctx, GlobalIdentity{LabelArray: lblArray})
if err != nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/identity/cache/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (l *localIdentityCache) lookupOrCreate(lbls labels.Labels, oldNID identity.
l.events <- allocator.AllocatorEvent{
Typ: kvstore.EventTypeCreate,
ID: idpool.ID(id.ID),
Key: GlobalIdentity{id.LabelArray},
Key: GlobalIdentity{LabelArray: id.LabelArray},
}
}

Expand Down
24 changes: 18 additions & 6 deletions pkg/k8s/identitybackend/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ const (

k8sPrefix = labels.LabelSourceK8s + ":"
k8sNamespaceLabelPrefix = labels.LabelSourceK8s + ":" + k8sConst.PodNamespaceMetaLabels + labels.PathDelimiter

// MetadataKeyBackendKey is the key used to store the backend key.
MetadataKeyBackendKey = 0
)

func NewCRDBackend(c CRDBackendConfiguration) (allocator.Backend, error) {
Expand Down Expand Up @@ -85,7 +88,8 @@ func sanitizeK8sLabels(old map[string]string) (selected, skipped map[string]stri
// AllocateID will create an identity CRD, thus creating the identity for this
// key-> ID mapping.
// Note: the lock field is not supported with the k8s CRD allocator.
func (c *crdBackend) AllocateID(ctx context.Context, id idpool.ID, key allocator.AllocatorKey) error {
// Returns an allocator key with the cilium identity stored in it.
func (c *crdBackend) AllocateID(ctx context.Context, id idpool.ID, key allocator.AllocatorKey) (allocator.AllocatorKey, error) {
selectedLabels, skippedLabels := sanitizeK8sLabels(key.GetAsMap())
log.WithField(logfields.Labels, skippedLabels).Info("Skipped non-kubernetes labels when labelling ciliumidentity. All labels will still be used in identity determination")

Expand All @@ -97,11 +101,14 @@ func (c *crdBackend) AllocateID(ctx context.Context, id idpool.ID, key allocator
SecurityLabels: key.GetAsMap(),
}

_, err := c.Client.CiliumV2().CiliumIdentities().Create(ctx, identity, metav1.CreateOptions{})
return err
ci, err := c.Client.CiliumV2().CiliumIdentities().Create(ctx, identity, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return key.PutValue(MetadataKeyBackendKey, ci), nil
}

func (c *crdBackend) AllocateIDIfLocked(ctx context.Context, id idpool.ID, key allocator.AllocatorKey, lock kvstore.KVLocker) error {
func (c *crdBackend) AllocateIDIfLocked(ctx context.Context, id idpool.ID, key allocator.AllocatorKey, lock kvstore.KVLocker) (allocator.AllocatorKey, error) {
return c.AllocateID(ctx, id, key)
}

Expand Down Expand Up @@ -133,7 +140,12 @@ func (c *crdBackend) AcquireReference(ctx context.Context, id idpool.ID, key all
return err
}
if !exists {
return fmt.Errorf("identity (id:%q,key:%q) does not exist", id, key)
// fall back to the key stored in the allocator key. If it's not present
// then return the error.
ci, ok = key.Value(MetadataKeyBackendKey).(*v2.CiliumIdentity)
if !ok {
return fmt.Errorf("identity (id:%q,key:%q) does not exist", id, key)
}
}
ci = ci.DeepCopy()

Expand Down Expand Up @@ -180,7 +192,7 @@ func (c *crdBackend) UpdateKey(ctx context.Context, id idpool.ID, key allocator.

if reliablyMissing {
// Recreate a missing master key
if err = c.AllocateID(ctx, id, key); err != nil {
if _, err = c.AllocateID(ctx, id, key); err != nil {
return fmt.Errorf("Unable recreate missing CRD identity %q->%q: %s", key, id, err)
}
return nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/kvstore/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,29 +127,29 @@ func (k *kvstoreBackend) DeleteAllKeys(ctx context.Context) {
}

// AllocateID allocates a key->ID mapping in the kvstore.
func (k *kvstoreBackend) AllocateID(ctx context.Context, id idpool.ID, key allocator.AllocatorKey) error {
func (k *kvstoreBackend) AllocateID(ctx context.Context, id idpool.ID, key allocator.AllocatorKey) (allocator.AllocatorKey, error) {
// create /id/<ID> and fail if it already exists
keyPath := path.Join(k.idPrefix, id.String())
keyEncoded := []byte(k.backend.Encode([]byte(key.GetKey())))
success, err := k.backend.CreateOnly(ctx, keyPath, keyEncoded, false)
if err != nil || !success {
return fmt.Errorf("unable to create master key '%s': %s", keyPath, err)
return nil, fmt.Errorf("unable to create master key '%s': %s", keyPath, err)
}

return nil
return key, nil
}

// AllocateID allocates a key->ID mapping in the kvstore.
func (k *kvstoreBackend) AllocateIDIfLocked(ctx context.Context, id idpool.ID, key allocator.AllocatorKey, lock kvstore.KVLocker) error {
func (k *kvstoreBackend) AllocateIDIfLocked(ctx context.Context, id idpool.ID, key allocator.AllocatorKey, lock kvstore.KVLocker) (allocator.AllocatorKey, error) {
// create /id/<ID> and fail if it already exists
keyPath := path.Join(k.idPrefix, id.String())
keyEncoded := []byte(k.backend.Encode([]byte(key.GetKey())))
success, err := k.backend.CreateOnlyIfLocked(ctx, keyPath, keyEncoded, false, lock)
if err != nil || !success {
return fmt.Errorf("unable to create master key '%s': %s", keyPath, err)
return nil, fmt.Errorf("unable to create master key '%s': %s", keyPath, err)
}

return nil
return key, nil
}

// AcquireReference marks that this node is using this key->ID mapping in the kvstore.
Expand Down
8 changes: 8 additions & 0 deletions pkg/kvstore/allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ func (t TestAllocatorKey) PutKeyFromMap(m map[string]string) allocator.Allocator
panic("empty map")
}

func (t TestAllocatorKey) PutValue(key any, value any) allocator.AllocatorKey {
panic("not implemented")
}

func (t TestAllocatorKey) Value(any) any {
panic("not implemented")
}

func randomTestName() string {
return rand.RandomStringWithPrefix(testPrefix, 12)
}
Expand Down

0 comments on commit 41aa568

Please sign in to comment.