Skip to content

Commit

Permalink
Bulk API Keys update (#1779)
Browse files Browse the repository at this point in the history
Bulk API Keys update (#1779)
  • Loading branch information
michalpristas committed Sep 20, 2022
1 parent 4a69b63 commit 46ac14b
Show file tree
Hide file tree
Showing 15 changed files with 617 additions and 41 deletions.
64 changes: 62 additions & 2 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
"github.com/elastic/fleet-server/v7/internal/pkg/smap"
"github.com/pkg/errors"

"github.com/julienschmidt/httprouter"
Expand Down Expand Up @@ -349,15 +350,49 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
return nil
}

if agent.DefaultAPIKeyID != "" {
res, err := ack.bulk.APIKeyRead(ctx, agent.DefaultAPIKeyID, true)
if err != nil {
zlog.Error().
Err(err).
Str(LogAPIKeyID, agent.DefaultAPIKeyID).
Msg("Failed to read API Key roles")
} else {
clean, removedRolesCount, err := cleanRoles(res.RoleDescriptors)
if err != nil {
zlog.Error().
Err(err).
RawJSON("roles", res.RoleDescriptors).
Str(LogAPIKeyID, agent.DefaultAPIKeyID).
Msg("Failed to cleanup roles")
} else if removedRolesCount > 0 {
if err := ack.bulk.APIKeyUpdate(ctx, agent.DefaultAPIKeyID, agent.PolicyOutputPermissionsHash, clean); err != nil {
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, agent.DefaultAPIKeyID).Msg("Failed to update API Key")
} else {
zlog.Debug().
Str("hash.sha256", agent.PolicyOutputPermissionsHash).
Str(LogAPIKeyID, agent.DefaultAPIKeyID).
RawJSON("roles", clean).
Int("removedRoles", removedRolesCount).
Msg("Updating agent record to pick up reduced roles.")
}
}
}
}

sz := len(agent.DefaultAPIKeyHistory)
if sz > 0 {
ids := make([]string, sz)
for i := 0; i < sz; i++ {
if agent.DefaultAPIKeyHistory[i].ID == agent.DefaultAPIKeyID {
// already updated
continue
}
ids[i] = agent.DefaultAPIKeyHistory[i].ID
}
log.Info().Strs("ids", ids).Msg("Invalidate old API keys")
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
log.Warn().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
}

Expand All @@ -376,7 +411,7 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
bulk.WithRetryOnConflict(3),
)

zlog.Info().Err(err).
zlog.Err(err).
Str(LogPolicyID, agent.PolicyID).
Int64("policyRevision", currRev).
Int64("policyCoordinator", currCoord).
Expand All @@ -385,6 +420,31 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
return errors.Wrap(err, "handlePolicyChange update")
}

func cleanRoles(roles json.RawMessage) (json.RawMessage, int, error) {
rr := smap.Map{}
if err := json.Unmarshal(roles, &rr); err != nil {
return nil, 0, errors.Wrap(err, "failed to unmarshal provided roles")
}

keys := make([]string, 0, len(rr))
for k := range rr {
if strings.HasSuffix(k, "-rdstale") {
keys = append(keys, k)
}
}

if len(keys) == 0 {
return roles, 0, nil
}

for _, k := range keys {
delete(rr, k)
}

r, err := json.Marshal(rr)
return r, len(keys), errors.Wrap(err, "failed to marshal resulting role definition")
}

