Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for untyped metrics from Ops Agent #668

Merged
merged 8 commits into from Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 76 additions & 0 deletions exporter/collector/googlemanagedprometheus/extra_metrics.go
Expand Up @@ -17,10 +17,86 @@ package googlemanagedprometheus
import (
"time"

"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

const (
// Special attribute key used by Ops Agent prometheus receiver to denote untyped
// prometheus metric. Internal use only.
GCPOpsAgentUntypedMetricKey = "prometheus_untyped_metric"

gcpUntypedDoubleExportGateKey = "gcp.untyped_double_export"
)

var untypedDoubleExportFeatureGate = featuregate.GlobalRegistry().MustRegister(
gcpUntypedDoubleExportGateKey,
featuregate.StageAlpha,
featuregate.WithRegisterFromVersion("v0.77.0"),
featuregate.WithRegisterDescription("Enable automatically exporting untyped Prometheus metrics as both gauge and cumulative to GCP."),
featuregate.WithRegisterReferenceURL("https://github.com/GoogleCloudPlatform/opentelemetry-operations-go/pull/668"))

// AddUntypedMetrics looks for any Gauge data point with the special Ops Agent untyped metric
// attribute and duplicates that data point to a matching Sum.
func AddUntypedMetrics(m pmetric.Metrics) {
if !untypedDoubleExportFeatureGate.IsEnabled() {
return
}
rms := m.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
sms := rm.ScopeMetrics()
for j := 0; j < sms.Len(); j++ {
sm := sms.At(j)
mes := sm.Metrics()

// only iterate up to the current length. new untyped sum metrics
// will be added to this slice inline.
mLen := mes.Len()
for k := 0; k < mLen; k++ {
metric := mes.At(k)

// only applies to gauges
if metric.Type() != pmetric.MetricTypeGauge {
continue
}

// attribute is set on the data point
gauge := metric.Gauge()
points := gauge.DataPoints()
for l := 0; l < points.Len(); l++ {
point := points.At(l)
val, ok := point.Attributes().Get(GCPOpsAgentUntypedMetricKey)
if !(ok && val.AsString() == "true") {
continue
}

// Add the new Sum metric and copy values over from this Gauge
newMetric := mes.AppendEmpty()
newMetric.SetName(metric.Name())
newMetric.SetDescription(metric.Description())
newMetric.SetUnit(metric.Unit())

newSum := newMetric.SetEmptySum()
newSum.SetIsMonotonic(true)
newSum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

newDataPoint := newSum.DataPoints().AppendEmpty()
point.Attributes().CopyTo(newDataPoint.Attributes())
if point.ValueType() == pmetric.NumberDataPointValueTypeInt {
newDataPoint.SetIntValue(point.IntValue())
} else if point.ValueType() == pmetric.NumberDataPointValueTypeDouble {
newDataPoint.SetDoubleValue(point.DoubleValue())
}
newDataPoint.SetFlags(point.Flags())
newDataPoint.SetTimestamp(point.Timestamp())
}
}
}
}
}

// AddTargetInfoMetric inserts target_info for each resource.
// First, it extracts the target_info metric from each ResourceMetric associated with the input pmetric.Metrics
// and inserts it into a new ScopeMetric for that resource, as specified in
Expand Down
160 changes: 160 additions & 0 deletions exporter/collector/googlemanagedprometheus/extra_metrics_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)
Expand Down Expand Up @@ -291,6 +292,165 @@ func TestAddExtraMetrics(t *testing.T) {
return metrics
}(),
},
{
name: "add untyped Sum metric from Gauge",
testFunc: func(m pmetric.Metrics) pmetric.ResourceMetricsSlice {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, true)
AddUntypedMetrics(m)
return m.ResourceMetrics()
},
input: func() pmetric.Metrics {
metrics := testMetric(timestamp)
metrics.ResourceMetrics().At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0).
Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
return metrics
}(),
expected: func() pmetric.ResourceMetricsSlice {
metrics := testMetric(timestamp).ResourceMetrics()

dataPoint := metrics.At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0)
dataPoint.Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")

metric := metrics.At(0).ScopeMetrics().At(0).Metrics().AppendEmpty()
metric.SetName("baz-metric")
metric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(2112)
metric.Sum().SetIsMonotonic(true)
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
metric.Sum().DataPoints().At(0).SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
metric.Sum().DataPoints().At(0).Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")

