Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 91 additions & 4 deletions internal/pkg/otel/translate/otelconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ import (

// This is a prefix we add to all names of Otel entities in the configuration. Its purpose is to avoid collisions with
// user-provided configuration
const OtelNamePrefix = "_agent-component/"
const (
OtelNamePrefix = "_agent-component/"
BeatsAuthExtensionType = "beatsauth"
outputOtelOverrideFieldName = "otel"
outputOtelOverrideExporterFieldName = "exporter"
outputOtelOverrideExtensionsFieldName = "extensions"
)

// BeatMonitoringConfigGetter is a function that returns the monitoring configuration for a beat receiver.
type (
Expand Down Expand Up @@ -183,7 +189,7 @@ func getExporterID(exporterType otelcomponent.Type, outputName string) otelcompo
// outputName here is name of the output defined in elastic-agent.yml. For ex: default, monitoring
func getBeatsAuthExtensionID(outputName string) otelcomponent.ID {
extensionName := fmt.Sprintf("%s%s", OtelNamePrefix, outputName)
return otelcomponent.NewIDWithName(otelcomponent.MustNewType("beatsauth"), extensionName)
return otelcomponent.NewIDWithName(otelcomponent.MustNewType(BeatsAuthExtensionType), extensionName)
}

// getCollectorConfigForComponent returns the Otel collector config required to run the given component.
Expand Down Expand Up @@ -426,6 +432,12 @@ func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type,
return nil, nil, nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err)
}

// if there's an otel override config, extract it, we'll apply it after the conversion
otelOverrideCfgC, err := extractOutputOtelOverrideConfig(outputCfgC)
if err != nil {
return nil, nil, nil, err
}

// Config translation function can mutate queue settings defined under output config
exporterConfig, err := configTranslationFunc(outputCfgC, logger)
if err != nil {
Expand All @@ -443,6 +455,19 @@ func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type,
}
}

// if there's an otel override section for the exporter, we should apply it
exporterOverrideCfg, err := getOutputOtelOverrideExporterConfig(otelOverrideCfgC)
if err != nil {
return nil, nil, nil, err
}
koanfmaps.Merge(exporterOverrideCfg, exporterConfig)

// if there's an otel override section for extensions, extract it and apply it to individual extension configs
extensionsOverrideCfg, err := getOutputOtelOverrideExtensionsConfig(otelOverrideCfgC)
if err != nil {
return nil, nil, nil, err
}

// beatsauth extension is not tested with output other than elasticsearch
if exporterType.String() == "elasticsearch" {
// get extension ID
Expand All @@ -452,6 +477,10 @@ func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type,
return nil, nil, nil, fmt.Errorf("error supporting http parameters for output: %s, unit: %s, error: %w", outputName, unit.ID, err)
}

if beatsauthOverrideCfg, found := extensionsOverrideCfg[BeatsAuthExtensionType]; found {
koanfmaps.Merge(beatsauthOverrideCfg, extensionConfig)
}

// sets extensionCfg
extensionCfg = map[string]any{
extensionID.String(): extensionConfig,
Expand Down Expand Up @@ -519,16 +548,74 @@ func translateEsOutputToExporter(cfg *config.C, logger *logp.Logger) (map[string
if err != nil {
return nil, err
}
// dynamic indexing works by default

// we also want to use dynamic log ids
// we want to use dynamic log ids
esConfig["logs_dynamic_id"] = map[string]any{"enabled": true}

esConfig["include_source_on_error"] = true

return esConfig, nil
}

// extractOutputOtelOverrideConfig removes the configuration under the otel override key from the provided configuration
// and returns it.
func extractOutputOtelOverrideConfig(cfg *config.C) (*config.C, error) {
if !cfg.HasField(outputOtelOverrideFieldName) {
return nil, nil
}
otelCfg, err := cfg.Child(outputOtelOverrideFieldName, -1)
if err != nil {
return nil, err
}
_, err = cfg.Remove(outputOtelOverrideFieldName, -1)
if err != nil {
return nil, err
}
return otelCfg, nil
}

// getOutputOtelOverrideExporterConfig returns the exporter override configuration from the given otel override
// configuration as a map[string]any. It does not modify the input.
func getOutputOtelOverrideExporterConfig(otelOverrideCfg *config.C) (map[string]any, error) {
if otelOverrideCfg == nil {
return nil, nil
}
if !otelOverrideCfg.HasField(outputOtelOverrideExporterFieldName) {
return nil, nil
}
exporterCfgC, err := otelOverrideCfg.Child(outputOtelOverrideExporterFieldName, -1)
if err != nil {
return nil, err
}
exporterCfgMap := make(map[string]any)
err = exporterCfgC.Unpack(&exporterCfgMap)
if err != nil {
return nil, err
}
return exporterCfgMap, nil
}

// getOutputOtelOverrideExporterConfig returns the override configuration for extensions from the given otel override
// configuration. The return value is a map keyed by extension types, with configuration overrides as values.
func getOutputOtelOverrideExtensionsConfig(otelOverrideCfg *config.C) (map[string]map[string]any, error) {
if otelOverrideCfg == nil {
return nil, nil
}
if !otelOverrideCfg.HasField(outputOtelOverrideExtensionsFieldName) {
return nil, nil
}
extensionsCfgC, err := otelOverrideCfg.Child(outputOtelOverrideExtensionsFieldName, -1)
if err != nil {
return nil, err
}
extensionsCfgMap := make(map[string]map[string]any)
err = extensionsCfgC.Unpack(&extensionsCfgMap)
if err != nil {
return nil, err
}
return extensionsCfgMap, nil
}

func BeatDataPath(componentId string) string {
return filepath.Join(paths.Run(), componentId)
}
Expand Down
Loading