Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OASIS-12] Add trace exporter to Datadog exporter #25759

Merged
merged 9 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions comp/otelcol/collector/impl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/exporter/serializerexporter"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/processor/infraattributesprocessor"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/datatype"
traceagent "github.com/DataDog/datadog-agent/comp/trace/agent/def"
"github.com/DataDog/datadog-agent/pkg/serializer"
zapAgent "github.com/DataDog/datadog-agent/pkg/util/log/zap"
"github.com/DataDog/datadog-agent/pkg/util/optional"
Expand All @@ -55,6 +56,7 @@ type Requires struct {
Provider confmap.Converter
CollectorContrib collectorcontrib.Component
Serializer serializer.MetricSerializer
TraceAgent traceagent.Component
LogsAgent optional.Option[logsagentpipeline.Component]
SourceProvider serializerexporter.SourceProviderFunc
Tagger tagger.Component
Expand Down Expand Up @@ -97,9 +99,9 @@ func NewComponent(reqs Requires) (Provides, error) {
return otelcol.Factories{}, err
}
if v, ok := reqs.LogsAgent.Get(); ok {
factories.Exporters[datadogexporter.Type] = datadogexporter.NewFactory(reqs.Serializer, v, reqs.SourceProvider)
factories.Exporters[datadogexporter.Type] = datadogexporter.NewFactory(reqs.TraceAgent, reqs.Serializer, v, reqs.SourceProvider)
} else {
factories.Exporters[datadogexporter.Type] = datadogexporter.NewFactory(reqs.Serializer, nil, reqs.SourceProvider)
factories.Exporters[datadogexporter.Type] = datadogexporter.NewFactory(reqs.TraceAgent, reqs.Serializer, nil, reqs.SourceProvider)
}
factories.Processors[infraattributesprocessor.Type] = infraattributesprocessor.NewFactory(reqs.Tagger)
return factories, nil
Expand Down
2 changes: 1 addition & 1 deletion comp/otelcol/otlp/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func getComponents(s serializer.MetricSerializer, logsAgentChannel chan *message

exporterFactories := []exporter.Factory{
otlpexporter.NewFactory(),
serializerexporter.NewFactory(s, &tagEnricher{cardinality: types.LowCardinality}, hostname.Get, nil),
serializerexporter.NewFactory(s, &tagEnricher{cardinality: types.LowCardinality}, hostname.Get, nil, nil),
loggingexporter.NewFactory(),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestValidate(t *testing.T) {
}

func TestUnmarshal(t *testing.T) {
cfgWithHTTPConfigs := NewFactory(nil, nil, nil).CreateDefaultConfig().(*Config)
cfgWithHTTPConfigs := NewFactory(nil, nil, nil, nil).CreateDefaultConfig().(*Config)
idleConnTimeout := 30 * time.Second
maxIdleConn := 300
maxIdleConnPerHost := 150
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestUnmarshal(t *testing.T) {
},
}

f := NewFactory(nil, nil, nil)
f := NewFactory(nil, nil, nil, nil)
for _, testInstance := range tests {
t.Run(testInstance.name, func(t *testing.T) {
cfg := f.CreateDefaultConfig().(*Config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestSendAggregations(t *testing.T) {

for _, testInstance := range tests {
t.Run(testInstance.name, func(t *testing.T) {
f := NewFactory(nil, nil, nil)
f := NewFactory(nil, nil, nil, nil)
cfg := f.CreateDefaultConfig().(*Config)
err := component.UnmarshalConfig(testInstance.cfgMap, cfg)
if err != nil || testInstance.err != "" {
Expand Down
104 changes: 76 additions & 28 deletions comp/otelcol/otlp/components/exporter/datadogexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/DataDog/datadog-agent/comp/otelcol/logsagentpipeline"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/exporter/logsagentexporter"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/exporter/serializerexporter"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/metricsclient"
traceagent "github.com/DataDog/datadog-agent/comp/trace/agent/def"
"github.com/DataDog/datadog-agent/pkg/logs/message"
tracepb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/datadog-agent/pkg/serializer"
Expand All @@ -35,29 +37,46 @@ import (
)

type factory struct {
onceAttributesTranslator sync.Once
attributesTranslator *attributes.Translator
attributesErr error

registry *featuregate.Registry
s serializer.MetricSerializer
logsAgent logsagentpipeline.Component
h serializerexporter.SourceProviderFunc
attributesErr error
onceSetupTraceAgentCmp sync.Once

registry *featuregate.Registry
s serializer.MetricSerializer
logsAgent logsagentpipeline.Component
h serializerexporter.SourceProviderFunc
traceagentcmp traceagent.Component
}

func (f *factory) AttributesTranslator(set component.TelemetrySettings) (*attributes.Translator, error) {
f.onceAttributesTranslator.Do(func() {
f.attributesTranslator, f.attributesErr = attributes.NewTranslator(set)
// setupTraceAgentCmp sets up the trace agent component.
// It is needed in trace exporter to send trace and in metrics exporter to send apm stats.
// The set up happens only once, subsequent calls are no-op.
func (f *factory) setupTraceAgentCmp(set component.TelemetrySettings) error {
f.onceSetupTraceAgentCmp.Do(func() {
var attributesTranslator *attributes.Translator
attributesTranslator, f.attributesErr = attributes.NewTranslator(set)
if f.attributesErr != nil {
return
}
f.traceagentcmp.SetOTelAttributeTranslator(attributesTranslator)
// TODO(OASIS-12): use this statsd client in trace agent
_ = metricsclient.InitializeMetricClient(set.MeterProvider, metricsclient.ExporterSourceTag)
})
return f.attributesTranslator, f.attributesErr
return f.attributesErr
}

func newFactoryWithRegistry(registry *featuregate.Registry, s serializer.MetricSerializer, logsagent logsagentpipeline.Component, h serializerexporter.SourceProviderFunc) exporter.Factory {
func newFactoryWithRegistry(
registry *featuregate.Registry,
traceagentcmp traceagent.Component,
s serializer.MetricSerializer,
logsagent logsagentpipeline.Component,
h serializerexporter.SourceProviderFunc,
) exporter.Factory {
f := &factory{
registry: registry,
s: s,
logsAgent: logsagent,
h: h,
registry: registry,
s: s,
logsAgent: logsagent,
traceagentcmp: traceagentcmp,
h: h,
}

return exporter.NewFactory(
Expand All @@ -84,8 +103,13 @@ func (t *tagEnricher) Enrich(_ context.Context, extraTags []string, dimensions *
}

// NewFactory creates a Datadog exporter factory
func NewFactory(s serializer.MetricSerializer, logsAgent logsagentpipeline.Component, h serializerexporter.SourceProviderFunc) exporter.Factory {
return newFactoryWithRegistry(featuregate.GlobalRegistry(), s, logsAgent, h)
func NewFactory(
traceagentcmp traceagent.Component,
s serializer.MetricSerializer,
logsAgent logsagentpipeline.Component,
h serializerexporter.SourceProviderFunc,
) exporter.Factory {
return newFactoryWithRegistry(featuregate.GlobalRegistry(), traceagentcmp, s, logsAgent, h)
}

func defaultClientConfig() confighttp.ClientConfig {
Expand Down Expand Up @@ -160,26 +184,51 @@ func checkAndCastConfig(c component.Config, logger *zap.Logger) *Config {

// createTracesExporter creates a trace exporter based on this config.
func (f *factory) createTracesExporter(
_ context.Context,
_ exporter.CreateSettings,
_ component.Config,
ctx context.Context,
set exporter.CreateSettings,
c component.Config,
) (exporter.Traces, error) {
// TODO implement
return nil, nil
cfg := checkAndCastConfig(c, set.TelemetrySettings.Logger)

err := f.setupTraceAgentCmp(set.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("failed to set up trace agent component: %w", err)
}

if cfg.OnlyMetadata {
return nil, fmt.Errorf("datadog::only_metadata should not be set in OTel Agent")
}

tracex := newTracesExporter(ctx, set, cfg, f.traceagentcmp)

return exporterhelper.NewTracesExporter(
ctx,
set,
cfg,
tracex.consumeTraces,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0 * time.Second}),
// We don't do retries on traces because of deduping concerns on APM Events.
exporterhelper.WithRetry(configretry.BackOffConfig{Enabled: false}),
exporterhelper.WithQueue(cfg.QueueSettings),
)
}

// createTracesExporter creates a trace exporter based on this config.
// createMetricsExporter creates a metrics exporter based on this config.
func (f *factory) createMetricsExporter(
ctx context.Context,
set exporter.CreateSettings,
c component.Config,
) (exporter.Metrics, error) {
cfg := checkAndCastConfig(c, set.Logger)
if err := f.setupTraceAgentCmp(set.TelemetrySettings); err != nil {
return nil, fmt.Errorf("failed to set up trace agent component: %w", err)
}
var wg sync.WaitGroup // waits for consumeStatsPayload to exit
statsIn := make(chan []byte, 1000)
statsv := set.BuildInfo.Command + set.BuildInfo.Version
f.consumeStatsPayload(ctx, &wg, statsIn, statsv, fmt.Sprintf("datadogexporter-%s-%s", set.BuildInfo.Command, set.BuildInfo.Version), set.Logger)
sf := serializerexporter.NewFactory(f.s, &tagEnricher{}, f.h, statsIn)
sf := serializerexporter.NewFactory(f.s, &tagEnricher{}, f.h, statsIn, &wg)
ex := &serializerexporter.ExporterConfig{
Metrics: cfg.Metrics,
TimeoutSettings: exporterhelper.TimeoutSettings{
Expand Down Expand Up @@ -214,8 +263,7 @@ func (f *factory) consumeStatsPayload(ctx context.Context, wg *sync.WaitGroup, s
}
// The DD Connector doesn't set the agent version, so we'll set it here
sp.AgentVersion = agentVersion

// TODO(OASIS-12): send StatsPayload with trace agent
f.traceagentcmp.SendStatsPayload(sp)
}
}
}()
Expand Down
Loading
Loading