Skip to content

Commit

Permalink
[OASIS-12] Add trace exporter to Datadog exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
songy23 committed Jun 11, 2024
1 parent 30764a6 commit f3f83ae
Show file tree
Hide file tree
Showing 22 changed files with 789 additions and 158 deletions.
21 changes: 21 additions & 0 deletions cmd/otel-agent/config/agent_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,29 @@ func NewConfigComponent(ctx context.Context, uris []string) (config.Component, e
pkgconfig.Set("logs_enabled", true, pkgconfigmodel.SourceLocalConfigProcess)
pkgconfig.Set("logs_config.use_compression", true, pkgconfigmodel.SourceLocalConfigProcess)
pkgconfig.Set("log_level", sc.Telemetry.Logs.Level, pkgconfigmodel.SourceLocalConfigProcess)

// APM & OTel trace configs
pkgconfig.Set("apm_config.enabled", true, pkgconfigmodel.SourceLocalConfigProcess)
pkgconfig.Set("apm_config.apm_non_local_traffic", true, pkgconfigmodel.SourceLocalConfigProcess)
pkgconfig.Set("otlp_config.traces.span_name_as_resource_name", ddc.Traces.SpanNameAsResourceName, pkgconfigmodel.SourceLocalConfigProcess)
pkgconfig.Set("otlp_config.traces.span_name_remappings", ddc.Traces.SpanNameRemappings, pkgconfigmodel.SourceLocalConfigProcess)
pkgconfig.Set("apm_config.receiver_port", 0, pkgconfigmodel.SourceLocalConfigProcess) // disable HTTP receiver
pkgconfig.Set("apm_config.ignore_resources", ddc.Traces.IgnoreResources, pkgconfigmodel.SourceLocalConfigProcess)
pkgconfig.Set("apm_config.skip_ssl_validation", ddc.ClientConfig.TLSSetting.InsecureSkipVerify, pkgconfigmodel.SourceLocalConfigProcess)
pkgconfig.Set("apm_config.peer_tags_aggregation", ddc.Traces.PeerTagsAggregation, pkgconfigmodel.SourceLocalConfigProcess)
pkgconfig.Set("apm_config.peer_service_aggregation", ddc.Traces.PeerServiceAggregation, pkgconfigmodel.SourceLocalConfigProcess)
pkgconfig.Set("apm_config.peer_tags", ddc.Traces.PeerTags, pkgconfigmodel.SourceLocalConfigProcess)
pkgconfig.Set("apm_config.compute_stats_by_span_kind", ddc.Traces.ComputeStatsBySpanKind, pkgconfigmodel.SourceLocalConfigProcess)
if v := ddc.Traces.TraceBuffer; v > 0 {
pkgconfig.Set("apm_config.trace_buffer", v, pkgconfigmodel.SourceLocalConfigProcess)
}
if addr := ddc.Traces.Endpoint; addr != "" {
pkgconfig.Set("apm_config.apm_dd_url", addr, pkgconfigmodel.SourceLocalConfigProcess)
}
if ddc.Traces.ComputeTopLevelBySpanKind {
pkgconfig.Set("apm_config.features", []string{"enable_otlp_compute_top_level_by_span_kind"}, pkgconfigmodel.SourceLocalConfigProcess)
}

return pkgconfig, nil
}

Expand Down
12 changes: 12 additions & 0 deletions cmd/otel-agent/config/agent_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ func TestAgentConfig(t *testing.T) {
assert.Equal(t, "DATADOG_API_KEY", c.Get("api_key"))
assert.Equal(t, "datadoghq.eu", c.Get("site"))
assert.Equal(t, "debug", c.Get("log_level"))
assert.Equal(t, "https://trace.agent.datadoghq.eu", c.Get("apm_config.apm_dd_url"))
assert.Equal(t, true, c.Get("otlp_config.traces.span_name_as_resource_name"))
assert.Equal(t, map[string]string{"io.opentelemetry.javaagent.spring.client": "spring.client"}, c.Get("otlp_config.traces.span_name_remappings"))
assert.Equal(t, []string{"(GET|POST) /healthcheck"}, c.Get("apm_config.ignore_resources"))
assert.Equal(t, 0, c.Get("apm_config.receiver_port"))
assert.Equal(t, true, c.Get("apm_config.peer_tags_aggregation"))
assert.Equal(t, true, c.Get("apm_config.compute_stats_by_span_kind"))
assert.Equal(t, []string{"tag1"}, c.Get("apm_config.peer_tags"))
assert.Equal(t, 10, c.Get("apm_config.trace_buffer"))
assert.Equal(t, []string{"enable_otlp_compute_top_level_by_span_kind"}, c.Get("apm_config.features"))
}

