Skip to content

Commit

Permalink
metrics: fix negative identity count
Browse files Browse the repository at this point in the history
Identity allocation uses cache and refcnt mechanisms, if the identity info is
already in remote kvstore and localkeys store, it will just increase the refcnt,
then notify the caller that this identity is reused. The caller will then not
bump up the identity counter. However, there is a corner case that not get
handled: refcnt from 0 to 1, which will result to negative identity count in
the metrics output.

This patch fixes the problem by returning another flag to indicate whether the
identity is first-time referenced (refcnt from 0 to 1) or not. The caller then
uses this information to determine whether or not to increase the counter.

Signed-off-by: arthurchiao <arthurchiao@hotmail.com>
  • Loading branch information
ArthurChiao authored and aanm committed Jul 1, 2020
1 parent 5cfa8a8 commit 9673c48
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 57 deletions.
2 changes: 1 addition & 1 deletion cilium/cmd/preflight_identity_crd_migrate.go
Expand Up @@ -167,7 +167,7 @@ func migrateIdentities() {

ctx, cancel := context.WithTimeout(context.Background(), opTimeout)
defer cancel()
newID, actuallyAllocated, err := crdAllocator.Allocate(ctx, key)
newID, actuallyAllocated, _, err := crdAllocator.Allocate(ctx, key)
switch {
case err != nil:
log.WithError(err).Errorf("Cannot allocate new CRD ID for %v", key)
Expand Down
81 changes: 50 additions & 31 deletions pkg/allocator/allocator.go
Expand Up @@ -449,13 +449,21 @@ func (a *Allocator) encodeKey(key AllocatorKey) string {
return a.backend.Encode(key.GetKey())
}

func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpool.ID, bool, error) {
// Return values:
// 1. allocated ID
// 2. whether the ID is newly allocated from kvstore
// 3. whether this is the first owner that holds a reference to the key in
// localkeys store
// 4. error in case of failure
func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpool.ID, bool, bool, error) {
var firstUse bool

kvstore.Trace("Allocating key in kvstore", nil, logrus.Fields{fieldKey: key})

k := a.encodeKey(key)
lock, err := a.backend.Lock(ctx, key)
if err != nil {
return 0, false, err
return 0, false, false, err
}

defer lock.Unlock(context.Background())
Expand All @@ -464,7 +472,7 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo
// node suffix
value, err := a.GetIfLocked(ctx, key, lock)
if err != nil {
return 0, false, err
return 0, false, false, err
}

kvstore.Trace("kvstore state is: ", nil, logrus.Fields{fieldID: value})
Expand All @@ -480,13 +488,19 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo
if value != 0 {
// re-create master key
if err := a.backend.UpdateKeyIfLocked(ctx, value, key, true, lock); err != nil {
return 0, false, fmt.Errorf("unable to re-create missing master key '%s': %s while allocating ID: %s", key, value, err)
return 0, false, false, fmt.Errorf("unable to re-create missing master key '%s': %s while allocating ID: %s", key, value, err)
}
}
} else {
_, err := a.localKeys.allocate(k, key, value)
_, firstUse, err = a.localKeys.allocate(k, key, value)
if err != nil {
return 0, false, fmt.Errorf("unable to reserve local key '%s': %s", k, err)
return 0, false, false, fmt.Errorf("unable to reserve local key '%s': %s", k, err)
}

if firstUse {
log.WithField(fieldKey, k).Info("Reserved new local key")
} else {
log.WithField(fieldKey, k).Info("Reusing existing local key")
}
}

Expand All @@ -495,21 +509,21 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo

if err = a.backend.AcquireReference(ctx, value, key, lock); err != nil {
a.localKeys.release(k)
return 0, false, fmt.Errorf("unable to create slave key '%s': %s", k, err)
return 0, false, false, fmt.Errorf("unable to create slave key '%s': %s", k, err)
}

// mark the key as verified in the local cache
if err := a.localKeys.verify(k); err != nil {
log.WithError(err).Error("BUG: Unable to verify local key")
}

return value, false, nil
return value, false, firstUse, nil
}

log.WithField(fieldKey, k).Debug("Allocating new master ID")
id, strID, unmaskedID := a.selectAvailableID()
if id == 0 {
return 0, false, fmt.Errorf("no more available IDs in configured space")
return 0, false, false, fmt.Errorf("no more available IDs in configured space")
}

kvstore.Trace("Selected available key ID", nil, logrus.Fields{fieldID: id})
Expand All @@ -519,37 +533,37 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo
a.idPool.Release(unmaskedID) // This returns this ID to be re-used for other keys
}

