diff --git a/.chloggen/signalfx-exp-otlp.yaml b/.chloggen/signalfx-exp-otlp.yaml new file mode 100755 index 0000000000000..0071f390e285d --- /dev/null +++ b/.chloggen/signalfx-exp-otlp.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/signalfx + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Send histograms in otlp format with new config `send_otlp_histograms` option + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26298] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/signalfx-recv-otlp.yaml b/.chloggen/signalfx-recv-otlp.yaml index c95e78a093ae9..75d20c483d11f 100755 --- a/.chloggen/signalfx-recv-otlp.yaml +++ b/.chloggen/signalfx-recv-otlp.yaml @@ -10,7 +10,7 @@ component: receiver/signalfx note: Accept otlp protobuf requests when content-type is "application/x-protobuf;format=otlp" # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [31052] +issues: [26298] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. diff --git a/exporter/signalfxexporter/README.md b/exporter/signalfxexporter/README.md index 468f9741aef40..819ca3241e069 100644 --- a/exporter/signalfxexporter/README.md +++ b/exporter/signalfxexporter/README.md @@ -145,7 +145,8 @@ will be replaced with a `_`. api_tls: ca_file: "/etc/opt/certs/ca.pem" ``` -- `drop_histogram_buckets`: (default = `false`) if set to true, histogram buckets will not be translated into datapoints with `_bucket` suffix but will be dropped instead, only datapoints with `_sum`, `_count`, `_min` (optional) and `_max` (optional) suffixes will be sent. +- `drop_histogram_buckets`: (default = `false`) if set to true, histogram buckets will not be translated into datapoints with `_bucket` suffix but will be dropped instead, only datapoints with `_sum`, `_count`, `_min` (optional) and `_max` (optional) suffixes will be sent. Please note that this option does not apply to histograms sent in OTLP format with `send_otlp_histograms` enabled. +- `send_otlp_histograms`: (default: `false`) if set to true, any histogram metrics receiver by the exporter will be sent to Splunk Observability backend in OTLP format without conversion to SignalFx format. This can only be enabled if the Splunk Observability environment (realm) has the new Histograms feature rolled out. Please note that histograms sent in OTLP format do not apply to the exporter configurations `include_metrics` and `exclude_metrics`. In addition, this exporter offers queued retry which is enabled by default. Information about queued retry configuration parameters can be found [here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md). diff --git a/exporter/signalfxexporter/config.go b/exporter/signalfxexporter/config.go index a9f3fb0091f79..aed4f7c82d9a2 100644 --- a/exporter/signalfxexporter/config.go +++ b/exporter/signalfxexporter/config.go @@ -112,7 +112,7 @@ type Config struct { // ExcludeMetrics defines dpfilter.MetricFilters that will determine metrics to be // excluded from sending to SignalFx backend. If translations enabled with - // TranslationRules options, the exclusion will be applie on translated metrics. + // TranslationRules options, the exclusion will be applied on translated metrics. ExcludeMetrics []dpfilters.MetricFilter `mapstructure:"exclude_metrics"` // IncludeMetrics defines dpfilter.MetricFilters to override exclusion any of metric. @@ -134,6 +134,10 @@ type Config struct { // Whether to drop histogram bucket metrics dispatched to Splunk Observability. // Default value is set to false. DropHistogramBuckets bool `mapstructure:"drop_histogram_buckets"` + + // Whether to send histogram metrics in OTLP format to Splunk Observability. + // Default value is set to false. + SendOTLPHistograms bool `mapstructure:"send_otlp_histograms"` } type DimensionClientConfig struct { diff --git a/exporter/signalfxexporter/config_test.go b/exporter/signalfxexporter/config_test.go index a9d359eed0234..5f1c4bdc6bc77 100644 --- a/exporter/signalfxexporter/config_test.go +++ b/exporter/signalfxexporter/config_test.go @@ -104,6 +104,7 @@ func TestLoadConfig(t *testing.T) { }, }, NonAlphanumericDimensionChars: "_-.", + SendOTLPHistograms: false, }, }, { @@ -263,6 +264,7 @@ func TestLoadConfig(t *testing.T) { }, }, NonAlphanumericDimensionChars: "_-.", + SendOTLPHistograms: true, }, }, } diff --git a/exporter/signalfxexporter/dpclient.go b/exporter/signalfxexporter/dpclient.go index 350c943459c06..82748947f4be6 100644 --- a/exporter/signalfxexporter/dpclient.go +++ b/exporter/signalfxexporter/dpclient.go @@ -17,12 +17,20 @@ import ( sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/translation" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/utils" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" ) +const ( + contentEncodingHeader = "Content-Encoding" + contentTypeHeader = "Content-Type" + otlpProtobufContentType = "application/x-protobuf;format=otlp" +) + type sfxClientBase struct { ingestURL *url.URL headers map[string]string @@ -58,6 +66,7 @@ type sfxDPClient struct { logger *zap.Logger accessTokenPassthrough bool converter *translation.MetricsConverter + sendOTLPHistograms bool } func (s *sfxDPClient) pushMetricsData( @@ -81,48 +90,55 @@ func (s *sfxDPClient) pushMetricsData( // All metrics in the pmetric.Metrics will have the same access token because of the BatchPerResourceMetrics. metricToken := s.retrieveAccessToken(rms.At(0)) + // export SFx format sfxDataPoints := s.converter.MetricsToSignalFxV2(md) - if s.logDataPoints { - for _, dp := range sfxDataPoints { - s.logger.Debug("Dispatching SFx datapoint", zap.Stringer("dp", dp)) + if len(sfxDataPoints) > 0 { + droppedCount, err := s.pushMetricsDataForToken(ctx, sfxDataPoints, metricToken) + if err != nil { + return droppedCount, err } } - return s.pushMetricsDataForToken(ctx, sfxDataPoints, metricToken) -} -func (s *sfxDPClient) pushMetricsDataForToken(ctx context.Context, sfxDataPoints []*sfxpb.DataPoint, accessToken string) (int, error) { - body, compressed, err := s.encodeBody(sfxDataPoints) - if err != nil { - return len(sfxDataPoints), consumererror.NewPermanent(err) + // export any histograms in otlp if sendOTLPHistograms is true + if s.sendOTLPHistograms { + histogramData, metricCount := utils.GetHistograms(md) + if metricCount > 0 { + droppedCount, err := s.pushOTLPMetricsDataForToken(ctx, histogramData, metricToken) + if err != nil { + return droppedCount, err + } + } } + return 0, nil + +} + +func (s *sfxDPClient) postData(ctx context.Context, body io.Reader, headers map[string]string) error { datapointURL := *s.ingestURL if !strings.HasSuffix(datapointURL.Path, "v2/datapoint") { datapointURL.Path = path.Join(datapointURL.Path, "v2/datapoint") } req, err := http.NewRequestWithContext(ctx, "POST", datapointURL.String(), body) if err != nil { - return len(sfxDataPoints), consumererror.NewPermanent(err) + return consumererror.NewPermanent(err) } + // Set the headers configured in sfxDPClient for k, v := range s.headers { req.Header.Set(k, v) } - // Override access token in headers map if it's non empty. - if accessToken != "" { - req.Header.Set(splunk.SFxAccessTokenHeader, accessToken) - } - - if compressed { - req.Header.Set("Content-Encoding", "gzip") + // Set any extra headers passed by the caller + for k, v := range headers { + req.Header.Set(k, v) } // TODO: Mark errors as partial errors wherever applicable when, partial // error for metrics is available. resp, err := s.client.Do(req) if err != nil { - return len(sfxDataPoints), err + return err } defer func() { @@ -132,7 +148,39 @@ func (s *sfxDPClient) pushMetricsDataForToken(ctx context.Context, sfxDataPoints err = splunk.HandleHTTPCode(resp) if err != nil { - return len(sfxDataPoints), err + return err + } + return nil +} + +func (s *sfxDPClient) pushMetricsDataForToken(ctx context.Context, sfxDataPoints []*sfxpb.DataPoint, accessToken string) (int, error) { + + if s.logDataPoints { + for _, dp := range sfxDataPoints { + s.logger.Debug("Dispatching SFx datapoint", zap.Stringer("dp", dp)) + } + } + + body, compressed, err := s.encodeBody(sfxDataPoints) + dataPointCount := len(sfxDataPoints) + if err != nil { + return dataPointCount, consumererror.NewPermanent(err) + } + + headers := make(map[string]string) + + // Override access token in headers map if it's non empty. + if accessToken != "" { + headers[splunk.SFxAccessTokenHeader] = accessToken + } + + if compressed { + headers[contentEncodingHeader] = "gzip" + } + + err = s.postData(ctx, body, headers) + if err != nil { + return dataPointCount, err } return 0, nil } @@ -160,3 +208,61 @@ func (s *sfxDPClient) retrieveAccessToken(md pmetric.ResourceMetrics) string { } return "" } + +func (s *sfxDPClient) pushOTLPMetricsDataForToken(ctx context.Context, mh pmetric.Metrics, accessToken string) (int, error) { + + dataPointCount := mh.DataPointCount() + if s.logDataPoints { + s.logger.Debug("Count of metrics to send in OTLP format", + zap.Int("resource metrics", mh.ResourceMetrics().Len()), + zap.Int("metrics", mh.MetricCount()), + zap.Int("data points", dataPointCount)) + buf, err := metricsMarshaler.MarshalMetrics(mh) + if err != nil { + s.logger.Error("Failed to marshal metrics for logging otlp histograms", zap.Error(err)) + } else { + s.logger.Debug("Dispatching OTLP metrics", zap.String("pmetrics", string(buf))) + } + } + + body, compressed, err := s.encodeOTLPBody(mh) + if err != nil { + return dataPointCount, consumererror.NewPermanent(err) + } + + headers := make(map[string]string) + + // Set otlp content-type header + headers[contentTypeHeader] = otlpProtobufContentType + + // Override access token in headers map if it's non-empty. + if accessToken != "" { + headers[splunk.SFxAccessTokenHeader] = accessToken + } + + if compressed { + headers[contentEncodingHeader] = "gzip" + } + + s.logger.Debug("Sending metrics in OTLP format") + + err = s.postData(ctx, body, headers) + + if err != nil { + return dataPointCount, consumererror.NewMetrics(err, mh) + } + + return 0, nil +} + +func (s *sfxDPClient) encodeOTLPBody(md pmetric.Metrics) (bodyReader io.Reader, compressed bool, err error) { + + tr := pmetricotlp.NewExportRequestFromMetrics(md) + + body, err := tr.MarshalProto() + + if err != nil { + return nil, false, err + } + return s.getReader(body) +} diff --git a/exporter/signalfxexporter/exporter.go b/exporter/signalfxexporter/exporter.go index becef412f2039..be733ec800f9b 100644 --- a/exporter/signalfxexporter/exporter.go +++ b/exporter/signalfxexporter/exporter.go @@ -84,6 +84,7 @@ func newSignalFxExporter( config.IncludeMetrics, config.NonAlphanumericDimensionChars, config.DropHistogramBuckets, + !config.SendOTLPHistograms, // if SendOTLPHistograms is true, do not process histograms when converting to SFx ) if err != nil { return nil, fmt.Errorf("failed to create metric converter: %w", err) @@ -121,6 +122,7 @@ func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err logger: se.logger, accessTokenPassthrough: se.config.AccessTokenPassthrough, converter: se.converter, + sendOTLPHistograms: se.config.SendOTLPHistograms, } apiTLSCfg, err := se.config.APITLSSettings.LoadTLSConfig() diff --git a/exporter/signalfxexporter/exporter_test.go b/exporter/signalfxexporter/exporter_test.go index a985b15331d25..1aee964074713 100644 --- a/exporter/signalfxexporter/exporter_test.go +++ b/exporter/signalfxexporter/exporter_test.go @@ -39,6 +39,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/dimensions" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/translation" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/translation/dpfilters" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/utils" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" metadata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" ) @@ -189,7 +190,7 @@ func TestConsumeMetrics(t *testing.T) { client, err := cfg.ToClient(componenttest.NewNopHost(), exportertest.NewNopCreateSettings().TelemetrySettings) require.NoError(t, err) - c, err := translation.NewMetricsConverter(zap.NewNop(), nil, nil, nil, "", false) + c, err := translation.NewMetricsConverter(zap.NewNop(), nil, nil, nil, "", false, true) require.NoError(t, err) require.NotNil(t, c) dpClient := &sfxDPClient{ @@ -237,7 +238,7 @@ func TestConsumeMetricsWithAccessTokenPassthrough(t *testing.T) { fromHeaders := "AccessTokenFromClientHeaders" fromLabels := []string{"AccessTokenFromLabel0", "AccessTokenFromLabel1"} - validMetricsWithToken := func(includeToken bool, token string) pmetric.Metrics { + validMetricsWithToken := func(includeToken bool, token string, histogram bool) pmetric.Metrics { out := pmetric.NewMetrics() rm := out.ResourceMetrics().AppendEmpty() @@ -248,12 +249,17 @@ func TestConsumeMetricsWithAccessTokenPassthrough(t *testing.T) { ilm := rm.ScopeMetrics().AppendEmpty() m := ilm.Metrics().AppendEmpty() - m.SetName("test_gauge") + if histogram { + buildHistogram(m, "test_histogram", pcommon.Timestamp(100000000), 1) + } else { + m.SetName("test_gauge") + + dp := m.SetEmptyGauge().DataPoints().AppendEmpty() + dp.Attributes().PutStr("k0", "v0") + dp.Attributes().PutStr("k1", "v1") + dp.SetDoubleValue(123) + } - dp := m.SetEmptyGauge().DataPoints().AppendEmpty() - dp.Attributes().PutStr("k0", "v0") - dp.Attributes().PutStr("k1", "v1") - dp.SetDoubleValue(123) return out } @@ -263,44 +269,93 @@ func TestConsumeMetricsWithAccessTokenPassthrough(t *testing.T) { metrics pmetric.Metrics additionalHeaders map[string]string pushedTokens []string + sendOTLPHistograms bool }{ { name: "passthrough access token and included in md", accessTokenPassthrough: true, - metrics: validMetricsWithToken(true, fromLabels[0]), + metrics: validMetricsWithToken(true, fromLabels[0], false), + pushedTokens: []string{fromLabels[0]}, + sendOTLPHistograms: false, + }, + { + name: "passthrough access token and included in md with OTLP histogram", + accessTokenPassthrough: true, + metrics: validMetricsWithToken(true, fromLabels[0], true), pushedTokens: []string{fromLabels[0]}, + sendOTLPHistograms: true, }, { name: "passthrough access token and not included in md", accessTokenPassthrough: true, - metrics: validMetricsWithToken(false, fromLabels[0]), + metrics: validMetricsWithToken(false, fromLabels[0], false), + pushedTokens: []string{fromHeaders}, + sendOTLPHistograms: false, + }, + { + name: "passthrough access token and not included in md with OTLP histogram", + accessTokenPassthrough: true, + metrics: validMetricsWithToken(false, fromLabels[0], true), pushedTokens: []string{fromHeaders}, + sendOTLPHistograms: true, }, { name: "don't passthrough access token and included in md", accessTokenPassthrough: false, metrics: func() pmetric.Metrics { - forFirstToken := validMetricsWithToken(true, fromLabels[0]) + forFirstToken := validMetricsWithToken(true, fromLabels[0], false) + tgt := forFirstToken.ResourceMetrics().AppendEmpty() + validMetricsWithToken(true, fromLabels[1], false).ResourceMetrics().At(0).CopyTo(tgt) + return forFirstToken + }(), + pushedTokens: []string{fromHeaders}, + sendOTLPHistograms: false, + }, + { + name: "don't passthrough access token and included in md with OTLP histogram", + accessTokenPassthrough: false, + metrics: func() pmetric.Metrics { + forFirstToken := validMetricsWithToken(true, fromLabels[0], true) tgt := forFirstToken.ResourceMetrics().AppendEmpty() - validMetricsWithToken(true, fromLabels[1]).ResourceMetrics().At(0).CopyTo(tgt) + validMetricsWithToken(true, fromLabels[1], true).ResourceMetrics().At(0).CopyTo(tgt) return forFirstToken }(), - pushedTokens: []string{fromHeaders}, + pushedTokens: []string{fromHeaders}, + sendOTLPHistograms: true, }, { name: "don't passthrough access token and not included in md", accessTokenPassthrough: false, - metrics: validMetricsWithToken(false, fromLabels[0]), + metrics: validMetricsWithToken(false, fromLabels[0], false), + pushedTokens: []string{fromHeaders}, + sendOTLPHistograms: false, + }, + { + name: "don't passthrough access token and not included in md with OTLP histogram", + accessTokenPassthrough: false, + metrics: validMetricsWithToken(false, fromLabels[0], true), pushedTokens: []string{fromHeaders}, + sendOTLPHistograms: true, }, { name: "override user-specified token-like header", accessTokenPassthrough: true, - metrics: validMetricsWithToken(true, fromLabels[0]), + metrics: validMetricsWithToken(true, fromLabels[0], false), + additionalHeaders: map[string]string{ + "x-sf-token": "user-specified", + }, + pushedTokens: []string{fromLabels[0]}, + sendOTLPHistograms: false, + }, + { + name: "override user-specified token-like header with OTLP histogram", + accessTokenPassthrough: true, + metrics: validMetricsWithToken(true, fromLabels[0], true), additionalHeaders: map[string]string{ "x-sf-token": "user-specified", }, - pushedTokens: []string{fromLabels[0]}, + pushedTokens: []string{fromLabels[0]}, + sendOTLPHistograms: true, }, { name: "use token from header when resource is nil", @@ -319,28 +374,75 @@ func TestConsumeMetricsWithAccessTokenPassthrough(t *testing.T) { return out }(), - pushedTokens: []string{fromHeaders}, + pushedTokens: []string{fromHeaders}, + sendOTLPHistograms: false, + }, + { + name: "use token from header when resource is nil with OTLP histogram", + accessTokenPassthrough: true, + metrics: func() pmetric.Metrics { + out := pmetric.NewMetrics() + rm := out.ResourceMetrics().AppendEmpty() + ilm := rm.ScopeMetrics().AppendEmpty() + m := ilm.Metrics().AppendEmpty() + buildHistogram(m, "test_histogram", pcommon.Timestamp(1000), 1) + return out + }(), + pushedTokens: []string{fromHeaders}, + sendOTLPHistograms: true, }, { name: "multiple tokens passed through", accessTokenPassthrough: true, metrics: func() pmetric.Metrics { - forFirstToken := validMetricsWithToken(true, fromLabels[0]) - forSecondToken := validMetricsWithToken(true, fromLabels[1]) + forFirstToken := validMetricsWithToken(true, fromLabels[0], false) + forSecondToken := validMetricsWithToken(true, fromLabels[1], false) + forSecondToken.ResourceMetrics().EnsureCapacity(2) + forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty()) + + return forSecondToken + }(), + pushedTokens: []string{fromLabels[0], fromLabels[1]}, + sendOTLPHistograms: false, + }, + { + name: "multiple tokens passed through with OTLP histogram", + accessTokenPassthrough: true, + metrics: func() pmetric.Metrics { + forFirstToken := validMetricsWithToken(true, fromLabels[0], true) + forSecondToken := validMetricsWithToken(true, fromLabels[1], true) forSecondToken.ResourceMetrics().EnsureCapacity(2) forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty()) return forSecondToken }(), - pushedTokens: []string{fromLabels[0], fromLabels[1]}, + pushedTokens: []string{fromLabels[0], fromLabels[1]}, + sendOTLPHistograms: true, }, { name: "multiple tokens passed through - multiple md with same token", accessTokenPassthrough: true, metrics: func() pmetric.Metrics { - forFirstToken := validMetricsWithToken(true, fromLabels[1]) - forSecondToken := validMetricsWithToken(true, fromLabels[0]) - moreForSecondToken := validMetricsWithToken(true, fromLabels[1]) + forFirstToken := validMetricsWithToken(true, fromLabels[1], false) + forSecondToken := validMetricsWithToken(true, fromLabels[0], false) + moreForSecondToken := validMetricsWithToken(true, fromLabels[1], false) + + forSecondToken.ResourceMetrics().EnsureCapacity(3) + forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty()) + moreForSecondToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty()) + + return forSecondToken + }(), + pushedTokens: []string{fromLabels[0], fromLabels[1]}, + sendOTLPHistograms: false, + }, + { + name: "multiple tokens passed through - multiple md with same token with OTLP histogram", + accessTokenPassthrough: true, + metrics: func() pmetric.Metrics { + forFirstToken := validMetricsWithToken(true, fromLabels[1], true) + forSecondToken := validMetricsWithToken(true, fromLabels[0], true) + moreForSecondToken := validMetricsWithToken(true, fromLabels[1], true) forSecondToken.ResourceMetrics().EnsureCapacity(3) forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty()) @@ -348,15 +450,16 @@ func TestConsumeMetricsWithAccessTokenPassthrough(t *testing.T) { return forSecondToken }(), - pushedTokens: []string{fromLabels[0], fromLabels[1]}, + pushedTokens: []string{fromLabels[0], fromLabels[1]}, + sendOTLPHistograms: true, }, { name: "multiple tokens passed through - multiple md with same token grouped together", accessTokenPassthrough: true, metrics: func() pmetric.Metrics { - forFirstToken := validMetricsWithToken(true, fromLabels[0]) - forSecondToken := validMetricsWithToken(true, fromLabels[1]) - moreForSecondToken := validMetricsWithToken(true, fromLabels[1]) + forFirstToken := validMetricsWithToken(true, fromLabels[0], false) + forSecondToken := validMetricsWithToken(true, fromLabels[1], false) + moreForSecondToken := validMetricsWithToken(true, fromLabels[1], false) forSecondToken.ResourceMetrics().EnsureCapacity(3) moreForSecondToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty()) @@ -364,19 +467,51 @@ func TestConsumeMetricsWithAccessTokenPassthrough(t *testing.T) { return forSecondToken }(), - pushedTokens: []string{fromLabels[0], fromLabels[1]}, + pushedTokens: []string{fromLabels[0], fromLabels[1]}, + sendOTLPHistograms: false, + }, + { + name: "multiple tokens passed through - multiple md with same token grouped together in OTLP histogram", + accessTokenPassthrough: true, + metrics: func() pmetric.Metrics { + forFirstToken := validMetricsWithToken(true, fromLabels[0], true) + forSecondToken := validMetricsWithToken(true, fromLabels[1], true) + moreForSecondToken := validMetricsWithToken(true, fromLabels[1], true) + + forSecondToken.ResourceMetrics().EnsureCapacity(3) + moreForSecondToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty()) + forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty()) + + return forSecondToken + }(), + pushedTokens: []string{fromLabels[0], fromLabels[1]}, + sendOTLPHistograms: true, }, { name: "multiple tokens passed through - one corrupted", accessTokenPassthrough: true, metrics: func() pmetric.Metrics { - forFirstToken := validMetricsWithToken(true, fromLabels[0]) - forSecondToken := validMetricsWithToken(false, fromLabels[1]) + forFirstToken := validMetricsWithToken(true, fromLabels[0], false) + forSecondToken := validMetricsWithToken(false, fromLabels[1], false) + forSecondToken.ResourceMetrics().EnsureCapacity(2) + forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty()) + return forSecondToken + }(), + pushedTokens: []string{fromLabels[0], fromHeaders}, + sendOTLPHistograms: false, + }, + { + name: "multiple tokens passed through - one corrupted in OTLP histogram", + accessTokenPassthrough: true, + metrics: func() pmetric.Metrics { + forFirstToken := validMetricsWithToken(true, fromLabels[0], true) + forSecondToken := validMetricsWithToken(false, fromLabels[1], true) forSecondToken.ResourceMetrics().EnsureCapacity(2) forFirstToken.ResourceMetrics().At(0).CopyTo(forSecondToken.ResourceMetrics().AppendEmpty()) return forSecondToken }(), - pushedTokens: []string{fromLabels[0], fromHeaders}, + pushedTokens: []string{fromLabels[0], fromHeaders}, + sendOTLPHistograms: true, }, } for _, tt := range tests { @@ -409,6 +544,7 @@ func TestConsumeMetricsWithAccessTokenPassthrough(t *testing.T) { cfg.ClientConfig.Headers["test_header_"] = configopaque.String(tt.name) cfg.AccessToken = configopaque.String(fromHeaders) cfg.AccessTokenPassthrough = tt.accessTokenPassthrough + cfg.SendOTLPHistograms = tt.sendOTLPHistograms sfxExp, err := NewFactory().CreateMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) require.NoError(t, err) require.NoError(t, sfxExp.Start(context.Background(), componenttest.NewNopHost())) @@ -730,6 +866,7 @@ func TestConsumeMetadata(t *testing.T) { cfg.IncludeMetrics, cfg.NonAlphanumericDimensionChars, false, + true, ) require.NoError(t, err) type args struct { @@ -1077,7 +1214,7 @@ func TestConsumeMetadata(t *testing.T) { func BenchmarkExporterConsumeData(b *testing.B) { batchSize := 1000 metrics := pmetric.NewMetrics() - tmd := testMetricsData() + tmd := testMetricsData(false) for i := 0; i < batchSize; i++ { tmd.ResourceMetrics().At(0).CopyTo(metrics.ResourceMetrics().AppendEmpty()) } @@ -1089,7 +1226,7 @@ func BenchmarkExporterConsumeData(b *testing.B) { serverURL, err := url.Parse(server.URL) assert.NoError(b, err) - c, err := translation.NewMetricsConverter(zap.NewNop(), nil, nil, nil, "", false) + c, err := translation.NewMetricsConverter(zap.NewNop(), nil, nil, nil, "", false, true) require.NoError(b, err) require.NotNil(b, c) dpClient := &sfxDPClient{ @@ -1282,7 +1419,7 @@ func TestTLSIngestConnection(t *testing.T) { func TestDefaultSystemCPUTimeExcludedAndTranslated(t *testing.T) { translator, err := translation.NewMetricTranslator(defaultTranslationRules, 3600) require.NoError(t, err) - converter, err := translation.NewMetricsConverter(zap.NewNop(), translator, defaultExcludeMetrics, nil, "_-.", false) + converter, err := translation.NewMetricsConverter(zap.NewNop(), translator, defaultExcludeMetrics, nil, "_-.", false, true) require.NoError(t, err) md := pmetric.NewMetrics() @@ -1325,7 +1462,8 @@ func TestTLSAPIConnection(t *testing.T) { cfg.ExcludeMetrics, cfg.IncludeMetrics, cfg.NonAlphanumericDimensionChars, - false) + false, + true) require.NoError(t, err) metadata := []*metadata.MetadataUpdate{ @@ -1437,3 +1575,315 @@ func newLocalHTTPSTestServer(handler http.Handler) (*httptest.Server, error) { ts.StartTLS() return ts, nil } + +func BenchmarkExporterConsumeDataWithOTLPHistograms(b *testing.B) { + batchSize := 1000 + metrics := pmetric.NewMetrics() + tmd := testMetricsData(true) + for i := 0; i < batchSize; i++ { + tmd.ResourceMetrics().At(0).CopyTo(metrics.ResourceMetrics().AppendEmpty()) + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + })) + defer server.Close() + serverURL, err := url.Parse(server.URL) + assert.NoError(b, err) + + c, err := translation.NewMetricsConverter(zap.NewNop(), nil, nil, nil, "", false, false) + require.NoError(b, err) + require.NotNil(b, c) + dpClient := &sfxDPClient{ + sfxClientBase: sfxClientBase{ + ingestURL: serverURL, + client: &http.Client{ + Timeout: 1 * time.Second, + }, + zippers: sync.Pool{New: func() any { + return gzip.NewWriter(nil) + }}, + }, + logger: zap.NewNop(), + converter: c, + } + + for i := 0; i < b.N; i++ { + numDroppedTimeSeries, err := dpClient.pushMetricsData(context.Background(), metrics) + assert.NoError(b, err) + assert.Equal(b, 0, numDroppedTimeSeries) + } +} + +func TestConsumeMixedMetrics(t *testing.T) { + ts := pcommon.NewTimestampFromTime(time.Now()) + smallBatch := pmetric.NewMetrics() + rm := smallBatch.ResourceMetrics().AppendEmpty() + res := rm.Resource() + res.Attributes().PutStr("kr0", "vr0") + ilms := rm.ScopeMetrics() + ilms.EnsureCapacity(2) + ilm := ilms.AppendEmpty() + ilm.Scope().Attributes().PutStr("ks0", "vs0") + ilm.Metrics().EnsureCapacity(2) + ilm.Metrics().AppendEmpty() + buildHistogram(ilm.Metrics().At(0), "test_histogram", ts, 2) + ilm.Metrics().AppendEmpty() + m1 := ilm.Metrics().At(1) + m1.SetName("test_gauge") + dp1 := m1.SetEmptyGauge().DataPoints().AppendEmpty() + dp1.Attributes().PutStr("k0", "v0") + dp1.SetDoubleValue(123) + + smallBatchHistogramOnly := pmetric.NewMetrics() + rmh := smallBatchHistogramOnly.ResourceMetrics().AppendEmpty() + resh := rmh.Resource() + resh.Attributes().PutStr("kr0", "vr0") + ilmsh := rmh.ScopeMetrics() + ilmsh.EnsureCapacity(2) + ilmh := ilmsh.AppendEmpty() + ilmh.Scope().Attributes().PutStr("ks0", "vs0") + ilmh.Metrics().EnsureCapacity(2) + ilmh.Metrics().AppendEmpty() + buildHistogram(ilmh.Metrics().At(0), "test_histogram", ts, 2) + + tests := []struct { + name string + md pmetric.Metrics + sfxHTTPResponseCode int + otlpHTTPResponseCode int + retryAfter int + numDroppedTimeSeries int + wantErr bool + wantPermanentErr bool + wantThrottleErr bool + expectedErrorMsg string + wantPartialMetricsErr bool + }{ + { + name: "happy_path", + md: smallBatch, + sfxHTTPResponseCode: http.StatusAccepted, + otlpHTTPResponseCode: http.StatusAccepted, + }, + { + name: "happy_path_otlp", + md: smallBatchHistogramOnly, + otlpHTTPResponseCode: http.StatusAccepted, + }, + { + name: "response_forbidden_sfx", + md: smallBatch, + sfxHTTPResponseCode: http.StatusForbidden, + numDroppedTimeSeries: 1, + wantErr: true, + expectedErrorMsg: "HTTP 403 \"Forbidden\"", + }, + { + name: "response_forbidden_otlp", + md: smallBatchHistogramOnly, + otlpHTTPResponseCode: http.StatusForbidden, + numDroppedTimeSeries: 2, + wantErr: true, + expectedErrorMsg: "HTTP 403 \"Forbidden\"", + }, + { + name: "response_forbidden_mixed", + md: smallBatch, + sfxHTTPResponseCode: http.StatusAccepted, + otlpHTTPResponseCode: http.StatusForbidden, + numDroppedTimeSeries: 2, + wantErr: true, + expectedErrorMsg: "HTTP 403 \"Forbidden\"", + }, + { + name: "response_bad_request_sfx", + md: smallBatch, + sfxHTTPResponseCode: http.StatusBadRequest, + numDroppedTimeSeries: 1, + wantPermanentErr: true, + expectedErrorMsg: "Permanent error: \"HTTP/1.1 400 Bad Request", + }, + { + name: "response_bad_request_otlp", + md: smallBatchHistogramOnly, + otlpHTTPResponseCode: http.StatusBadRequest, + numDroppedTimeSeries: 2, + wantPermanentErr: true, + expectedErrorMsg: "Permanent error: \"HTTP/1.1 400 Bad Request", + }, + { + name: "response_bad_request_mixed", + md: smallBatch, + sfxHTTPResponseCode: http.StatusAccepted, + otlpHTTPResponseCode: http.StatusBadRequest, + numDroppedTimeSeries: 2, + wantPermanentErr: true, + expectedErrorMsg: "Permanent error: \"HTTP/1.1 400 Bad Request", + }, + { + name: "response_throttle_sfx", + md: smallBatch, + sfxHTTPResponseCode: http.StatusTooManyRequests, + numDroppedTimeSeries: 1, + wantThrottleErr: true, + }, + { + name: "response_throttle_mixed", + md: smallBatch, + sfxHTTPResponseCode: http.StatusAccepted, + otlpHTTPResponseCode: http.StatusTooManyRequests, + numDroppedTimeSeries: 2, + wantThrottleErr: true, + wantPartialMetricsErr: true, + }, + { + name: "response_throttle_otlp", + md: smallBatchHistogramOnly, + otlpHTTPResponseCode: http.StatusTooManyRequests, + numDroppedTimeSeries: 2, + wantThrottleErr: true, + wantPartialMetricsErr: true, + }, + { + name: "response_throttle_with_header_sfx", + md: smallBatch, + retryAfter: 123, + sfxHTTPResponseCode: http.StatusServiceUnavailable, + numDroppedTimeSeries: 1, + wantThrottleErr: true, + }, + { + name: "response_throttle_with_header_otlp", + md: smallBatchHistogramOnly, + retryAfter: 123, + otlpHTTPResponseCode: http.StatusServiceUnavailable, + numDroppedTimeSeries: 2, + wantThrottleErr: true, + wantPartialMetricsErr: true, + }, + { + name: "response_throttle_with_header_mixed", + md: smallBatch, + retryAfter: 123, + sfxHTTPResponseCode: http.StatusAccepted, + otlpHTTPResponseCode: http.StatusServiceUnavailable, + numDroppedTimeSeries: 2, + wantThrottleErr: true, + wantPartialMetricsErr: true, + }, + { + name: "large_batch", + md: generateLargeMixedDPBatch(), + sfxHTTPResponseCode: http.StatusAccepted, + otlpHTTPResponseCode: http.StatusAccepted, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "test", r.Header.Get("test_header_")) + var respCode int + if r.Header.Get("Content-Type") == otlpProtobufContentType { + respCode = tt.otlpHTTPResponseCode + } else { + respCode = tt.sfxHTTPResponseCode + } + if (respCode == http.StatusTooManyRequests || + respCode == http.StatusServiceUnavailable) && tt.retryAfter != 0 { + w.Header().Add(splunk.HeaderRetryAfter, strconv.Itoa(tt.retryAfter)) + } + w.WriteHeader(respCode) + _, _ = w.Write([]byte("response content")) + })) + defer server.Close() + + serverURL, err := url.Parse(server.URL) + assert.NoError(t, err) + + cfg := &Config{ + ClientConfig: confighttp.ClientConfig{ + Timeout: 1 * time.Second, + Headers: map[string]configopaque.String{"test_header_": "test"}, + }, + } + + client, err := cfg.ToClient(componenttest.NewNopHost(), exportertest.NewNopCreateSettings().TelemetrySettings) + require.NoError(t, err) + + c, err := translation.NewMetricsConverter(zap.NewNop(), nil, nil, nil, "", false, false) + require.NoError(t, err) + require.NotNil(t, c) + sfxClient := &sfxDPClient{ + sfxClientBase: sfxClientBase{ + ingestURL: serverURL, + client: client, + zippers: sync.Pool{New: func() any { + return gzip.NewWriter(nil) + }}, + }, + logger: zap.NewNop(), + converter: c, + sendOTLPHistograms: true, + } + + numDroppedTimeSeries, err := sfxClient.pushMetricsData(context.Background(), tt.md) + assert.Equal(t, tt.numDroppedTimeSeries, numDroppedTimeSeries) + + if tt.wantErr { + assert.Error(t, err) + assert.EqualError(t, err, tt.expectedErrorMsg) + return + } + + if tt.wantPermanentErr { + assert.Error(t, err) + assert.True(t, consumererror.IsPermanent(err)) + assert.True(t, strings.HasPrefix(err.Error(), tt.expectedErrorMsg)) + assert.Contains(t, err.Error(), "response content") + return + } + + if tt.wantThrottleErr { + if tt.wantPartialMetricsErr { + partialMetrics, _ := utils.GetHistograms(smallBatch) + throttleErr := fmt.Errorf("HTTP %d %q", tt.otlpHTTPResponseCode, http.StatusText(tt.otlpHTTPResponseCode)) + throttleErr = exporterhelper.NewThrottleRetry(throttleErr, time.Duration(tt.retryAfter)*time.Second) + testErr := consumererror.NewMetrics(throttleErr, partialMetrics) + assert.EqualValues(t, testErr, err) + return + } + + expected := fmt.Errorf("HTTP %d %q", tt.sfxHTTPResponseCode, http.StatusText(tt.sfxHTTPResponseCode)) + expected = exporterhelper.NewThrottleRetry(expected, time.Duration(tt.retryAfter)*time.Second) + assert.EqualValues(t, expected, err) + return + } + + assert.NoError(t, err) + }) + } +} + +func generateLargeMixedDPBatch() pmetric.Metrics { + md := pmetric.NewMetrics() + md.ResourceMetrics().EnsureCapacity(7500) + + ts := pcommon.NewTimestampFromTime(time.Now()) + for i := 0; i < 7500; i++ { + rm := md.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("kr0", "vr0") + ilm := rm.ScopeMetrics().AppendEmpty() + ilm.Metrics().EnsureCapacity(2) + m1 := ilm.Metrics().AppendEmpty() + m1.SetName("test_" + strconv.Itoa(i)) + dp := m1.SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetTimestamp(ts) + dp.Attributes().PutStr("k0", "v0") + dp.Attributes().PutStr("k1", "v1") + dp.SetIntValue(int64(i)) + m2 := ilm.Metrics().AppendEmpty() + buildHistogram(m2, "histogram_"+strconv.Itoa(i), ts, 1) + } + return md +} diff --git a/exporter/signalfxexporter/factory_test.go b/exporter/signalfxexporter/factory_test.go index 5d811319529da..e276af79ba58a 100644 --- a/exporter/signalfxexporter/factory_test.go +++ b/exporter/signalfxexporter/factory_test.go @@ -121,9 +121,9 @@ func TestDefaultTranslationRules(t *testing.T) { require.NotNil(t, rules, "rules are nil") tr, err := translation.NewMetricTranslator(rules, 1) require.NoError(t, err) - data := testMetricsData() + data := testMetricsData(false) - c, err := translation.NewMetricsConverter(zap.NewNop(), tr, nil, nil, "", false) + c, err := translation.NewMetricsConverter(zap.NewNop(), tr, nil, nil, "", false, true) require.NoError(t, err) translated := c.MetricsToSignalFxV2(data) require.NotNil(t, translated) @@ -210,7 +210,7 @@ func requireDimension(t *testing.T, dims []*sfxpb.Dimension, key, val string) { require.True(t, found, `missing dimension: %s`, key) } -func testMetricsData() pmetric.Metrics { +func testMetricsData(addHistogram bool) pmetric.Metrics { md := pmetric.NewMetrics() m1 := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() @@ -232,6 +232,10 @@ func testMetricsData() pmetric.Metrics { dp12.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp12.SetIntValue(6e9) + if addHistogram { + buildHistogram(m1, "histogram", pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)), 5) + } + sm2 := md.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty().Metrics() m2 := sm2.AppendEmpty() m2.SetName("system.disk.io") @@ -263,6 +267,10 @@ func testMetricsData() pmetric.Metrics { dp24.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp24.SetIntValue(8e9) + if addHistogram { + buildHistogram(m2, "histogram", pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)), 5) + } + m3 := sm2.AppendEmpty() m3.SetName("system.disk.operations") m3.SetDescription("Disk operations count.") @@ -294,6 +302,10 @@ func testMetricsData() pmetric.Metrics { dp34.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp34.SetIntValue(5e3) + if addHistogram { + buildHistogram(m3, "histogram", pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)), 5) + } + m4 := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() m4.SetName("system.disk.operations") m4.SetDescription("Disk operations count.") @@ -325,6 +337,10 @@ func testMetricsData() pmetric.Metrics { dp44.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000060, 0))) dp44.SetIntValue(7e3) + if addHistogram { + buildHistogram(m4, "histogram", pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)), 5) + } + sm5 := md.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty().Metrics() m5 := sm5.AppendEmpty() m5.SetName("system.network.io") @@ -347,6 +363,10 @@ func testMetricsData() pmetric.Metrics { dp52.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp52.SetIntValue(6e9) + if addHistogram { + buildHistogram(m5, "histogram", pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)), 5) + } + m6 := sm5.AppendEmpty() m6.SetName("system.network.packets") m6.SetDescription("The number of packets transferred") @@ -367,6 +387,10 @@ func testMetricsData() pmetric.Metrics { dp62.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp62.SetIntValue(150) + if addHistogram { + buildHistogram(m6, "histogram", pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)), 5) + } + sm7 := md.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty().Metrics() m7 := sm7.AppendEmpty() m7.SetName("container.memory.working_set") @@ -378,6 +402,10 @@ func testMetricsData() pmetric.Metrics { dp71.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp71.SetIntValue(1000) + if addHistogram { + buildHistogram(m7, "histogram", pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)), 5) + } + m8 := sm7.AppendEmpty() m8.SetName("container.memory.page_faults") dp81 := m8.SetEmptyGauge().DataPoints().AppendEmpty() @@ -387,6 +415,10 @@ func testMetricsData() pmetric.Metrics { dp81.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp81.SetIntValue(1000) + if addHistogram { + buildHistogram(m8, "histogram", pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)), 5) + } + m9 := sm7.AppendEmpty() m9.SetName("container.memory.major_page_faults") dp91 := m9.SetEmptyGauge().DataPoints().AppendEmpty() @@ -396,6 +428,10 @@ func testMetricsData() pmetric.Metrics { dp91.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp91.SetIntValue(1000) + if addHistogram { + buildHistogram(m9, "histogram", pcommon.NewTimestampFromTime(time.Unix(1596000000, 0)), 5) + } + return md } @@ -493,7 +529,7 @@ func TestHostmetricsCPUTranslations(t *testing.T) { f := NewFactory() cfg := f.CreateDefaultConfig().(*Config) require.NoError(t, setDefaultExcludes(cfg)) - converter, err := translation.NewMetricsConverter(zap.NewNop(), testGetTranslator(t), cfg.ExcludeMetrics, cfg.IncludeMetrics, "", false) + converter, err := translation.NewMetricsConverter(zap.NewNop(), testGetTranslator(t), cfg.ExcludeMetrics, cfg.IncludeMetrics, "", false, true) require.NoError(t, err) md1, err := golden.ReadMetrics(filepath.Join("testdata", "hostmetrics_system_cpu_time_1.yaml")) @@ -534,7 +570,7 @@ func TestDefaultExcludesTranslated(t *testing.T) { cfg := f.CreateDefaultConfig().(*Config) require.NoError(t, setDefaultExcludes(cfg)) - converter, err := translation.NewMetricsConverter(zap.NewNop(), testGetTranslator(t), cfg.ExcludeMetrics, cfg.IncludeMetrics, "", false) + converter, err := translation.NewMetricsConverter(zap.NewNop(), testGetTranslator(t), cfg.ExcludeMetrics, cfg.IncludeMetrics, "", false, true) require.NoError(t, err) var metrics []map[string]string @@ -557,7 +593,7 @@ func TestDefaultExcludes_not_translated(t *testing.T) { cfg := f.CreateDefaultConfig().(*Config) require.NoError(t, setDefaultExcludes(cfg)) - converter, err := translation.NewMetricsConverter(zap.NewNop(), nil, cfg.ExcludeMetrics, cfg.IncludeMetrics, "", false) + converter, err := translation.NewMetricsConverter(zap.NewNop(), nil, cfg.ExcludeMetrics, cfg.IncludeMetrics, "", false, true) require.NoError(t, err) var metrics []map[string]string @@ -577,7 +613,7 @@ func BenchmarkMetricConversion(b *testing.B) { tr, err := translation.NewMetricTranslator(rules, 1) require.NoError(b, err) - c, err := translation.NewMetricsConverter(zap.NewNop(), tr, nil, nil, "", false) + c, err := translation.NewMetricsConverter(zap.NewNop(), tr, nil, nil, "", false, true) require.NoError(b, err) bytes, err := os.ReadFile("testdata/json/hostmetrics.json") @@ -622,3 +658,29 @@ func testReadJSON(f string, v any) error { } return json.Unmarshal(bytes, &v) } + +func buildHistogramDP(dp pmetric.HistogramDataPoint, timestamp pcommon.Timestamp) { + dp.SetStartTimestamp(timestamp) + dp.SetTimestamp(timestamp) + dp.SetMin(1.0) + dp.SetMax(2) + dp.SetCount(5) + dp.SetSum(7.0) + dp.BucketCounts().FromRaw([]uint64{3, 2}) + dp.ExplicitBounds().FromRaw([]float64{1, 2}) + dp.Attributes().PutStr("k1", "v1") +} + +func buildHistogram(im pmetric.Metric, name string, timestamp pcommon.Timestamp, dpCount int) { + im.SetName(name) + im.SetDescription("Histogram") + im.SetUnit("1") + im.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + idps := im.Histogram().DataPoints() + idps.EnsureCapacity(dpCount) + + for i := 0; i < dpCount; i++ { + dp := idps.AppendEmpty() + buildHistogramDP(dp, timestamp) + } +} diff --git a/exporter/signalfxexporter/internal/dimensions/metadata_test.go b/exporter/signalfxexporter/internal/dimensions/metadata_test.go index fc938a481a4b5..4fa352dc8f2b4 100644 --- a/exporter/signalfxexporter/internal/dimensions/metadata_test.go +++ b/exporter/signalfxexporter/internal/dimensions/metadata_test.go @@ -205,6 +205,7 @@ func TestGetDimensionUpdateFromMetadata(t *testing.T) { nil, "-_.", false, + true, ) require.NoError(t, err) assert.Equal(t, tt.want, getDimensionUpdateFromMetadata(tt.args.metadata, *converter)) diff --git a/exporter/signalfxexporter/internal/translation/converter.go b/exporter/signalfxexporter/internal/translation/converter.go index 7f704d9f88939..9a63c8815e120 100644 --- a/exporter/signalfxexporter/internal/translation/converter.go +++ b/exporter/signalfxexporter/internal/translation/converter.go @@ -36,6 +36,7 @@ type MetricsConverter struct { datapointValidator *datapointValidator translator *signalfx.FromTranslator dropHistogramBuckets bool + processHistograms bool } // NewMetricsConverter creates a MetricsConverter from the passed in logger and @@ -47,7 +48,8 @@ func NewMetricsConverter( excludes []dpfilters.MetricFilter, includes []dpfilters.MetricFilter, nonAlphanumericDimChars string, - dropHistogramBuckets bool) (*MetricsConverter, error) { + dropHistogramBuckets bool, + processHistograms bool) (*MetricsConverter, error) { fs, err := dpfilters.NewFilterSet(excludes, includes) if err != nil { return nil, err @@ -59,11 +61,13 @@ func NewMetricsConverter( datapointValidator: newDatapointValidator(logger, nonAlphanumericDimChars), translator: &signalfx.FromTranslator{}, dropHistogramBuckets: dropHistogramBuckets, + processHistograms: processHistograms, }, nil } -// MetricsToSignalFxV2 converts the passed in MetricsData to SFx datapoints, -// returning those datapoints and the number of time series that had to be +// MetricsToSignalFxV2 converts the passed in MetricsData to SFx datapoints +// and if processHistograms is set, histogram metrics are not converted to SFx format. +// It returns those datapoints and the number of time series that had to be // dropped because of errors or warnings. func (c *MetricsConverter) MetricsToSignalFxV2(md pmetric.Metrics) []*sfxpb.DataPoint { var sfxDataPoints []*sfxpb.DataPoint @@ -77,7 +81,7 @@ func (c *MetricsConverter) MetricsToSignalFxV2(md pmetric.Metrics) []*sfxpb.Data var initialDps []*sfxpb.DataPoint for k := 0; k < ilm.Metrics().Len(); k++ { currentMetric := ilm.Metrics().At(k) - dps := c.translator.FromMetric(currentMetric, extraDimensions, c.dropHistogramBuckets) + dps := c.translator.FromMetric(currentMetric, extraDimensions, c.dropHistogramBuckets, c.processHistograms) initialDps = append(initialDps, dps...) } diff --git a/exporter/signalfxexporter/internal/translation/converter_test.go b/exporter/signalfxexporter/internal/translation/converter_test.go index f2f8457c74575..13c5fc711462c 100644 --- a/exporter/signalfxexporter/internal/translation/converter_test.go +++ b/exporter/signalfxexporter/internal/translation/converter_test.go @@ -603,7 +603,7 @@ func Test_MetricDataToSignalFxV2(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c, err := NewMetricsConverter(logger, nil, tt.excludeMetrics, tt.includeMetrics, "", true) + c, err := NewMetricsConverter(logger, nil, tt.excludeMetrics, tt.includeMetrics, "", true, true) require.NoError(t, err) md := tt.metricsFn() gotSfxDataPoints := c.MetricsToSignalFxV2(md) @@ -832,7 +832,7 @@ func Test_MetricDataToSignalFxV2WithHistogramBuckets(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c, err := NewMetricsConverter(logger, nil, tt.excludeMetrics, tt.includeMetrics, "", false) + c, err := NewMetricsConverter(logger, nil, tt.excludeMetrics, tt.includeMetrics, "", false, true) require.NoError(t, err) md := tt.metricsFn() gotSfxDataPoints := c.MetricsToSignalFxV2(md) @@ -982,7 +982,7 @@ func Test_MetricDataToSignalFxV2WithHistogramBuckets(t *testing.T) { for _, tt := range testsWithDropHistogramBuckets { t.Run(tt.name, func(t *testing.T) { - c, err := NewMetricsConverter(logger, nil, tt.excludeMetrics, tt.includeMetrics, "", true) + c, err := NewMetricsConverter(logger, nil, tt.excludeMetrics, tt.includeMetrics, "", true, true) require.NoError(t, err) md := tt.metricsFn() gotSfxDataPoints := c.MetricsToSignalFxV2(md) @@ -994,6 +994,110 @@ func Test_MetricDataToSignalFxV2WithHistogramBuckets(t *testing.T) { assert.Equal(t, tt.wantSfxDataPoints, gotSfxDataPoints) }) } + + testsWithProcessHistogramsFalse := []struct { + name string + metricsFn func() pmetric.Metrics + excludeMetrics []dpfilters.MetricFilter + includeMetrics []dpfilters.MetricFilter + wantCount int + wantSfxDataPoints []*sfxpb.DataPoint + }{ + { + name: "no_histograms", + metricsFn: func() pmetric.Metrics { + out := pmetric.NewMetrics() + ilm := out.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() + { + m := ilm.Metrics().AppendEmpty() + m.SetName("gauge_int_with_dims") + initInt64PtWithLabels(m.SetEmptyGauge().DataPoints().AppendEmpty()) + } + { + m := ilm.Metrics().AppendEmpty() + m.SetName("cumulative_double_with_dims") + m.SetEmptySum().SetIsMonotonic(true) + initDoublePtWithLabels(m.Sum().DataPoints().AppendEmpty()) + } + return out + }, + wantCount: 2, + wantSfxDataPoints: []*sfxpb.DataPoint{ + int64SFxDataPoint("gauge_int_with_dims", &sfxMetricTypeGauge, labelMap), + doubleSFxDataPoint("cumulative_double_with_dims", &sfxMetricTypeCumulativeCounter, labelMap), + }, + }, + { + name: "only_histograms", + metricsFn: func() pmetric.Metrics { + out := pmetric.NewMetrics() + ilm := out.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() + { + m := ilm.Metrics().AppendEmpty() + m.SetName("histo_with_buckets") + initHistDP(m.SetEmptyHistogram().DataPoints().AppendEmpty()) + } + { + m := ilm.Metrics().AppendEmpty() + m.SetName("histo_with_buckets_2") + initHistDP(m.SetEmptyHistogram().DataPoints().AppendEmpty()) + } + return out + }, + wantCount: 0, + wantSfxDataPoints: []*sfxpb.DataPoint(nil), + }, + { + name: "mixed_with_histograms", + metricsFn: func() pmetric.Metrics { + out := pmetric.NewMetrics() + ilm := out.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() + { + m := ilm.Metrics().AppendEmpty() + m.SetName("gauge_int_with_dims") + initInt64PtWithLabels(m.SetEmptyGauge().DataPoints().AppendEmpty()) + } + { + m := ilm.Metrics().AppendEmpty() + m.SetName("cumulative_double_with_dims") + m.SetEmptySum().SetIsMonotonic(true) + initDoublePtWithLabels(m.Sum().DataPoints().AppendEmpty()) + } + { + m := ilm.Metrics().AppendEmpty() + m.SetName("histo_with_no_buckets") + initHistDPNoBuckets(m.SetEmptyHistogram().DataPoints().AppendEmpty()) + } + { + m := ilm.Metrics().AppendEmpty() + m.SetName("histo_with_buckets") + initHistDP(m.SetEmptyHistogram().DataPoints().AppendEmpty()) + } + return out + }, + wantCount: 2, + wantSfxDataPoints: []*sfxpb.DataPoint{ + int64SFxDataPoint("gauge_int_with_dims", &sfxMetricTypeGauge, labelMap), + doubleSFxDataPoint("cumulative_double_with_dims", &sfxMetricTypeCumulativeCounter, labelMap), + }, + }, + } + + for _, tt := range testsWithProcessHistogramsFalse { + t.Run(tt.name, func(t *testing.T) { + c, err := NewMetricsConverter(logger, nil, tt.excludeMetrics, tt.includeMetrics, "", true, false) + require.NoError(t, err) + md := tt.metricsFn() + gotSfxDataPoints := c.MetricsToSignalFxV2(md) + + // Sort SFx dimensions since they are built from maps and the order + // of those is not deterministic. + sortDimensions(tt.wantSfxDataPoints) + sortDimensions(gotSfxDataPoints) + assert.Equal(t, tt.wantCount, len(gotSfxDataPoints)) + assert.Equal(t, tt.wantSfxDataPoints, gotSfxDataPoints) + }) + } } func TestMetricDataToSignalFxV2WithTranslation(t *testing.T) { @@ -1030,7 +1134,7 @@ func TestMetricDataToSignalFxV2WithTranslation(t *testing.T) { }, }, } - c, err := NewMetricsConverter(zap.NewNop(), translator, nil, nil, "", false) + c, err := NewMetricsConverter(zap.NewNop(), translator, nil, nil, "", false, true) require.NoError(t, err) assert.EqualValues(t, expected, c.MetricsToSignalFxV2(md)) } @@ -1069,7 +1173,7 @@ func TestDimensionKeyCharsWithPeriod(t *testing.T) { }, }, } - c, err := NewMetricsConverter(zap.NewNop(), translator, nil, nil, "_-.", false) + c, err := NewMetricsConverter(zap.NewNop(), translator, nil, nil, "_-.", false, true) require.NoError(t, err) assert.EqualValues(t, expected, c.MetricsToSignalFxV2(md)) @@ -1087,7 +1191,7 @@ func TestInvalidNumberOfDimensions(t *testing.T) { for i := 0; i < 10; i++ { dp.Attributes().PutStr(fmt.Sprint("dim_key_", i), fmt.Sprint("dim_val_", i)) } - c, err := NewMetricsConverter(logger, nil, nil, nil, "_-.", false) + c, err := NewMetricsConverter(logger, nil, nil, nil, "_-.", false, true) require.NoError(t, err) assert.EqualValues(t, 1, len(c.MetricsToSignalFxV2(md))) // No log message should be printed @@ -1193,7 +1297,7 @@ func TestNewMetricsConverter(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := NewMetricsConverter(zap.NewNop(), nil, tt.excludes, nil, "", false) + got, err := NewMetricsConverter(zap.NewNop(), nil, tt.excludes, nil, "", false, true) if tt.wantErr { assert.Error(t, err) return @@ -1253,7 +1357,7 @@ func TestMetricsConverter_ConvertDimension(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c, err := NewMetricsConverter(zap.NewNop(), tt.fields.metricTranslator, nil, nil, tt.fields.nonAlphanumericDimChars, false) + c, err := NewMetricsConverter(zap.NewNop(), tt.fields.metricTranslator, nil, nil, tt.fields.nonAlphanumericDimChars, false, true) require.NoError(t, err) if got := c.ConvertDimension(tt.args.dim); got != tt.want { t.Errorf("ConvertDimension() = %v, want %v", got, tt.want) diff --git a/exporter/signalfxexporter/internal/translation/translator_test.go b/exporter/signalfxexporter/internal/translation/translator_test.go index 288f2d500336b..10f8c0281dd7a 100644 --- a/exporter/signalfxexporter/internal/translation/translator_test.go +++ b/exporter/signalfxexporter/internal/translation/translator_test.go @@ -2960,7 +2960,7 @@ func testConverter(t *testing.T, mapping map[string]string) *MetricsConverter { tr, err := NewMetricTranslator(rules, 1) require.NoError(t, err) - c, err := NewMetricsConverter(zap.NewNop(), tr, nil, nil, "", false) + c, err := NewMetricsConverter(zap.NewNop(), tr, nil, nil, "", false, true) require.NoError(t, err) return c } diff --git a/exporter/signalfxexporter/internal/utils/histogram_utils.go b/exporter/signalfxexporter/internal/utils/histogram_utils.go new file mode 100644 index 0000000000000..16bfd495d07b6 --- /dev/null +++ b/exporter/signalfxexporter/internal/utils/histogram_utils.go @@ -0,0 +1,138 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package utils // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/utils" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" +) + +// removeAccessToken removes the SFX access token label if found in the give resource metric as a resource attribute +func removeAccessToken(dest pmetric.ResourceMetrics) { + dest.Resource().Attributes().RemoveIf(func(k string, val pcommon.Value) bool { + return k == splunk.SFxAccessTokenLabel + }) +} + +// matchedHistogramResourceMetrics returns a map with the keys set to the index of resource Metrics containing +// Histogram metric type. +// The value is another map consisting of the ScopeMetric indices in the RM which contain Histogram metric type as keys +// and the value as an int slice with indices of the Histogram metric within the given scope. +// Example output {1: {1: [0,2]}}. +// The above output can be interpreted as Resource at index 1 contains Histogram metrics. +// Within this resource specifically the scope metric at index 1 contain Histograms. +// Lastly, the scope metric at index 1 has two Histogram type metric which can be found at index 0 and 2. +func matchedHistogramResourceMetrics(md pmetric.Metrics) (matchedRMIdx map[int]map[int][]int) { + rms := md.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + rm := rms.At(i) + matchedSMIdx := matchedHistogramScopeMetrics(rm) + if len(matchedSMIdx) > 0 { + if matchedRMIdx == nil { + matchedRMIdx = map[int]map[int][]int{} + } + matchedRMIdx[i] = matchedSMIdx + } + } + return +} + +// matchedHistogramScopeMetrics returns a map with keys equal to the ScopeMetric indices in the input resource metric +// which contain Histogram metric type. +// And the value is an int slice with indices of the Histogram metric within the keyed scope metric. +// Example output {1: [0,2]}. +// The above output can be interpreted as scope metrics at index 1 contains Histogram metrics. +// And that the scope metric at index 1 has two Histogram type metric which can be found at index 0 and 2. +func matchedHistogramScopeMetrics(rm pmetric.ResourceMetrics) (matchedSMIdx map[int][]int) { + ilms := rm.ScopeMetrics() + for i := 0; i < ilms.Len(); i++ { + ilm := ilms.At(i) + matchedMetricsIdx := matchedHistogramMetrics(ilm) + if len(matchedMetricsIdx) > 0 { + if matchedSMIdx == nil { + matchedSMIdx = map[int][]int{} + } + matchedSMIdx[i] = matchedMetricsIdx + } + } + return +} + +// matchedHistogramMetrics returns an int slice with indices of metrics which are of Histogram type +// within the input scope metric. +// Example output [0,2]. +// The above output can be interpreted as input scope metric has Histogram type metric at index 0 and 2. +func matchedHistogramMetrics(ilm pmetric.ScopeMetrics) (matchedMetricsIdx []int) { + ms := ilm.Metrics() + for i := 0; i < ms.Len(); i++ { + metric := ms.At(i) + if metric.Type() == pmetric.MetricTypeHistogram { + matchedMetricsIdx = append(matchedMetricsIdx, i) + } + } + return +} + +// GetHistograms returns new Metrics slice containing only Histogram metrics found in the input +// and the count of histogram metrics +func GetHistograms(md pmetric.Metrics) (pmetric.Metrics, int) { + matchedMetricsIdxes := matchedHistogramResourceMetrics(md) + matchedRmCount := len(matchedMetricsIdxes) + if matchedRmCount == 0 { + return pmetric.Metrics{}, 0 + } + + metricCount := 0 + srcRms := md.ResourceMetrics() + dest := pmetric.NewMetrics() + dstRms := dest.ResourceMetrics() + dstRms.EnsureCapacity(matchedRmCount) + + // Iterate over those ResourceMetrics which were found to contain histograms + for rmIdx, ilmMap := range matchedMetricsIdxes { + srcRm := srcRms.At(rmIdx) + + // Copy resource metric properties to dest + destRm := dstRms.AppendEmpty() + srcRm.Resource().CopyTo(destRm.Resource()) + destRm.SetSchemaUrl(srcRm.SchemaUrl()) + + // Remove Sfx access token + removeAccessToken(destRm) + + matchedIlmCount := len(ilmMap) + destIlms := destRm.ScopeMetrics() + destIlms.EnsureCapacity(matchedIlmCount) + srcIlms := srcRm.ScopeMetrics() + + // Iterate over ScopeMetrics which were found to contain histograms + for ilmIdx, metricIdxes := range ilmMap { + srcIlm := srcIlms.At(ilmIdx) + + // Copy scope properties to dest + destIlm := destIlms.AppendEmpty() + srcIlm.Scope().CopyTo(destIlm.Scope()) + destIlm.SetSchemaUrl(srcIlm.SchemaUrl()) + + matchedMetricCount := len(metricIdxes) + destMs := destIlm.Metrics() + destMs.EnsureCapacity(matchedMetricCount) + srcMs := srcIlm.Metrics() + + // Iterate over Metrics which contain histograms + for _, srcIdx := range metricIdxes { + srcMetric := srcMs.At(srcIdx) + + // Copy metric properties to dest + destMetric := destMs.AppendEmpty() + srcMetric.CopyTo(destMetric) + metricCount++ + } + } + } + + return dest, metricCount +} diff --git a/exporter/signalfxexporter/internal/utils/histogram_utils_test.go b/exporter/signalfxexporter/internal/utils/histogram_utils_test.go new file mode 100644 index 0000000000000..e47a1f55b4d11 --- /dev/null +++ b/exporter/signalfxexporter/internal/utils/histogram_utils_test.go @@ -0,0 +1,310 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package utils + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func initMetric(m pmetric.Metric, name string, ty pmetric.MetricType) { + m.SetName(name) + m.SetDescription("") + m.SetUnit("1") + switch ty { + case pmetric.MetricTypeGauge: + m.SetEmptyGauge() + case pmetric.MetricTypeSum: + sum := m.SetEmptySum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + case pmetric.MetricTypeHistogram: + histo := m.SetEmptyHistogram() + histo.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + case pmetric.MetricTypeExponentialHistogram: + histo := m.SetEmptyExponentialHistogram() + histo.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + case pmetric.MetricTypeSummary: + m.SetEmptySummary() + } +} + +func buildHistogramDP(dp pmetric.HistogramDataPoint, timestamp pcommon.Timestamp) { + dp.SetStartTimestamp(timestamp) + dp.SetTimestamp(timestamp) + dp.SetMin(1.0) + dp.SetMax(2) + dp.SetCount(5) + dp.SetSum(7.0) + dp.BucketCounts().FromRaw([]uint64{3, 2}) + dp.ExplicitBounds().FromRaw([]float64{1, 2}) + dp.Attributes().PutStr("k1", "v1") +} + +func buildHistogram(im pmetric.Metric, name string, timestamp pcommon.Timestamp, dpCount int) { + initMetric(im, name, pmetric.MetricTypeHistogram) + idps := im.Histogram().DataPoints() + idps.EnsureCapacity(dpCount) + + for i := 0; i < dpCount; i++ { + dp := idps.AppendEmpty() + buildHistogramDP(dp, timestamp) + } +} + +func buildGauge(im pmetric.Metric, name string, timestamp pcommon.Timestamp, dpCount int) { + initMetric(im, name, pmetric.MetricTypeGauge) + idps := im.Gauge().DataPoints() + idps.EnsureCapacity(dpCount) + + for i := 0; i < dpCount; i++ { + dp := idps.AppendEmpty() + dp.SetTimestamp(timestamp) + dp.SetDoubleValue(1000) + dp.Attributes().PutStr("k1", "v1") + } +} + +func buildSum(im pmetric.Metric, name string, timestamp pcommon.Timestamp, dpCount int) { + initMetric(im, name, pmetric.MetricTypeSum) + idps := im.Sum().DataPoints() + idps.EnsureCapacity(dpCount) + + for i := 0; i < dpCount; i++ { + dp := idps.AppendEmpty() + dp.SetStartTimestamp(timestamp) + dp.SetTimestamp(timestamp) + dp.SetIntValue(123) + dp.Attributes().PutStr("k1", "v1") + } +} + +func TestHistogramsAreRetrieved(t *testing.T) { + ts := pcommon.NewTimestampFromTime(time.Date(2024, 2, 9, 20, 26, 13, 789, time.UTC)) + tests := []struct { + name string + inMetricsFunc func() pmetric.Metrics + wantMetricCount int + wantMetrics func() pmetric.Metrics + }{ + { + name: "no_histograms", + inMetricsFunc: func() pmetric.Metrics { + out := pmetric.NewMetrics() + ilm := out.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() + ilm.Metrics().EnsureCapacity(2) + { + m := ilm.Metrics().AppendEmpty() + buildGauge(m, "gauge", ts, 1) + } + { + m := ilm.Metrics().AppendEmpty() + buildGauge(m, "sum", ts, 1) + } + return out + }, + wantMetricCount: 0, + wantMetrics: func() pmetric.Metrics { return pmetric.Metrics{} }, + }, + { + name: "only_histograms", + inMetricsFunc: func() pmetric.Metrics { + out := pmetric.NewMetrics() + rm := out.ResourceMetrics().AppendEmpty() + res := rm.Resource() + res.Attributes().PutStr("kr0", "vr0") + ilms := rm.ScopeMetrics() + ilms.EnsureCapacity(3) + ilm := ilms.AppendEmpty() + ilm.SetSchemaUrl("Scope SchemaUrl") + ilm.Scope().Attributes().PutStr("ks0", "vs0") + ilm.Scope().SetName("Scope name") + ilm.Scope().SetVersion("Scope version") + ilm.Metrics().EnsureCapacity(2) + { + m := ilm.Metrics().AppendEmpty() + buildHistogram(m, "histogram_1", ts, 5) + } + { + m := ilm.Metrics().AppendEmpty() + buildHistogram(m, "histogram_2", ts, 1) + } + return out + }, + wantMetricCount: 2, + wantMetrics: func() pmetric.Metrics { + out := pmetric.NewMetrics() + rm := out.ResourceMetrics().AppendEmpty() + res := rm.Resource() + res.Attributes().PutStr("kr0", "vr0") + ilm := rm.ScopeMetrics().AppendEmpty() + ilm.SetSchemaUrl("Scope SchemaUrl") + ilm.Scope().Attributes().PutStr("ks0", "vs0") + ilm.Scope().SetName("Scope name") + ilm.Scope().SetVersion("Scope version") + ilm.Metrics().EnsureCapacity(2) + { + m := ilm.Metrics().AppendEmpty() + buildHistogram(m, "histogram_1", ts, 5) + } + { + m := ilm.Metrics().AppendEmpty() + buildHistogram(m, "histogram_2", ts, 1) + } + return out + }, + }, + { + name: "mixed_type_multiple_scopes", + inMetricsFunc: func() pmetric.Metrics { + out := pmetric.NewMetrics() + rm := out.ResourceMetrics().AppendEmpty() + res := rm.Resource() + res.Attributes().PutStr("kr0", "vr0") + rm.ScopeMetrics().AppendEmpty() + ilm0 := rm.ScopeMetrics().At(0) + ilm0.SetSchemaUrl("Scope SchemaUrl") + ilm0.Scope().Attributes().PutStr("ks0", "vs0") + ilm0.Scope().SetName("Scope name") + ilm0.Scope().SetVersion("Scope version") + ilm0.Metrics().EnsureCapacity(2) + ilm0.Metrics().AppendEmpty() + buildHistogram(ilm0.Metrics().At(0), "histogram_1", ts, 1) + ilm0.Metrics().AppendEmpty() + buildGauge(ilm0.Metrics().At(1), "gauge", ts, 2) + + rm.ScopeMetrics().AppendEmpty() + ilm1 := rm.ScopeMetrics().At(1) + ilm1.Metrics().AppendEmpty() + buildSum(ilm1.Metrics().At(0), "gauge", ts, 2) + + rm.ScopeMetrics().AppendEmpty() + ilm2 := rm.ScopeMetrics().At(2) + ilm2.SetSchemaUrl("Scope SchemaUrl") + ilm2.Scope().Attributes().PutStr("ks0", "vs0") + ilm2.Metrics().EnsureCapacity(2) + ilm2.Metrics().AppendEmpty() + buildHistogram(ilm2.Metrics().At(0), "histogram_1", ts, 1) + ilm2.Metrics().AppendEmpty() + buildHistogram(ilm2.Metrics().At(1), "histogram_2", ts, 2) + return out + }, + wantMetricCount: 3, + wantMetrics: func() pmetric.Metrics { + out := pmetric.NewMetrics() + rm := out.ResourceMetrics().AppendEmpty() + res := rm.Resource() + res.Attributes().PutStr("kr0", "vr0") + rm.ScopeMetrics().AppendEmpty() + ilm0 := rm.ScopeMetrics().At(0) + ilm0.SetSchemaUrl("Scope SchemaUrl") + ilm0.Scope().Attributes().PutStr("ks0", "vs0") + ilm0.Scope().SetName("Scope name") + ilm0.Scope().SetVersion("Scope version") + buildHistogram(ilm0.Metrics().AppendEmpty(), "histogram_1", ts, 1) + + rm.ScopeMetrics().AppendEmpty() + ilm1 := rm.ScopeMetrics().At(1) + ilm1.SetSchemaUrl("Scope SchemaUrl") + ilm1.Scope().Attributes().PutStr("ks0", "vs0") + ilm1.Metrics().EnsureCapacity(2) + ilm1.Metrics().AppendEmpty() + buildHistogram(ilm1.Metrics().At(0), "histogram_1", ts, 1) + ilm1.Metrics().AppendEmpty() + buildHistogram(ilm1.Metrics().At(1), "histogram_2", ts, 2) + return out + }}, + { + name: "mixed_type_multiple_resources", + inMetricsFunc: func() pmetric.Metrics { + out := pmetric.NewMetrics() + out.ResourceMetrics().EnsureCapacity(2) + out.ResourceMetrics().AppendEmpty() + rm0 := out.ResourceMetrics().At(0) + rm0.SetSchemaUrl("Resource SchemaUrl") + rm0.Resource().Attributes().PutStr("kr0", "vr0") + rm0.ScopeMetrics().AppendEmpty() + ilm0r0 := rm0.ScopeMetrics().At(0) + ilm0r0.SetSchemaUrl("Scope SchemaUrl") + ilm0r0.Scope().Attributes().PutStr("ks0", "vs0") + ilm0r0.Metrics().EnsureCapacity(2) + ilm0r0.Metrics().AppendEmpty() + buildHistogram(ilm0r0.Metrics().At(0), "histogram_1", ts, 1) + ilm0r0.Metrics().AppendEmpty() + buildGauge(ilm0r0.Metrics().At(1), "gauge", ts, 1) + rm0.ScopeMetrics().AppendEmpty() + ilm1r0 := rm0.ScopeMetrics().At(1) + ilm1r0.Metrics().AppendEmpty() + buildGauge(ilm1r0.Metrics().At(0), "gauge", ts, 1) + + out.ResourceMetrics().AppendEmpty() + rm1 := out.ResourceMetrics().At(1) + rm1.Resource().Attributes().PutStr("kr1", "vr1") + ilm0r1 := rm1.ScopeMetrics().AppendEmpty() + ilm0r1.SetSchemaUrl("Scope SchemaUrl") + ilm0r1.Scope().Attributes().PutStr("ks0", "vs0") + ilm0r1.Metrics().AppendEmpty() + buildGauge(ilm0r1.Metrics().At(0), "gauge", ts, 1) + + return out + }, + wantMetricCount: 1, + wantMetrics: func() pmetric.Metrics { + out := pmetric.NewMetrics() + out.ResourceMetrics().AppendEmpty() + rm := out.ResourceMetrics().At(0) + rm.SetSchemaUrl("Resource SchemaUrl") + rm.Resource().Attributes().PutStr("kr0", "vr0") + rm.ScopeMetrics().AppendEmpty() + ilm0 := rm.ScopeMetrics().At(0) + ilm0.SetSchemaUrl("Scope SchemaUrl") + ilm0.Scope().Attributes().PutStr("ks0", "vs0") + ilm0.Metrics().EnsureCapacity(1) + ilm0.Metrics().AppendEmpty() + buildHistogram(ilm0.Metrics().At(0), "histogram_1", ts, 1) + return out + }}, + { + name: "remove_access_token", + inMetricsFunc: func() pmetric.Metrics { + out := pmetric.NewMetrics() + rm := out.ResourceMetrics().AppendEmpty() + res := rm.Resource() + res.Attributes().PutStr("kr0", "vr0") + res.Attributes().PutStr("com.splunk.signalfx.access_token", "abcd") + ilms := rm.ScopeMetrics() + ilms.EnsureCapacity(3) + ilm := ilms.AppendEmpty() + buildHistogram(ilm.Metrics().AppendEmpty(), "histogram_1", ts, 1) + return out + }, + wantMetricCount: 1, + wantMetrics: func() pmetric.Metrics { + out := pmetric.NewMetrics() + rm := out.ResourceMetrics().AppendEmpty() + res := rm.Resource() + res.Attributes().PutStr("kr0", "vr0") + ilms := rm.ScopeMetrics() + ilms.EnsureCapacity(3) + ilm := ilms.AppendEmpty() + buildHistogram(ilm.Metrics().AppendEmpty(), "histogram_1", ts, 1) + return out + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + md := tt.inMetricsFunc() + gotMetrics, gotCount := GetHistograms(md) + + assert.Equal(t, tt.wantMetricCount, gotCount) + assert.Equal(t, tt.wantMetrics(), gotMetrics) + }) + } +} diff --git a/exporter/signalfxexporter/testdata/config.yaml b/exporter/signalfxexporter/testdata/config.yaml index d82534b51563d..edc8f0d9765b7 100644 --- a/exporter/signalfxexporter/testdata/config.yaml +++ b/exporter/signalfxexporter/testdata/config.yaml @@ -79,3 +79,4 @@ signalfx/allsettings: property_value: '!globbed*value' dimension_name: globbed* dimension_value: '!globbed*value' + send_otlp_histograms: true diff --git a/pkg/translator/signalfx/from_metrics.go b/pkg/translator/signalfx/from_metrics.go index 8c0c7f89edaac..0d316db4947a3 100644 --- a/pkg/translator/signalfx/from_metrics.go +++ b/pkg/translator/signalfx/from_metrics.go @@ -36,7 +36,7 @@ const ( type FromTranslator struct{} // FromMetrics converts pmetric.Metrics to SignalFx proto data points. -func (ft *FromTranslator) FromMetrics(md pmetric.Metrics, dropHistogramBuckets bool) ([]*sfxpb.DataPoint, error) { +func (ft *FromTranslator) FromMetrics(md pmetric.Metrics, dropHistogramBuckets bool, processHistograms bool) ([]*sfxpb.DataPoint, error) { var sfxDataPoints []*sfxpb.DataPoint rms := md.ResourceMetrics() @@ -47,7 +47,7 @@ func (ft *FromTranslator) FromMetrics(md pmetric.Metrics, dropHistogramBuckets b for j := 0; j < rm.ScopeMetrics().Len(); j++ { ilm := rm.ScopeMetrics().At(j) for k := 0; k < ilm.Metrics().Len(); k++ { - sfxDataPoints = append(sfxDataPoints, ft.FromMetric(ilm.Metrics().At(k), extraDimensions, dropHistogramBuckets)...) + sfxDataPoints = append(sfxDataPoints, ft.FromMetric(ilm.Metrics().At(k), extraDimensions, dropHistogramBuckets, processHistograms)...) } } } @@ -57,7 +57,7 @@ func (ft *FromTranslator) FromMetrics(md pmetric.Metrics, dropHistogramBuckets b // FromMetric converts pmetric.Metric to SignalFx proto data points. // TODO: Remove this and change signalfxexporter to us FromMetrics. -func (ft *FromTranslator) FromMetric(m pmetric.Metric, extraDimensions []*sfxpb.Dimension, dropHistogramBuckets bool) []*sfxpb.DataPoint { +func (ft *FromTranslator) FromMetric(m pmetric.Metric, extraDimensions []*sfxpb.Dimension, dropHistogramBuckets bool, processHistograms bool) []*sfxpb.DataPoint { var dps []*sfxpb.DataPoint mt := fromMetricTypeToMetricType(m) @@ -68,7 +68,9 @@ func (ft *FromTranslator) FromMetric(m pmetric.Metric, extraDimensions []*sfxpb. case pmetric.MetricTypeSum: dps = convertNumberDataPoints(m.Sum().DataPoints(), m.Name(), mt, extraDimensions) case pmetric.MetricTypeHistogram: - dps = convertHistogram(m.Histogram().DataPoints(), m.Name(), mt, extraDimensions, dropHistogramBuckets) + if processHistograms { + dps = convertHistogram(m.Histogram().DataPoints(), m.Name(), mt, extraDimensions, dropHistogramBuckets) + } case pmetric.MetricTypeSummary: dps = convertSummaryDataPoints(m.Summary().DataPoints(), m.Name(), extraDimensions) case pmetric.MetricTypeExponentialHistogram: diff --git a/pkg/translator/signalfx/from_metrics_test.go b/pkg/translator/signalfx/from_metrics_test.go index 5f3b0997989b3..70d091ac40806 100644 --- a/pkg/translator/signalfx/from_metrics_test.go +++ b/pkg/translator/signalfx/from_metrics_test.go @@ -384,7 +384,7 @@ func Test_FromMetrics(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { from := &FromTranslator{} - gotSfxDataPoints, err := from.FromMetrics(tt.metricsFn(), false) + gotSfxDataPoints, err := from.FromMetrics(tt.metricsFn(), false, true) require.NoError(t, err) // Sort SFx dimensions since they are built from maps and the order // of those is not deterministic. @@ -473,7 +473,44 @@ func Test_FromMetrics(t *testing.T) { for _, tt := range testsWithDropHistogramBuckets { t.Run(tt.name, func(t *testing.T) { from := &FromTranslator{} - gotSfxDataPoints, err := from.FromMetrics(tt.metricsFn(), true) + gotSfxDataPoints, err := from.FromMetrics(tt.metricsFn(), true, true) + require.NoError(t, err) + // Sort SFx dimensions since they are built from maps and the order + // of those is not deterministic. + sortDimensions(tt.wantSfxDataPoints) + sortDimensions(gotSfxDataPoints) + assert.EqualValues(t, tt.wantSfxDataPoints, gotSfxDataPoints) + }) + } + + testsWithIgnoreHistograms := []struct { + name string + metricsFn func() pmetric.Metrics + wantSfxDataPoints []*sfxpb.DataPoint + }{ + { + name: "no_histogram", + metricsFn: func() pmetric.Metrics { + out := pmetric.NewMetrics() + ilm := out.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() + m1 := ilm.Metrics().AppendEmpty() + m1.SetName("histogram") + m1.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + initHistDP(m1.Histogram().DataPoints().AppendEmpty()) + m2 := ilm.Metrics().AppendEmpty() + m2.SetName("gauge_double_with_dims") + initDoublePt(m2.SetEmptyGauge().DataPoints().AppendEmpty()) + return out + }, + wantSfxDataPoints: []*sfxpb.DataPoint{ + doubleSFxDataPoint("gauge_double_with_dims", &sfxMetricTypeGauge, nil, doubleVal), + }, + }, + } + for _, tt := range testsWithIgnoreHistograms { + t.Run(tt.name, func(t *testing.T) { + from := &FromTranslator{} + gotSfxDataPoints, err := from.FromMetrics(tt.metricsFn(), true, false) require.NoError(t, err) // Sort SFx dimensions since they are built from maps and the order // of those is not deterministic.