Skip to content

Commit

Permalink
Change default aggregation for Counter, Observer and Recorder (#41)
Browse files Browse the repository at this point in the history
* Change default aggregation for Counter, Observer and Recorder

* Fixes #25
* Fixes #37
* Change metric exporter version 0.2.0 -> v0.2.1

* Change to pass make precommit

* Add support for SumObserver and UpDownSumObserver.

* Add support for SumObserver and UpDownSumObserver
* Fix package name to call for consistency (metric -> apimetric)
* Fix version to use in the metric example

* Fix comments
  • Loading branch information
ymotongpoo committed Jun 11, 2020
1 parent ec6b73c commit 5fb47d1
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 59 deletions.
4 changes: 2 additions & 2 deletions example/metric/example.go
Expand Up @@ -106,9 +106,9 @@ func main() {
cv := 100 + r
counter.Add(ctx, cv, clabels...)

r2 := rand.Int63n(10)
r2 := rand.Int63n(100)
ov := 12.34 + float64(r2)/20.0
of.set(ov)
log.Printf("Submitted data: counter %v, observer %v", cv, ov)
log.Printf("Most recent data: counter %v, observer %v", cv, ov)
}
}
2 changes: 1 addition & 1 deletion example/metric/go.mod
Expand Up @@ -5,7 +5,7 @@ go 1.14
replace github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric => ../../exporter/metric

require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.2.0
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.2.1
github.com/google/go-cmp v0.4.1 // indirect
go.opentelemetry.io/otel v0.6.0
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2 // indirect
Expand Down
51 changes: 51 additions & 0 deletions exporter/metric/error.go
@@ -0,0 +1,51 @@
// Copyright 2020, Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric

import (
"errors"
"fmt"

apimetric "go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
)

var (
errBlankProjectID = errors.New("expecting a non-blank ProjectID")
)

type errUnsupportedAggregation struct {
agg export.Aggregator
}

func (e errUnsupportedAggregation) Error() string {
return fmt.Sprintf("currently the aggregator is not supported: %v", e.agg)
}

type errUnexpectedNumberKind struct {
kind apimetric.NumberKind
}

func (e errUnexpectedNumberKind) Error() string {
return fmt.Sprintf("the number kind is unexpected: %v", e.kind)
}

type errUnexpectedMetricKind struct {
kind apimetric.Kind
}

func (e errUnexpectedMetricKind) Error() string {
return fmt.Sprintf("the metric kind is unexpected: %v", e.kind)
}
97 changes: 43 additions & 54 deletions exporter/metric/metric.go
Expand Up @@ -16,7 +16,6 @@ package metric

