Skip to content

Commit

Permalink
Add support for untyped metrics from Ops Agent (#668)
Browse files Browse the repository at this point in the history
* Add support for untyped metrics from Ops Agent

* Wrap double export in feature gate

* Move untyped metrics logic to GMP exporter

* Update tests

* Pass skip timestamp update to collector test

* more tests

* Disable cumulative normalization for untyped test so we only have to send 1 data point

* Copy starttimestamp to new data point and add to test fixture to appease GCM
  • Loading branch information
damemi committed Jul 14, 2023
1 parent ff27f89 commit 0c97ee5
Show file tree
Hide file tree
Showing 12 changed files with 3,428 additions and 4 deletions.
77 changes: 77 additions & 0 deletions exporter/collector/googlemanagedprometheus/extra_metrics.go
Expand Up @@ -17,10 +17,87 @@ package googlemanagedprometheus
import (
"time"

"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

const (
// Special attribute key used by Ops Agent prometheus receiver to denote untyped
// prometheus metric. Internal use only.
GCPOpsAgentUntypedMetricKey = "prometheus_untyped_metric"

gcpUntypedDoubleExportGateKey = "gcp.untyped_double_export"
)

var untypedDoubleExportFeatureGate = featuregate.GlobalRegistry().MustRegister(
gcpUntypedDoubleExportGateKey,
featuregate.StageAlpha,
featuregate.WithRegisterFromVersion("v0.77.0"),
featuregate.WithRegisterDescription("Enable automatically exporting untyped Prometheus metrics as both gauge and cumulative to GCP."),
featuregate.WithRegisterReferenceURL("https://github.com/GoogleCloudPlatform/opentelemetry-operations-go/pull/668"))

// AddUntypedMetrics looks for any Gauge data point with the special Ops Agent untyped metric
// attribute and duplicates that data point to a matching Sum.
func AddUntypedMetrics(m pmetric.Metrics) {
if !untypedDoubleExportFeatureGate.IsEnabled() {
return
}
rms := m.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
sms := rm.ScopeMetrics()
for j := 0; j < sms.Len(); j++ {
sm := sms.At(j)
mes := sm.Metrics()

// only iterate up to the current length. new untyped sum metrics
// will be added to this slice inline.
mLen := mes.Len()
for k := 0; k < mLen; k++ {
metric := mes.At(k)

// only applies to gauges
if metric.Type() != pmetric.MetricTypeGauge {
continue
}

// attribute is set on the data point
gauge := metric.Gauge()
points := gauge.DataPoints()
for l := 0; l < points.Len(); l++ {
point := points.At(l)
val, ok := point.Attributes().Get(GCPOpsAgentUntypedMetricKey)
if !(ok && val.AsString() == "true") {
continue
}

// Add the new Sum metric and copy values over from this Gauge
newMetric := mes.AppendEmpty()
newMetric.SetName(metric.Name())
newMetric.SetDescription(metric.Description())
newMetric.SetUnit(metric.Unit())

newSum := newMetric.SetEmptySum()
newSum.SetIsMonotonic(true)
newSum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

newDataPoint := newSum.DataPoints().AppendEmpty()
point.Attributes().CopyTo(newDataPoint.Attributes())
if point.ValueType() == pmetric.NumberDataPointValueTypeInt {
newDataPoint.SetIntValue(point.IntValue())
} else if point.ValueType() == pmetric.NumberDataPointValueTypeDouble {
newDataPoint.SetDoubleValue(point.DoubleValue())
}
newDataPoint.SetFlags(point.Flags())
newDataPoint.SetTimestamp(point.Timestamp())
newDataPoint.SetStartTimestamp(point.StartTimestamp())
}
}
}
}
}

// AddTargetInfoMetric inserts target_info for each resource.
// First, it extracts the target_info metric from each ResourceMetric associated with the input pmetric.Metrics
// and inserts it into a new ScopeMetric for that resource, as specified in
Expand Down
160 changes: 160 additions & 0 deletions exporter/collector/googlemanagedprometheus/extra_metrics_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)
Expand Down Expand Up @@ -291,6 +292,165 @@ func TestAddExtraMetrics(t *testing.T) {
return metrics
}(),
},
{
name: "add untyped Sum metric from Gauge",
testFunc: func(m pmetric.Metrics) pmetric.ResourceMetricsSlice {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, true)
AddUntypedMetrics(m)
return m.ResourceMetrics()
},
input: func() pmetric.Metrics {
metrics := testMetric(timestamp)
metrics.ResourceMetrics().At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0).
Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
return metrics
}(),
expected: func() pmetric.ResourceMetricsSlice {
metrics := testMetric(timestamp).ResourceMetrics()

dataPoint := metrics.At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0)
dataPoint.Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")

