-
Notifications
You must be signed in to change notification settings - Fork 42
/
grpc_rate_limiter.go
76 lines (67 loc) · 2.74 KB
/
grpc_rate_limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package inventory
import (
"context"
"time"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/googleapis/gax-go/v2"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
// a gax.CallOption that defines a retry strategy which retries the request on ResourceExhausted error.
var RetryOnResourceExhausted = gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{codes.ResourceExhausted}, gax.Backoff{
Initial: 1 * time.Second,
Max: 1 * time.Minute,
Multiplier: 1.2,
})
})
type AssetsInventoryRateLimiter struct {
methods map[string]*rate.Limiter
log *logp.Logger
}
// a map of asset inventory client methods and their quotas.
// see https://cloud.google.com/asset-inventory/docs/quota
var methods = map[string]*rate.Limiter{
// using per-project quota suffices for both single-account and organization-account, because it's more restrictive.
"/google.cloud.asset.v1.AssetService/ListAssets": rate.NewLimiter(rate.Every(time.Minute/100), 1),
}
func NewAssetsInventoryRateLimiter(log *logp.Logger) *AssetsInventoryRateLimiter {
return &AssetsInventoryRateLimiter{
log: log,
methods: methods,
}
}
// Limits the rate of the method calls defined in the methods map.
func (rl *AssetsInventoryRateLimiter) Wait(ctx context.Context, method string, req any) {
limiter := rl.methods[method]
if limiter != nil {
err := limiter.Wait(ctx)
if err != nil {
rl.log.Errorf("Failed to wait for project quota on method: %s, request: %v, error: %v", method, req, err)
}
}
}
// Returns a grpc.DialOption that intercepts the unary RPCs and limits the rate of the method calls.
func (rl *AssetsInventoryRateLimiter) GetInterceptorDialOption() grpc.DialOption {
return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
rl.Wait(ctx, method, req)
return invoker(ctx, method, req, reply, cc, opts...)
})
}