diff --git a/cilium/cmd/preflight_identity_crd_migrate.go b/cilium/cmd/preflight_identity_crd_migrate.go index bd5223631364..75869cf09b8d 100644 --- a/cilium/cmd/preflight_identity_crd_migrate.go +++ b/cilium/cmd/preflight_identity_crd_migrate.go @@ -167,7 +167,7 @@ func migrateIdentities() { ctx, cancel := context.WithTimeout(context.Background(), opTimeout) defer cancel() - newID, actuallyAllocated, err := crdAllocator.Allocate(ctx, key) + newID, actuallyAllocated, _, err := crdAllocator.Allocate(ctx, key) switch { case err != nil: log.WithError(err).Errorf("Cannot allocate new CRD ID for %v", key) diff --git a/pkg/allocator/allocator.go b/pkg/allocator/allocator.go index 6c731251f92f..c853c2c70cd1 100644 --- a/pkg/allocator/allocator.go +++ b/pkg/allocator/allocator.go @@ -449,13 +449,21 @@ func (a *Allocator) encodeKey(key AllocatorKey) string { return a.backend.Encode(key.GetKey()) } -func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpool.ID, bool, error) { +// Return values: +// 1. allocated ID +// 2. whether the ID is newly allocated from kvstore +// 3. whether this is the first owner that holds a reference to the key in +// localkeys store +// 4. error in case of failure +func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpool.ID, bool, bool, error) { + var firstUse bool + kvstore.Trace("Allocating key in kvstore", nil, logrus.Fields{fieldKey: key}) k := a.encodeKey(key) lock, err := a.backend.Lock(ctx, key) if err != nil { - return 0, false, err + return 0, false, false, err } defer lock.Unlock(context.Background()) @@ -464,7 +472,7 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo // node suffix value, err := a.GetIfLocked(ctx, key, lock) if err != nil { - return 0, false, err + return 0, false, false, err } kvstore.Trace("kvstore state is: ", nil, logrus.Fields{fieldID: value}) @@ -480,13 +488,19 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo if value != 0 { // re-create master key if err := a.backend.UpdateKeyIfLocked(ctx, value, key, true, lock); err != nil { - return 0, false, fmt.Errorf("unable to re-create missing master key '%s': %s while allocating ID: %s", key, value, err) + return 0, false, false, fmt.Errorf("unable to re-create missing master key '%s': %s while allocating ID: %s", key, value, err) } } } else { - _, err := a.localKeys.allocate(k, key, value) + _, firstUse, err = a.localKeys.allocate(k, key, value) if err != nil { - return 0, false, fmt.Errorf("unable to reserve local key '%s': %s", k, err) + return 0, false, false, fmt.Errorf("unable to reserve local key '%s': %s", k, err) + } + + if firstUse { + log.WithField(fieldKey, k).Info("Reserved new local key") + } else { + log.WithField(fieldKey, k).Info("Reusing existing local key") } } @@ -495,7 +509,7 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo if err = a.backend.AcquireReference(ctx, value, key, lock); err != nil { a.localKeys.release(k) - return 0, false, fmt.Errorf("unable to create slave key '%s': %s", k, err) + return 0, false, false, fmt.Errorf("unable to create slave key '%s': %s", k, err) } // mark the key as verified in the local cache @@ -503,13 +517,13 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo log.WithError(err).Error("BUG: Unable to verify local key") } - return value, false, nil + return value, false, firstUse, nil } log.WithField(fieldKey, k).Debug("Allocating new master ID") id, strID, unmaskedID := a.selectAvailableID() if id == 0 { - return 0, false, fmt.Errorf("no more available IDs in configured space") + return 0, false, false, fmt.Errorf("no more available IDs in configured space") } kvstore.Trace("Selected available key ID", nil, logrus.Fields{fieldID: id}) @@ -519,17 +533,17 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo a.idPool.Release(unmaskedID) // This returns this ID to be re-used for other keys } - oldID, err := a.localKeys.allocate(k, key, id) + oldID, firstUse, err := a.localKeys.allocate(k, key, id) if err != nil { a.idPool.Release(unmaskedID) - return 0, false, fmt.Errorf("unable to reserve local key '%s': %s", k, err) + return 0, false, false, fmt.Errorf("unable to reserve local key '%s': %s", k, err) } // Another local writer beat us to allocating an ID for the same key, // start over if id != oldID { releaseKeyAndID() - return 0, false, fmt.Errorf("another writer has allocated key %s", k) + return 0, false, false, fmt.Errorf("another writer has allocated key %s", k) } // Check that this key has not been allocated in the cluster during our @@ -537,11 +551,11 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo value, err = a.GetNoCache(ctx, key) if err != nil { releaseKeyAndID() - return 0, false, err + return 0, false, false, err } if value != 0 { releaseKeyAndID() - return 0, false, fmt.Errorf("Found master key after proceeding with new allocation for %s", k) + return 0, false, false, fmt.Errorf("Found master key after proceeding with new allocation for %s", k) } err = a.backend.AllocateIDIfLocked(ctx, id, key, lock) @@ -549,7 +563,7 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo // Creation failed. Another agent most likely beat us to allocting this // ID, retry. releaseKeyAndID() - return 0, false, fmt.Errorf("unable to allocate ID %s for key %s: %s", strID, key, err) + return 0, false, false, fmt.Errorf("unable to allocate ID %s for key %s: %s", strID, key, err) } // Notify pool that leased ID is now in-use. @@ -560,7 +574,7 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo // exposed and may be in use by other nodes. The garbage // collector will release it again. releaseKeyAndID() - return 0, false, fmt.Errorf("slave key creation failed '%s': %s", k, err) + return 0, false, false, fmt.Errorf("slave key creation failed '%s': %s", k, err) } // mark the key as verified in the local cache @@ -570,7 +584,7 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo log.WithField(fieldKey, k).Info("Allocated new global key") - return id, true, nil + return id, true, firstUse, nil } // Allocate will retrieve the ID for the provided key. If no ID has been @@ -578,14 +592,19 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo // most likely due to a parallel allocation of the same ID by another user, // allocation is re-attempted for maxAllocAttempts times. // -// Returns the ID allocated to the key, if the ID had to be allocated, then -// true is returned. An error is returned in case of failure. -func (a *Allocator) Allocate(ctx context.Context, key AllocatorKey) (idpool.ID, bool, error) { +// Return values: +// 1. allocated ID +// 2. whether the ID is newly allocated from kvstore +// 3. whether this is the first owner that holds a reference to the key in +// localkeys store +// 4. error in case of failure +func (a *Allocator) Allocate(ctx context.Context, key AllocatorKey) (idpool.ID, bool, bool, error) { var ( - err error - value idpool.ID - isNew bool - k = a.encodeKey(key) + err error + value idpool.ID + isNew bool + firstUse bool + k = a.encodeKey(key) ) log.WithField(fieldKey, key).Debug("Allocating key") @@ -593,7 +612,7 @@ func (a *Allocator) Allocate(ctx context.Context, key AllocatorKey) (idpool.ID, select { case <-a.initialListDone: case <-ctx.Done(): - return 0, false, fmt.Errorf("allocation was cancelled while waiting for initial key list to be received: %s", ctx.Err()) + return 0, false, false, fmt.Errorf("allocation was cancelled while waiting for initial key list to be received: %s", ctx.Err()) } kvstore.Trace("Allocating from kvstore", nil, logrus.Fields{fieldKey: key}) @@ -613,15 +632,15 @@ func (a *Allocator) Allocate(ctx context.Context, key AllocatorKey) (idpool.ID, if val := a.localKeys.use(k); val != idpool.NoID { kvstore.Trace("Reusing local id", nil, logrus.Fields{fieldID: val, fieldKey: key}) a.mainCache.insert(key, val) - return val, false, nil + return val, false, false, nil } // FIXME: Add non-locking variant - value, isNew, err = a.lockedAllocate(ctx, key) + value, isNew, firstUse, err = a.lockedAllocate(ctx, key) if err == nil { a.mainCache.insert(key, value) log.WithField(fieldKey, key).WithField(fieldID, value).Debug("Allocated key") - return value, isNew, nil + return value, isNew, firstUse, nil } scopedLog := log.WithFields(logrus.Fields{ @@ -632,7 +651,7 @@ func (a *Allocator) Allocate(ctx context.Context, key AllocatorKey) (idpool.ID, select { case <-ctx.Done(): scopedLog.WithError(ctx.Err()).Warning("Ongoing key allocation has been cancelled") - return 0, false, fmt.Errorf("key allocation cancelled: %s", ctx.Err()) + return 0, false, false, fmt.Errorf("key allocation cancelled: %s", ctx.Err()) default: scopedLog.WithError(err).Warning("Key allocation attempt failed") } @@ -640,11 +659,11 @@ func (a *Allocator) Allocate(ctx context.Context, key AllocatorKey) (idpool.ID, kvstore.Trace("Allocation attempt failed", err, logrus.Fields{fieldKey: key, logfields.Attempt: attempt}) if waitErr := boff.Wait(ctx); waitErr != nil { - return 0, false, waitErr + return 0, false, false, waitErr } } - return 0, false, err + return 0, false, false, err } // GetIfLocked returns the ID which is allocated to a key. Returns an ID of NoID if no ID diff --git a/pkg/allocator/allocator_test.go b/pkg/allocator/allocator_test.go index d545ed259798..740d4312edb6 100644 --- a/pkg/allocator/allocator_test.go +++ b/pkg/allocator/allocator_test.go @@ -265,10 +265,11 @@ func testAllocator(c *C, maxID idpool.ID, allocatorName string, suffix string) { // allocate all available IDs for i := idpool.ID(1); i <= maxID; i++ { key := TestAllocatorKey(fmt.Sprintf("key%04d", i)) - id, new, err := allocator.Allocate(context.Background(), key) + id, new, firstUse, err := allocator.Allocate(context.Background(), key) c.Assert(err, IsNil) c.Assert(id, Not(Equals), 0) c.Assert(new, Equals, true) + c.Assert(firstUse, Equals, true) // refcnt must be 1 c.Assert(allocator.localKeys.keys[allocator.encodeKey(key)].refcnt, Equals, uint64(1)) @@ -278,19 +279,21 @@ func testAllocator(c *C, maxID idpool.ID, allocatorName string, suffix string) { allocator.backoffTemplate.Factor = 1.0 // we should be out of id space here - _, new, err := allocator.Allocate(context.Background(), TestAllocatorKey(fmt.Sprintf("key%04d", maxID+1))) + _, new, firstUse, err := allocator.Allocate(context.Background(), TestAllocatorKey(fmt.Sprintf("key%04d", maxID+1))) c.Assert(err, Not(IsNil)) c.Assert(new, Equals, false) + c.Assert(firstUse, Equals, false) allocator.backoffTemplate.Factor = saved // allocate all IDs again using the same set of keys, refcnt should go to 2 for i := idpool.ID(1); i <= maxID; i++ { key := TestAllocatorKey(fmt.Sprintf("key%04d", i)) - id, new, err := allocator.Allocate(context.Background(), key) + id, new, firstUse, err := allocator.Allocate(context.Background(), key) c.Assert(err, IsNil) c.Assert(id, Not(Equals), 0) c.Assert(new, Equals, false) + c.Assert(firstUse, Equals, false) // refcnt must now be 2 c.Assert(allocator.localKeys.keys[allocator.encodeKey(key)].refcnt, Equals, uint64(2)) @@ -304,10 +307,11 @@ func testAllocator(c *C, maxID idpool.ID, allocatorName string, suffix string) { // allocate all IDs again using the same set of keys, refcnt should go to 2 for i := idpool.ID(1); i <= maxID; i++ { key := TestAllocatorKey(fmt.Sprintf("key%04d", i)) - id, new, err := allocator2.Allocate(context.Background(), key) + id, new, firstUse, err := allocator2.Allocate(context.Background(), key) c.Assert(err, IsNil) c.Assert(id, Not(Equals), 0) c.Assert(new, Equals, false) + c.Assert(firstUse, Equals, true) localKey := allocator2.localKeys.keys[allocator.encodeKey(key)] c.Assert(localKey, Not(IsNil)) diff --git a/pkg/allocator/localkeys.go b/pkg/allocator/localkeys.go index 251c18e42c2c..f6402f739b71 100644 --- a/pkg/allocator/localkeys.go +++ b/pkg/allocator/localkeys.go @@ -51,25 +51,28 @@ func newLocalKeys() *localKeys { // allocate creates an entry for key in localKeys if needed and increments the // refcnt. The value associated with the key must match the local cache or an // error is returned -func (lk *localKeys) allocate(keyString string, key AllocatorKey, val idpool.ID) (idpool.ID, error) { +func (lk *localKeys) allocate(keyString string, key AllocatorKey, val idpool.ID) (idpool.ID, bool, error) { lk.Lock() defer lk.Unlock() + var firstUse bool + if k, ok := lk.keys[keyString]; ok { if val != k.val { - return idpool.NoID, fmt.Errorf("local key already allocated with different value (%s != %s)", val, k.val) + return idpool.NoID, firstUse, fmt.Errorf("local key already allocated with different value (%s != %s)", val, k.val) } k.refcnt++ kvstore.Trace("Incremented local key refcnt", nil, logrus.Fields{fieldKey: keyString, fieldID: val, fieldRefCnt: k.refcnt}) - return k.val, nil + return k.val, firstUse, nil } + firstUse = true k := &localKey{key: key, val: val, refcnt: 1} lk.keys[keyString] = k lk.ids[val] = k kvstore.Trace("New local key", nil, logrus.Fields{fieldKey: keyString, fieldID: val, fieldRefCnt: 1}) - return val, nil + return val, firstUse, nil } func (lk *localKeys) verify(key string) error { diff --git a/pkg/allocator/localkeys_test.go b/pkg/allocator/localkeys_test.go index e63f85cec4d0..64fe25ed9524 100644 --- a/pkg/allocator/localkeys_test.go +++ b/pkg/allocator/localkeys_test.go @@ -30,9 +30,10 @@ func (s *AllocatorSuite) TestLocalKeys(c *C) { v := k.use(key.GetKey()) c.Assert(v, Equals, idpool.NoID) - v, err := k.allocate(key.GetKey(), key, val) // refcnt=1 + v, firstUse, err := k.allocate(key.GetKey(), key, val) // refcnt=1 c.Assert(err, IsNil) c.Assert(v, Equals, val) + c.Assert(firstUse, Equals, true) c.Assert(k.verify(key.GetKey()), IsNil) @@ -40,20 +41,22 @@ func (s *AllocatorSuite) TestLocalKeys(c *C) { c.Assert(v, Equals, val) k.release(key.GetKey()) // refcnt=1 - v, err = k.allocate(key.GetKey(), key, val) // refcnt=2 + v, firstUse, err = k.allocate(key.GetKey(), key, val) // refcnt=2 c.Assert(err, IsNil) c.Assert(v, Equals, val) + c.Assert(firstUse, Equals, false) - v, err = k.allocate(key2.GetKey(), key2, val2) // refcnt=1 + v, firstUse, err = k.allocate(key2.GetKey(), key2, val2) // refcnt=1 c.Assert(err, IsNil) c.Assert(v, Equals, val2) + c.Assert(firstUse, Equals, true) // only one of the two keys is verified yet ids := k.getVerifiedIDs() c.Assert(len(ids), Equals, 1) // allocate with different value must fail - _, err = k.allocate(key2.GetKey(), key2, val) + _, _, err = k.allocate(key2.GetKey(), key2, val) c.Assert(err, Not(IsNil)) k.release(key.GetKey()) // refcnt=1 diff --git a/pkg/identity/cache/allocator.go b/pkg/identity/cache/allocator.go index ebfcd8e3817f..ec6f26da967f 100644 --- a/pkg/identity/cache/allocator.go +++ b/pkg/identity/cache/allocator.go @@ -300,13 +300,18 @@ func (m *CachingIdentityAllocator) WaitForInitialGlobalIdentities(ctx context.Co // re-used and reference counting is performed, otherwise a new identity is // allocated via the kvstore. func (m *CachingIdentityAllocator) AllocateIdentity(ctx context.Context, lbls labels.Labels, notifyOwner bool) (id *identity.Identity, allocated bool, err error) { + isNewLocally := false + // Notify the owner of the newly added identities so that the // cached identities can be updated ASAP, rather than just // relying on the kv-store update events. defer func() { - if err == nil && allocated { - metrics.IdentityCount.Inc() - if notifyOwner { + if err == nil { + if allocated || isNewLocally { + metrics.IdentityCount.Inc() + } + + if allocated && notifyOwner { added := IdentityCache{ id.ID: id.LabelArray, } @@ -349,7 +354,7 @@ func (m *CachingIdentityAllocator) AllocateIdentity(ctx context.Context, lbls la return nil, false, fmt.Errorf("allocator not initialized") } - idp, isNew, err := m.IdentityAllocator.Allocate(ctx, GlobalIdentity{lbls.LabelArray()}) + idp, isNew, isNewLocally, err := m.IdentityAllocator.Allocate(ctx, GlobalIdentity{lbls.LabelArray()}) if err != nil { return nil, false, err } @@ -359,6 +364,7 @@ func (m *CachingIdentityAllocator) AllocateIdentity(ctx context.Context, lbls la logfields.Identity: idp, logfields.IdentityLabels: lbls.String(), "isNew": isNew, + "isNewLocally": isNewLocally, }).Debug("Resolved identity") } diff --git a/pkg/kvstore/allocator/allocator_test.go b/pkg/kvstore/allocator/allocator_test.go index c162ff935b09..d07ae28ac000 100644 --- a/pkg/kvstore/allocator/allocator_test.go +++ b/pkg/kvstore/allocator/allocator_test.go @@ -111,7 +111,7 @@ func (s *AllocatorSuite) BenchmarkAllocate(c *C) { c.ResetTimer() for i := 0; i < c.N; i++ { - _, _, err := a.Allocate(context.Background(), TestAllocatorKey(fmt.Sprintf("key%04d", i))) + _, _, _, err := a.Allocate(context.Background(), TestAllocatorKey(fmt.Sprintf("key%04d", i))) c.Assert(err, IsNil) } c.StopTimer() @@ -257,12 +257,12 @@ func (s *AllocatorSuite) TestGC(c *C) { allocator.DeleteAllKeys() shortKey := TestAllocatorKey("1;") - shortID, _, err := allocator.Allocate(context.Background(), shortKey) + shortID, _, _, err := allocator.Allocate(context.Background(), shortKey) c.Assert(err, IsNil) c.Assert(shortID, Not(Equals), 0) longKey := TestAllocatorKey("1;2;") - longID, _, err := allocator.Allocate(context.Background(), longKey) + longID, _, _, err := allocator.Allocate(context.Background(), longKey) c.Assert(err, IsNil) c.Assert(longID, Not(Equals), 0) @@ -305,19 +305,21 @@ func testAllocator(c *C, maxID idpool.ID, allocatorName string, suffix string) { // allocate all available IDs for i := idpool.ID(1); i <= maxID; i++ { key := TestAllocatorKey(fmt.Sprintf("key%04d", i)) - id, new, err := a.Allocate(context.Background(), key) + id, new, newLocally, err := a.Allocate(context.Background(), key) c.Assert(err, IsNil) c.Assert(id, Not(Equals), 0) c.Assert(new, Equals, true) + c.Assert(newLocally, Equals, true) } // allocate all IDs again using the same set of keys, refcnt should go to 2 for i := idpool.ID(1); i <= maxID; i++ { key := TestAllocatorKey(fmt.Sprintf("key%04d", i)) - id, new, err := a.Allocate(context.Background(), key) + id, new, newLocally, err := a.Allocate(context.Background(), key) c.Assert(err, IsNil) c.Assert(id, Not(Equals), 0) c.Assert(new, Equals, false) + c.Assert(newLocally, Equals, false) } // Create a 2nd allocator, refill it @@ -331,10 +333,11 @@ func testAllocator(c *C, maxID idpool.ID, allocatorName string, suffix string) { // allocate all IDs again using the same set of keys, refcnt should go to 2 for i := idpool.ID(1); i <= maxID; i++ { key := TestAllocatorKey(fmt.Sprintf("key%04d", i)) - id, new, err := a2.Allocate(context.Background(), key) + id, new, newLocally, err := a2.Allocate(context.Background(), key) c.Assert(err, IsNil) c.Assert(id, Not(Equals), 0) c.Assert(new, Equals, false) + c.Assert(newLocally, Equals, true) a2.Release(context.Background(), key) } @@ -416,10 +419,11 @@ func testGetNoCache(c *C, maxID idpool.ID, suffix string) { labelsLong := "foo;/;bar;" key := TestAllocatorKey(fmt.Sprintf("%s%010d", labelsLong, 0)) - longID, new, err := allocator.Allocate(context.Background(), key) + longID, new, newLocally, err := allocator.Allocate(context.Background(), key) c.Assert(err, IsNil) c.Assert(longID, Not(Equals), 0) c.Assert(new, Equals, true) + c.Assert(newLocally, Equals, true) observedID, err := allocator.GetNoCache(context.Background(), key) c.Assert(err, IsNil) @@ -431,10 +435,11 @@ func testGetNoCache(c *C, maxID idpool.ID, suffix string) { c.Assert(err, IsNil) c.Assert(observedID, Equals, idpool.NoID) - shortID, new, err := allocator.Allocate(context.Background(), shortKey) + shortID, new, newLocally, err := allocator.Allocate(context.Background(), shortKey) c.Assert(err, IsNil) c.Assert(shortID, Not(Equals), 0) c.Assert(new, Equals, true) + c.Assert(newLocally, Equals, true) observedID, err = allocator.GetNoCache(context.Background(), shortKey) c.Assert(err, IsNil) @@ -496,7 +501,7 @@ func (s *AllocatorSuite) TestRemoteCache(c *C) { // allocate all available IDs for i := idpool.ID(1); i <= idpool.ID(4); i++ { key := TestAllocatorKey(fmt.Sprintf("key%04d", i)) - _, _, err := a.Allocate(context.Background(), key) + _, _, _, err := a.Allocate(context.Background(), key) c.Assert(err, IsNil) }