From 8dc2eac078dad072f09ae8bc6b680121689c17ac Mon Sep 17 00:00:00 2001 From: Ben Blackmore Date: Wed, 1 Nov 2023 13:05:15 +0100 Subject: [PATCH] [connector/spanmetrics] Fix memory leak # Why The `spanmetrics` connector has a memory leak that occurs when the cumulative temporality is used. The `connectorImp#resourceMetrics` map is only ever cleaned up in delta temporality. # What Turn `connectorImp#resourceMetrics` into a LRU cache with a maximum size. To correctly handle metric resets we also introduce a start timestamp per `resourceMetric` instance. # References Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27654 --- connector/spanmetricsconnector/config.go | 5 ++ connector/spanmetricsconnector/config_test.go | 23 +++--- connector/spanmetricsconnector/connector.go | 50 +++++++------ .../spanmetricsconnector/connector_test.go | 74 ++++++++++++++----- connector/spanmetricsconnector/factory.go | 9 ++- .../internal/cache/cache.go | 14 ++++ .../spanmetricsconnector/testdata/config.yaml | 1 + 7 files changed, 123 insertions(+), 53 deletions(-) diff --git a/connector/spanmetricsconnector/config.go b/connector/spanmetricsconnector/config.go index 26d36d785c60..fad7fbfd4128 100644 --- a/connector/spanmetricsconnector/config.go +++ b/connector/spanmetricsconnector/config.go @@ -46,6 +46,11 @@ type Config struct { // Optional. See defaultDimensionsCacheSize in connector.go for the default value. DimensionsCacheSize int `mapstructure:"dimensions_cache_size"` + // ResourceMetricsCacheSize defines the size of the cache holding metrics for a service. This is mostly relevant for + // cumulative temporality to avoid memory leaks and correct metric timestamp resets. + // Optional. See defaultResourceMetricsCacheSize in connector.go for the default value. + ResourceMetricsCacheSize int `mapstructure:"resource_metrics_cache_size"` + AggregationTemporality string `mapstructure:"aggregation_temporality"` Histogram HistogramConfig `mapstructure:"histogram"` diff --git a/connector/spanmetricsconnector/config_test.go b/connector/spanmetricsconnector/config_test.go index 2530c481d27b..227f9a846223 100644 --- a/connector/spanmetricsconnector/config_test.go +++ b/connector/spanmetricsconnector/config_test.go @@ -47,8 +47,9 @@ func TestLoadConfig(t *testing.T) { {Name: "http.method", Default: &defaultMethod}, {Name: "http.status_code", Default: (*string)(nil)}, }, - DimensionsCacheSize: 1500, - MetricsFlushInterval: 30 * time.Second, + DimensionsCacheSize: 1500, + ResourceMetricsCacheSize: 1600, + MetricsFlushInterval: 30 * time.Second, Exemplars: ExemplarsConfig{ Enabled: true, }, @@ -66,9 +67,10 @@ func TestLoadConfig(t *testing.T) { { id: component.NewIDWithName(metadata.Type, "exponential_histogram"), expected: &Config{ - AggregationTemporality: cumulative, - DimensionsCacheSize: 1000, - MetricsFlushInterval: 15 * time.Second, + AggregationTemporality: cumulative, + DimensionsCacheSize: defaultDimensionsCacheSize, + ResourceMetricsCacheSize: defaultResourceMetricsCacheSize, + MetricsFlushInterval: 15 * time.Second, Histogram: HistogramConfig{ Unit: metrics.Milliseconds, Exponential: &ExponentialHistogramConfig{ @@ -88,11 +90,12 @@ func TestLoadConfig(t *testing.T) { { id: component.NewIDWithName(metadata.Type, "exemplars_enabled"), expected: &Config{ - AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", - DimensionsCacheSize: defaultDimensionsCacheSize, - MetricsFlushInterval: 15 * time.Second, - Histogram: HistogramConfig{Disable: false, Unit: defaultUnit}, - Exemplars: ExemplarsConfig{Enabled: true}, + AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", + DimensionsCacheSize: defaultDimensionsCacheSize, + ResourceMetricsCacheSize: defaultResourceMetricsCacheSize, + MetricsFlushInterval: 15 * time.Second, + Histogram: HistogramConfig{Disable: false, Unit: defaultUnit}, + Exemplars: ExemplarsConfig{Enabled: true}, }, }, } diff --git a/connector/spanmetricsconnector/connector.go b/connector/spanmetricsconnector/connector.go index 9e0189f1f921..17b79b4cee76 100644 --- a/connector/spanmetricsconnector/connector.go +++ b/connector/spanmetricsconnector/connector.go @@ -32,7 +32,8 @@ const ( statusCodeKey = "status.code" // OpenTelemetry non-standard constant. metricKeySeparator = string(byte(0)) - defaultDimensionsCacheSize = 1000 + defaultDimensionsCacheSize = 1000 + defaultResourceMetricsCacheSize = 1000 metricNameDuration = "duration" metricNameCalls = "calls" @@ -51,10 +52,7 @@ type connectorImp struct { // Additional dimensions to add to metrics. dimensions []dimension - // The starting time of the data points. - startTimestamp pcommon.Timestamp - - resourceMetrics map[resourceKey]*resourceMetrics + resourceMetrics *cache.Cache[resourceKey, *resourceMetrics] keyBuf *bytes.Buffer @@ -79,6 +77,8 @@ type resourceMetrics struct { sums metrics.SumMetrics events metrics.SumMetrics attributes pcommon.Map + // startTimestamp captures when the first data points for this resource are recorded. + startTimestamp pcommon.Timestamp } type dimension struct { @@ -110,11 +110,15 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic return nil, err } + resourceMetricsCache, err := cache.NewCache[resourceKey, *resourceMetrics](cfg.ResourceMetricsCacheSize) + if err != nil { + return nil, err + } + return &connectorImp{ logger: logger, config: *cfg, - startTimestamp: pcommon.NewTimestampFromTime(time.Now()), - resourceMetrics: make(map[resourceKey]*resourceMetrics), + resourceMetrics: resourceMetricsCache, dimensions: newDimensions(cfg.Dimensions), keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)), metricKeyToDimensions: metricKeyToDimensionsCache, @@ -236,7 +240,8 @@ func (p *connectorImp) exportMetrics(ctx context.Context) { // buildMetrics collects the computed raw metrics data and builds OTLP metrics. func (p *connectorImp) buildMetrics() pmetric.Metrics { m := pmetric.NewMetrics() - for _, rawMetrics := range p.resourceMetrics { + + p.resourceMetrics.ForEach(func(_ resourceKey, rawMetrics *resourceMetrics) { rm := m.ResourceMetrics().AppendEmpty() rawMetrics.attributes.CopyTo(rm.Resource().Attributes()) @@ -246,22 +251,22 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics { sums := rawMetrics.sums metric := sm.Metrics().AppendEmpty() metric.SetName(buildMetricName(p.config.Namespace, metricNameCalls)) - sums.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality()) + sums.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality()) if !p.config.Histogram.Disable { histograms := rawMetrics.histograms metric = sm.Metrics().AppendEmpty() metric.SetName(buildMetricName(p.config.Namespace, metricNameDuration)) metric.SetUnit(p.config.Histogram.Unit.String()) - histograms.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality()) + histograms.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality()) } events := rawMetrics.events if p.events.Enabled { metric = sm.Metrics().AppendEmpty() metric.SetName(buildMetricName(p.config.Namespace, metricNameEvents)) - events.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality()) + events.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality()) } - } + }) return m } @@ -269,19 +274,19 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics { func (p *connectorImp) resetState() { // If delta metrics, reset accumulated data if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta { - p.resourceMetrics = make(map[resourceKey]*resourceMetrics) + p.resourceMetrics.Purge() p.metricKeyToDimensions.Purge() - p.startTimestamp = pcommon.NewTimestampFromTime(time.Now()) } else { + p.resourceMetrics.RemoveEvictedItems() p.metricKeyToDimensions.RemoveEvictedItems() // Exemplars are only relevant to this batch of traces, so must be cleared within the lock if p.config.Histogram.Disable { return } - for _, m := range p.resourceMetrics { + p.resourceMetrics.ForEach(func(_ resourceKey, m *resourceMetrics) { m.histograms.Reset(true) - } + }) } } @@ -381,15 +386,16 @@ type resourceKey [16]byte func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics { key := resourceKey(pdatautil.MapHash(attr)) - v, ok := p.resourceMetrics[key] + v, ok := p.resourceMetrics.Get(key) if !ok { v = &resourceMetrics{ - histograms: initHistogramMetrics(p.config), - sums: metrics.NewSumMetrics(), - events: metrics.NewSumMetrics(), - attributes: attr, + histograms: initHistogramMetrics(p.config), + sums: metrics.NewSumMetrics(), + events: metrics.NewSumMetrics(), + attributes: attr, + startTimestamp: pcommon.NewTimestampFromTime(time.Now()), } - p.resourceMetrics[key] = v + p.resourceMetrics.Add(key, v) } return v } diff --git a/connector/spanmetricsconnector/connector_test.go b/connector/spanmetricsconnector/connector_test.go index 2e3e3c34c7cb..ed010f3c0503 100644 --- a/connector/spanmetricsconnector/connector_test.go +++ b/connector/spanmetricsconnector/connector_test.go @@ -32,18 +32,19 @@ import ( ) const ( - stringAttrName = "stringAttrName" - intAttrName = "intAttrName" - doubleAttrName = "doubleAttrName" - boolAttrName = "boolAttrName" - nullAttrName = "nullAttrName" - mapAttrName = "mapAttrName" - arrayAttrName = "arrayAttrName" - notInSpanAttrName0 = "shouldBeInMetric" - notInSpanAttrName1 = "shouldNotBeInMetric" - regionResourceAttrName = "region" - exceptionTypeAttrName = "exception.type" - DimensionsCacheSize = 2 + stringAttrName = "stringAttrName" + intAttrName = "intAttrName" + doubleAttrName = "doubleAttrName" + boolAttrName = "boolAttrName" + nullAttrName = "nullAttrName" + mapAttrName = "mapAttrName" + arrayAttrName = "arrayAttrName" + notInSpanAttrName0 = "shouldBeInMetric" + notInSpanAttrName1 = "shouldNotBeInMetric" + regionResourceAttrName = "region" + exceptionTypeAttrName = "exception.type" + DimensionsCacheSize = 2 + ResourceMetricsCacheSize = 5 sampleRegion = "us-east-1" sampleDuration = float64(11) @@ -881,6 +882,44 @@ func TestMetricKeyCache(t *testing.T) { }, 10*time.Second, time.Millisecond*100) } +func TestResourceMetricsCache(t *testing.T) { + mcon := consumertest.NewNop() + + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, cumulative, zaptest.NewLogger(t), nil) + + // Test + ctx := metadata.NewIncomingContext(context.Background(), nil) + + // 0 resources in the beginning + assert.Zero(t, p.resourceMetrics.Len()) + + err := p.ConsumeTraces(ctx, buildSampleTrace()) + // Validate + require.NoError(t, err) + assert.Equal(t, 2, p.resourceMetrics.Len()) + + // consume another batch of traces for the same resources + err = p.ConsumeTraces(ctx, buildSampleTrace()) + require.NoError(t, err) + assert.Equal(t, 2, p.resourceMetrics.Len()) + + // consume more batches for new resources. Max size is exceeded causing old resource entries to be discarded + for i := 0; i < ResourceMetricsCacheSize; i++ { + traces := buildSampleTrace() + + // add resource attributes to simulate additional resources providing data + for j := 0; j < traces.ResourceSpans().Len(); j++ { + traces.ResourceSpans().At(j).Resource().Attributes().PutStr("dummy", fmt.Sprintf("%d", i)) + } + + err = p.ConsumeTraces(ctx, traces) + require.NoError(t, err) + } + + // validate that the cache doesn't grow past its limit + assert.Equal(t, ResourceMetricsCacheSize, p.resourceMetrics.Len()) +} + func BenchmarkConnectorConsumeTraces(b *testing.B) { // Prepare mcon := consumertest.NewNop() @@ -951,11 +990,12 @@ func TestExcludeDimensionsConsumeTraces(t *testing.T) { func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, temporality string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp { cfg := &Config{ - AggregationTemporality: temporality, - Histogram: histogramConfig(), - Exemplars: exemplarsConfig(), - ExcludeDimensions: excludedDimensions, - DimensionsCacheSize: DimensionsCacheSize, + AggregationTemporality: temporality, + Histogram: histogramConfig(), + Exemplars: exemplarsConfig(), + ExcludeDimensions: excludedDimensions, + DimensionsCacheSize: DimensionsCacheSize, + ResourceMetricsCacheSize: ResourceMetricsCacheSize, Dimensions: []Dimension{ // Set nil defaults to force a lookup for the attribute in the span. {stringAttrName, nil}, diff --git a/connector/spanmetricsconnector/factory.go b/connector/spanmetricsconnector/factory.go index 792b47935b7e..a7ff39047529 100644 --- a/connector/spanmetricsconnector/factory.go +++ b/connector/spanmetricsconnector/factory.go @@ -28,10 +28,11 @@ func NewFactory() connector.Factory { func createDefaultConfig() component.Config { return &Config{ - AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", - DimensionsCacheSize: defaultDimensionsCacheSize, - MetricsFlushInterval: 15 * time.Second, - Histogram: HistogramConfig{Disable: false, Unit: defaultUnit}, + AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", + DimensionsCacheSize: defaultDimensionsCacheSize, + ResourceMetricsCacheSize: defaultResourceMetricsCacheSize, + MetricsFlushInterval: 15 * time.Second, + Histogram: HistogramConfig{Disable: false, Unit: defaultUnit}, } } diff --git a/connector/spanmetricsconnector/internal/cache/cache.go b/connector/spanmetricsconnector/internal/cache/cache.go index 8b35f77fbc29..23513298339c 100644 --- a/connector/spanmetricsconnector/internal/cache/cache.go +++ b/connector/spanmetricsconnector/internal/cache/cache.go @@ -74,3 +74,17 @@ func (c *Cache[K, V]) Purge() { c.lru.Purge() c.RemoveEvictedItems() } + +// ForEach iterates over all the items within the cache, as well as the evicted items (if any). +func (c *Cache[K, V]) ForEach(fn func(k K, v V)) { + for _, k := range c.lru.Keys() { + v, ok := c.lru.Get(k) + if ok { + fn(k.(K), v.(V)) + } + } + + for k, v := range c.evictedItems { + fn(k, v) + } +} diff --git a/connector/spanmetricsconnector/testdata/config.yaml b/connector/spanmetricsconnector/testdata/config.yaml index b952c8d38120..5827403af4a6 100644 --- a/connector/spanmetricsconnector/testdata/config.yaml +++ b/connector/spanmetricsconnector/testdata/config.yaml @@ -15,6 +15,7 @@ spanmetrics/full: exemplars: enabled: true dimensions_cache_size: 1500 + resource_metrics_cache_size: 1600 # Additional list of dimensions on top of: # - service.name