func TestAgentConfigDefaults(t *testing.T) {
Expand All @@ -31,6 +41,8 @@ func TestAgentConfigDefaults(t *testing.T) {
}
assert.Equal(t, "DATADOG_API_KEY", c.Get("api_key"))
assert.Equal(t, "datadoghq.com", c.Get("site"))
assert.Equal(t, "https://trace.agent.datadoghq.com", c.Get("apm_config.apm_dd_url"))
assert.Equal(t, 0, c.Get("apm_config.receiver_port"))
}

func TestNoDDExporter(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions cmd/otel-agent/config/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ exporters:
api:
key: DATADOG_API_KEY
site: datadoghq.eu
traces:
span_name_as_resource_name: true
span_name_remappings:
io.opentelemetry.javaagent.spring.client: spring.client
ignore_resources: ["(GET|POST) /healthcheck"]
compute_stats_by_span_kind: true
compute_top_level_by_span_kind: true
peer_tags_aggregation: true
peer_tags: ["tag1"]
trace_buffer: 10
processors:
batch:
timeout: 10s
Expand Down
6 changes: 4 additions & 2 deletions comp/otelcol/collector/impl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/exporter/datadogexporter"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/processor/infraattributesprocessor"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/datatype"
traceagent "github.com/DataDog/datadog-agent/comp/trace/agent/def"
"github.com/DataDog/datadog-agent/pkg/serializer"
"github.com/DataDog/datadog-agent/pkg/util/optional"
)
Expand All @@ -54,6 +55,7 @@ type Requires struct {
Provider confmap.Converter
CollectorContrib collectorcontrib.Component
Serializer serializer.MetricSerializer
TraceAgent traceagent.Component
LogsAgent optional.Option[logsagentpipeline.Component]
HostName hostname.Component
Tagger tagger.Component
Expand Down Expand Up @@ -90,9 +92,9 @@ func NewComponent(reqs Requires) (Provides, error) {
return otelcol.Factories{}, err
}
if v, ok := reqs.LogsAgent.Get(); ok {
factories.Exporters[datadogexporter.Type] = datadogexporter.NewFactory(reqs.Serializer, v, reqs.HostName)
factories.Exporters[datadogexporter.Type] = datadogexporter.NewFactory(reqs.TraceAgent, reqs.Serializer, v, reqs.HostName)
} else {
factories.Exporters[datadogexporter.Type] = datadogexporter.NewFactory(reqs.Serializer, nil, reqs.HostName)
factories.Exporters[datadogexporter.Type] = datadogexporter.NewFactory(reqs.TraceAgent, reqs.Serializer, nil, reqs.HostName)
}
factories.Processors[infraattributesprocessor.Type] = infraattributesprocessor.NewFactory(reqs.Tagger)
return factories, nil
Expand Down
2 changes: 1 addition & 1 deletion comp/otelcol/otlp/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func getComponents(s serializer.MetricSerializer, logsAgentChannel chan *message

exporterFactories := []exporter.Factory{
otlpexporter.NewFactory(),
serializerexporter.NewFactory(s, &tagEnricher{cardinality: types.LowCardinality}, hostname.Get, nil),
serializerexporter.NewFactory(s, &tagEnricher{cardinality: types.LowCardinality}, hostname.Get, nil, nil),
loggingexporter.NewFactory(),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ type TracesConfig struct {
// NOTE: For stats computed from OTel traces, only top-level spans are considered when this option is off.
ComputeStatsBySpanKind bool `mapstructure:"compute_stats_by_span_kind"`

// If set to true, root spans and spans with a server or consumer `span.kind` will be marked as top-level.
// Additionally, spans with a client or producer `span.kind` will have stats computed.
// Enabling this config option may increase the number of spans that generate trace metrics, and may change which spans appear as top-level in Datadog.
// ComputeTopLevelBySpanKind needs to be enabled in both the Datadog connector and Datadog exporter configs if both components are being used.
// The default value is `false`.
ComputeTopLevelBySpanKind bool `mapstructure:"compute_top_level_by_span_kind"`

// If set to true, enables `peer.service` aggregation in the exporter. If disabled, aggregated trace stats will not include `peer.service` as a dimension.
// For the best experience with `peer.service`, it is recommended to also enable `compute_stats_by_span_kind`.
// If enabling both causes the datadog exporter to consume too many resources, try disabling `compute_stats_by_span_kind` first.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestValidate(t *testing.T) {
}

func TestUnmarshal(t *testing.T) {
cfgWithHTTPConfigs := NewFactory(nil, nil, nil).CreateDefaultConfig().(*Config)
cfgWithHTTPConfigs := NewFactory(nil, nil, nil, nil).CreateDefaultConfig().(*Config)
idleConnTimeout := 30 * time.Second
maxIdleConn := 300
maxIdleConnPerHost := 150
Expand Down Expand Up @@ -338,7 +338,7 @@ func TestUnmarshal(t *testing.T) {
},
}

f := NewFactory(nil, nil, nil)
f := NewFactory(nil, nil, nil, nil)
for _, testInstance := range tests {
t.Run(testInstance.name, func(t *testing.T) {
cfg := f.CreateDefaultConfig().(*Config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestSendAggregations(t *testing.T) {

for _, testInstance := range tests {
t.Run(testInstance.name, func(t *testing.T) {
f := NewFactory(nil, nil, nil)
f := NewFactory(nil, nil, nil, nil)
cfg := f.CreateDefaultConfig().(*Config)
err := component.UnmarshalConfig(testInstance.cfgMap, cfg)
if err != nil || testInstance.err != "" {
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestPeerTags(t *testing.T) {

for _, testInstance := range tests {
t.Run(testInstance.name, func(t *testing.T) {
f := NewFactory(nil, nil, nil)
f := NewFactory(nil, nil, nil, nil)
cfg := f.CreateDefaultConfig().(*Config)
err := component.UnmarshalConfig(testInstance.cfgMap, cfg)
if err != nil || testInstance.err != "" {
Expand Down
96 changes: 69 additions & 27 deletions comp/otelcol/otlp/components/exporter/datadogexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/DataDog/datadog-agent/comp/otelcol/logsagentpipeline"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/exporter/logsagentexporter"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/exporter/serializerexporter"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/metricsclient"
traceagent "github.com/DataDog/datadog-agent/comp/trace/agent/def"
"github.com/DataDog/datadog-agent/pkg/logs/message"
tracepb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/datadog-agent/pkg/serializer"
Expand All @@ -36,29 +38,43 @@ import (
)

type factory struct {
onceAttributesTranslator sync.Once
attributesTranslator *attributes.Translator
attributesErr error

registry *featuregate.Registry
s serializer.MetricSerializer
logsAgent logsagentpipeline.Component
h hostnameinterface.Component
attributesErr error
onceSetupTraceAgentCmp sync.Once

registry *featuregate.Registry
s serializer.MetricSerializer
logsAgent logsagentpipeline.Component
h hostnameinterface.Component
traceagentcmp traceagent.Component
}

func (f *factory) AttributesTranslator(set component.TelemetrySettings) (*attributes.Translator, error) {
f.onceAttributesTranslator.Do(func() {
f.attributesTranslator, f.attributesErr = attributes.NewTranslator(set)
func (f *factory) setupTraceAgentCmp(set component.TelemetrySettings) error {
f.onceSetupTraceAgentCmp.Do(func() {
var attributesTranslator *attributes.Translator
attributesTranslator, f.attributesErr = attributes.NewTranslator(set)
if f.attributesErr != nil {
return
}
mclient := metricsclient.InitializeMetricClient(set.MeterProvider, metricsclient.ExporterSourceTag)
f.traceagentcmp.SetOTelAttributeTranslator(attributesTranslator)
f.traceagentcmp.SetStatsdClient(mclient)
})
return f.attributesTranslator, f.attributesErr
return f.attributesErr
}

func newFactoryWithRegistry(registry *featuregate.Registry, s serializer.MetricSerializer, logsagent logsagentpipeline.Component, h hostnameinterface.Component) exporter.Factory {
func newFactoryWithRegistry(
registry *featuregate.Registry,
traceagentcmp traceagent.Component,
s serializer.MetricSerializer,
logsagent logsagentpipeline.Component,
h hostnameinterface.Component,
) exporter.Factory {
f := &factory{
registry: registry,
s: s,
logsAgent: logsagent,
h: h,
registry: registry,
s: s,
logsAgent: logsagent,
traceagentcmp: traceagentcmp,
h: h,
}

return exporter.NewFactory(
Expand All @@ -85,8 +101,13 @@ func (t *tagEnricher) Enrich(_ context.Context, extraTags []string, dimensions *
}

// NewFactory creates a Datadog exporter factory
func NewFactory(s serializer.MetricSerializer, logsAgent logsagentpipeline.Component, h hostnameinterface.Component) exporter.Factory {
return newFactoryWithRegistry(featuregate.GlobalRegistry(), s, logsAgent, h)
func NewFactory(
traceagentcmp traceagent.Component,
s serializer.MetricSerializer,
logsAgent logsagentpipeline.Component,
h hostnameinterface.Component,
) exporter.Factory {
return newFactoryWithRegistry(featuregate.GlobalRegistry(), traceagentcmp, s, logsAgent, h)
}

func defaultClientConfig() confighttp.ClientConfig {
Expand Down Expand Up @@ -159,12 +180,34 @@ func checkAndCastConfig(c component.Config, logger *zap.Logger) *Config {

// createTracesExporter creates a trace exporter based on this config.
func (f *factory) createTracesExporter(
_ context.Context,
_ exporter.CreateSettings,
_ component.Config,
ctx context.Context,
set exporter.CreateSettings,
c component.Config,
) (exporter.Traces, error) {
// TODO implement
return nil, nil
cfg := checkAndCastConfig(c, set.TelemetrySettings.Logger)

err := f.setupTraceAgentCmp(set.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("failed to build attributes translator: %w", err)
}

if cfg.OnlyMetadata {
set.Logger.Error("datadog::only_metadata should not be set in OTel Agent")
}

tracex := newTracesExporter(ctx, set, cfg, f.traceagentcmp)

return exporterhelper.NewTracesExporter(
ctx,
set,
cfg,
tracex.consumeTraces,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0 * time.Second}),
// We don't do retries on traces because of deduping concerns on APM Events.
exporterhelper.WithRetry(configretry.BackOffConfig{Enabled: false}),
exporterhelper.WithQueue(cfg.QueueSettings),
)
}

// createTracesExporter creates a trace exporter based on this config.
Expand All @@ -178,7 +221,7 @@ func (f *factory) createMetricsExporter(
statsIn := make(chan []byte, 1000)
statsv := set.BuildInfo.Command + set.BuildInfo.Version
f.consumeStatsPayload(ctx, &wg, statsIn, statsv, fmt.Sprintf("datadogexporter-%s-%s", set.BuildInfo.Command, set.BuildInfo.Version), set.Logger)
sf := serializerexporter.NewFactory(f.s, &tagEnricher{}, f.h.Get, statsIn)
sf := serializerexporter.NewFactory(f.s, &tagEnricher{}, f.h.Get, statsIn, &wg)
ex := &serializerexporter.ExporterConfig{
Metrics: cfg.Metrics,
TimeoutSettings: exporterhelper.TimeoutSettings{
Expand Down Expand Up @@ -213,8 +256,7 @@ func (f *factory) consumeStatsPayload(ctx context.Context, wg *sync.WaitGroup, s
}
// The DD Connector doesn't set the agent version, so we'll set it here
sp.AgentVersion = agentVersion

// TODO(OASIS-12): send StatsPayload with trace agent
f.traceagentcmp.SendStatsPayload(sp)
}
}
}()
Expand Down
Loading

0 comments on commit f3f83ae

Please sign in to comment.