Skip to content

Commit

Permalink
[connector/spanmetrics] Fix memory leak
Browse files Browse the repository at this point in the history
# Why

The `spanmetrics` connector has a memory leak that occurs when the
cumulative temporality is used. The `connectorImp#resourceMetrics` map
is only ever cleaned up in delta temporality.

# What

Turn `connectorImp#resourceMetrics` into a LRU cache with a maximum
size. To correctly handle metric resets we also introduce a start
timestamp per `resourceMetric` instance.

# References

Fixes open-telemetry#27654

Co-authored-by: Jared Tan <jian.tan@daocloud.io>
  • Loading branch information
bripkens and JaredTan95 committed Dec 5, 2023
1 parent 7222cc9 commit 7ff7b54
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 55 deletions.
11 changes: 11 additions & 0 deletions .chloggen/bripkens-spanmetrics-memory-leak.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: connector/spanmetrics

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix memory leak when the cumulative temporality is used.

# One or more tracking issues related to the change
issues: [27654]
2 changes: 2 additions & 0 deletions connector/spanmetricsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ The following settings can be optionally configured:
If no `default` is provided, this dimension will be **omitted** from the metric.
- `exclude_dimensions`: the list of dimensions to be excluded from the default set of dimensions. Use to exclude unneeded data from metrics.
- `dimensions_cache_size` (default: `1000`): the size of cache for storing Dimensions to improve collectors memory usage. Must be a positive number.
- `resource_metrics_cache_size` (default: `1000`): the size of the cache holding metrics for a service. This is mostly relevant for
cumulative temporality to avoid memory leaks and correct metric timestamp resets.
- `aggregation_temporality` (default: `AGGREGATION_TEMPORALITY_CUMULATIVE`): Defines the aggregation temporality of the generated metrics.
One of either `AGGREGATION_TEMPORALITY_CUMULATIVE` or `AGGREGATION_TEMPORALITY_DELTA`.
- `namespace`: Defines the namespace of the generated metrics. If `namespace` provided, generated metric name will be added `namespace.` prefix.
Expand Down
5 changes: 5 additions & 0 deletions connector/spanmetricsconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ type Config struct {
// Optional. See defaultDimensionsCacheSize in connector.go for the default value.
DimensionsCacheSize int `mapstructure:"dimensions_cache_size"`

// ResourceMetricsCacheSize defines the size of the cache holding metrics for a service. This is mostly relevant for
// cumulative temporality to avoid memory leaks and correct metric timestamp resets.
// Optional. See defaultResourceMetricsCacheSize in connector.go for the default value.
ResourceMetricsCacheSize int `mapstructure:"resource_metrics_cache_size"`

AggregationTemporality string `mapstructure:"aggregation_temporality"`

Histogram HistogramConfig `mapstructure:"histogram"`
Expand Down
23 changes: 13 additions & 10 deletions connector/spanmetricsconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func TestLoadConfig(t *testing.T) {
{Name: "http.method", Default: &defaultMethod},
{Name: "http.status_code", Default: (*string)(nil)},
},
DimensionsCacheSize: 1500,
MetricsFlushInterval: 30 * time.Second,
DimensionsCacheSize: 1500,
ResourceMetricsCacheSize: 1600,
MetricsFlushInterval: 30 * time.Second,
Exemplars: ExemplarsConfig{
Enabled: true,
},
Expand All @@ -66,9 +67,10 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "exponential_histogram"),
expected: &Config{
AggregationTemporality: cumulative,
DimensionsCacheSize: 1000,
MetricsFlushInterval: 15 * time.Second,
AggregationTemporality: cumulative,
DimensionsCacheSize: defaultDimensionsCacheSize,
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
MetricsFlushInterval: 15 * time.Second,
Histogram: HistogramConfig{
Unit: metrics.Milliseconds,
Exponential: &ExponentialHistogramConfig{
Expand All @@ -88,11 +90,12 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "exemplars_enabled"),
expected: &Config{
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
MetricsFlushInterval: 15 * time.Second,
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
Exemplars: ExemplarsConfig{Enabled: true},
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
MetricsFlushInterval: 15 * time.Second,
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
Exemplars: ExemplarsConfig{Enabled: true},
},
},
}
Expand Down
50 changes: 28 additions & 22 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ const (
statusCodeKey = "status.code" // OpenTelemetry non-standard constant.
metricKeySeparator = string(byte(0))

defaultDimensionsCacheSize = 1000
defaultDimensionsCacheSize = 1000
defaultResourceMetricsCacheSize = 1000

metricNameDuration = "duration"
metricNameCalls = "calls"
Expand All @@ -51,10 +52,7 @@ type connectorImp struct {
// Additional dimensions to add to metrics.
dimensions []dimension

// The starting time of the data points.
startTimestamp pcommon.Timestamp

resourceMetrics map[resourceKey]*resourceMetrics
resourceMetrics *cache.Cache[resourceKey, *resourceMetrics]

keyBuf *bytes.Buffer

Expand All @@ -79,6 +77,8 @@ type resourceMetrics struct {
sums metrics.SumMetrics
events metrics.SumMetrics
attributes pcommon.Map
// startTimestamp captures when the first data points for this resource are recorded.
startTimestamp pcommon.Timestamp
}

type dimension struct {
Expand Down Expand Up @@ -110,11 +110,15 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
return nil, err
}

resourceMetricsCache, err := cache.NewCache[resourceKey, *resourceMetrics](cfg.ResourceMetricsCacheSize)
if err != nil {
return nil, err
}

return &connectorImp{
logger: logger,
config: *cfg,
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
resourceMetrics: make(map[resourceKey]*resourceMetrics),
resourceMetrics: resourceMetricsCache,
dimensions: newDimensions(cfg.Dimensions),
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
metricKeyToDimensions: metricKeyToDimensionsCache,
Expand Down Expand Up @@ -236,7 +240,8 @@ func (p *connectorImp) exportMetrics(ctx context.Context) {
// buildMetrics collects the computed raw metrics data and builds OTLP metrics.
func (p *connectorImp) buildMetrics() pmetric.Metrics {
m := pmetric.NewMetrics()
for _, rawMetrics := range p.resourceMetrics {

p.resourceMetrics.ForEach(func(_ resourceKey, rawMetrics *resourceMetrics) {
rm := m.ResourceMetrics().AppendEmpty()
rawMetrics.attributes.CopyTo(rm.Resource().Attributes())

Expand All @@ -246,42 +251,42 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
sums := rawMetrics.sums
metric := sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameCalls))
sums.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality())
sums.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
if !p.config.Histogram.Disable {
histograms := rawMetrics.histograms
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameDuration))
metric.SetUnit(p.config.Histogram.Unit.String())
histograms.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality())
histograms.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
}

events := rawMetrics.events
if p.events.Enabled {
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameEvents))
events.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality())
events.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
}
}
})

