Skip to content

Commit

Permalink
add filtered reservoir to eliminate unneccessary time.Now calls
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Jun 27, 2024
1 parent f3a2d96 commit 0274645
Show file tree
Hide file tree
Showing 18 changed files with 262 additions and 231 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Upgrade `go.opentelemetry.io/otel/semconv/v1.25.0` to `go.opentelemetry.io/otel/semconv/v1.26.0` in `go.opentelemetry.io/otel/sdk/resource`. (#5490)
- Upgrade `go.opentelemetry.io/otel/semconv/v1.25.0` to `go.opentelemetry.io/otel/semconv/v1.26.0` in `go.opentelemetry.io/otel/sdk/trace`. (#5490)
- Use non-generic functions in the `Start` method of `"go.opentelemetry.io/otel/sdk/trace".Trace` to reduce memory allocation. (#5497)
- Improve performance of metric instruments in `go.opentelemetry.io/otel/sdk/metric` by removing unnecessary calls to time.Now. (#5545)

### Fixed

Expand Down
92 changes: 44 additions & 48 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,67 +19,63 @@ import (
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc(agg Aggregation) func() exemplar.Reservoir {
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredReservoir[N] {
if !x.Exemplars.Enabled() {
return nil
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
resF := func() func() exemplar.Reservoir {
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() exemplar.Reservoir {
bounds := cp
return exemplar.Histogram(bounds)
}
}

var n int
if a, ok := agg.(AggregationBase2ExponentialHistogram); ok {
// Base2 Exponential Histogram Aggregation SHOULD use a
// SimpleFixedSizeExemplarReservoir with a reservoir equal to the
// smaller of the maximum number of buckets configured on the
// aggregation or twenty (e.g. min(20, max_buckets)).
n = int(a.MaxSize)
if n > 20 {
n = 20
}
} else {
// https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
// This Exemplar reservoir MAY take a configuration parameter for
// the size of the reservoir. If no size configuration is
// provided, the default size MAY be the number of possible
// concurrent threads (e.g. number of CPUs) to help reduce
// contention. Otherwise, a default size of 1 SHOULD be used.
n = runtime.NumCPU()
if n < 1 {
// Should never be the case, but be defensive.
n = 1
}
}

return func() exemplar.Reservoir {
return exemplar.FixedSize(n)
}
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"

var filter exemplar.Filter

switch os.Getenv(filterEnvKey) {
case "always_on":
return resF()
filter = exemplar.AlwaysOnFilter
case "always_off":
return exemplar.Drop
case "trace_based":
fallthrough
default:
newR := resF()
return func() exemplar.Reservoir {
return exemplar.SampledFilter(newR())
filter = exemplar.SampledFilter
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() exemplar.FilteredReservoir[N] {
bounds := cp
return exemplar.NewFilteredReservoir[N](filter, exemplar.Histogram(bounds))
}
}

var n int
if a, ok := agg.(AggregationBase2ExponentialHistogram); ok {
// Base2 Exponential Histogram Aggregation SHOULD use a
// SimpleFixedSizeExemplarReservoir with a reservoir equal to the
// smaller of the maximum number of buckets configured on the
// aggregation or twenty (e.g. min(20, max_buckets)).
n = int(a.MaxSize)
if n > 20 {
n = 20
}
} else {
// https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
// This Exemplar reservoir MAY take a configuration parameter for
// the size of the reservoir. If no size configuration is
// provided, the default size MAY be the number of possible
// concurrent threads (e.g. number of CPUs) to help reduce
// contention. Otherwise, a default size of 1 SHOULD be used.
n = runtime.NumCPU()
if n < 1 {
// Should never be the case, but be defensive.
n = 1
}
}

return func() exemplar.FilteredReservoir[N] {
return exemplar.NewFilteredReservoir[N](filter, exemplar.FixedSize(n))
}
}
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Builder[N int64 | float64] struct {
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.Reservoir
ReservoirFunc func() exemplar.FilteredReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -50,7 +50,7 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) resFunc() func() exemplar.Reservoir {
func (b Builder[N]) resFunc() func() exemplar.FilteredReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func (c *clock) Register() (unregister func()) {
return func() { now = orig }
}

func dropExemplars[N int64 | float64]() exemplar.Reservoir {
return exemplar.Drop()
func dropExemplars[N int64 | float64]() exemplar.FilteredReservoir[N] {
return exemplar.Drop[N]()
}

func TestBuilderFilter(t *testing.T) {
Expand Down
10 changes: 4 additions & 6 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
res exemplar.Reservoir
res exemplar.FilteredReservoir[N]

count uint64
min N
Expand Down Expand Up @@ -282,7 +282,7 @@ func (b *expoBuckets) downscale(delta int) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
Expand All @@ -305,7 +305,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int

newRes func() exemplar.Reservoir
newRes func() exemplar.FilteredReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
Expand All @@ -319,8 +319,6 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
return
}

t := now()

e.valuesMu.Lock()
defer e.valuesMu.Unlock()

Expand All @@ -333,7 +331,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
e.values[attr.Equivalent()] = v
}
v.record(value)
v.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)
v.res.Offer(ctx, value, droppedAttr)
}

func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
Expand Down
20 changes: 10 additions & 10 deletions sdk/metric/internal/aggregate/exponential_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(1),
Time: y2kPlus(9),
Time: y2kPlus(2),
Count: 7,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
Expand Down Expand Up @@ -832,8 +832,8 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(10),
Time: y2kPlus(24),
StartTime: y2kPlus(3),
Time: y2kPlus(4),
Count: 7,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
Expand All @@ -850,8 +850,8 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
},
{
Attributes: overflowSet,
StartTime: y2kPlus(10),
Time: y2kPlus(24),
StartTime: y2kPlus(3),
Time: y2kPlus(4),
Count: 6,
Min: metricdata.NewExtrema[N](1),
Max: metricdata.NewExtrema[N](16),
Expand Down Expand Up @@ -905,7 +905,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(9),
Time: y2kPlus(2),
Count: 7,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
Expand Down Expand Up @@ -938,7 +938,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(13),
Time: y2kPlus(3),
Count: 10,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
Expand Down Expand Up @@ -967,7 +967,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(14),
Time: y2kPlus(4),
Count: 10,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
Expand Down Expand Up @@ -1004,7 +1004,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(21),
Time: y2kPlus(5),
Count: 10,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
Expand All @@ -1022,7 +1022,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
{
Attributes: overflowSet,
StartTime: y2kPlus(0),
Time: y2kPlus(21),
Time: y2kPlus(5),
Count: 6,
Min: metricdata.NewExtrema[N](1),
Max: metricdata.NewExtrema[N](16),
Expand Down
12 changes: 5 additions & 7 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

type buckets[N int64 | float64] struct {
attrs attribute.Set
res exemplar.Reservoir
res exemplar.FilteredReservoir[N]

counts []uint64
count uint64
Expand Down Expand Up @@ -48,13 +48,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

newRes func() exemplar.Reservoir
newRes func() exemplar.FilteredReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.Reservoir) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
Expand All @@ -80,8 +80,6 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
// (s.bounds[len(s.bounds)-1], +∞).
idx := sort.SearchFloat64s(s.bounds, float64(value))

t := now()

s.valuesMu.Lock()
defer s.valuesMu.Unlock()

Expand All @@ -106,12 +104,12 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
if !s.noSum {
b.sum(value)
}
b.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)
b.res.Offer(ctx, value, droppedAttr)
}

// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,
Expand Down
32 changes: 16 additions & 16 deletions sdk/metric/internal/aggregate/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 2, 3, y2kPlus(1), y2kPlus(7)),
c.hPt(fltrBob, 10, 2, y2kPlus(1), y2kPlus(7)),
c.hPt(fltrAlice, 2, 3, y2kPlus(1), y2kPlus(2)),
c.hPt(fltrBob, 10, 2, y2kPlus(1), y2kPlus(2)),
},
},
},
Expand All @@ -96,8 +96,8 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 10, 1, y2kPlus(7), y2kPlus(10)),
c.hPt(fltrBob, 3, 1, y2kPlus(7), y2kPlus(10)),
c.hPt(fltrAlice, 10, 1, y2kPlus(2), y2kPlus(3)),
c.hPt(fltrBob, 3, 1, y2kPlus(2), y2kPlus(3)),
},
},
},
Expand Down Expand Up @@ -126,9 +126,9 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 1, 1, y2kPlus(11), y2kPlus(16)),
c.hPt(fltrBob, 1, 1, y2kPlus(11), y2kPlus(16)),
c.hPt(overflowSet, 1, 2, y2kPlus(11), y2kPlus(16)),
c.hPt(fltrAlice, 1, 1, y2kPlus(4), y2kPlus(5)),
c.hPt(fltrBob, 1, 1, y2kPlus(4), y2kPlus(5)),
c.hPt(overflowSet, 1, 2, y2kPlus(4), y2kPlus(5)),
},
},
},
Expand Down Expand Up @@ -167,8 +167,8 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 2, 3, y2kPlus(0), y2kPlus(7)),
c.hPt(fltrBob, 10, 2, y2kPlus(0), y2kPlus(7)),
c.hPt(fltrAlice, 2, 3, y2kPlus(0), y2kPlus(2)),
c.hPt(fltrBob, 10, 2, y2kPlus(0), y2kPlus(2)),
},
},
},
Expand All @@ -183,8 +183,8 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(10)),
c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(10)),
c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(3)),
c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(3)),
},
},
},
Expand All @@ -196,8 +196,8 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(11)),
c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(11)),
c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(4)),
c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(4)),
},
},
},
Expand All @@ -213,9 +213,9 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(14)),
c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(14)),
c.hPt(overflowSet, 1, 2, y2kPlus(0), y2kPlus(14)),
c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(5)),
c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(5)),
c.hPt(overflowSet, 1, 2, y2kPlus(0), y2kPlus(5)),
},
},
},
Expand Down
Loading

0 comments on commit 0274645

Please sign in to comment.