Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add rate limit to asset inventory #2055

Merged
merged 6 commits into from
May 7, 2024
Merged

Conversation

orouz
Copy link
Collaborator

@orouz orouz commented Mar 24, 2024

Summary of your changes

  1. adds rate limiting to the asset inventory client. our current usage is only for ListAssets
  2. retries requests made by ListAssets client whenever they failed due to rate limiting
  3. adds a cache to the policies fetcher (using a new cache utility, as we already have 3 caches in this file)

for ListAssets, we only use the per-project quota, which is 100 per minute per consumer project. we do this because:

  1. the consumer project (the one consuming the quota) is set by the user when they run gcloud config set project <project_id> before deploying the agent. (verified with gcloud config get billing/quota_project).
  2. in both our use cases: single-account and organization-account, we always use a single quota project. we never re-define it for the user
  3. in anyway, the org quotas are 800 per minute per org and 650,000 per day per org. so the per-project quota is more restrictive than both of these, meaning we shouldn't exceed those either.
  4. we don't account for multiple cloudbeat instances running together without being synced on the quotas each of them consume (this is assumed to be an unlikely edge case we accept for now)

Screenshot/Data

test script
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	asset "cloud.google.com/go/asset/apiv1"
	"cloud.google.com/go/asset/apiv1/assetpb"
	"github.com/googleapis/gax-go"
	"golang.org/x/time/rate"
	"google.golang.org/api/iterator"
	"google.golang.org/api/option"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
)

type Iterator interface {
	Next() (*assetpb.Asset, error)
}

var RetryOnResourceExhausted = gax.WithRetry(func() gax.Retryer {
	return gax.OnCodes([]codes.Code{codes.ResourceExhausted}, gax.Backoff{
		Initial:    1 * time.Second,
		Max:        10 * time.Second,
		Multiplier: 1.2,
	})
})

type AssetsInventoryRateLimiter struct {
	methods map[string]*rate.Limiter
}

const projectQuota = 100

func NewAssetsInventoryRateLimiter() *AssetsInventoryRateLimiter {
	return &AssetsInventoryRateLimiter{
		methods: map[string]*rate.Limiter{
			"/google.cloud.asset.v1.AssetService/ListAssets": rate.NewLimiter(rate.Every(time.Minute/projectQuota), 1),
		},
	}
}

func (rl *AssetsInventoryRateLimiter) GetInterceptorDialOption() grpc.DialOption {
	return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
		limiter := rl.methods[method]
		if limiter != nil {
			limiter.Wait(ctx)
		}
		return invoker(ctx, method, req, reply, cc, opts...)
	})
}

func main() {
	ctx := context.Background()
	limiter := NewAssetsInventoryRateLimiter()
	clientA, err := asset.NewClient(ctx, option.WithGRPCDialOption(limiter.GetInterceptorDialOption()))

	if err != nil {
		log.Fatalf("failed to create client: %v", err)
	}

	var totalGot int
	var totalLost int
	start := time.Now()
	// simulate 10x the project per-minute quota by requesting some assets multiple times
	var assets []*assetpb.Asset
	for i := 0; i < projectQuota*10; i++ {
		log.Printf("Iteration: %d \n", i)
		resp := getAllAssets(clientA.ListAssets(ctx, &assetpb.ListAssetsRequest{
			Parent:      fmt.Sprintf("organizations/%s", "693506308612"),
			AssetTypes:  []string{"logging.googleapis.com/LogBucket"},
			ContentType: assetpb.ContentType_RESOURCE,
		}, RetryOnResourceExhausted))
		if resp == nil {
			totalLost++
		} else {
			totalGot++
			assets = append(assets, resp...)
		}
	}
	end := time.Now()
	log.Println("-----------------------------------------")
	log.Printf("time: %v \n", end.Sub(start))
	log.Printf("assets: %d \n", len(assets))
	log.Printf("requests lost: %d \n", totalLost)
	log.Printf("requests got: %d \n", totalGot)
}