return m
}

func (p *connectorImp) resetState() {
// If delta metrics, reset accumulated data
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
p.resourceMetrics = make(map[resourceKey]*resourceMetrics)
p.resourceMetrics.Purge()
p.metricKeyToDimensions.Purge()
p.startTimestamp = pcommon.NewTimestampFromTime(time.Now())
} else {
p.resourceMetrics.RemoveEvictedItems()
p.metricKeyToDimensions.RemoveEvictedItems()

// Exemplars are only relevant to this batch of traces, so must be cleared within the lock
if p.config.Histogram.Disable {
return
}
for _, m := range p.resourceMetrics {
p.resourceMetrics.ForEach(func(_ resourceKey, m *resourceMetrics) {
m.histograms.Reset(true)
}
})

}
}
Expand Down Expand Up @@ -387,15 +392,16 @@ type resourceKey [16]byte

func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics {
key := resourceKey(pdatautil.MapHash(attr))
v, ok := p.resourceMetrics[key]
v, ok := p.resourceMetrics.Get(key)
if !ok {
v = &resourceMetrics{
histograms: initHistogramMetrics(p.config),
sums: metrics.NewSumMetrics(),
events: metrics.NewSumMetrics(),
attributes: attr,
histograms: initHistogramMetrics(p.config),
sums: metrics.NewSumMetrics(),
events: metrics.NewSumMetrics(),
attributes: attr,
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
}
p.resourceMetrics[key] = v
p.resourceMetrics.Add(key, v)
}
return v
}
Expand Down
78 changes: 59 additions & 19 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,19 @@ import (
)