oldID, err := a.localKeys.allocate(k, key, id)
oldID, firstUse, err := a.localKeys.allocate(k, key, id)
if err != nil {
a.idPool.Release(unmaskedID)
return 0, false, fmt.Errorf("unable to reserve local key '%s': %s", k, err)
return 0, false, false, fmt.Errorf("unable to reserve local key '%s': %s", k, err)
}

// Another local writer beat us to allocating an ID for the same key,
// start over
if id != oldID {
releaseKeyAndID()
return 0, false, fmt.Errorf("another writer has allocated key %s", k)
return 0, false, false, fmt.Errorf("another writer has allocated key %s", k)
}

// Check that this key has not been allocated in the cluster during our
// operation here
value, err = a.GetNoCache(ctx, key)
if err != nil {
releaseKeyAndID()
return 0, false, err
return 0, false, false, err
}
if value != 0 {
releaseKeyAndID()
return 0, false, fmt.Errorf("Found master key after proceeding with new allocation for %s", k)
return 0, false, false, fmt.Errorf("Found master key after proceeding with new allocation for %s", k)
}

err = a.backend.AllocateIDIfLocked(ctx, id, key, lock)
if err != nil {
// Creation failed. Another agent most likely beat us to allocting this
// ID, retry.
releaseKeyAndID()
return 0, false, fmt.Errorf("unable to allocate ID %s for key %s: %s", strID, key, err)
return 0, false, false, fmt.Errorf("unable to allocate ID %s for key %s: %s", strID, key, err)
}

// Notify pool that leased ID is now in-use.
Expand All @@ -560,7 +574,7 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo
// exposed and may be in use by other nodes. The garbage
// collector will release it again.
releaseKeyAndID()
return 0, false, fmt.Errorf("slave key creation failed '%s': %s", k, err)
return 0, false, false, fmt.Errorf("slave key creation failed '%s': %s", k, err)
}

// mark the key as verified in the local cache
Expand All @@ -570,30 +584,35 @@ func (a *Allocator) lockedAllocate(ctx context.Context, key AllocatorKey) (idpoo

log.WithField(fieldKey, k).Info("Allocated new global key")

return id, true, nil
return id, true, firstUse, nil
}