func getAllAssets(it Iterator) []*assetpb.Asset {
	results := make([]*assetpb.Asset, 0)
	for {
		response, err := it.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return nil
		}
		results = append(results, response)
	}
	return results
}
  1. this is roughly the same code from the assets inventory provider that calls ListAssets
  2. you can change the Parent param to a different organization. make sure to re-run gcloud auth login and gcloud auth application-default login
  3. the script limits requests to 100 per minute and retries failed ones. (you can adjust the quota here)
  4. it simulates consumption of 10x the quota
  5. eventually (+10mins) returns requests got: 1000, which means it doesn't lose any requests.
  6. before this PR, the same usage would error after 100 requests and then just log the error and continue. you can remove the rl.Wait() and RetryOnResourceExhausted to see verify

Related Issues

@orouz orouz added the gcp label Mar 24, 2024
@mergify mergify bot assigned orouz Mar 24, 2024
Copy link

mergify bot commented Mar 24, 2024

This pull request does not have a backport label. Could you fix it @orouz? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v./d./d./d is the label to automatically backport to the 8./d branch. /d is the digit
    NOTE: backport-skip has been added to this pull request.

Copy link

mergify bot commented Mar 24, 2024

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b gcp_assets_inventory_rl upstream/gcp_assets_inventory_rl
git merge upstream/main
git push upstream gcp_assets_inventory_rl

@orouz orouz force-pushed the gcp_assets_inventory_rl branch 4 times, most recently from b91d3c1 to 9502f81 Compare March 31, 2024 07:38
Copy link

github-actions bot commented Mar 31, 2024

📊 Allure Report - 💚 No failures were reported.

Result Count
🟥 Failed 0
🟩 Passed 359
⬜ Skipped 33