metric := metrics.At(0).ScopeMetrics().At(0).Metrics().AppendEmpty()
metric.SetName("baz-metric")
metric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(2112)
metric.Sum().SetIsMonotonic(true)
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
metric.Sum().DataPoints().At(0).SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
metric.Sum().DataPoints().At(0).Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")

return metrics
}(),
},
{
name: "add untyped Sum metric from Gauge (double value)",
testFunc: func(m pmetric.Metrics) pmetric.ResourceMetricsSlice {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, true)
AddUntypedMetrics(m)
return m.ResourceMetrics()
},
input: func() pmetric.Metrics {
metrics := testMetric(timestamp)
metrics.ResourceMetrics().At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0).
Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
metrics.ResourceMetrics().At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0).
SetDoubleValue(123.5)
return metrics
}(),
expected: func() pmetric.ResourceMetricsSlice {
metrics := testMetric(timestamp).ResourceMetrics()

dataPoint := metrics.At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0)
dataPoint.Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
dataPoint.SetDoubleValue(123.5)

metric := metrics.At(0).ScopeMetrics().At(0).Metrics().AppendEmpty()
metric.SetName("baz-metric")
metric.SetEmptySum().DataPoints().AppendEmpty().SetDoubleValue(123.5)
metric.Sum().SetIsMonotonic(true)
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
metric.Sum().DataPoints().At(0).SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
metric.Sum().DataPoints().At(0).Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")