import (
"context"
"errors"
"fmt"
"log"
"strings"
Expand All @@ -27,7 +26,6 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"

monitoring "cloud.google.com/go/monitoring/apiv3"
Expand All @@ -39,29 +37,12 @@ import (
)

const (
version = "0.1.0"
version = "0.2.1"
cloudMonitoringMetricDescriptorNameFormat = "custom.googleapis.com/opentelemetry/%s"
)

var (
errBlankProjectID = errors.New("expecting a non-blank ProjectID")
)

type errUnsupportedAggregation struct {
agg export.Aggregator
}

func (e errUnsupportedAggregation) Error() string {
return fmt.Sprintf("currently the aggregator is not supported: %v", e.agg)
}

type errUnexpectedNumberKind struct {
kind apimetric.NumberKind
}

func (e errUnexpectedNumberKind) Error() string {
return fmt.Sprintf("the metric kind is unexpected: %v", e.kind)
}
// TODO: Remove when Count aggregation is used in the implementation
var _ = countToTypeValueAndTimestamp

// key is used to judge the uniqueness of the record descriptor.
type key struct {
Expand All @@ -86,6 +67,11 @@ type metricExporter struct {
mdCache map[key]*googlemetricpb.MetricDescriptor

client *monitoring.MetricClient

// startTime is the cache of start time shared among all CUMULATIVE values.
// c.f. https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.metricDescriptors#MetricKind
// TODO: Remove this when OTel SDK provides start time for each record specifically for stateful batcher.
startTime time.Time
}

// newMetricExporter returns an exporter that uploads OTel metric data to Google Cloud Monitoring.
Expand All @@ -106,9 +92,10 @@ func newMetricExporter(o *options) (*metricExporter, error) {

cache := map[key]*googlemetricpb.MetricDescriptor{}
e := &metricExporter{
o: o,
mdCache: cache,
client: client,
o: o,
mdCache: cache,
client: client,
startTime: time.Now(),
}
return e, nil
}
Expand All @@ -126,7 +113,7 @@ func InstallNewPipeline(opts []Option, popts ...push.Option) (*push.Controller,
// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// chaining a NewRawExporter into the recommended selectors and integrators.
func NewExportPipeline(opts []Option, popts ...push.Option) (*push.Controller, error) {
selector := simple.NewWithExactDistribution()
selector := NewWithCloudMonitoringDistribution()
exporter, err := NewRawExporter(opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -239,7 +226,7 @@ func (me *metricExporter) recordToTspb(r *export.Record, res *resource.Resource)
m := me.recordToMpb(r)
mr := me.resourceToMonitoredResourcepb(res)

tv, t, err := recordToTypedValueAndTimestamp(r)
tv, t, err := me.recordToTypedValueAndTimestamp(r)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -307,7 +294,6 @@ func (me *metricExporter) resourceToMonitoredResourcepb(_ *resource.Resource) *m
// recordToMdpbKindType return the mapping from OTel's record descriptor to
// Cloud Monitoring's MetricKind and ValueType.
func recordToMdpbKindType(r *export.Record) (googlemetricpb.MetricDescriptor_MetricKind, googlemetricpb.MetricDescriptor_ValueType) {
// TODO: [ymotongpoo] Remove tentative implementation
mkind := r.Descriptor().MetricKind()
nkind := r.Descriptor().NumberKind()

Expand Down Expand Up @@ -370,54 +356,54 @@ func (me *metricExporter) recordToMpb(r *export.Record) *googlemetricpb.Metric {
// TODO: Apply appropriate TimeInterval based upon MetricKind. See details in the doc.
// https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries#Point
// See detils in #25.
func recordToTypedValueAndTimestamp(r *export.Record) (*monitoringpb.TypedValue, *monitoringpb.TimeInterval, error) {
func (me *metricExporter) recordToTypedValueAndTimestamp(r *export.Record) (*monitoringpb.TypedValue, *monitoringpb.TimeInterval, error) {
agg := r.Aggregator()
kind := r.Descriptor().NumberKind()
nkind := r.Descriptor().NumberKind()
mkind := r.Descriptor().MetricKind()
now := time.Now().Unix()

// TODO: Ignoring the case for Min, Max and Distribution to simply
// the first implementation.
//
// Currently the selector used in the integrator is `simple.NewWithExactDistribution`
// which should return array.New(), where it is ambiguous how the aggregator is treated inside.
// https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/aggregator/array?tab=doc#New
//
// Now this function only returns count for Counter and Observer as it is set as 1st condition,
// and float64 type obersever is trimmed to int64, unfortunately.
// If we'd like to handle all possible aggregations, we need to have intermediate result type,
// and return the specified aggrataion value, which is done in stdout exporter.
// https://github.com/open-telemetry/opentelemetry-go/blob/21d094af43/exporters/metric/stdout/stdout.go#L72-L84
// https://github.com/open-telemetry/opentelemetry-go/blob/21d094af43/exporters/metric/stdout/stdout.go#L167-L221
// NOTE: Currently the selector used in the integrator is our own implementation,
// because none of those in go.opentelemetry.io/otel/sdk/metric/selector/simple
// gives interfaces to fetch LastValue.
//
// Views API should provide better interface that does not require the complicated codition handling
// done in this function.
// https://github.com/open-telemetry/opentelemetry-specification/issues/466
// In OpenCensus, view interface provided the bundle of name, measure, labels and aggregation in one place,
// and it should return the appropriate value based on the aggregation type specified there.
if count, ok := agg.(aggregator.Count); ok {
return countToTypeValueAndTimestamp(count, kind, now)
} else if lv, ok := agg.(aggregator.LastValue); ok {
return lastValueToTypedValueAndTimestamp(lv, kind, now)
} else if sum, ok := agg.(aggregator.Sum); ok {
// TODO: Handle TimeInterval appropriately (#25)
return sumToTypedValueAndTimestamp(sum, kind, now, now+1)
}
return nil, nil, errUnsupportedAggregation{agg: agg}
switch mkind {
case apimetric.ValueObserverKind, apimetric.ValueRecorderKind:
if lv, ok := agg.(aggregator.LastValue); ok {
return lastValueToTypedValueAndTimestamp(lv, nkind)
}
return nil, nil, errUnsupportedAggregation{agg: agg}
case apimetric.CounterKind, apimetric.UpDownCounterKind, apimetric.SumObserverKind, apimetric.UpDownSumObserverKind:
// CUMULATIVE measurement should have the same start time and increasing end time.
// c.f. https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.metricDescriptors#MetricKind
if sum, ok := agg.(aggregator.Sum); ok {
return sumToTypedValueAndTimestamp(sum, nkind, me.startTime.Unix(), now)
}
return nil, nil, errUnsupportedAggregation{agg: agg}
}

return nil, nil, errUnexpectedMetricKind{kind: mkind}
}

func countToTypeValueAndTimestamp(count aggregator.Count, kind apimetric.NumberKind, seconds int64) (*monitoringpb.TypedValue, *monitoringpb.TimeInterval, error) {
func countToTypeValueAndTimestamp(count aggregator.Count, kind apimetric.NumberKind, start, end int64) (*monitoringpb.TypedValue, *monitoringpb.TimeInterval, error) {
value, err := count.Count()
if err != nil {
return nil, nil, err
}

// TODO: Consider the expression of TimeInterval (#25)
t := &monitoringpb.TimeInterval{
StartTime: &googlepb.Timestamp{
Seconds: seconds,
Seconds: start,
},
EndTime: &googlepb.Timestamp{
Seconds: seconds,
Seconds: end,
},
}
switch kind {
Expand All @@ -443,7 +429,7 @@ func countToTypeValueAndTimestamp(count aggregator.Count, kind apimetric.NumberK
return nil, nil, errUnexpectedNumberKind{kind: kind}
}

func lastValueToTypedValueAndTimestamp(lv aggregator.LastValue, kind apimetric.NumberKind, start int64) (*monitoringpb.TypedValue, *monitoringpb.TimeInterval, error) {
func lastValueToTypedValueAndTimestamp(lv aggregator.LastValue, kind apimetric.NumberKind) (*monitoringpb.TypedValue, *monitoringpb.TimeInterval, error) {
value, timestamp, err := lv.LastValue()
if err != nil {
return nil, nil, err
Expand All @@ -454,6 +440,9 @@ func lastValueToTypedValueAndTimestamp(lv aggregator.LastValue, kind apimetric.N
StartTime: &googlepb.Timestamp{
Seconds: timestamp.Unix(),
},
EndTime: &googlepb.Timestamp{
Seconds: timestamp.Unix(),
},
}

tv, err := aggToTypedValue(kind, value)
Expand Down
4 changes: 2 additions & 2 deletions exporter/metric/option.go
Expand Up @@ -69,7 +69,7 @@ type options struct {
// If no custom hook is set, errors are logged. Optional.
//
// TODO: This option should be replaced with OTel defining error handler.
// c.f. https://pkg.go.dev/go.opentelemetry.io/otel@v0.5.0/sdk/metric/controller/push?tab=doc#Config
// c.f. https://pkg.go.dev/go.opentelemetry.io/otel@v0.6.0/sdk/metric/controller/push?tab=doc#Config
onError func(error)
}

Expand Down Expand Up @@ -103,7 +103,7 @@ func WithInterval(t time.Duration) func(o *options) {

// WithMetricDescriptorTypeFormatter sets the custom formatter for MetricDescriptor.
// Note that the format has to follow the convention defined in the official document.
// The default is "custom.googleapi.com/[metric name]".
// The default is "custom.googleapis.com/[metric name]".
// ref. https://cloud.google.com/monitoring/custom-metrics/creating-metrics#custom_metric_names
func WithMetricDescriptorTypeFormatter(f func(*apimetric.Descriptor) string) func(o *options) {
return func(o *options) {
Expand Down
54 changes: 54 additions & 0 deletions exporter/metric/selector.go
@@ -0,0 +1,54 @@
// Copyright 2020, Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric

import (
apimetric "go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
)

type selectorCloudMonitoring struct{}

var _ export.AggregationSelector = selectorCloudMonitoring{}

// NewWithCloudMonitoringDistribution return a simple aggregation selector
// that uses lastvalue, counter, array, and aggregator for three kinds of metric.
//
// NOTE: this selector is just to ensure that LastValue is used for
// ValueObserverKind and ValueRecorderKind.
//
// TODO: Remove this once SDK implements such a
// selector, otherwise Views API gives flexibility to set aggregation type on
// configuring measurement.
// c.f. https://github.com/open-telemetry/oteps/pull/89
func NewWithCloudMonitoringDistribution() export.AggregationSelector {
return selectorCloudMonitoring{}
}

func (selectorCloudMonitoring) AggregatorFor(descriptor *apimetric.Descriptor) export.Aggregator {
switch descriptor.MetricKind() {
case apimetric.ValueObserverKind, apimetric.ValueRecorderKind:
return lastvalue.New()
// NOTE: `array` gives the option to use Sum, Count, Max, Min, Quantile and Points (most flexible),
// so chosen for future implementations rather than `sum`.
case apimetric.CounterKind, apimetric.UpDownCounterKind, apimetric.SumObserverKind, apimetric.UpDownSumObserverKind:
return array.New()
default:
return sum.New()
}
}

0 comments on commit 5fb47d1

Please sign in to comment.