func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error {
apiKeys := _getAPIKeyIDs(agent)
if len(apiKeys) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/api/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func invalidateAPIKey(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk
LOOP:
for {

_, err := bulker.APIKeyRead(ctx, apikeyID)
_, err := bulker.APIKeyRead(ctx, apikeyID, false)

switch {
case err == nil:
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/apikey/apikey_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestCreateAPIKeyWithMetadata(t *testing.T) {
}

// Get the key and verify that metadata was saved correctly
aKeyMeta, err := Read(ctx, es, akey.ID)
aKeyMeta, err := Read(ctx, es, akey.ID, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -80,7 +80,7 @@ func TestCreateAPIKeyWithMetadata(t *testing.T) {
}

// Try to get the key that doesn't exists, expect ErrApiKeyNotFound
_, err = Read(ctx, es, "0000000000000")
_, err = Read(ctx, es, "0000000000000", false)
if !errors.Is(err, ErrAPIKeyNotFound) {
t.Errorf("Unexpected error type: %v", err)
}
Expand Down
20 changes: 13 additions & 7 deletions internal/pkg/apikey/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ import (

// APIKetMetadata tracks Metadata associated with an APIKey.
type APIKeyMetadata struct {
ID string
Metadata Metadata
ID string
Metadata Metadata
RoleDescriptors json.RawMessage
}

// Read gathers APIKeyMetadata from Elasticsearch using the given client.
func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKeyMetadata, error) {
func Read(ctx context.Context, client *elasticsearch.Client, id string, withOwner bool) (*APIKeyMetadata, error) {

opts := []func(*esapi.SecurityGetAPIKeyRequest){
client.Security.GetAPIKey.WithContext(ctx),
client.Security.GetAPIKey.WithID(id),
}
if withOwner {
opts = append(opts, client.Security.GetAPIKey.WithOwner(true))
}

res, err := client.Security.GetAPIKey(
opts...,
Expand All @@ -42,8 +46,9 @@ func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKey
}

type APIKeyResponse struct {
ID string `json:"id"`
Metadata Metadata `json:"metadata"`
ID string `json:"id"`
Metadata Metadata `json:"metadata"`
RoleDescriptors json.RawMessage `json:"role_descriptors"`
}
type GetAPIKeyResponse struct {
APIKeys []APIKeyResponse `json:"api_keys"`
Expand All @@ -62,7 +67,8 @@ func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKey
first := resp.APIKeys[0]

return &APIKeyMetadata{
ID: first.ID,
Metadata: first.Metadata,
ID: first.ID,
Metadata: first.Metadata,
RoleDescriptors: first.RoleDescriptors,
}, nil
}
2 changes: 2 additions & 0 deletions internal/pkg/bulk/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
ActionDelete
ActionIndex
ActionUpdate
ActionUpdateAPIKey
ActionRead
ActionSearch
ActionFleetSearch
Expand All @@ -53,6 +54,7 @@ var actionStrings = []string{
"delete",
"index",
"update",
"update_api_key",
"read",
"search",
"fleet_search",
Expand Down
8 changes: 7 additions & 1 deletion internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ type Bulk interface {

// APIKey operations
APIKeyCreate(ctx context.Context, name, ttl string, roles []byte, meta interface{}) (*APIKey, error)
APIKeyRead(ctx context.Context, id string) (*APIKeyMetadata, error)
APIKeyRead(ctx context.Context, id string, withOwner bool) (*APIKeyMetadata, error)
APIKeyAuth(ctx context.Context, key APIKey) (*SecurityInfo, error)
APIKeyInvalidate(ctx context.Context, ids ...string) error
APIKeyUpdate(ctx context.Context, id, outputPolicyHash string, roles []byte) error

// Accessor used to talk to elastic search direcly bypassing bulk engine
Client() *elasticsearch.Client
Expand All @@ -81,6 +82,7 @@ const (
defaultMaxPending = 32
defaultBlockQueueSz = 32 // Small capacity to allow multiOp to spin fast
defaultAPIKeyMaxParallel = 32
defaultApikeyMaxReqSize = 100 * 1024 * 1024
)

func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker {
Expand Down Expand Up @@ -136,6 +138,8 @@ func blkToQueueType(blk *bulkT) queueType {
} else {
queueIdx = kQueueRead
}
case ActionUpdateAPIKey:
queueIdx = kQueueAPIKeyUpdate
default:
if forceRefresh {
queueIdx = kQueueRefreshBulk
Expand Down Expand Up @@ -288,6 +292,8 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu
err = b.flushRead(ctx, queue)
case kQueueSearch, kQueueFleetSearch:
err = b.flushSearch(ctx, queue)
case kQueueAPIKeyUpdate:
err = b.flushUpdateAPIKey(ctx, queue)
default:
err = b.flushBulk(ctx, queue)
}
Expand Down

0 comments on commit 46ac14b

Please sign in to comment.