From 16445b30a61eb0cdfed4aaab8c6c5b96d5a2bf52 Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Mon, 5 Feb 2024 11:39:07 -0500 Subject: [PATCH] [datadog/connector] Create a simplified Trace to Trace connector (#31026) **Description:** Datadog Connector is creating two instances of Trace Agent, one in the trace-to-metrics pipeline and another in the traces-to-traces pipeline. The PR separates the trace-to-trace connector, simplifying the logic, this avoid un-necessary serialization. **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30828 https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30487 **Testing:** **Documentation:** --- .../dinesh.gurumurthy_fix-connector.yaml | 22 +++++++ connector/datadogconnector/connector.go | 36 +++++------- connector/datadogconnector/connector_test.go | 14 +++-- connector/datadogconnector/factory.go | 10 +--- .../datadogconnector/traces_connector.go | 58 +++++++++++++++++++ internal/datadog/agent.go | 7 --- internal/datadog/agent_test.go | 23 -------- 7 files changed, 108 insertions(+), 62 deletions(-) create mode 100755 .chloggen/dinesh.gurumurthy_fix-connector.yaml create mode 100644 connector/datadogconnector/traces_connector.go diff --git a/.chloggen/dinesh.gurumurthy_fix-connector.yaml b/.chloggen/dinesh.gurumurthy_fix-connector.yaml new file mode 100755 index 0000000000000..33a8f2abd3337 --- /dev/null +++ b/.chloggen/dinesh.gurumurthy_fix-connector.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: datadog/connector +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Create a separate connector in the Datadog connector for the trace-to-metrics and trace-to-trace pipelines. It should reduce the number of conversions we do and help with Datadog connector performance. +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30828] +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Simplify datadog/connector with two separate connectors in trace-to-metrics pipeline and trace-to-trace pipeline. +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/connector/datadogconnector/connector.go b/connector/datadogconnector/connector.go index fc3d168c54757..550423f0c7041 100644 --- a/connector/datadogconnector/connector.go +++ b/connector/datadogconnector/connector.go @@ -21,10 +21,9 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog" ) -// connectorImp is the schema for connector -type connectorImp struct { +// traceToMetricConnector is the schema for connector +type traceToMetricConnector struct { metricsConsumer consumer.Metrics // the next component in the pipeline to ingest metrics after connector - tracesConsumer consumer.Traces // the next component in the pipeline to ingest traces after connector logger *zap.Logger // agent specifies the agent used to ingest traces and output APM Stats. @@ -43,12 +42,11 @@ type connectorImp struct { exit chan struct{} } -var _ component.Component = (*connectorImp)(nil) // testing that the connectorImp properly implements the type Component interface +var _ component.Component = (*traceToMetricConnector)(nil) // testing that the connectorImp properly implements the type Component interface // function to create a new connector -func newConnector(set component.TelemetrySettings, cfg component.Config, metricsConsumer consumer.Metrics, tracesConsumer consumer.Traces) (*connectorImp, error) { - set.Logger.Info("Building datadog connector") - +func newTraceToMetricConnector(set component.TelemetrySettings, cfg component.Config, metricsConsumer consumer.Metrics) (*traceToMetricConnector, error) { + set.Logger.Info("Building datadog connector for traces to metrics") in := make(chan *pb.StatsPayload, 100) set.MeterProvider = noop.NewMeterProvider() // disable metrics for the connector attributesTranslator, err := attributes.NewTranslator(set) @@ -61,13 +59,12 @@ func newConnector(set component.TelemetrySettings, cfg component.Config, metrics } ctx := context.Background() - return &connectorImp{ + return &traceToMetricConnector{ logger: set.Logger, agent: datadog.NewAgentWithConfig(ctx, getTraceAgentCfg(cfg.(*Config).Traces), in), translator: trans, in: in, metricsConsumer: metricsConsumer, - tracesConsumer: tracesConsumer, exit: make(chan struct{}), }, nil } @@ -86,18 +83,18 @@ func getTraceAgentCfg(cfg TracesConfig) *traceconfig.AgentConfig { } // Start implements the component.Component interface. -func (c *connectorImp) Start(_ context.Context, _ component.Host) error { +func (c *traceToMetricConnector) Start(_ context.Context, _ component.Host) error { c.logger.Info("Starting datadogconnector") c.agent.Start() - if c.metricsConsumer != nil { - go c.run() - } + go c.run() return nil } // Shutdown implements the component.Component interface. -func (c *connectorImp) Shutdown(context.Context) error { +func (c *traceToMetricConnector) Shutdown(context.Context) error { c.logger.Info("Shutting down datadog connector") + c.logger.Info("Stopping datadog agent") + // stop the agent and wait for the run loop to exit c.agent.Stop() c.exit <- struct{}{} // signal exit <-c.exit // wait for close @@ -106,21 +103,18 @@ func (c *connectorImp) Shutdown(context.Context) error { // Capabilities implements the consumer interface. // tells use whether the component(connector) will mutate the data passed into it. if set to true the connector does modify the data -func (c *connectorImp) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: true} // ConsumeTraces puts a new attribute _dd.stats_computed +func (c *traceToMetricConnector) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} } -func (c *connectorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error { +func (c *traceToMetricConnector) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error { c.agent.Ingest(ctx, traces) - if c.tracesConsumer != nil { - return c.tracesConsumer.ConsumeTraces(ctx, traces) - } return nil } // run awaits incoming stats resulting from the agent's ingestion, converts them // to metrics and flushes them using the configured metrics exporter. -func (c *connectorImp) run() { +func (c *traceToMetricConnector) run() { defer close(c.exit) for { select { diff --git a/connector/datadogconnector/connector_test.go b/connector/datadogconnector/connector_test.go index d5410720be717..ad3655c56a403 100644 --- a/connector/datadogconnector/connector_test.go +++ b/connector/datadogconnector/connector_test.go @@ -13,11 +13,10 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" ) -var _ component.Component = (*connectorImp)(nil) // testing that the connectorImp properly implements the type Component interface +var _ component.Component = (*traceToMetricConnector)(nil) // testing that the connectorImp properly implements the type Component interface // create test to create a connector, check that basic code compiles func TestNewConnector(t *testing.T) { - factory := NewFactory() creationParams := connectortest.NewNopCreateSettings() @@ -26,12 +25,19 @@ func TestNewConnector(t *testing.T) { traceToMetricsConnector, err := factory.CreateTracesToMetrics(context.Background(), creationParams, cfg, consumertest.NewNop()) assert.NoError(t, err) - _, ok := traceToMetricsConnector.(*connectorImp) + _, ok := traceToMetricsConnector.(*traceToMetricConnector) assert.True(t, ok) // checks if the created connector implements the connectorImp struct +} + +func TestTraceToTraceConnector(t *testing.T) { + factory := NewFactory() + + creationParams := connectortest.NewNopCreateSettings() + cfg := factory.CreateDefaultConfig().(*Config) traceToTracesConnector, err := factory.CreateTracesToTraces(context.Background(), creationParams, cfg, consumertest.NewNop()) assert.NoError(t, err) - _, ok = traceToTracesConnector.(*connectorImp) + _, ok := traceToTracesConnector.(*traceToTraceConnector) assert.True(t, ok) // checks if the created connector implements the connectorImp struct } diff --git a/connector/datadogconnector/factory.go b/connector/datadogconnector/factory.go index 90934fb81e9d1..782014d99e6f4 100644 --- a/connector/datadogconnector/factory.go +++ b/connector/datadogconnector/factory.go @@ -37,17 +37,13 @@ func createDefaultConfig() component.Config { // defines the consumer type of the connector // we want to consume traces and export metrics therefore define nextConsumer as metrics, consumer is the next component in the pipeline func createTracesToMetricsConnector(_ context.Context, params connector.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) { - c, err := newConnector(params.TelemetrySettings, cfg, nextConsumer, nil) + c, err := newTraceToMetricConnector(params.TelemetrySettings, cfg, nextConsumer) if err != nil { return nil, err } return c, nil } -func createTracesToTracesConnector(_ context.Context, params connector.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (connector.Traces, error) { - c, err := newConnector(params.TelemetrySettings, cfg, nil, nextConsumer) - if err != nil { - return nil, err - } - return c, nil +func createTracesToTracesConnector(_ context.Context, params connector.CreateSettings, _ component.Config, nextConsumer consumer.Traces) (connector.Traces, error) { + return newTraceToTraceConnector(params.Logger, nextConsumer), nil } diff --git a/connector/datadogconnector/traces_connector.go b/connector/datadogconnector/traces_connector.go new file mode 100644 index 0000000000000..dfa32f7b94b25 --- /dev/null +++ b/connector/datadogconnector/traces_connector.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package datadogconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" +) + +// keyStatsComputed specifies the resource attribute key which indicates if stats have been +// computed for the resource spans. +const keyStatsComputed = "_dd.stats_computed" + +type traceToTraceConnector struct { + logger *zap.Logger + tracesConsumer consumer.Traces // the next component in the pipeline to ingest traces after connector +} + +func newTraceToTraceConnector(logger *zap.Logger, nextConsumer consumer.Traces) *traceToTraceConnector { + logger.Info("Building datadog connector for trace to trace") + return &traceToTraceConnector{ + logger: logger, + tracesConsumer: nextConsumer, + } +} + +// Start implements the component interface. +func (c *traceToTraceConnector) Start(_ context.Context, _ component.Host) error { + return nil +} + +// Shutdown implements the component interface. +func (c *traceToTraceConnector) Shutdown(_ context.Context) error { + return nil +} + +// Capabilities implements the consumer interface. +// tells use whether the component(connector) will mutate the data passed into it. if set to true the connector does modify the data +func (c *traceToTraceConnector) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} // ConsumeTraces puts a new attribute _dd.stats_computed +} + +// ConsumeTraces implements the consumer interface. +func (c *traceToTraceConnector) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error { + for i := 0; i < traces.ResourceSpans().Len(); i++ { + rs := traces.ResourceSpans().At(i) + // Stats will be computed for p. Mark the original resource spans to ensure that they don't + // get computed twice in case these spans pass through here again. + rs.Resource().Attributes().PutBool(keyStatsComputed, true) + + } + return c.tracesConsumer.ConsumeTraces(ctx, traces) +} diff --git a/internal/datadog/agent.go b/internal/datadog/agent.go index 044cddad0f85a..b9c80029265c3 100644 --- a/internal/datadog/agent.go +++ b/internal/datadog/agent.go @@ -21,10 +21,6 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" ) -// keyStatsComputed specifies the resource attribute key which indicates if stats have been -// computed for the resource spans. -const keyStatsComputed = "_dd.stats_computed" - // TraceAgent specifies a minimal trace agent instance that is able to process traces and output stats. type TraceAgent struct { *agent.Agent @@ -148,9 +144,6 @@ func (p *TraceAgent) Ingest(ctx context.Context, traces ptrace.Traces) { // ...the call transforms the OTLP Spans into a Datadog payload and sends the result // down the p.pchan channel - // Stats will be computed for p. Mark the original resource spans to ensure that they don't - // get computed twice in case these spans pass through here again. - rspans.Resource().Attributes().PutBool(keyStatsComputed, true) } } diff --git a/internal/datadog/agent_test.go b/internal/datadog/agent_test.go index a4a06823f55e5..e2fddac093dc1 100644 --- a/internal/datadog/agent_test.go +++ b/internal/datadog/agent_test.go @@ -13,7 +13,6 @@ import ( "github.com/DataDog/datadog-agent/pkg/trace/testutil" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pcommon" ) func TestTraceAgentConfig(t *testing.T) { @@ -79,26 +78,4 @@ loop: // of groups require.Len(t, stats.Stats[0].Stats[0].Stats, traces.SpanCount()) require.Len(t, a.TraceWriter.In, 0) // the trace writer channel should've been drained - - // Check that the payload is labeled - val, ok := traces.ResourceSpans().At(0).Resource().Attributes().Get(keyStatsComputed) - require.True(t, ok) - require.Equal(t, pcommon.ValueTypeBool, val.Type()) - require.True(t, val.Bool()) - - // Ingest again - a.Ingest(ctx, traces) - timeout = time.After(500 * time.Millisecond) -loop2: - for { - select { - case stats = <-out: - if len(stats.Stats) != 0 { - t.Fatal("got payload when none was expected") - } - case <-timeout: - // We got no stats (expected), thus we end the test - break loop2 - } - } }