@orouz orouz force-pushed the gcp_assets_inventory_rl branch 6 times, most recently from 68ff83e to e90006b Compare April 2, 2024 14:53
@orouz orouz marked this pull request as ready for review April 2, 2024 16:21
@orouz orouz requested a review from a team as a code owner April 2, 2024 16:21
@@ -49,6 +49,7 @@ func (g *GCP) NewBenchmark(ctx context.Context, log *logp.Logger, cfg *config.Co

return builder.New(
builder.WithBenchmarkDataProvider(bdp),
builder.WithManagerTimeout(cfg.Period),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure why we need the manager timeout at all instead of just letting it work for as long as the cycle lasts, but after this PR the GCP fetchers will be slower, going at a rate of 100 requests per minute, so for a 1000 requests, that would be 10 minutes, which could conflict with the manager timeout, which has a default of 10m, as the context would be cancelled. given that, i've changed the manager timeout for GCP to be the same as the CSPM cycle period, which is 24h.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense.
Once we update the rest of the cloud providers to have a rate limiter we should consider removing the manager timeout option and configuring all to be limited to the interval period (24h).

Copy link
Contributor

@jeniawhite jeniawhite Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should still consider a scale scenario when the resources combined with the rate limiting exceed the cycle time and make sure that we perform the work up until the end instead of sending partial cycles, but we will still need to have some sort of a upper bound limit in order to make sure that avoid "infinite cycles" (not part of this PR).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeniawhite I agree, but this seems to be a new feature we need to create. The upper bound limit could be (without significant effort) something like this: a cycle still running could postpone a maximum of N new cycles and then get canceled.

However, we should implement it as a new feature and consider the edge scenarios.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. i've opened an issue to handle this scenario - #2180

@@ -405,19 +400,21 @@ func getAssetsByProject[T any](assets []*ExtendedGcpAsset, log *logp.Logger, f T
return enrichedAssets
}

func getAllAssets(log *logp.Logger, it Iterator) []*assetpb.Asset {
func (p *Provider) getAllAssets(ctx context.Context, request *assetpb.ListAssetsRequest) []*assetpb.Asset {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've changed this function to be a method on the provider so we can use it for all fetching and log the request details we're about to make in one place, instead of sprinkling a bunch of p.log.Infof(...) every time we call getAllAssets

log.Errorf("Error fetching GCP Asset: %s", err)
return nil
p.log.Errorf("Error fetching GCP Asset: %s", err)
return results
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't part of the rate limiting bug fix, but still a fix - we used to return nil whenever we got an error, which comes from a request to the next page. but if for example we got page 1 and already populated results with data, then got an error on page 2, we still returned nil instead of the results we already got. so now we return what we already have after getting an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's great and I can see that eventually this is being appended into assets, we just need to make sure that there is no hidden logic that differentiates between nil and actually getting results.
Because we do not want to act is if everything succeeded when we have a partial response.
From what I saw in the code we act in a opportunistic sort of way, but I would consider in the future to recognize when we have partial results and potentially act on it instead of skipping the whole cycle (not in the scope of this PR).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still log the error like we did before, and the cycle continues without interruption like we did before, it just has a bit more findings to report. overall i think this change is safe, as returning nil or an empty []*assetpb.Asset is the same in the sense that we never differentiate between the two, we just don't iterate on an empty value.

Comment on lines 484 to 485
func getAncestorsAssets(ctx context.Context, ancestorsPolicies map[string][]*ExtendedGcpAsset, p *Provider, ancestors []string) []*ExtendedGcpAsset {
return lo.Flatten(lo.Map(ancestors, func(parent string, _ int) []*ExtendedGcpAsset {
if ancestorsPolicies[parent] != nil {
return ancestorsPolicies[parent]
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is called during an iteration on a user's projects and it fetches policies for each project ancestors. i've added a cache because ancestors are prone to be identical between different projects. (for example, the last ancestor item - organizations/123 will always be the same). i've tested this locally and got a lot of cache hits for organization / folders. this will reduce the number of api calls the policies fetcher is making.


// a map of asset inventory client methods and their quotas.
// see https://cloud.google.com/asset-inventory/docs/quota
var methods = map[string]*rate.Limiter{
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if at some point we'll use more methods from the assets inventory, we can add them here.

it might be better to add the rate limiting directly to the ListAssets method instead of adding it to the whole assets inventory client and only limit methods we pre-define, but i didn't find a way to do this. (the grpc.CallOption interface does not export relevant types)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so what happens in case we call a method that is not ListAssets? the interceptor is still active?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the interceptor will be called but we'll not wait, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the interceptor will be called but we'll not wait, right?

yeah the interceptor will just be a pass-through function

@orouz orouz force-pushed the gcp_assets_inventory_rl branch 3 times, most recently from 85e60a8 to 7625e87 Compare April 7, 2024 10:28
Copy link
Contributor

@uri-weisman uri-weisman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! 💪
Left some questions...

@@ -49,6 +49,7 @@ func (g *GCP) NewBenchmark(ctx context.Context, log *logp.Logger, cfg *config.Co

return builder.New(
builder.WithBenchmarkDataProvider(bdp),
builder.WithManagerTimeout(cfg.Period),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense.
Once we update the rest of the cloud providers to have a rate limiter we should consider removing the manager timeout option and configuring all to be limited to the interval period (24h).


// a map of asset inventory client methods and their quotas.
// see https://cloud.google.com/asset-inventory/docs/quota
var methods = map[string]*rate.Limiter{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so what happens in case we call a method that is not ListAssets? the interceptor is still active?


// a map of asset inventory client methods and their quotas.
// see https://cloud.google.com/asset-inventory/docs/quota
var methods = map[string]*rate.Limiter{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the interceptor will be called but we'll not wait, right?

internal/resources/providers/gcplib/inventory/provider.go Outdated Show resolved Hide resolved
@elastic elastic deleted a comment from github-actions bot Apr 7, 2024
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have a cache utility but it is used for single values and is cycle-aware

this cache just abstracts the repetitive read/write using a plain map would require and instead takes a function to get a value which will be used for initial read and assignment.

@@ -49,6 +49,7 @@ func (g *GCP) NewBenchmark(ctx context.Context, log *logp.Logger, cfg *config.Co

return builder.New(
builder.WithBenchmarkDataProvider(bdp),
builder.WithManagerTimeout(cfg.Period),
Copy link
Contributor

@jeniawhite jeniawhite Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should still consider a scale scenario when the resources combined with the rate limiting exceed the cycle time and make sure that we perform the work up until the end instead of sending partial cycles, but we will still need to have some sort of a upper bound limit in order to make sure that avoid "infinite cycles" (not part of this PR).

projectName = crm.getProjectDisplayName(ctx, keys.parentProject)
// some assets are not associated with a project
if projectId != "" {
projectName = p.crm.getProjectDisplayName(ctx, fmt.Sprintf("projects/%s", projectId))
Copy link
Contributor

@jeniawhite jeniawhite Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that this changes the behavior.

Previously we've returned an empty string and printed a log.
Now we do not print any log and we do not manipulate the projectName value (which should be empty string due to the initial declaration).

Another side effect is that we do not push this value and key to the cache (wondering if that effects any of the flows).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does change the behavior we had before, but i think it's ok. we used to try to fetch project names using a project id of empty string, which resulted in an empty project name. we did this multiple times, so sometimes we got the empty project name from cache. in any case, after #2085 was merged, we don't send empty project names anyway:

insertIfNotEmpty(cloudAccountNameField, strings.FirstNonEmpty(resMetadata.AccountName, a.accountName), event),

so the outcome of this change is ultimately just not sending redundant api calls to fetch empty project names.

internal/resources/providers/gcplib/inventory/provider.go Outdated Show resolved Hide resolved
log.Errorf("Error fetching GCP Asset: %s", err)
return nil
p.log.Errorf("Error fetching GCP Asset: %s", err)
return results
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's great and I can see that eventually this is being appended into assets, we just need to make sure that there is no hidden logic that differentiates between nil and actually getting results.
Because we do not want to act is if everything succeeded when we have a partial response.
From what I saw in the code we act in a opportunistic sort of way, but I would consider in the future to recognize when we have partial results and potentially act on it instead of skipping the whole cycle (not in the scope of this PR).

log: log,
inventory: assetsInventoryWrapper,
crm: crmServiceWrapper,
cloudAccountMetadataCache: NewMapCache[*fetching.CloudAccountMetadata](),
Copy link
Member

@moukoublen moukoublen Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Provider lifecycle per-cycle or long-lived? I think it is long-lived, its initialized once at the startup of cloudbeat, but perhaps I am mistaken here.

If it is, then shouldn't we be extra careful what we cache once and only? Is cloud account metadata safe to retrieve only once per lifetime? If not we should use one of the many in-mem cache libraries that implements global ttl or per key ttl.

(Alternatively, a cache-per-cycle could be also a safe choice)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before this PR, the cache was a plain map:

- crmCache map[string]*fetching.CloudAccountMetadata
+ cloudAccountMetadataCache *MapCache[*fetching.CloudAccountMetadata]

so it's still the same behavior as before, only now a little less repetitive as MapCache just takes away the operations we'd need to do to cache values using a plain map[string]

in general though, i agree this behaviour is probably not correct, even though project/org names probably rarely change, we'd probably still want to get fresh values. i've opened an issue to address this in a separate PR (as the behaviour itself didn't change from before and is unrelated to this PR)

@orouz orouz merged commit b8ffed9 into elastic:main May 7, 2024
24 checks passed
mergify bot pushed a commit that referenced this pull request May 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Account for rate limiting in GCP fetchers
4 participants