return metrics
}(),
},
{
name: "untyped Gauge does nothing if feature gate is disabled",
testFunc: func(m pmetric.Metrics) pmetric.ResourceMetricsSlice {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, false)
AddUntypedMetrics(m)
return m.ResourceMetrics()
},
input: func() pmetric.Metrics {
metrics := testMetric(timestamp)
metrics.ResourceMetrics().At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0).
Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
return metrics
}(),
expected: func() pmetric.ResourceMetricsSlice {
metrics := testMetric(timestamp).ResourceMetrics()
dataPoint := metrics.At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0)
dataPoint.Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
return metrics
}(),
},
{
name: "untyped Gauge does nothing if feature gate is enabled and key!=true",
testFunc: func(m pmetric.Metrics) pmetric.ResourceMetricsSlice {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, true)
AddUntypedMetrics(m)
return m.ResourceMetrics()
},
input: func() pmetric.Metrics {
metrics := testMetric(timestamp)
metrics.ResourceMetrics().At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0).
Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "foo")
return metrics
}(),
expected: func() pmetric.ResourceMetricsSlice {
metrics := testMetric(timestamp).ResourceMetrics()
dataPoint := metrics.At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0)
dataPoint.Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "foo")
return metrics
}(),
},
{
name: "untyped non-Gauge does nothing if feature gate is enabled",
testFunc: func(m pmetric.Metrics) pmetric.ResourceMetricsSlice {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, true)
AddUntypedMetrics(m)
return m.ResourceMetrics()
},
input: func() pmetric.Metrics {
metrics := testMetric(timestamp)
metrics.ResourceMetrics().At(0).
ScopeMetrics().At(0).
Metrics().AppendEmpty().SetEmptyHistogram().DataPoints().AppendEmpty().
Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
return metrics
}(),
expected: func() pmetric.ResourceMetricsSlice {
metrics := testMetric(timestamp).ResourceMetrics()
metrics.At(0).
ScopeMetrics().At(0).
Metrics().AppendEmpty().SetEmptyHistogram().DataPoints().AppendEmpty().
Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
return metrics
}(),
},
} {
t.Run(tc.name, func(t *testing.T) {
rms := tc.testFunc(tc.input)
Expand Down
27 changes: 25 additions & 2 deletions exporter/collector/googlemanagedprometheus/naming.go
Expand Up @@ -28,9 +28,9 @@ func GetMetricName(baseName string, metric pmetric.Metric) (string, error) {
// Second, ad the GMP-specific suffix
switch metric.Type() {
case pmetric.MetricTypeSum:
return compliantName + "/counter", nil
return compliantName + getUnknownMetricSuffix(metric.Sum().DataPoints(), "/counter", "counter"), nil
case pmetric.MetricTypeGauge:
return compliantName + "/gauge", nil
return compliantName + getUnknownMetricSuffix(metric.Gauge().DataPoints(), "/gauge", ""), nil
case pmetric.MetricTypeSummary:
// summaries are sent as the following series:
// * Sum: prometheus.googleapis.com/<baseName>_sum/summary:counter
Expand All @@ -49,3 +49,26 @@ func GetMetricName(baseName string, metric pmetric.Metric) (string, error) {
return "", fmt.Errorf("unsupported metric datatype: %v", metric.Type())
}
}

// getUnknownMetricSuffix will set the metric suffix for untyped metrics to
// "/unknown" (eg, for Gauge) or "/unknown:{secondarySuffix}" (eg, "/unknown:counter" for Sum).
// It is based on the untyped_prometheus_metric data point attribute, and behind a feature gate.
func getUnknownMetricSuffix(points pmetric.NumberDataPointSlice, suffix, secondarySuffix string) string {
if !untypedDoubleExportFeatureGate.IsEnabled() {
return suffix
}
newSuffix := suffix
for i := 0; i < points.Len(); i++ {
point := points.At(i)
if val, ok := point.Attributes().Get(GCPOpsAgentUntypedMetricKey); ok && val.AsString() == "true" {
// delete the special Ops Agent untyped attribute
point.Attributes().Remove(GCPOpsAgentUntypedMetricKey)
newSuffix = "/unknown"
if len(secondarySuffix) > 0 {
newSuffix = newSuffix + ":" + secondarySuffix
}
// even though we have the suffix, keep looping to remove the attribute from other points, if any
}
}
return newSuffix
}
48 changes: 48 additions & 0 deletions exporter/collector/googlemanagedprometheus/naming_test.go
Expand Up @@ -130,6 +130,54 @@ func TestGetMetricName(t *testing.T) {
},
expectErr: true,
},
{
desc: "untyped gauge with feature gate disabled does nothing",
baseName: "bar",
metric: func(m pmetric.Metric) {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, false)
m.SetName("bar")
m.SetEmptyGauge()
m.Gauge().DataPoints().AppendEmpty().Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
},
expected: "bar/gauge",
},
{
desc: "untyped sum with feature gate disabled does nothing",
baseName: "bar",
metric: func(m pmetric.Metric) {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, false)
m.SetName("bar")
m.SetEmptySum()
m.Sum().DataPoints().AppendEmpty().Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
},
expected: "bar/counter",
},
{
desc: "untyped gauge with feature gate enabled returns unknown",
baseName: "bar",
metric: func(m pmetric.Metric) {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, true)
m.SetName("bar")
m.SetEmptyGauge()
m.Gauge().DataPoints().AppendEmpty().Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
},
expected: "bar/unknown",
},
{
desc: "untyped sum with feature gate enabled returns unknown:counter",
baseName: "bar",
metric: func(m pmetric.Metric) {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, true)
m.SetName("bar")
m.SetEmptySum()
m.Sum().DataPoints().AppendEmpty().Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
},
expected: "bar/unknown:counter",
},
} {
t.Run(tc.desc, func(t *testing.T) {
assert.True(t, gate.IsEnabled())
Expand Down
2 changes: 1 addition & 1 deletion exporter/collector/integrationtest/go.mod
Expand Up @@ -18,6 +18,7 @@ require (
go.opentelemetry.io/collector v0.78.0
go.opentelemetry.io/collector/component v0.78.0
go.opentelemetry.io/collector/exporter v0.78.0
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012
go.opentelemetry.io/collector/pdata v1.0.0-rcv0012
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
Expand Down Expand Up @@ -87,7 +88,6 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opentelemetry.io/collector/confmap v0.78.0 // indirect
go.opentelemetry.io/collector/consumer v0.78.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012 // indirect
go.opentelemetry.io/collector/receiver v0.78.0 // indirect
go.opentelemetry.io/collector/semconv v0.78.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.16.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions exporter/collector/integrationtest/testcases/testcase.go
Expand Up @@ -271,6 +271,8 @@ func NormalizeLogFixture(t testing.TB, fixture *protos.LogExpectFixture) {

// Load OTLP metric fixture, test expectation fixtures and modify them so they're suitable for
// testing. Currently, this just updates the timestamps.
//
//nolint:revive
func (tc *TestCase) LoadOTLPMetricsInput(
t testing.TB,
startTime time.Time,
Expand Down

0 comments on commit 0c97ee5

Please sign in to comment.