-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
agent: handle dependencies between cached leases during persistent storage restore #12765
Changes from all commits
72b29d4
6445d0b
fe53e0b
32c7b33
289a7c3
a9a3692
87b750d
d59a6c6
37582b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
```release-note:improvement | ||
agent/cache: Process persistent cache leases in dependency order during restore to ensure child leases are always correctly restored | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -356,6 +356,7 @@ func (c *LeaseCache) Send(ctx context.Context, req *SendRequest) (*SendResponse, | |
|
||
index.Lease = secret.LeaseID | ||
index.LeaseToken = req.Token | ||
index.RequestTokenIndexID = entry.ID | ||
|
||
index.Type = cacheboltdb.SecretLeaseType | ||
|
||
|
@@ -381,6 +382,7 @@ func (c *LeaseCache) Send(ctx context.Context, req *SendRequest) (*SendResponse, | |
parentCtx = entry.RenewCtxInfo.Ctx | ||
|
||
index.TokenParent = req.Token | ||
index.RequestTokenIndexID = entry.ID | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I see what you mean, this |
||
} | ||
|
||
renewCtxInfo = c.createCtxInfo(parentCtx) | ||
|
@@ -556,7 +558,9 @@ func computeIndexID(req *SendRequest) (string, error) { | |
|
||
// Append req.Token into the byte slice. This is needed since auto-auth'ed | ||
// requests sets the token directly into SendRequest.Token | ||
b.Write([]byte(req.Token)) | ||
if _, err := b.Write([]byte(req.Token)); err != nil { | ||
return "", fmt.Errorf("error writing token to hash input: %w", err) | ||
} | ||
|
||
return hex.EncodeToString(cryptoutil.Blake2b256Hash(string(b.Bytes()))), nil | ||
} | ||
|
@@ -966,101 +970,145 @@ func (c *LeaseCache) Flush() error { | |
return nil | ||
} | ||
|
||
type indexAndChannel struct { | ||
index *cachememdb.Index | ||
ch chan struct{} | ||
} | ||
|
||
type leaseMaps struct { | ||
// Maps of index ID -> index and channel | ||
authLeases map[string]indexAndChannel | ||
secretLeases map[string]indexAndChannel | ||
} | ||
|
||
// Restore loads the cachememdb from the persistent storage passed in. Loads | ||
// tokens first, since restoring a lease's renewal context and watcher requires | ||
// looking up the token in the cachememdb. | ||
func (c *LeaseCache) Restore(ctx context.Context, storage *cacheboltdb.BoltStorage) error { | ||
var errors *multierror.Error | ||
return c.restoreWithChannel(ctx, storage, nil) | ||
} | ||
|
||
func (c *LeaseCache) restoreWithChannel(ctx context.Context, storage *cacheboltdb.BoltStorage, ch chan *cachememdb.Index) error { | ||
var errs *multierror.Error | ||
|
||
// Process tokens first | ||
tokens, err := storage.GetByType(ctx, cacheboltdb.TokenType) | ||
if err != nil { | ||
errors = multierror.Append(errors, err) | ||
errs = multierror.Append(errs, err) | ||
} else { | ||
if err := c.restoreTokens(tokens); err != nil { | ||
errors = multierror.Append(errors, err) | ||
errs = multierror.Append(errs, err) | ||
} | ||
} | ||
|
||
// Then process auth leases | ||
authLeases, err := storage.GetByType(ctx, cacheboltdb.AuthLeaseType) | ||
if err != nil { | ||
errors = multierror.Append(errors, err) | ||
} else { | ||
if err := c.restoreLeases(authLeases); err != nil { | ||
errors = multierror.Append(errors, err) | ||
} | ||
leaseMaps := &leaseMaps{ | ||
authLeases: make(map[string]indexAndChannel), | ||
secretLeases: make(map[string]indexAndChannel), | ||
} | ||
|
||
// Then process secret leases | ||
secretLeases, err := storage.GetByType(ctx, cacheboltdb.SecretLeaseType) | ||
if err != nil { | ||
errors = multierror.Append(errors, err) | ||
} else { | ||
if err := c.restoreLeases(secretLeases); err != nil { | ||
errors = multierror.Append(errors, err) | ||
// Fetch all the auth and secret leases we'll process upfront. | ||
// Doing so allows us to process dependencies between the leases if there are any. | ||
// Any lease that depends on another can wait for its parent to be restored before | ||
// being processed itself. | ||
// This algorithm requires that the graph of dependencies is acyclic. | ||
for _, leaseType := range []struct { | ||
name string | ||
m map[string]indexAndChannel | ||
}{ | ||
{cacheboltdb.AuthLeaseType, leaseMaps.authLeases}, | ||
{cacheboltdb.SecretLeaseType, leaseMaps.secretLeases}, | ||
} { | ||
leases, err := storage.GetByType(ctx, leaseType.name) | ||
if err != nil { | ||
errs = multierror.Append(errs, err) | ||
continue | ||
} | ||
|
||
for _, lease := range leases { | ||
index, err := cachememdb.Deserialize(lease) | ||
if err != nil { | ||
errs = multierror.Append(errs, err) | ||
continue | ||
} | ||
leaseType.m[index.ID] = indexAndChannel{ | ||
index: index, | ||
ch: make(chan struct{}), | ||
} | ||
} | ||
} | ||
|
||
return errors.ErrorOrNil() | ||
} | ||
// Now restore the auth and secret leases. | ||
wg := sync.WaitGroup{} | ||
mtx := sync.Mutex{} | ||
for _, leases := range []map[string]indexAndChannel{leaseMaps.authLeases, leaseMaps.secretLeases} { | ||
for id, lease := range leases { | ||
wg.Add(1) | ||
go func(id string, lease indexAndChannel) { | ||
defer wg.Done() | ||
defer close(lease.ch) | ||
|
||
func (c *LeaseCache) restoreTokens(tokens [][]byte) error { | ||
var errors *multierror.Error | ||
c.logger.Trace("processing lease", "id", id) | ||
// Check if this lease has already expired | ||
expired, err := c.hasExpired(time.Now().UTC(), lease.index) | ||
if err != nil { | ||
c.logger.Warn("failed to check if lease is expired", "id", id, "error", err) | ||
} | ||
if expired { | ||
return | ||
} | ||
|
||
for _, token := range tokens { | ||
newIndex, err := cachememdb.Deserialize(token) | ||
if err != nil { | ||
errors = multierror.Append(errors, err) | ||
continue | ||
} | ||
newIndex.RenewCtxInfo = c.createCtxInfo(nil) | ||
if err := c.db.Set(newIndex); err != nil { | ||
errors = multierror.Append(errors, err) | ||
continue | ||
if err := c.restoreLeaseRenewCtx(lease.index, leaseMaps.authLeases); err != nil { | ||
mtx.Lock() | ||
errs = multierror.Append(errs, err) | ||
mtx.Unlock() | ||
return | ||
} | ||
if err := c.db.Set(lease.index); err != nil { | ||
mtx.Lock() | ||
errs = multierror.Append(errs, err) | ||
mtx.Unlock() | ||
return | ||
} | ||
if ch != nil { | ||
ch <- lease.index | ||
} | ||
c.logger.Trace("restored lease", "id", id, "path", lease.index.RequestPath) | ||
}(id, lease) | ||
} | ||
c.logger.Trace("restored token", "id", newIndex.ID) | ||
} | ||
|
||
return errors.ErrorOrNil() | ||
wg.Wait() | ||
if ch != nil { | ||
close(ch) | ||
} | ||
|
||
return errs.ErrorOrNil() | ||
} | ||
|
||
func (c *LeaseCache) restoreLeases(leases [][]byte) error { | ||
func (c *LeaseCache) restoreTokens(tokens [][]byte) error { | ||
var errors *multierror.Error | ||
|
||
for _, lease := range leases { | ||
newIndex, err := cachememdb.Deserialize(lease) | ||
for _, token := range tokens { | ||
newIndex, err := cachememdb.Deserialize(token) | ||
if err != nil { | ||
errors = multierror.Append(errors, err) | ||
continue | ||
} | ||
|
||
// Check if this lease has already expired | ||
expired, err := c.hasExpired(time.Now().UTC(), newIndex) | ||
if err != nil { | ||
c.logger.Warn("failed to check if lease is expired", "id", newIndex.ID, "error", err) | ||
} | ||
if expired { | ||
continue | ||
} | ||
|
||
if err := c.restoreLeaseRenewCtx(newIndex); err != nil { | ||
errors = multierror.Append(errors, err) | ||
continue | ||
} | ||
newIndex.RenewCtxInfo = c.createCtxInfo(nil) | ||
if err := c.db.Set(newIndex); err != nil { | ||
errors = multierror.Append(errors, err) | ||
continue | ||
} | ||
c.logger.Trace("restored lease", "id", newIndex.ID, "path", newIndex.RequestPath) | ||
c.logger.Trace("restored token", "id", newIndex.ID) | ||
} | ||
|
||
return errors.ErrorOrNil() | ||
} | ||
|
||
// restoreLeaseRenewCtx re-creates a RenewCtx for an index object and starts | ||
// the watcher go routine | ||
func (c *LeaseCache) restoreLeaseRenewCtx(index *cachememdb.Index) error { | ||
func (c *LeaseCache) restoreLeaseRenewCtx(index *cachememdb.Index, authLeases map[string]indexAndChannel) error { | ||
if index.Response == nil { | ||
return fmt.Errorf("cached response was nil for %s", index.ID) | ||
} | ||
|
@@ -1081,6 +1129,13 @@ func (c *LeaseCache) restoreLeaseRenewCtx(index *cachememdb.Index) error { | |
var renewCtxInfo *cachememdb.ContextInfo | ||
switch { | ||
case secret.LeaseID != "": | ||
if parent, ok := authLeases[index.RequestTokenIndexID]; ok { | ||
c.logger.Trace("waiting for parent token to restore", "id", index.RequestTokenIndexID) | ||
tomhjp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
select { | ||
case <-parent.ch: | ||
} | ||
c.logger.Trace("parent token restored", "id", index.RequestTokenIndexID) | ||
} | ||
entry, err := c.db.Get(cachememdb.IndexNameToken, index.RequestToken) | ||
if err != nil { | ||
return err | ||
|
@@ -1096,6 +1151,13 @@ func (c *LeaseCache) restoreLeaseRenewCtx(index *cachememdb.Index) error { | |
case secret.Auth != nil: | ||
var parentCtx context.Context | ||
if !secret.Auth.Orphan { | ||
if parent, ok := authLeases[index.RequestTokenIndexID]; ok { | ||
c.logger.Trace("waiting for parent token to restore", "id", index.RequestTokenIndexID) | ||
select { | ||
case <-parent.ch: | ||
} | ||
c.logger.Trace("parent token restored", "id", index.RequestTokenIndexID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to care here if the Parent token was or was not restored based on expiry? |
||
} | ||
entry, err := c.db.Get(cachememdb.IndexNameToken, index.RequestToken) | ||
if err != nil { | ||
return err | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will adding this field require a schema migration for the boltdb? Maybe not a schema migration per se, but I'm wondering how this would work restoring from a previous version that didn't populate
RequestTokenIndexID
, if that's something we're going to support.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question that I didn't really address in any comments. As you point out, we could end up restoring from a previous version that didn't populate this field. In that case, the algorithm basically reverts to the same behaviour as today. The child won't be able to find its parent in the
leasesMap
, and so it won't restore in dependency order. It will just depend on luck whether the child is restored before or after the parent.We could try harder to populate existing leases in existing caches with this information (rather than the PR's current approach of just populating new leases), but I doubt it's worth the complexity given the typical lifetimes we would expect for the persistent cache file and the leases inside it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To summarise the above, I think it's probably not worth adding any schema migration logic for this specific change, but it does deserve consideration.