return metrics
}(),
},
{
name: "add untyped Sum metric from Gauge (double value)",
testFunc: func(m pmetric.Metrics) pmetric.ResourceMetricsSlice {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, true)
AddUntypedMetrics(m)
return m.ResourceMetrics()
},
input: func() pmetric.Metrics {
metrics := testMetric(timestamp)
metrics.ResourceMetrics().At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0).
Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
metrics.ResourceMetrics().At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0).
SetDoubleValue(123.5)
return metrics
}(),
expected: func() pmetric.ResourceMetricsSlice {
metrics := testMetric(timestamp).ResourceMetrics()

dataPoint := metrics.At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0)
dataPoint.Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
dataPoint.SetDoubleValue(123.5)

metric := metrics.At(0).ScopeMetrics().At(0).Metrics().AppendEmpty()
metric.SetName("baz-metric")
metric.SetEmptySum().DataPoints().AppendEmpty().SetDoubleValue(123.5)
metric.Sum().SetIsMonotonic(true)
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
metric.Sum().DataPoints().At(0).SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
metric.Sum().DataPoints().At(0).Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")

return metrics
}(),
},
{
name: "untyped Gauge does nothing if feature gate is disabled",
testFunc: func(m pmetric.Metrics) pmetric.ResourceMetricsSlice {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, false)
AddUntypedMetrics(m)
return m.ResourceMetrics()
},
input: func() pmetric.Metrics {
metrics := testMetric(timestamp)
metrics.ResourceMetrics().At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0).
Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
return metrics
}(),
expected: func() pmetric.ResourceMetricsSlice {
metrics := testMetric(timestamp).ResourceMetrics()
dataPoint := metrics.At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0)
dataPoint.Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
return metrics
}(),
},
{
name: "untyped Gauge does nothing if feature gate is enabled and key!=true",
testFunc: func(m pmetric.Metrics) pmetric.ResourceMetricsSlice {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, true)
AddUntypedMetrics(m)
return m.ResourceMetrics()
},
input: func() pmetric.Metrics {
metrics := testMetric(timestamp)
metrics.ResourceMetrics().At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0).
Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "foo")
return metrics
}(),
expected: func() pmetric.ResourceMetricsSlice {
metrics := testMetric(timestamp).ResourceMetrics()
dataPoint := metrics.At(0).
ScopeMetrics().At(0).
Metrics().At(0).
Gauge().DataPoints().At(0)
dataPoint.Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "foo")
return metrics
}(),
},
{
name: "untyped non-Gauge does nothing if feature gate is enabled",
testFunc: func(m pmetric.Metrics) pmetric.ResourceMetricsSlice {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, true)
AddUntypedMetrics(m)
return m.ResourceMetrics()
},
input: func() pmetric.Metrics {
metrics := testMetric(timestamp)
metrics.ResourceMetrics().At(0).
ScopeMetrics().At(0).
Metrics().AppendEmpty().SetEmptyHistogram().DataPoints().AppendEmpty().
Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
return metrics
}(),
expected: func() pmetric.ResourceMetricsSlice {
metrics := testMetric(timestamp).ResourceMetrics()
metrics.At(0).
ScopeMetrics().At(0).
Metrics().AppendEmpty().SetEmptyHistogram().DataPoints().AppendEmpty().
Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
return metrics
}(),
},
} {
t.Run(tc.name, func(t *testing.T) {
rms := tc.testFunc(tc.input)
Expand Down
27 changes: 25 additions & 2 deletions exporter/collector/googlemanagedprometheus/naming.go
Expand Up @@ -28,9 +28,9 @@ func GetMetricName(baseName string, metric pmetric.Metric) (string, error) {
// Second, ad the GMP-specific suffix
switch metric.Type() {
case pmetric.MetricTypeSum:
return compliantName + "/counter", nil
return compliantName + getUnknownMetricSuffix(metric.Sum().DataPoints(), "/counter", "counter"), nil
case pmetric.MetricTypeGauge:
return compliantName + "/gauge", nil
return compliantName + getUnknownMetricSuffix(metric.Gauge().DataPoints(), "/gauge", ""), nil
case pmetric.MetricTypeSummary:
// summaries are sent as the following series:
// * Sum: prometheus.googleapis.com/<baseName>_sum/summary:counter
Expand All @@ -49,3 +49,26 @@ func GetMetricName(baseName string, metric pmetric.Metric) (string, error) {
return "", fmt.Errorf("unsupported metric datatype: %v", metric.Type())
}
}

// getUnknownMetricSuffix will set the metric suffix for untyped metrics to
// "/unknown" (eg, for Gauge) or "/unknown:{secondarySuffix}" (eg, "/unknown:counter" for Sum).
// It is based on the untyped_prometheus_metric data point attribute, and behind a feature gate.
func getUnknownMetricSuffix(points pmetric.NumberDataPointSlice, suffix, secondarySuffix string) string {
if !untypedDoubleExportFeatureGate.IsEnabled() {
return suffix
}
newSuffix := suffix
for i := 0; i < points.Len(); i++ {
point := points.At(i)
if val, ok := point.Attributes().Get(GCPOpsAgentUntypedMetricKey); ok && val.AsString() == "true" {
// delete the special Ops Agent untyped attribute
point.Attributes().Remove(GCPOpsAgentUntypedMetricKey)
newSuffix = "/unknown"
if len(secondarySuffix) > 0 {
newSuffix = newSuffix + ":" + secondarySuffix
}
// even though we have the suffix, keep looping to remove the attribute from other points, if any
}
}
return newSuffix
}
48 changes: 48 additions & 0 deletions exporter/collector/googlemanagedprometheus/naming_test.go
Expand Up @@ -130,6 +130,54 @@ func TestGetMetricName(t *testing.T) {
},
expectErr: true,
},
{
desc: "untyped gauge with feature gate disabled does nothing",
baseName: "bar",
metric: func(m pmetric.Metric) {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, false)
m.SetName("bar")
m.SetEmptyGauge()
m.Gauge().DataPoints().AppendEmpty().Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
},
expected: "bar/gauge",
},
{
desc: "untyped sum with feature gate disabled does nothing",
baseName: "bar",
metric: func(m pmetric.Metric) {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, false)
m.SetName("bar")
m.SetEmptySum()
m.Sum().DataPoints().AppendEmpty().Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
},
expected: "bar/counter",
},
{
desc: "untyped gauge with feature gate enabled returns unknown",
baseName: "bar",
metric: func(m pmetric.Metric) {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, true)
m.SetName("bar")
m.SetEmptyGauge()
m.Gauge().DataPoints().AppendEmpty().Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
},
expected: "bar/unknown",
},
{
desc: "untyped sum with feature gate enabled returns unknown:counter",
baseName: "bar",
metric: func(m pmetric.Metric) {
//nolint:errcheck
featuregate.GlobalRegistry().Set(gcpUntypedDoubleExportGateKey, true)
m.SetName("bar")
m.SetEmptySum()
m.Sum().DataPoints().AppendEmpty().Attributes().PutStr(GCPOpsAgentUntypedMetricKey, "true")
},
expected: "bar/unknown:counter",
},
} {
t.Run(tc.desc, func(t *testing.T) {
assert.True(t, gate.IsEnabled())
Expand Down
Expand Up @@ -151,7 +151,7 @@ func (fr fixtureRecorder) recordMetrics(ctx context.Context, t *FakeTesting, sta
require.NoError(t, fr.checkDuplicate(test))

func() {
metrics := test.LoadOTLPMetricsInput(t, startTime, endTime)
metrics := test.LoadOTLPMetricsInput(t, startTime, endTime, test.SkipTimestampUpdate)
testServerExporter := integrationtest.NewMetricTestExporter(ctx, t, testServer, test.CreateCollectorMetricConfig())
inMemoryOCExporter, err := integrationtest.NewInMemoryOCViewExporter()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion exporter/collector/integrationtest/go.mod
Expand Up @@ -18,6 +18,7 @@ require (
go.opentelemetry.io/collector v0.78.0
go.opentelemetry.io/collector/component v0.78.0
go.opentelemetry.io/collector/exporter v0.78.0
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012
go.opentelemetry.io/collector/pdata v1.0.0-rcv0012
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
Expand Down Expand Up @@ -87,7 +88,6 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opentelemetry.io/collector/confmap v0.78.0 // indirect
go.opentelemetry.io/collector/consumer v0.78.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012 // indirect
go.opentelemetry.io/collector/receiver v0.78.0 // indirect
go.opentelemetry.io/collector/semconv v0.78.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.16.1 // indirect
Expand Down
Expand Up @@ -64,7 +64,7 @@ func TestIntegrationCollectorMetrics(t *testing.T) {

t.Run(test.Name, func(t *testing.T) {
test.SkipIfNeeded(t)
metrics := test.LoadOTLPMetricsInput(t, startTime, endTime)
metrics := test.LoadOTLPMetricsInput(t, startTime, endTime, false)
setSecondProjectInMetrics(t, metrics)
exporter := createMetricsExporter(ctx, t, &test)
defer func() { require.NoError(t, exporter.Shutdown(ctx)) }()
Expand All @@ -88,7 +88,7 @@ func TestIntegrationSDKMetrics(t *testing.T) {

t.Run(test.Name, func(t *testing.T) {
test.SkipIfNeededForSDK(t)
metrics := test.LoadOTLPMetricsInput(t, startTime, endTime)
metrics := test.LoadOTLPMetricsInput(t, startTime, endTime, false)
setSecondProjectInMetrics(t, metrics)
exporter, err := metric.New(
metric.WithProjectID(os.Getenv("PROJECT_ID")),
Expand Down