// Allocate will retrieve the ID for the provided key. If no ID has been
// allocated for this key yet, a key will be allocated. If allocation fails,
// most likely due to a parallel allocation of the same ID by another user,
// allocation is re-attempted for maxAllocAttempts times.
//
// Returns the ID allocated to the key, if the ID had to be allocated, then
// true is returned. An error is returned in case of failure.
func (a *Allocator) Allocate(ctx context.Context, key AllocatorKey) (idpool.ID, bool, error) {
// Return values:
// 1. allocated ID
// 2. whether the ID is newly allocated from kvstore
// 3. whether this is the first owner that holds a reference to the key in
// localkeys store
// 4. error in case of failure
func (a *Allocator) Allocate(ctx context.Context, key AllocatorKey) (idpool.ID, bool, bool, error) {
var (
err error
value idpool.ID
isNew bool
k = a.encodeKey(key)
err error
value idpool.ID
isNew bool
firstUse bool
k = a.encodeKey(key)
)

log.WithField(fieldKey, key).Debug("Allocating key")

select {
case <-a.initialListDone:
case <-ctx.Done():
return 0, false, fmt.Errorf("allocation was cancelled while waiting for initial key list to be received: %s", ctx.Err())
return 0, false, false, fmt.Errorf("allocation was cancelled while waiting for initial key list to be received: %s", ctx.Err())
}

kvstore.Trace("Allocating from kvstore", nil, logrus.Fields{fieldKey: key})
Expand All @@ -613,15 +632,15 @@ func (a *Allocator) Allocate(ctx context.Context, key AllocatorKey) (idpool.ID,
if val := a.localKeys.use(k); val != idpool.NoID {
kvstore.Trace("Reusing local id", nil, logrus.Fields{fieldID: val, fieldKey: key})
a.mainCache.insert(key, val)
return val, false, nil
return val, false, false, nil
}

// FIXME: Add non-locking variant
value, isNew, err = a.lockedAllocate(ctx, key)
value, isNew, firstUse, err = a.lockedAllocate(ctx, key)
if err == nil {
a.mainCache.insert(key, value)
log.WithField(fieldKey, key).WithField(fieldID, value).Debug("Allocated key")
return value, isNew, nil
return value, isNew, firstUse, nil
}

scopedLog := log.WithFields(logrus.Fields{
Expand All @@ -632,19 +651,19 @@ func (a *Allocator) Allocate(ctx context.Context, key AllocatorKey) (idpool.ID,
select {
case <-ctx.Done():
scopedLog.WithError(ctx.Err()).Warning("Ongoing key allocation has been cancelled")
return 0, false, fmt.Errorf("key allocation cancelled: %s", ctx.Err())
return 0, false, false, fmt.Errorf("key allocation cancelled: %s", ctx.Err())
default:
scopedLog.WithError(err).Warning("Key allocation attempt failed")
}

kvstore.Trace("Allocation attempt failed", err, logrus.Fields{fieldKey: key, logfields.Attempt: attempt})

if waitErr := boff.Wait(ctx); waitErr != nil {
return 0, false, waitErr
return 0, false, false, waitErr
}
}

return 0, false, err
return 0, false, false, err
}

// GetIfLocked returns the ID which is allocated to a key. Returns an ID of NoID if no ID
Expand Down
12 changes: 8 additions & 4 deletions pkg/allocator/allocator_test.go
Expand Up @@ -265,10 +265,11 @@ func testAllocator(c *C, maxID idpool.ID, allocatorName string, suffix string) {
// allocate all available IDs
for i := idpool.ID(1); i <= maxID; i++ {
key := TestAllocatorKey(fmt.Sprintf("key%04d", i))
id, new, err := allocator.Allocate(context.Background(), key)
id, new, firstUse, err := allocator.Allocate(context.Background(), key)
c.Assert(err, IsNil)
c.Assert(id, Not(Equals), 0)
c.Assert(new, Equals, true)
c.Assert(firstUse, Equals, true)

// refcnt must be 1
c.Assert(allocator.localKeys.keys[allocator.encodeKey(key)].refcnt, Equals, uint64(1))
Expand All @@ -278,19 +279,21 @@ func testAllocator(c *C, maxID idpool.ID, allocatorName string, suffix string) {
allocator.backoffTemplate.Factor = 1.0

// we should be out of id space here
_, new, err := allocator.Allocate(context.Background(), TestAllocatorKey(fmt.Sprintf("key%04d", maxID+1)))
_, new, firstUse, err := allocator.Allocate(context.Background(), TestAllocatorKey(fmt.Sprintf("key%04d", maxID+1)))
c.Assert(err, Not(IsNil))
c.Assert(new, Equals, false)
c.Assert(firstUse, Equals, false)

allocator.backoffTemplate.Factor = saved

// allocate all IDs again using the same set of keys, refcnt should go to 2
for i := idpool.ID(1); i <= maxID; i++ {
key := TestAllocatorKey(fmt.Sprintf("key%04d", i))
id, new, err := allocator.Allocate(context.Background(), key)
id, new, firstUse, err := allocator.Allocate(context.Background(), key)
c.Assert(err, IsNil)
c.Assert(id, Not(Equals), 0)
c.Assert(new, Equals, false)
c.Assert(firstUse, Equals, false)

// refcnt must now be 2
c.Assert(allocator.localKeys.keys[allocator.encodeKey(key)].refcnt, Equals, uint64(2))
Expand All @@ -304,10 +307,11 @@ func testAllocator(c *C, maxID idpool.ID, allocatorName string, suffix string) {
// allocate all IDs again using the same set of keys, refcnt should go to 2
for i := idpool.ID(1); i <= maxID; i++ {
key := TestAllocatorKey(fmt.Sprintf("key%04d", i))
id, new, err := allocator2.Allocate(context.Background(), key)
id, new, firstUse, err := allocator2.Allocate(context.Background(), key)
c.Assert(err, IsNil)
c.Assert(id, Not(Equals), 0)
c.Assert(new, Equals, false)
c.Assert(firstUse, Equals, true)

localKey := allocator2.localKeys.keys[allocator.encodeKey(key)]
c.Assert(localKey, Not(IsNil))
Expand Down
11 changes: 7 additions & 4 deletions pkg/allocator/localkeys.go
Expand Up @@ -51,25 +51,28 @@ func newLocalKeys() *localKeys {
// allocate creates an entry for key in localKeys if needed and increments the
// refcnt. The value associated with the key must match the local cache or an
// error is returned
func (lk *localKeys) allocate(keyString string, key AllocatorKey, val idpool.ID) (idpool.ID, error) {
func (lk *localKeys) allocate(keyString string, key AllocatorKey, val idpool.ID) (idpool.ID, bool, error) {
lk.Lock()
defer lk.Unlock()

var firstUse bool

if k, ok := lk.keys[keyString]; ok {
if val != k.val {
return idpool.NoID, fmt.Errorf("local key already allocated with different value (%s != %s)", val, k.val)
return idpool.NoID, firstUse, fmt.Errorf("local key already allocated with different value (%s != %s)", val, k.val)
}

k.refcnt++
kvstore.Trace("Incremented local key refcnt", nil, logrus.Fields{fieldKey: keyString, fieldID: val, fieldRefCnt: k.refcnt})
return k.val, nil
return k.val, firstUse, nil
}

firstUse = true
k := &localKey{key: key, val: val, refcnt: 1}
lk.keys[keyString] = k
lk.ids[val] = k
kvstore.Trace("New local key", nil, logrus.Fields{fieldKey: keyString, fieldID: val, fieldRefCnt: 1})
return val, nil
return val, firstUse, nil
}

func (lk *localKeys) verify(key string) error {
Expand Down
11 changes: 7 additions & 4 deletions pkg/allocator/localkeys_test.go
Expand Up @@ -30,30 +30,33 @@ func (s *AllocatorSuite) TestLocalKeys(c *C) {
v := k.use(key.GetKey())
c.Assert(v, Equals, idpool.NoID)

v, err := k.allocate(key.GetKey(), key, val) // refcnt=1
v, firstUse, err := k.allocate(key.GetKey(), key, val) // refcnt=1
c.Assert(err, IsNil)
c.Assert(v, Equals, val)
c.Assert(firstUse, Equals, true)

c.Assert(k.verify(key.GetKey()), IsNil)

v = k.use(key.GetKey()) // refcnt=2
c.Assert(v, Equals, val)
k.release(key.GetKey()) // refcnt=1

v, err = k.allocate(key.GetKey(), key, val) // refcnt=2
v, firstUse, err = k.allocate(key.GetKey(), key, val) // refcnt=2
c.Assert(err, IsNil)
c.Assert(v, Equals, val)
c.Assert(firstUse, Equals, false)

v, err = k.allocate(key2.GetKey(), key2, val2) // refcnt=1
v, firstUse, err = k.allocate(key2.GetKey(), key2, val2) // refcnt=1
c.Assert(err, IsNil)
c.Assert(v, Equals, val2)
c.Assert(firstUse, Equals, true)

// only one of the two keys is verified yet
ids := k.getVerifiedIDs()
c.Assert(len(ids), Equals, 1)

// allocate with different value must fail
_, err = k.allocate(key2.GetKey(), key2, val)
_, _, err = k.allocate(key2.GetKey(), key2, val)
c.Assert(err, Not(IsNil))

k.release(key.GetKey()) // refcnt=1
Expand Down
14 changes: 10 additions & 4 deletions pkg/identity/cache/allocator.go
Expand Up @@ -300,13 +300,18 @@ func (m *CachingIdentityAllocator) WaitForInitialGlobalIdentities(ctx context.Co
// re-used and reference counting is performed, otherwise a new identity is
// allocated via the kvstore.
func (m *CachingIdentityAllocator) AllocateIdentity(ctx context.Context, lbls labels.Labels, notifyOwner bool) (id *identity.Identity, allocated bool, err error) {
isNewLocally := false

// Notify the owner of the newly added identities so that the
// cached identities can be updated ASAP, rather than just
// relying on the kv-store update events.
defer func() {
if err == nil && allocated {
metrics.IdentityCount.Inc()
if notifyOwner {
if err == nil {
if allocated || isNewLocally {
metrics.IdentityCount.Inc()
}

if allocated && notifyOwner {
added := IdentityCache{
id.ID: id.LabelArray,
}
Expand Down Expand Up @@ -349,7 +354,7 @@ func (m *CachingIdentityAllocator) AllocateIdentity(ctx context.Context, lbls la
return nil, false, fmt.Errorf("allocator not initialized")
}

idp, isNew, err := m.IdentityAllocator.Allocate(ctx, GlobalIdentity{lbls.LabelArray()})
idp, isNew, isNewLocally, err := m.IdentityAllocator.Allocate(ctx, GlobalIdentity{lbls.LabelArray()})
if err != nil {
return nil, false, err
}
Expand All @@ -359,6 +364,7 @@ func (m *CachingIdentityAllocator) AllocateIdentity(ctx context.Context, lbls la
logfields.Identity: idp,
logfields.IdentityLabels: lbls.String(),
"isNew": isNew,
"isNewLocally": isNewLocally,
}).Debug("Resolved identity")
}

Expand Down

0 comments on commit 9673c48

Please sign in to comment.