const (
stringAttrName = "stringAttrName"
intAttrName = "intAttrName"
doubleAttrName = "doubleAttrName"
boolAttrName = "boolAttrName"
nullAttrName = "nullAttrName"
mapAttrName = "mapAttrName"
arrayAttrName = "arrayAttrName"
notInSpanAttrName0 = "shouldBeInMetric"
notInSpanAttrName1 = "shouldNotBeInMetric"
regionResourceAttrName = "region"
exceptionTypeAttrName = "exception.type"
DimensionsCacheSize = 2
stringAttrName = "stringAttrName"
intAttrName = "intAttrName"
doubleAttrName = "doubleAttrName"
boolAttrName = "boolAttrName"
nullAttrName = "nullAttrName"
mapAttrName = "mapAttrName"
arrayAttrName = "arrayAttrName"
notInSpanAttrName0 = "shouldBeInMetric"
notInSpanAttrName1 = "shouldNotBeInMetric"
regionResourceAttrName = "region"
exceptionTypeAttrName = "exception.type"
dimensionsCacheSize = 2
resourceMetricsCacheSize = 5

sampleRegion = "us-east-1"
sampleDuration = float64(11)
Expand Down Expand Up @@ -881,7 +882,7 @@ func TestMetricKeyCache(t *testing.T) {
require.NoError(t, err)
// 2 key was cached, 1 key was evicted and cleaned after the processing
assert.Eventually(t, func() bool {
return p.metricKeyToDimensions.Len() == DimensionsCacheSize
return p.metricKeyToDimensions.Len() == dimensionsCacheSize
}, 10*time.Second, time.Millisecond*100)

// consume another batch of traces
Expand All @@ -890,10 +891,48 @@ func TestMetricKeyCache(t *testing.T) {

// 2 key was cached, other keys were evicted and cleaned after the processing
assert.Eventually(t, func() bool {
return p.metricKeyToDimensions.Len() == DimensionsCacheSize
return p.metricKeyToDimensions.Len() == dimensionsCacheSize
}, 10*time.Second, time.Millisecond*100)
}

func TestResourceMetricsCache(t *testing.T) {
mcon := consumertest.NewNop()

p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil)

// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)

// 0 resources in the beginning
assert.Zero(t, p.resourceMetrics.Len())

err := p.ConsumeTraces(ctx, buildSampleTrace())
// Validate
require.NoError(t, err)
assert.Equal(t, 2, p.resourceMetrics.Len())

// consume another batch of traces for the same resources
err = p.ConsumeTraces(ctx, buildSampleTrace())
require.NoError(t, err)
assert.Equal(t, 2, p.resourceMetrics.Len())

// consume more batches for new resources. Max size is exceeded causing old resource entries to be discarded
for i := 0; i < resourceMetricsCacheSize; i++ {
traces := buildSampleTrace()

// add resource attributes to simulate additional resources providing data
for j := 0; j < traces.ResourceSpans().Len(); j++ {
traces.ResourceSpans().At(j).Resource().Attributes().PutStr("dummy", fmt.Sprintf("%d", i))
}

err = p.ConsumeTraces(ctx, traces)
require.NoError(t, err)
}

// validate that the cache doesn't grow past its limit
assert.Equal(t, resourceMetricsCacheSize, p.resourceMetrics.Len())
}

func BenchmarkConnectorConsumeTraces(b *testing.B) {
// Prepare
mcon := consumertest.NewNop()
Expand Down Expand Up @@ -964,11 +1003,12 @@ func TestExcludeDimensionsConsumeTraces(t *testing.T) {
func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp {

cfg := &Config{
AggregationTemporality: temporality,
Histogram: histogramConfig(),
Exemplars: exemplarsConfig(),
ExcludeDimensions: excludedDimensions,
DimensionsCacheSize: DimensionsCacheSize,
AggregationTemporality: temporality,
Histogram: histogramConfig(),
Exemplars: exemplarsConfig(),
ExcludeDimensions: excludedDimensions,
DimensionsCacheSize: dimensionsCacheSize,
ResourceMetricsCacheSize: resourceMetricsCacheSize,
Dimensions: []Dimension{
// Set nil defaults to force a lookup for the attribute in the span.
{stringAttrName, nil},
Expand Down
9 changes: 5 additions & 4 deletions connector/spanmetricsconnector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ func NewFactory() connector.Factory {

func createDefaultConfig() component.Config {
return &Config{
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
MetricsFlushInterval: 15 * time.Second,
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
MetricsFlushInterval: 15 * time.Second,
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
}
}

Expand Down
14 changes: 14 additions & 0 deletions connector/spanmetricsconnector/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,17 @@ func (c *Cache[K, V]) Purge() {
c.lru.Purge()
c.RemoveEvictedItems()
}

// ForEach iterates over all the items within the cache, as well as the evicted items (if any).
func (c *Cache[K, V]) ForEach(fn func(k K, v V)) {
for _, k := range c.lru.Keys() {
v, ok := c.lru.Get(k)
if ok {
fn(k.(K), v.(V))
}
}

for k, v := range c.evictedItems {
fn(k, v)
}
}
1 change: 1 addition & 0 deletions connector/spanmetricsconnector/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ spanmetrics/full:
exemplars:
enabled: true
dimensions_cache_size: 1500
resource_metrics_cache_size: 1600

# Additional list of dimensions on top of:
# - service.name
Expand Down

0 comments on commit 7ff7b54

Please sign in to comment.