From b091942d5f8e994b53da9d2c854624b8582ea86b Mon Sep 17 00:00:00 2001 From: Constanca Manteigas Date: Mon, 14 Jul 2025 17:30:26 +0200 Subject: [PATCH 1/4] Fix delay behavior --- processor/ratelimitprocessor/go.mod | 2 + processor/ratelimitprocessor/go.sum | 4 ++ processor/ratelimitprocessor/gubernator.go | 47 +++++++++++++++++-- .../ratelimitprocessor/gubernator_test.go | 28 +++++++++++ 4 files changed, 76 insertions(+), 5 deletions(-) diff --git a/processor/ratelimitprocessor/go.mod b/processor/ratelimitprocessor/go.mod index 18d6c5071..51867e63b 100644 --- a/processor/ratelimitprocessor/go.mod +++ b/processor/ratelimitprocessor/go.mod @@ -3,6 +3,7 @@ module github.com/elastic/opentelemetry-collector-components/processor/ratelimit go 1.23.8 require ( + github.com/elastic/go-freelru v0.16.0 github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent v0.0.0-20250220025958-386ba0c4bced github.com/gubernator-io/gubernator/v2 v2.13.0 github.com/sirupsen/logrus v1.9.2 @@ -27,6 +28,7 @@ require ( go.opentelemetry.io/otel/metric v1.36.0 go.opentelemetry.io/otel/sdk/metric v1.36.0 go.opentelemetry.io/otel/trace v1.36.0 + go.uber.org/atomic v1.9.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 golang.org/x/time v0.11.0 diff --git a/processor/ratelimitprocessor/go.sum b/processor/ratelimitprocessor/go.sum index 231c761fb..32e47cef5 100644 --- a/processor/ratelimitprocessor/go.sum +++ b/processor/ratelimitprocessor/go.sum @@ -98,6 +98,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/elastic/go-freelru v0.16.0 h1:gG2HJ1WXN2tNl5/p40JS/l59HjvjRhjyAa+oFTRArYs= +github.com/elastic/go-freelru v0.16.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -546,6 +548,8 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe go.opentelemetry.io/proto/otlp v1.1.0 h1:2Di21piLrCqJ3U3eXGCTPHE9R8Nh+0uglSnOyxikMeI= go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7ej/RNTae6MdY= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= diff --git a/processor/ratelimitprocessor/gubernator.go b/processor/ratelimitprocessor/gubernator.go index 54d5e507d..139dc71b0 100644 --- a/processor/ratelimitprocessor/gubernator.go +++ b/processor/ratelimitprocessor/gubernator.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/uptrace/opentelemetry-go-extra/otellogrus" @@ -50,6 +51,9 @@ type gubernatorRateLimiter struct { daemon *gubernator.Daemon client gubernator.V1Client clientConn *grpc.ClientConn + + currentRequests map[string]int + currentRequestsMx sync.RWMutex } func newGubernatorDaemonConfig(logger *zap.Logger) (gubernator.DaemonConfig, error) { @@ -82,10 +86,11 @@ func newGubernatorRateLimiter(cfg *Config, set processor.Settings) (*gubernatorR } return &gubernatorRateLimiter{ - cfg: cfg, - set: set, - behavior: gubernator.Behavior(0), // BATCHING behavior - daemonCfg: daemonCfg, + cfg: cfg, + set: set, + behavior: gubernator.Behavior(0), // BATCHING behavior + daemonCfg: daemonCfg, + currentRequests: make(map[string]int), }, nil } @@ -121,6 +126,35 @@ func (r *gubernatorRateLimiter) Shutdown(_ context.Context) error { return nil } +func (r *gubernatorRateLimiter) addRequests(uniqueKey string, hits int) int { + r.currentRequestsMx.Lock() + current, exists := r.currentRequests[uniqueKey] + if !exists { + r.currentRequests[uniqueKey] = hits + } else { + r.currentRequests[uniqueKey] = current + hits + } + current = r.currentRequests[uniqueKey] + r.currentRequestsMx.Unlock() + return current +} + +func (r *gubernatorRateLimiter) deleteRequests(uniqueKey string, hits int) error { + r.currentRequestsMx.Lock() + current, exists := r.currentRequests[uniqueKey] + if !exists { + return fmt.Errorf("unexpected: current requests entry does not exist for this unique key %s", uniqueKey) + } else { + r.currentRequests[uniqueKey] = current - 1 + if r.currentRequests[uniqueKey] < 0 { + return fmt.Errorf("unexpected: current request for unique key %s reached a negative value", uniqueKey) + } + } + current = r.currentRequests[uniqueKey] + r.currentRequestsMx.Unlock() + return nil +} + func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error { uniqueKey := getUniqueKey(ctx, r.cfg.MetadataKeys) cfg := resolveRateLimitSettings(r.cfg, uniqueKey) @@ -169,13 +203,16 @@ func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error { ) return status.Error(codes.ResourceExhausted, errTooManyRequests.Error()) case ThrottleBehaviorDelay: - delay := time.Duration(resp.GetResetTime()-createdAt) * time.Millisecond + current := r.addRequests(uniqueKey, hits) + delay := time.Duration(resp.GetResetTime()-createdAt+int64(current)*cfg.ThrottleInterval.Milliseconds()) * time.Millisecond timer := time.NewTimer(delay) defer timer.Stop() select { case <-ctx.Done(): + _ = r.deleteRequests(uniqueKey, hits) return ctx.Err() case <-timer.C: + return r.deleteRequests(uniqueKey, hits) } } } diff --git a/processor/ratelimitprocessor/gubernator_test.go b/processor/ratelimitprocessor/gubernator_test.go index 2150a9d5f..16d874c3f 100644 --- a/processor/ratelimitprocessor/gubernator_test.go +++ b/processor/ratelimitprocessor/gubernator_test.go @@ -19,6 +19,7 @@ package ratelimitprocessor import ( "context" + "sync" "testing" "time" @@ -68,6 +69,8 @@ func newTestGubernatorRateLimiter(t *testing.T, cfg *Config) *gubernatorRateLimi daemon: daemons[0], client: cl, clientConn: conn, + + currentRequests: make(map[string]int), } require.NoError(t, err) @@ -142,3 +145,28 @@ func TestGubernatorRateLimiter_RateLimit_MetadataKeys(t *testing.T) { err = rateLimiter.RateLimit(clientContext2, 1) assert.NoError(t, err) } + +func TestGubernatorRateLimiter_MultipleRequests_Delay(t *testing.T) { + rl := newTestGubernatorRateLimiter(t, &Config{ + RateLimitSettings: RateLimitSettings{ + Rate: 1, // request per second + Burst: 1, // capacity only for one + ThrottleBehavior: ThrottleBehaviorDelay, + ThrottleInterval: 100 * time.Millisecond, // add 1 token after 100ms + }, + MetadataKeys: []string{"metadata_key"}, + }) + + // Simulate 2 requests hitting the rate limit simultaneously + requests := 3 + var wg sync.WaitGroup + wg.Add(requests) + for i := 0; i < requests; i++ { + go func() { + defer wg.Done() + err := rl.RateLimit(context.Background(), 1) + require.NoError(t, err) + }() + } + wg.Wait() +} From 76b8fa0250755640bfc1a46fe438ede3a1eabdcb Mon Sep 17 00:00:00 2001 From: Constanca Manteigas Date: Mon, 14 Jul 2025 17:47:05 +0200 Subject: [PATCH 2/4] make commands --- processor/ratelimitprocessor/go.mod | 2 -- processor/ratelimitprocessor/go.sum | 4 ---- processor/ratelimitprocessor/gubernator.go | 3 +-- 3 files changed, 1 insertion(+), 8 deletions(-) diff --git a/processor/ratelimitprocessor/go.mod b/processor/ratelimitprocessor/go.mod index 51867e63b..18d6c5071 100644 --- a/processor/ratelimitprocessor/go.mod +++ b/processor/ratelimitprocessor/go.mod @@ -3,7 +3,6 @@ module github.com/elastic/opentelemetry-collector-components/processor/ratelimit go 1.23.8 require ( - github.com/elastic/go-freelru v0.16.0 github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent v0.0.0-20250220025958-386ba0c4bced github.com/gubernator-io/gubernator/v2 v2.13.0 github.com/sirupsen/logrus v1.9.2 @@ -28,7 +27,6 @@ require ( go.opentelemetry.io/otel/metric v1.36.0 go.opentelemetry.io/otel/sdk/metric v1.36.0 go.opentelemetry.io/otel/trace v1.36.0 - go.uber.org/atomic v1.9.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 golang.org/x/time v0.11.0 diff --git a/processor/ratelimitprocessor/go.sum b/processor/ratelimitprocessor/go.sum index 32e47cef5..231c761fb 100644 --- a/processor/ratelimitprocessor/go.sum +++ b/processor/ratelimitprocessor/go.sum @@ -98,8 +98,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/elastic/go-freelru v0.16.0 h1:gG2HJ1WXN2tNl5/p40JS/l59HjvjRhjyAa+oFTRArYs= -github.com/elastic/go-freelru v0.16.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -548,8 +546,6 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe go.opentelemetry.io/proto/otlp v1.1.0 h1:2Di21piLrCqJ3U3eXGCTPHE9R8Nh+0uglSnOyxikMeI= go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7ej/RNTae6MdY= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= -go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= diff --git a/processor/ratelimitprocessor/gubernator.go b/processor/ratelimitprocessor/gubernator.go index 139dc71b0..03d4c944a 100644 --- a/processor/ratelimitprocessor/gubernator.go +++ b/processor/ratelimitprocessor/gubernator.go @@ -145,12 +145,11 @@ func (r *gubernatorRateLimiter) deleteRequests(uniqueKey string, hits int) error if !exists { return fmt.Errorf("unexpected: current requests entry does not exist for this unique key %s", uniqueKey) } else { - r.currentRequests[uniqueKey] = current - 1 + r.currentRequests[uniqueKey] = current - hits if r.currentRequests[uniqueKey] < 0 { return fmt.Errorf("unexpected: current request for unique key %s reached a negative value", uniqueKey) } } - current = r.currentRequests[uniqueKey] r.currentRequestsMx.Unlock() return nil } From 26ae328686c58b3babea505933c20b669273f509 Mon Sep 17 00:00:00 2001 From: Constanca Manteigas Date: Mon, 18 Aug 2025 14:26:25 +0200 Subject: [PATCH 3/4] Fix rate limiter with delay --- processor/ratelimitprocessor/gubernator.go | 123 ++++++++---------- .../ratelimitprocessor/gubernator_test.go | 35 ++++- processor/ratelimitprocessor/local_test.go | 50 +++++++ 3 files changed, 130 insertions(+), 78 deletions(-) diff --git a/processor/ratelimitprocessor/gubernator.go b/processor/ratelimitprocessor/gubernator.go index 5810c10bc..cca29240a 100644 --- a/processor/ratelimitprocessor/gubernator.go +++ b/processor/ratelimitprocessor/gubernator.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "sync" "time" "github.com/uptrace/opentelemetry-go-extra/otellogrus" @@ -51,9 +50,6 @@ type gubernatorRateLimiter struct { daemon *gubernator.Daemon client gubernator.V1Client clientConn *grpc.ClientConn - - currentRequests map[string]int - currentRequestsMx sync.RWMutex } func newGubernatorDaemonConfig(logger *zap.Logger) (gubernator.DaemonConfig, error) { @@ -86,11 +82,10 @@ func newGubernatorRateLimiter(cfg *Config, set processor.Settings) (*gubernatorR } return &gubernatorRateLimiter{ - cfg: cfg, - set: set, - behavior: gubernator.Behavior(0), // BATCHING behavior - daemonCfg: daemonCfg, - currentRequests: make(map[string]int), + cfg: cfg, + set: set, + behavior: gubernator.Behavior(0), // BATCHING behavior + daemonCfg: daemonCfg, }, nil } @@ -126,66 +121,44 @@ func (r *gubernatorRateLimiter) Shutdown(_ context.Context) error { return nil } -func (r *gubernatorRateLimiter) addRequests(uniqueKey string, hits int) int { - r.currentRequestsMx.Lock() - current, exists := r.currentRequests[uniqueKey] - if !exists { - r.currentRequests[uniqueKey] = hits - } else { - r.currentRequests[uniqueKey] = current + hits - } - current = r.currentRequests[uniqueKey] - r.currentRequestsMx.Unlock() - return current -} - -func (r *gubernatorRateLimiter) deleteRequests(uniqueKey string, hits int) error { - r.currentRequestsMx.Lock() - current, exists := r.currentRequests[uniqueKey] - if !exists { - return fmt.Errorf("unexpected: current requests entry does not exist for this unique key %s", uniqueKey) - } else { - r.currentRequests[uniqueKey] = current - hits - if r.currentRequests[uniqueKey] < 0 { - return fmt.Errorf("unexpected: current request for unique key %s reached a negative value", uniqueKey) - } - } - r.currentRequestsMx.Unlock() - return nil -} - func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error { uniqueKey := getUniqueKey(ctx, r.cfg.MetadataKeys) cfg := resolveRateLimitSettings(r.cfg, uniqueKey) - createdAt := time.Now().UnixMilli() - getRateLimitsResp, err := r.client.GetRateLimits(ctx, &gubernator.GetRateLimitsReq{ - Requests: []*gubernator.RateLimitReq{ - { - Name: r.set.ID.String(), - UniqueKey: uniqueKey, - Hits: int64(hits), - Behavior: r.behavior, - Algorithm: gubernator.Algorithm_LEAKY_BUCKET, - Limit: int64(cfg.Rate), // rate is per second - Burst: int64(cfg.Burst), - Duration: cfg.ThrottleInterval.Milliseconds(), // duration is in milliseconds, i.e. 1s - CreatedAt: &createdAt, + var resp *gubernator.RateLimitResp + makeRateLimitRequest := func() error { + createdAt := time.Now().UnixMilli() + getRateLimitsResp, err := r.client.GetRateLimits(ctx, &gubernator.GetRateLimitsReq{ + Requests: []*gubernator.RateLimitReq{ + { + Name: r.set.ID.String(), + UniqueKey: uniqueKey, + Hits: int64(hits), + Behavior: r.behavior, + Algorithm: gubernator.Algorithm_LEAKY_BUCKET, + Limit: int64(cfg.Rate), // rate is per second + Burst: int64(cfg.Burst), + Duration: cfg.ThrottleInterval.Milliseconds(), // duration is in milliseconds, i.e. 1s + CreatedAt: &createdAt, + }, }, - }, - }) - if err != nil { - return err - } - - // Inside the gRPC response, we should have a single-item list of responses. - responses := getRateLimitsResp.GetResponses() - if n := len(responses); n != 1 { - return fmt.Errorf("expected 1 response from gubernator, got %d", n) + }) + if err != nil { + return err + } + // Inside the gRPC response, we should have a single-item list of responses. + responses := getRateLimitsResp.GetResponses() + if n := len(responses); n != 1 { + return fmt.Errorf("expected 1 response from gubernator, got %d", n) + } + resp = responses[0] + if resp.GetError() != "" { + return errors.New(resp.GetError()) + } + return nil } - resp := responses[0] - if resp.GetError() != "" { - return errors.New(resp.GetError()) + if err := makeRateLimitRequest(); err != nil { + return err } if resp.GetStatus() == gubernator.Status_OVER_LIMIT { @@ -194,16 +167,24 @@ func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error { case ThrottleBehaviorError: return status.Error(codes.ResourceExhausted, errTooManyRequests.Error()) case ThrottleBehaviorDelay: - current := r.addRequests(uniqueKey, hits) - delay := time.Duration(resp.GetResetTime()-createdAt+int64(current)*cfg.ThrottleInterval.Milliseconds()) * time.Millisecond + delay := time.Duration(resp.GetResetTime()-time.Now().UnixMilli()) * time.Millisecond timer := time.NewTimer(delay) defer timer.Stop() - select { - case <-ctx.Done(): - _ = r.deleteRequests(uniqueKey, hits) - return ctx.Err() - case <-timer.C: - return r.deleteRequests(uniqueKey, hits) + retry: + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + if err := makeRateLimitRequest(); err != nil { + return err + } + if resp.GetStatus() == gubernator.Status_UNDER_LIMIT { + break retry + } + delay = time.Duration(resp.GetResetTime()-time.Now().UnixMilli()) * time.Millisecond + timer.Reset(delay) + } } } } diff --git a/processor/ratelimitprocessor/gubernator_test.go b/processor/ratelimitprocessor/gubernator_test.go index 16d874c3f..0e035bdaf 100644 --- a/processor/ratelimitprocessor/gubernator_test.go +++ b/processor/ratelimitprocessor/gubernator_test.go @@ -19,6 +19,7 @@ package ratelimitprocessor import ( "context" + "slices" "sync" "testing" "time" @@ -69,8 +70,6 @@ func newTestGubernatorRateLimiter(t *testing.T, cfg *Config) *gubernatorRateLimi daemon: daemons[0], client: cl, clientConn: conn, - - currentRequests: make(map[string]int), } require.NoError(t, err) @@ -147,26 +146,48 @@ func TestGubernatorRateLimiter_RateLimit_MetadataKeys(t *testing.T) { } func TestGubernatorRateLimiter_MultipleRequests_Delay(t *testing.T) { + throttleInterval := 100 * time.Millisecond rl := newTestGubernatorRateLimiter(t, &Config{ RateLimitSettings: RateLimitSettings{ Rate: 1, // request per second Burst: 1, // capacity only for one ThrottleBehavior: ThrottleBehaviorDelay, - ThrottleInterval: 100 * time.Millisecond, // add 1 token after 100ms + ThrottleInterval: throttleInterval, // add 1 token after 100ms }, MetadataKeys: []string{"metadata_key"}, }) - // Simulate 2 requests hitting the rate limit simultaneously - requests := 3 + // Simulate 4 requests hitting the rate limit simultaneously. + // The first request passes, and the next ones hit it simultaneously. + requests := 5 + endingTimes := make([]time.Time, requests) var wg sync.WaitGroup wg.Add(requests) + for i := 0; i < requests; i++ { - go func() { + go func(i int) { defer wg.Done() err := rl.RateLimit(context.Background(), 1) require.NoError(t, err) - }() + endingTimes[i] = time.Now() + }(i) } wg.Wait() + + // Make sure all ending times have a difference of at least 100ms, as tokens are + // added at that rate. We need to sort them first. + slices.SortFunc(endingTimes, func(a, b time.Time) int { + if a.Before(b) { + return -1 + } + return 1 + }) + + for i := 1; i < requests; i++ { + diff := endingTimes[i].Sub(endingTimes[i-1]).Milliseconds() + minExpected := throttleInterval - 5*time.Millisecond // allow small tolerance + if diff < minExpected.Milliseconds() { + t.Fatalf("difference is %dms, requests were sent before tokens were added", diff) + } + } } diff --git a/processor/ratelimitprocessor/local_test.go b/processor/ratelimitprocessor/local_test.go index 052d18ada..8db2377e2 100644 --- a/processor/ratelimitprocessor/local_test.go +++ b/processor/ratelimitprocessor/local_test.go @@ -19,6 +19,8 @@ package ratelimitprocessor import ( "context" + "slices" + "sync" "testing" "time" @@ -126,3 +128,51 @@ func TestLocalRateLimiter_RateLimit_MetadataKeys(t *testing.T) { assert.NoError(t, err) } } + +func TestLocalRateLimiter_MultipleRequests_Delay(t *testing.T) { + throttleInterval := 100 * time.Millisecond + rl := newTestLocalRateLimiter(t, &Config{ + Type: LocalRateLimiter, + RateLimitSettings: RateLimitSettings{ + Rate: 1, // request per second + Burst: 1, // capacity only for one + ThrottleBehavior: ThrottleBehaviorDelay, + ThrottleInterval: throttleInterval, // add 1 token after 100ms + }, + MetadataKeys: []string{"metadata_key"}, + }) + + // Simulate 4 requests hitting the rate limit simultaneously. + // The first request passes, and the next ones hit it simultaneously. + requests := 5 + endingTimes := make([]time.Time, requests) + var wg sync.WaitGroup + wg.Add(requests) + + for i := 0; i < requests; i++ { + go func(i int) { + defer wg.Done() + err := rl.RateLimit(context.Background(), 1) + require.NoError(t, err) + endingTimes[i] = time.Now() + }(i) + } + wg.Wait() + + // Make sure all ending times have a difference of at least 100ms, as tokens are + // added at that rate. We need to sort them first. + slices.SortFunc(endingTimes, func(a, b time.Time) int { + if a.Before(b) { + return -1 + } + return 1 + }) + + for i := 1; i < requests; i++ { + diff := endingTimes[i].Sub(endingTimes[i-1]).Milliseconds() + minExpected := throttleInterval - 5*time.Millisecond // allow small tolerance + if diff < minExpected.Milliseconds() { + t.Fatalf("difference is %dms, requests were sent before tokens were added", diff) + } + } +} From 0a748caf23a3a52d5cb198e4b78c7c845d18bc1d Mon Sep 17 00:00:00 2001 From: Constanca Manteigas Date: Tue, 26 Aug 2025 05:44:10 +0200 Subject: [PATCH 4/4] Address comments --- processor/ratelimitprocessor/gubernator.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/processor/ratelimitprocessor/gubernator.go b/processor/ratelimitprocessor/gubernator.go index cca29240a..39d04cb45 100644 --- a/processor/ratelimitprocessor/gubernator.go +++ b/processor/ratelimitprocessor/gubernator.go @@ -125,8 +125,7 @@ func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error { uniqueKey := getUniqueKey(ctx, r.cfg.MetadataKeys) cfg := resolveRateLimitSettings(r.cfg, uniqueKey) - var resp *gubernator.RateLimitResp - makeRateLimitRequest := func() error { + makeRateLimitRequest := func() (*gubernator.RateLimitResp, error) { createdAt := time.Now().UnixMilli() getRateLimitsResp, err := r.client.GetRateLimits(ctx, &gubernator.GetRateLimitsReq{ Requests: []*gubernator.RateLimitReq{ @@ -144,20 +143,21 @@ func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error { }, }) if err != nil { - return err + return nil, err } // Inside the gRPC response, we should have a single-item list of responses. responses := getRateLimitsResp.GetResponses() if n := len(responses); n != 1 { - return fmt.Errorf("expected 1 response from gubernator, got %d", n) + return nil, fmt.Errorf("expected 1 response from gubernator, got %d", n) } - resp = responses[0] + resp := responses[0] if resp.GetError() != "" { - return errors.New(resp.GetError()) + return nil, errors.New(resp.GetError()) } - return nil + return resp, nil } - if err := makeRateLimitRequest(); err != nil { + resp, err := makeRateLimitRequest() + if err != nil { return err } @@ -176,7 +176,8 @@ func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error { case <-ctx.Done(): return ctx.Err() case <-timer.C: - if err := makeRateLimitRequest(); err != nil { + resp, err = makeRateLimitRequest() + if err != nil { return err } if resp.GetStatus() == gubernator.Status_UNDER_LIMIT {