Skip to content

Commit

Permalink
Only registers callbacks if non-drop aggregation is used (#3408)
Browse files Browse the repository at this point in the history
* Do not return an error for Drop aggs

The async instruments currently return an error if and only if there are
no aggregators returned from a resolve. Returning no aggregators means
the instrument aggregation is drop. Do not include this in the error
reporting decision.

* Only registers callbacks if non-drop agg is used

The instruments passed to RegisterCallback need to have some aggregation
defined otherwise it is implied they have a Drop aggregation. Check that
at least one instrument passed has an aggregation other than Drop before
registering the callback with the pipelines.

Also, return an error if the user passed another API implementation of
an asynchronous instrument.

* Remove unneeded TODO from pipeline

* Add changes to changelog

* Test callback not called for all drop instruments

* Test RegisterCallback returns err for non-SDK inst

* Fail gracefully for non-SDK instruments

Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
  • Loading branch information
NullBrotasli and hanyuancheung authored Nov 11, 2022
1 parent 987ab55 commit 14d1d28
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340)
- `Aggregation`s from `go.opentelemetry.io/otel/sdk/metric` with no data are not exported. (#3394, #3436)
- Reenabled Attribute Filters in the Metric SDK. (#3396)
- Asynchronous callbacks are only called if they are registered with at least one instrument that does not use drop aggragation. (#3408)
- Do not report empty partial-success responses in the `go.opentelemetry.io/otel/exporters/otlp` exporters. (#3438, #3432)
- Handle partial success responses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric` exporters. (#3162, #3440)

Expand Down
5 changes: 0 additions & 5 deletions sdk/metric/instrument_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"fmt"

"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
Expand Down Expand Up @@ -70,9 +68,6 @@ func (p *instProvider[N]) lookup(kind view.InstrumentKind, name string, opts []i
}

aggs, err := p.resolve.Aggregators(key)
if len(aggs) == 0 && err != nil {
err = fmt.Errorf("instrument does not match any view: %w", err)
}
return &instrumentImpl[N]{aggregators: aggs}, err
}

Expand Down
25 changes: 25 additions & 0 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,31 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
// RegisterCallback registers the function f to be called when any of the
// insts Collect method is called.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error {
for _, inst := range insts {
// Only register if at least one instrument has a non-drop aggregation.
// Otherwise, calling f during collection will be wasted computation.
switch t := inst.(type) {
case *instrumentImpl[int64]:
if len(t.aggregators) > 0 {
return m.registerCallback(f)
}
case *instrumentImpl[float64]:
if len(t.aggregators) > 0 {
return m.registerCallback(f)
}
default:
// Instrument external to the SDK. For example, an instrument from
// the "go.opentelemetry.io/otel/metric/internal/global" package.
//
// Fail gracefully here, assume a valid instrument.
return m.registerCallback(f)
}
}
// All insts use drop aggregation.
return nil
}

func (m *meter) registerCallback(f func(context.Context)) error {
m.pipes.registerCallback(f)
return nil
}
Expand Down
44 changes: 44 additions & 0 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/metric/view"
Expand Down Expand Up @@ -480,6 +481,49 @@ func TestMetersProvideScope(t *testing.T) {
metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp())
}

func TestRegisterCallbackDropAggregations(t *testing.T) {
aggFn := func(view.InstrumentKind) aggregation.Aggregation {
return aggregation.Drop{}
}
r := NewManualReader(WithAggregationSelector(aggFn))
mp := NewMeterProvider(WithReader(r))
m := mp.Meter("testRegisterCallbackDropAggregations")

int64Counter, err := m.AsyncInt64().Counter("int64.counter")
require.NoError(t, err)

int64UpDownCounter, err := m.AsyncInt64().UpDownCounter("int64.up_down_counter")
require.NoError(t, err)

int64Gauge, err := m.AsyncInt64().Gauge("int64.gauge")
require.NoError(t, err)

floag64Counter, err := m.AsyncFloat64().Counter("floag64.counter")
require.NoError(t, err)

floag64UpDownCounter, err := m.AsyncFloat64().UpDownCounter("floag64.up_down_counter")
require.NoError(t, err)

floag64Gauge, err := m.AsyncFloat64().Gauge("floag64.gauge")
require.NoError(t, err)

var called bool
require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{
int64Counter,
int64UpDownCounter,
int64Gauge,
floag64Counter,
floag64UpDownCounter,
floag64Gauge,
}, func(context.Context) { called = true }))

data, err := r.Collect(context.Background())
require.NoError(t, err)

assert.False(t, called, "callback called for all drop instruments")
assert.Len(t, data.ScopeMetrics, 0, "metrics exported for drop instruments")
}

func TestAttributeFilter(t *testing.T) {
one := 1.0
two := 2.0
Expand Down
1 change: 0 additions & 1 deletion sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ func newPipelines(res *resource.Resource, readers []Reader, views []view.View) p
return pipes
}

// TODO (#3053) Only register callbacks if any instrument matches in a view.
func (p pipelines) registerCallback(fn func(context.Context)) {
for _, pipe := range p {
pipe.addCallback(fn)
Expand Down

0 comments on commit 14d1d28

Please sign in to comment.