From ec99f8d393915ad47511665cc3e03688f69e3d56 Mon Sep 17 00:00:00 2001 From: vigneshshanmugam Date: Fri, 15 Aug 2025 16:19:55 -0700 Subject: [PATCH] [processor/ratelimiter] Add metadatakeys to ratelimiter request size metrics --- processor/ratelimitprocessor/processor.go | 13 +++++++------ .../ratelimitprocessor/processor_test.go | 19 ++++++++++--------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/processor/ratelimitprocessor/processor.go b/processor/ratelimitprocessor/processor.go index af42501d2..6711d9299 100644 --- a/processor/ratelimitprocessor/processor.go +++ b/processor/ratelimitprocessor/processor.go @@ -262,15 +262,16 @@ func rateLimit(ctx context.Context, return err } -func recordRequestSize(ctx context.Context, tb *metadata.TelemetryBuilder, strategy Strategy, hits int) { +func recordRequestSize(ctx context.Context, tb *metadata.TelemetryBuilder, strategy Strategy, hits int, metadataKeys []string) { if tb != nil && strategy == StrategyRateLimitBytes { - tb.RatelimitRequestSize.Record(ctx, int64(hits)) + attrsCommon := getAttrsFromContext(ctx, metadataKeys) + tb.RatelimitRequestSize.Record(ctx, int64(hits), metric.WithAttributes(attrsCommon...)) } } func (r *LogsRateLimiterProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { hits := r.count(ld) - recordRequestSize(ctx, r.telemetryBuilder, r.strategy, hits) + recordRequestSize(ctx, r.telemetryBuilder, r.strategy, hits, r.metadataKeys) if err := rateLimit( ctx, @@ -289,7 +290,7 @@ func (r *LogsRateLimiterProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs func (r *MetricsRateLimiterProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { hits := r.count(md) - recordRequestSize(ctx, r.telemetryBuilder, r.strategy, hits) + recordRequestSize(ctx, r.telemetryBuilder, r.strategy, hits, r.metadataKeys) if err := rateLimit( ctx, @@ -308,7 +309,7 @@ func (r *MetricsRateLimiterProcessor) ConsumeMetrics(ctx context.Context, md pme func (r *TracesRateLimiterProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { hits := r.count(td) - recordRequestSize(ctx, r.telemetryBuilder, r.strategy, hits) + recordRequestSize(ctx, r.telemetryBuilder, r.strategy, hits, r.metadataKeys) if err := rateLimit( ctx, @@ -327,7 +328,7 @@ func (r *TracesRateLimiterProcessor) ConsumeTraces(ctx context.Context, td ptrac func (r *ProfilesRateLimiterProcessor) ConsumeProfiles(ctx context.Context, pd pprofile.Profiles) error { hits := r.count(pd) - recordRequestSize(ctx, r.telemetryBuilder, r.strategy, hits) + recordRequestSize(ctx, r.telemetryBuilder, r.strategy, hits, r.metadataKeys) if err := rateLimit( ctx, diff --git a/processor/ratelimitprocessor/processor_test.go b/processor/ratelimitprocessor/processor_test.go index d202fc60d..0c0acd5a7 100644 --- a/processor/ratelimitprocessor/processor_test.go +++ b/processor/ratelimitprocessor/processor_test.go @@ -411,15 +411,16 @@ func TestConcurrentRequestsTelemetry(t *testing.T) { } func testRequestSize(t *testing.T, tt *componenttest.Telemetry, count int, sum int) { - m, err := tt.GetMetric("otelcol_ratelimit.request_size") - require.NoError(t, err) - - hist, ok := m.Data.(metricdata.Histogram[int64]) - require.True(t, ok) - - require.Equal(t, 1, len(hist.DataPoints)) - require.Equal(t, uint64(count), hist.DataPoints[0].Count) - require.Equal(t, int64(sum), hist.DataPoints[0].Sum) + metadatatest.AssertEqualRatelimitRequestSize(t, tt, []metricdata.HistogramDataPoint[int64]{ + { + Count: uint64(count), + Sum: int64(sum), + Attributes: attribute.NewSet( + []attribute.KeyValue{ + attribute.String("x-tenant-id", "TestProjectID"), + }...), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) } func testRateLimitTelemetry(t *testing.T, tel *componenttest.Telemetry) {