From 354238c85d3c293720f70f78859dc295b07763b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Sat, 15 Nov 2025 18:41:38 +0100 Subject: [PATCH 1/8] Move ComponentsModifies to the component package --- internal/pkg/agent/application/application.go | 2 +- internal/pkg/agent/application/coordinator/coordinator.go | 8 ++------ .../pkg/agent/application/endpoint_component_modifier.go | 5 ++--- internal/pkg/agent/application/fleet_server_bootstrap.go | 4 ++-- .../agent/application/inject_proxy_component_modifier.go | 3 +-- pkg/component/component.go | 4 ++++ 6 files changed, 12 insertions(+), 14 deletions(-) diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index c577e846202..668333fd642 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -171,7 +171,7 @@ func New( var configMgr coordinator.ConfigManager var managed *managedConfigManager - var compModifiers = []coordinator.ComponentsModifier{InjectAPMConfig} + var compModifiers = []component.ComponentsModifier{InjectAPMConfig} var composableManaged bool var isManaged bool var actionAcker acker.Acker diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 945163b1830..1753389d934 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -224,10 +224,6 @@ type VarsManager interface { Watch() <-chan []*transpiler.Vars } -// ComponentsModifier is a function that takes the computed components model and modifies it before -// passing it into the components runtime manager. -type ComponentsModifier func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) - // managerShutdownTimeout is how long the coordinator will wait during shutdown // to receive termination states from its managers. // Note: The current timeout (5s) is shorter than the default stop timeout for @@ -267,7 +263,7 @@ type Coordinator struct { otelCfg *confmap.Conf caps capabilities.Capabilities - modifiers []ComponentsModifier + modifiers []component.ComponentsModifier // The current state of the Coordinator. This value and its subfields are // safe to read directly from within the main Coordinator goroutine. @@ -439,7 +435,7 @@ func New( otelMgr OTelManager, fleetAcker acker.Acker, initialUpgradeDetails *details.Details, - modifiers ...ComponentsModifier, + modifiers ...component.ComponentsModifier, ) *Coordinator { var fleetState cproto.State var fleetMessage string diff --git a/internal/pkg/agent/application/endpoint_component_modifier.go b/internal/pkg/agent/application/endpoint_component_modifier.go index bc9b4b1db07..108008fd508 100644 --- a/internal/pkg/agent/application/endpoint_component_modifier.go +++ b/internal/pkg/agent/application/endpoint_component_modifier.go @@ -13,7 +13,6 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-libs/transport/tlscommon" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -50,7 +49,7 @@ func (tlsCache) MakeKey(keyPassPath, certPath, keyPath string) string { // "revision": 1, // "type": "endpoint" // } -func EndpointSignedComponentModifier() coordinator.ComponentsModifier { +func EndpointSignedComponentModifier() component.ComponentsModifier { return func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) { const signedKey = "signed" @@ -83,7 +82,7 @@ func EndpointSignedComponentModifier() coordinator.ComponentsModifier { // 'key_passphrase_path'. // It does so, ONLY for the client TLS configuration for mTLS used with // fleet-server. -func EndpointTLSComponentModifier(log *logger.Logger) coordinator.ComponentsModifier { +func EndpointTLSComponentModifier(log *logger.Logger) component.ComponentsModifier { return newEndpointTLSComponentModifier(log, &tlsCache{mu: &sync.Mutex{}}) } diff --git a/internal/pkg/agent/application/fleet_server_bootstrap.go b/internal/pkg/agent/application/fleet_server_bootstrap.go index 2484b98064d..2756a7b03dd 100644 --- a/internal/pkg/agent/application/fleet_server_bootstrap.go +++ b/internal/pkg/agent/application/fleet_server_bootstrap.go @@ -50,7 +50,7 @@ var injectFleetServerInput = config.MustNewConfigFrom(map[string]interface{}{ // FleetServerComponentModifier modifies the comps to inject extra information from the policy into // the Fleet Server component and units needed to run Fleet Server correctly. -func FleetServerComponentModifier(serverCfg *configuration.FleetServerConfig) coordinator.ComponentsModifier { +func FleetServerComponentModifier(serverCfg *configuration.FleetServerConfig) component.ComponentsModifier { return func(comps []component.Component, _ map[string]interface{}) ([]component.Component, error) { for i, comp := range comps { if comp.InputSpec != nil && comp.InputSpec.InputType == fleetServer && comp.Err == nil { @@ -118,7 +118,7 @@ func addBootstrapCfg(dst map[string]interface{}, es *configuration.Elasticsearch // InjectFleetConfigComponentModifier The modifier that injects the fleet configuration for the components // that need to be able to connect to fleet server. -func InjectFleetConfigComponentModifier(fleetCfg *configuration.FleetAgentConfig, agentInfo info.Agent) coordinator.ComponentsModifier { +func InjectFleetConfigComponentModifier(fleetCfg *configuration.FleetAgentConfig, agentInfo info.Agent) component.ComponentsModifier { return func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) { hostsStr := fleetCfg.Client.GetHosts() fleetHosts := make([]interface{}, 0, len(hostsStr)) diff --git a/internal/pkg/agent/application/inject_proxy_component_modifier.go b/internal/pkg/agent/application/inject_proxy_component_modifier.go index 1290e8a342a..dc8b347b076 100644 --- a/internal/pkg/agent/application/inject_proxy_component_modifier.go +++ b/internal/pkg/agent/application/inject_proxy_component_modifier.go @@ -10,7 +10,6 @@ import ( "golang.org/x/net/http/httpproxy" "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/pkg/component" ) @@ -20,7 +19,7 @@ import ( // The URL used is the HTTPS_PROXY env var. If that's not set the HTTP_PROXY env var is used. // If there are no env vars set, or the unit's config has `proxy_disable: true`, nothing is injected // If the output config has `proxy_url: ""`, it will not be overwritten. -func InjectProxyEndpointModifier() coordinator.ComponentsModifier { +func InjectProxyEndpointModifier() component.ComponentsModifier { return func(comps []component.Component, _ map[string]interface{}) ([]component.Component, error) { for i, comp := range comps { if comp.InputSpec != nil && comp.InputSpec.InputType == endpoint { diff --git a/pkg/component/component.go b/pkg/component/component.go index b8515a9655b..e88773dc645 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -286,6 +286,10 @@ func (c *Component) RemoveWorkDir(parentDirPath string) error { return os.RemoveAll(c.WorkDirPath(parentDirPath)) } +// ComponentsModifier is a function that takes the computed components model and modifies it before +// passing it into the components runtime manager. +type ComponentsModifier func(comps []Component, cfg map[string]interface{}) ([]Component, error) + // Model is the components model with signed policy data // This replaces former top level []Components with the top Model that captures signed policy data. // The signed data is a part of the policy since 8.8.0 release and contains the signed policy fragments and the signature that can be validated. From 0084bae9fd70d31fd3b66412f87141583d672f24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Sat, 15 Nov 2025 19:53:45 +0100 Subject: [PATCH 2/8] Move Otel runtime determination to component modifier --- .../application/coordinator/coordinator.go | 19 +++++++++---------- .../endpoint_component_modifier_test.go | 3 +-- .../install/componentvalidation/validation.go | 2 +- internal/pkg/agent/install/uninstall.go | 2 +- pkg/component/component.go | 16 ++++++++++++++++ pkg/component/component_test.go | 6 +++--- 6 files changed, 31 insertions(+), 17 deletions(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 1753389d934..d90d583dad2 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -1808,7 +1808,6 @@ func (c *Coordinator) updateManagersWithConfig(model *component.Model) { func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtimeModel *component.Model, otelModel *component.Model) { var otelComponents, runtimeComponents []component.Component for _, comp := range model.Components { - c.maybeOverrideRuntimeForComponent(&comp) switch comp.RuntimeManager { case component.OtelRuntimeManager: otelComponents = append(otelComponents, comp) @@ -1840,7 +1839,7 @@ func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtime // Normally, we use the runtime set in the component itself via the configuration, but // we may also fall back to the process runtime if the otel runtime is unsupported for // some reason. One example is the output using unsupported config options. -func (c *Coordinator) maybeOverrideRuntimeForComponent(comp *component.Component) { +func maybeOverrideRuntimeForComponent(logger *logger.Logger, comp *component.Component) { if comp.RuntimeManager == component.ProcessRuntimeManager { // do nothing, the process runtime can handle any component return @@ -1849,7 +1848,7 @@ func (c *Coordinator) maybeOverrideRuntimeForComponent(comp *component.Component // check if the component is actually supported err := translate.VerifyComponentIsOtelSupported(comp) if err != nil { - c.logger.Warnf("otel runtime is not supported for component %s, switching to process runtime, reason: %v", comp.ID, err) + logger.Warnf("otel runtime is not supported for component %s, switching to process runtime, reason: %v", comp.ID, err) comp.RuntimeManager = component.ProcessRuntimeManager } } @@ -1939,8 +1938,15 @@ func (c *Coordinator) generateComponentModel() (err error) { existingCompState[comp.Component.ID] = comp.State.Pid } + otelRuntimeModifier := func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) { + for i := range comps { + maybeOverrideRuntimeForComponent(c.logger, &comps[i]) + } + return comps, nil + } comps, err := c.specs.ToComponents( cfg, + append(c.modifiers, otelRuntimeModifier), configInjector, c.state.LogLevel, c.agentInfo, @@ -1953,13 +1959,6 @@ func (c *Coordinator) generateComponentModel() (err error) { // Filter any disallowed inputs/outputs from the components comps = c.filterByCapabilities(comps) - for _, modifier := range c.modifiers { - comps, err = modifier(comps, cfg) - if err != nil { - return fmt.Errorf("failed to modify components: %w", err) - } - } - // If we made it this far, update our internal derived values and // return with no error c.derivedConfig = cfg diff --git a/internal/pkg/agent/application/endpoint_component_modifier_test.go b/internal/pkg/agent/application/endpoint_component_modifier_test.go index 7ecae1cdd07..1f13b54f11f 100644 --- a/internal/pkg/agent/application/endpoint_component_modifier_test.go +++ b/internal/pkg/agent/application/endpoint_component_modifier_test.go @@ -20,7 +20,6 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/elastic-agent-libs/testing/certutil" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/internal/pkg/testutils/fipsutils" "github.com/elastic/elastic-agent/pkg/core/logger/loggertest" @@ -44,7 +43,7 @@ func TestEndpointComponentModifier(t *testing.T) { tests := map[string][]struct { name string - compModifier coordinator.ComponentsModifier + compModifier component.ComponentsModifier comps []component.Component cfg map[string]interface{} wantComps []component.Component diff --git a/internal/pkg/agent/install/componentvalidation/validation.go b/internal/pkg/agent/install/componentvalidation/validation.go index 24cbc640951..47b19a448fe 100644 --- a/internal/pkg/agent/install/componentvalidation/validation.go +++ b/internal/pkg/agent/install/componentvalidation/validation.go @@ -101,7 +101,7 @@ func GetComponentsFromPolicy(ctx context.Context, l *logger.Logger, cfgPath stri } // Compute the components from the computed configuration. - comps, err := specs.ToComponents(m, monitorFn, lvl, agentInfo, map[string]uint64{}) + comps, err := specs.ToComponents(m, nil, monitorFn, lvl, agentInfo, map[string]uint64{}) if err != nil { return nil, fmt.Errorf("failed to render components: %w", err) } diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index fff121fd159..55ba9b403e7 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -422,7 +422,7 @@ func serviceComponentsFromConfig(specs component.RuntimeSpecs, cfg *config.Confi if err != nil { return nil, aerrors.New("failed to create a map from config", err) } - allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel, nil, map[string]uint64{}) + allComps, err := specs.ToComponents(mm, nil, nil, logp.InfoLevel, nil, map[string]uint64{}) if err != nil { return nil, fmt.Errorf("failed to render components: %w", err) } diff --git a/pkg/component/component.go b/pkg/component/component.go index e88773dc645..f9e9d5c57c9 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -343,6 +343,7 @@ type Model struct { // the current runtime specification. func (r *RuntimeSpecs) ToComponents( policy map[string]interface{}, + modifiers []ComponentsModifier, monitoringInjector GenerateMonitoringCfgFn, ll logp.Level, headers HeadersProvider, @@ -353,6 +354,14 @@ func (r *RuntimeSpecs) ToComponents( return nil, err } + // Do this here so the monitoring injector has a more accurate view of what components are running + for _, modifier := range modifiers { + components, err = modifier(components, policy) + if err != nil { + return nil, err + } + } + if monitoringInjector != nil { monitoringCfg, err := monitoringInjector(policy, components, currentServiceCompInts) if err != nil { @@ -366,6 +375,13 @@ func (r *RuntimeSpecs) ToComponents( return nil, fmt.Errorf("failed to generate monitoring components: %w", err) } + for _, modifier := range modifiers { + monitoringComps, err = modifier(monitoringComps, policy) + if err != nil { + return nil, err + } + } + components = append(components, monitoringComps...) } } diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index fcd990aa3d4..5de3bcc323e 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -2503,7 +2503,7 @@ func TestToComponents(t *testing.T) { runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), scenario.Platform, SkipBinaryCheck()) require.NoError(t, err) - result, err := runtime.ToComponents(scenario.Policy, nil, scenario.LogLevel, scenario.headers, map[string]uint64{}) + result, err := runtime.ToComponents(scenario.Policy, nil, nil, scenario.LogLevel, scenario.headers, map[string]uint64{}) if scenario.Err != "" { assert.Equal(t, scenario.Err, err.Error()) } else { @@ -2979,7 +2979,7 @@ func TestFlattenedDataStream(t *testing.T) { t.Fatalf("cannot load runtime specs: %s", err) } - result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil, map[string]uint64{}) + result, err := runtime.ToComponents(policy, nil, nil, logp.DebugLevel, nil, map[string]uint64{}) if err != nil { t.Fatalf("cannot convert policy to component: %s", err) } @@ -3080,7 +3080,7 @@ func TestFlattenedDataStreamIsolatedUnits(t *testing.T) { t.Fatalf("cannot load runtime specs: %s", err) } - result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil, map[string]uint64{}) + result, err := runtime.ToComponents(policy, nil, nil, logp.DebugLevel, nil, map[string]uint64{}) if err != nil { t.Fatalf("cannot convert policy to component: %s", err) } From 51eaca3d818a7d54aa4b79f453f78a9900fb2312 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Mon, 17 Nov 2025 13:18:35 +0100 Subject: [PATCH 3/8] Check supported outputs in monitoring config generation --- .../application/coordinator/coordinator.go | 6 - .../testdata/monitoring_config_full_otel.yaml | 5 +- .../monitoring_config_full_process.yaml | 77 ++------- .../monitoring/component/v1_monitor.go | 131 +++++++++------ .../monitoring/component/v1_monitor_test.go | 123 +++++++++++++- .../monitoring/monitoringhelpers/endpoint.go | 14 ++ internal/pkg/otel/translate/otelconfig.go | 61 +++++-- .../pkg/otel/translate/otelconfig_test.go | 46 +++++ pkg/component/component.go | 158 ++++++++++-------- 9 files changed, 418 insertions(+), 203 deletions(-) create mode 100644 internal/pkg/agent/application/monitoring/monitoringhelpers/endpoint.go diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index d90d583dad2..5fb4bc31876 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -1812,12 +1812,6 @@ func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtime case component.OtelRuntimeManager: otelComponents = append(otelComponents, comp) case component.ProcessRuntimeManager: - // Hack to fix https://github.com/elastic/elastic-agent/issues/11169 - // TODO: Remove this after https://github.com/elastic/elastic-agent/issues/10220 is resolved - if comp.ID == "prometheus/metrics-monitoring" { - c.logger.Warnf("The Otel prometheus metrics monitoring input can't run in a beats process, skipping") - continue - } runtimeComponents = append(runtimeComponents, comp) default: // this should be impossible if we parse the configuration correctly diff --git a/internal/pkg/agent/application/monitoring/component/testdata/monitoring_config_full_otel.yaml b/internal/pkg/agent/application/monitoring/component/testdata/monitoring_config_full_otel.yaml index 71a77ca2b12..32f74cc5e7b 100644 --- a/internal/pkg/agent/application/monitoring/component/testdata/monitoring_config_full_otel.yaml +++ b/internal/pkg/agent/application/monitoring/component/testdata/monitoring_config_full_otel.yaml @@ -915,4 +915,7 @@ inputs: type: system/metrics use_output: monitoring outputs: - monitoring: {} + monitoring: + hosts: + - localhost:9200 + type: elasticsearch diff --git a/internal/pkg/agent/application/monitoring/component/testdata/monitoring_config_full_process.yaml b/internal/pkg/agent/application/monitoring/component/testdata/monitoring_config_full_process.yaml index dc1d3d346b2..41ea9a7b358 100644 --- a/internal/pkg/agent/application/monitoring/component/testdata/monitoring_config_full_process.yaml +++ b/internal/pkg/agent/application/monitoring/component/testdata/monitoring_config_full_process.yaml @@ -4,7 +4,8 @@ agent: enabled: false metrics: true inputs: -- id: filestream-monitoring-agent +- _runtime_experimental: process + id: filestream-monitoring-agent name: filestream-monitoring-agent streams: - close: @@ -110,7 +111,8 @@ inputs: type: filestream type: filestream use_output: monitoring -- data_stream: +- _runtime_experimental: process + data_stream: namespace: default id: metrics-monitoring-beats name: metrics-monitoring-beats @@ -357,9 +359,18 @@ inputs: binary: metricbeat id: prometheus/metrics-monitoring target: component + - drop_fields: + fields: + - beat.stats.cgroup + - beat.stats.cpu + - beat.stats.handles + - beat.stats.memstats + - beat.stats.runtime + ignore_missing: true type: beat/metrics use_output: monitoring -- data_stream: +- _runtime_experimental: process + data_stream: namespace: default id: metrics-monitoring-agent name: metrics-monitoring-agent @@ -875,61 +886,6 @@ inputs: binary: metricbeat id: http/metrics-monitoring target: component - - data_stream: - dataset: elastic_agent.elastic_agent - namespace: default - type: metrics - failure_threshold: 5 - hosts: - - placeholder - id: metrics-monitoring-metricbeat-1 - index: metrics-elastic_agent.elastic_agent-default - metricsets: - - json - namespace: agent - path: /stats - period: 1m0s - processors: - - add_fields: - fields: - dataset: elastic_agent.elastic_agent - target: event - - add_fields: - fields: - id: "" - process: metricbeat - snapshot: false - version: placeholder - target: elastic_agent - - add_fields: - fields: - id: "" - target: agent - - copy_fields: - fail_on_error: false - fields: - - from: http.agent.beat.cpu - to: system.process.cpu - - from: http.agent.beat.memstats.memory_sys - to: system.process.memory.size - - from: http.agent.beat.handles - to: system.process.fd - - from: http.agent.beat.cgroup - to: system.process.cgroup - - from: http.agent.apm-server - to: apm-server - - from: http.filebeat_input - to: filebeat_input - ignore_missing: true - - drop_fields: - fields: - - http - ignore_missing: true - - add_fields: - fields: - binary: metricbeat - id: prometheus/metrics-monitoring - target: component type: http/metrics use_output: monitoring - _runtime_experimental: otel @@ -1099,4 +1055,7 @@ inputs: type: system/metrics use_output: monitoring outputs: - monitoring: {} + monitoring: + hosts: + - localhost:9200 + type: elasticsearch diff --git a/internal/pkg/agent/application/monitoring/component/v1_monitor.go b/internal/pkg/agent/application/monitoring/component/v1_monitor.go index 183ce191531..e7fe9ba0bea 100644 --- a/internal/pkg/agent/application/monitoring/component/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/component/v1_monitor.go @@ -23,6 +23,11 @@ import ( koanfmaps "github.com/knadh/koanf/maps" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/monitoringhelpers" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/internal/pkg/otel/translate" + "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/utils" @@ -71,8 +76,9 @@ const ( fileBeatName = "filebeat" collectorName = "collector" - monitoringMetricsUnitID = "metrics-monitoring" - monitoringFilesUnitsID = "filestream-monitoring" + monitoringMetricsUnitID = "metrics-monitoring" + monitoringFilesUnitsID = "filestream-monitoring" + prometheusMonitoringComponentId = "prometheus/" + monitoringMetricsUnitID windowsOS = "windows" @@ -228,26 +234,33 @@ func (b *BeatsMonitor) MonitoringConfig( } } - componentInfos := b.getComponentInfos(components, componentIDPidMap) - - if err := b.injectMonitoringOutput(policy, cfg, monitoringOutputName); err != nil && !errors.Is(err, errNoOutputPresent) { + outputCfg, err := b.injectMonitoringOutput(policy, cfg, monitoringOutputName) + if err != nil && !errors.Is(err, errNoOutputPresent) { return nil, errors.New(err, "failed to inject monitoring output") } else if errors.Is(err, errNoOutputPresent) { // nothing to inject, no monitoring output return nil, nil } + outputOtelSupported := isOutputOtelSupported(outputCfg) + monitoringRuntime := component.RuntimeManager(b.config.C.RuntimeManager) + if !outputOtelSupported { + monitoringRuntime = monitoringCfg.ProcessRuntimeManager + } + componentInfos := b.getComponentInfos(components, monitoringRuntime, outputOtelSupported, componentIDPidMap) + // initializes inputs collection so injectors don't have to deal with it b.initInputs(cfg) if b.config.C.MonitorLogs { - if err := b.injectLogsInput(cfg, componentInfos, monitoringOutput); err != nil { + if err := b.injectLogsInput(cfg, componentInfos, monitoringOutput, monitoringRuntime); err != nil { return nil, errors.New(err, "failed to inject monitoring output") } } if b.config.C.MonitorMetrics { - if err := b.injectMetricsInput(cfg, componentInfos, metricsCollectionIntervalString, failureThreshold); err != nil { + if err := b.injectMetricsInput( + cfg, componentInfos, metricsCollectionIntervalString, failureThreshold, monitoringRuntime); err != nil { return nil, errors.New(err, "failed to inject monitoring output") } } @@ -287,7 +300,7 @@ func (b *BeatsMonitor) ComponentMonitoringConfig(unitID, binary string) map[stri } configMap := make(map[string]any) - endpoint := BeatsMonitoringEndpoint(unitID) + endpoint := monitoringhelpers.BeatsMonitoringEndpoint(unitID) if endpoint != "" { httpConfigMap := map[string]any{ "enabled": true, @@ -393,20 +406,27 @@ func (b *BeatsMonitor) initInputs(cfg map[string]interface{}) { cfg[inputsKey] = inputsCollection } -func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{}, monitoringOutputName string) error { +// injectMonitoringOutput injects the monitoring output into the configuration. It takes an existing output named +// `monitoringOutputName` and makes a copy of it named `monitoring`. It returns the output configuration. +func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{}, monitoringOutputName string) (map[string]any, error) { outputsNode, found := source[outputsKey] if !found { - return errNoOutputPresent + return nil, errNoOutputPresent } outputs, ok := outputsNode.(map[string]interface{}) if !ok { - return fmt.Errorf("outputs not a map") + return nil, fmt.Errorf("outputs not a map") } outputNode, found := outputs[monitoringOutputName] if !found { - return fmt.Errorf("output %q used for monitoring not found", monitoringOutputName) + return nil, fmt.Errorf("output %q used for monitoring not found", monitoringOutputName) + } + + outputMap, ok := outputNode.(map[string]any) + if !ok { + return nil, fmt.Errorf("output %q used for monitoring not a map", monitoringOutputName) } monitoringOutputs := map[string]interface{}{ @@ -415,13 +435,18 @@ func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{ dest[outputsKey] = monitoringOutputs - return nil + return outputMap, nil } // getComponentInfos returns a slice of componentInfo structs based on the provided components. This slice contains // all the information needed to generate the monitoring configuration for these components, as well as configuration // for new components which are going to be doing the monitoring. -func (b *BeatsMonitor) getComponentInfos(components []component.Component, componentIDPidMap map[string]uint64) []componentInfo { +func (b *BeatsMonitor) getComponentInfos( + components []component.Component, + monitoringRuntime component.RuntimeManager, + outputOtelSupported bool, + componentIDPidMap map[string]uint64, +) []componentInfo { componentInfos := make([]componentInfo, 0, len(components)) for _, comp := range components { compInfo := componentInfo{ @@ -440,29 +465,29 @@ func (b *BeatsMonitor) getComponentInfos(components []component.Component, compo componentInfo{ ID: fmt.Sprintf("beat/%s", monitoringMetricsUnitID), BinaryName: metricBeatName, - RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager), + RuntimeManager: monitoringRuntime, }, componentInfo{ ID: fmt.Sprintf("http/%s", monitoringMetricsUnitID), BinaryName: metricBeatName, - RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager), + RuntimeManager: monitoringRuntime, }) } if b.config.C.MonitorLogs { componentInfos = append(componentInfos, componentInfo{ ID: monitoringFilesUnitsID, BinaryName: fileBeatName, - RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager), + RuntimeManager: monitoringRuntime, }) } - // If any other component uses the Otel runtime, also add a component to monitor - // its telemetry. - if b.config.C.MonitorMetrics && usingOtelRuntime(componentInfos) { + // If any other component uses the Otel runtime, also add a component to monitor its telemetry. + // This component only works in the Otel runtime, so we can't add it if the output doesn't support it. + if b.config.C.MonitorMetrics && usingOtelRuntime(componentInfos) && outputOtelSupported { componentInfos = append(componentInfos, componentInfo{ - ID: fmt.Sprintf("prometheus/%s", monitoringMetricsUnitID), + ID: prometheusMonitoringComponentId, BinaryName: metricBeatName, - RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager), + RuntimeManager: component.OtelRuntimeManager, }) } // sort the components to ensure a consistent order of inputs in the configuration @@ -473,7 +498,12 @@ func (b *BeatsMonitor) getComponentInfos(components []component.Component, compo } // injectLogsInput adds logging configs for component monitoring to the `cfg` map -func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentInfos []componentInfo, monitoringOutput string) error { +func (b *BeatsMonitor) injectLogsInput( + cfg map[string]interface{}, + componentInfos []componentInfo, + monitoringOutput string, + monitoringRuntime component.RuntimeManager, +) error { logsDrop := filepath.Dir(loggingPath("unit", b.operatingSystem)) streams := []any{b.getAgentFilestreamStream(logsDrop)} @@ -481,15 +511,12 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentInfo streams = append(streams, b.getServiceComponentFilestreamStreams(componentInfos)...) input := map[string]interface{}{ - idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID), - "name": fmt.Sprintf("%s-agent", monitoringFilesUnitsID), - "type": "filestream", - useOutputKey: monitoringOutput, - "streams": streams, - } - - if b.config.C.RuntimeManager == monitoringCfg.OtelRuntimeManager { - input["_runtime_experimental"] = b.config.C.RuntimeManager + idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID), + "name": fmt.Sprintf("%s-agent", monitoringFilesUnitsID), + "type": "filestream", + useOutputKey: monitoringOutput, + "streams": streams, + "_runtime_experimental": monitoringRuntime, } inputs := []any{input} @@ -527,6 +554,7 @@ func (b *BeatsMonitor) injectMetricsInput( componentInfos []componentInfo, metricsCollectionIntervalString string, failureThreshold *uint, + monitoringRuntime component.RuntimeManager, ) error { if metricsCollectionIntervalString == "" { metricsCollectionIntervalString = defaultMetricsCollectionInterval.String() @@ -550,7 +578,8 @@ func (b *BeatsMonitor) injectMetricsInput( "data_stream": map[string]interface{}{ "namespace": monitoringNamespace, }, - "streams": beatsStreams, + "streams": beatsStreams, + "_runtime_experimental": monitoringRuntime, }, map[string]interface{}{ idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID), @@ -560,11 +589,17 @@ func (b *BeatsMonitor) injectMetricsInput( "data_stream": map[string]interface{}{ "namespace": monitoringNamespace, }, - "streams": httpStreams, + "streams": httpStreams, + "_runtime_experimental": monitoringRuntime, }, } - if usingOtelRuntime(componentInfos) { + // We only add this stream if the Otel manager is enabled and the respective component info exists. This is a + // special case where this input shouldn't exists if the output doesn't support otel, which we check while + // creating the component infos. + if usingOtelRuntime(componentInfos) && slices.ContainsFunc(componentInfos, func(ci componentInfo) bool { + return ci.ID == prometheusMonitoringComponentId + }) { prometheusStream := b.getPrometheusStream(failureThreshold, metricsCollectionIntervalString) inputs = append(inputs, map[string]interface{}{ idKey: fmt.Sprintf("%s-collector", monitoringMetricsUnitID), @@ -581,16 +616,6 @@ func (b *BeatsMonitor) injectMetricsInput( }) } - // Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled - if b.config.C.RuntimeManager == monitoringCfg.OtelRuntimeManager { - for _, input := range inputs { - inputMap := input.(map[string]interface{}) - if _, found := inputMap["_runtime_experimental"]; !found { - inputMap["_runtime_experimental"] = b.config.C.RuntimeManager - } - } - } - // add system/process metrics for services that can't be monitored via json/beats metrics inputs = append(inputs, b.getServiceComponentProcessMetricInputs( componentInfos, metricsCollectionIntervalString)...) @@ -750,7 +775,7 @@ func (b *BeatsMonitor) getHttpStreams( continue } - endpoints := []interface{}{PrefixedEndpoint(BeatsMonitoringEndpoint(compInfo.ID))} + endpoints := []interface{}{PrefixedEndpoint(monitoringhelpers.BeatsMonitoringEndpoint(compInfo.ID))} name := sanitizeName(binaryName) // Do not create http streams if runtime-manager is otel and binary is of beat type @@ -1440,10 +1465,6 @@ func AgentMonitoringEndpoint(cfg *monitoringCfg.MonitoringConfig) string { return fmt.Sprintf(`unix:///tmp/elastic-agent/%x.sock`, sha256.Sum256([]byte(path))) } -func BeatsMonitoringEndpoint(componentID string) string { - return utils.SocketURLWithFallback(componentID, paths.TempDir()) -} - func httpCopyRules() []interface{} { fromToMap := []interface{}{ // I should be able to see the CPU Usage on the running machine. Am using too much CPU? @@ -1504,6 +1525,16 @@ func isSupportedBeatsBinary(binaryName string) bool { return false } +func isOutputOtelSupported(outputCfg map[string]any) bool { + parsed, err := component.ParseOutput(monitoringOutput, outputCfg, logp.InfoLevel, nil) + if err != nil { + return false + } + + err = translate.VerifyOutputIsOtelSupported(parsed.OutputType, outputCfg) + return err == nil +} + func monitoringDrop(path string) (drop string) { defer func() { if drop != "" { diff --git a/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go index f6637f72551..7999d4e408c 100644 --- a/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go @@ -42,7 +42,10 @@ func TestMonitoringFull(t *testing.T) { }, }, "outputs": map[string]any{ - "default": map[string]any{}, + "default": map[string]any{ + "hosts": []string{"localhost:9200"}, + "type": "elasticsearch", + }, }, } @@ -991,7 +994,10 @@ func TestMonitoringWithOtelRuntime(t *testing.T) { }, }, "outputs": map[string]any{ - "default": map[string]any{}, + "default": map[string]any{ + "hosts": []string{"localhost:9200"}, + "type": "elasticsearch", + }, }, } @@ -1134,3 +1140,116 @@ agent.monitoring: assert.Equal(t, beatsMonitor.config.C.MonitorLogs, true) assert.Equal(t, beatsMonitor.config.C.MonitorMetrics, true) } + +func TestMonitoringConfigOtelOutputSupport(t *testing.T) { + agentInfo, err := info.NewAgentInfo(context.Background(), false) + require.NoError(t, err, "Error creating agent info") + + testCases := []struct { + name string + outputConfig map[string]any + expectPrometheusMonitoring bool + monitoringRuntimeManager string + }{ + { + name: "kafka output - should NOT have prometheus monitoring", + outputConfig: map[string]any{ + "type": "kafka", + "hosts": []string{"localhost:9092"}, + }, + expectPrometheusMonitoring: false, + monitoringRuntimeManager: monitoringcfg.ProcessRuntimeManager, + }, + { + name: "elasticsearch output - should have prometheus monitoring", + outputConfig: map[string]any{ + "type": "elasticsearch", + "hosts": []string{"localhost:9200"}, + }, + expectPrometheusMonitoring: true, + monitoringRuntimeManager: monitoringcfg.OtelRuntimeManager, + }, + { + name: "elasticsearch with unsupported config - should NOT have prometheus monitoring", + outputConfig: map[string]any{ + "type": "elasticsearch", + "hosts": []string{"localhost:9200"}, + "indices": []any{}, + }, + expectPrometheusMonitoring: false, + monitoringRuntimeManager: monitoringcfg.ProcessRuntimeManager, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + testMon := BeatsMonitor{ + enabled: true, + config: &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + MonitorLogs: false, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + RuntimeManager: monitoringcfg.OtelRuntimeManager, + }, + }, + agentInfo: agentInfo, + } + + policy := map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + }, + }, + "outputs": map[string]any{ + "default": tc.outputConfig, + }, + } + + // Add a component that uses the OTel runtime to trigger prometheus monitoring + components := []component.Component{ + { + ID: "filestream-otel", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "filebeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Name: "filebeat", + }, + }, + }, + RuntimeManager: component.OtelRuntimeManager, + }, + } + + outCfg, err := testMon.MonitoringConfig(policy, components, map[string]uint64{}) + require.NoError(t, err) + + // Check for prometheus/metrics input + inputs := outCfg["inputs"].([]any) + foundPrometheusInput := false + for _, input := range inputs { + var inputStruct struct { + ID string `mapstructure:"id"` + Type string `mapstructure:"type"` + Runtime string `mapstructure:"_runtime_experimental"` + } + require.NoError(t, mapstructure.Decode(input, &inputStruct)) + foundPrometheusInput = foundPrometheusInput || inputStruct.Type == "prometheus/metrics" + assert.Equalf(t, tc.monitoringRuntimeManager, inputStruct.Runtime, + "expected monitoring runtime manager %s for input %s, got %s", + tc.monitoringRuntimeManager, inputStruct.ID, inputStruct.Runtime) + } + + assert.Equal(t, tc.expectPrometheusMonitoring, foundPrometheusInput, + "Prometheus monitoring presence mismatch for output type") + }) + } +} diff --git a/internal/pkg/agent/application/monitoring/monitoringhelpers/endpoint.go b/internal/pkg/agent/application/monitoring/monitoringhelpers/endpoint.go new file mode 100644 index 00000000000..29b8bb7cf38 --- /dev/null +++ b/internal/pkg/agent/application/monitoring/monitoringhelpers/endpoint.go @@ -0,0 +1,14 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package monitoringhelpers + +import ( + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + "github.com/elastic/elastic-agent/pkg/utils" +) + +func BeatsMonitoringEndpoint(componentID string) string { + return utils.SocketURLWithFallback(componentID, paths.TempDir()) +} diff --git a/internal/pkg/otel/translate/otelconfig.go b/internal/pkg/otel/translate/otelconfig.go index 0472445a270..d1a2cbc231d 100644 --- a/internal/pkg/otel/translate/otelconfig.go +++ b/internal/pkg/otel/translate/otelconfig.go @@ -14,8 +14,9 @@ import ( "github.com/go-viper/mapstructure/v2" koanfmaps "github.com/knadh/koanf/maps" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/monitoringhelpers" + "github.com/elastic/elastic-agent-libs/logp" - componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component" otelcomponent "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" @@ -141,6 +142,30 @@ func VerifyComponentIsOtelSupported(comp *component.Component) error { return nil } +// VerifyOutputIsOtelSupported verifies that the given output can be converted into an Otel Collector exporter. It +// returns an error indicating what the problem is, if it can't. +func VerifyOutputIsOtelSupported(outputType string, outputCfg map[string]any) error { + if !slices.Contains(OtelSupportedOutputTypes, outputType) { + return fmt.Errorf("unsupported output type: %s", outputType) + } + exporterType, err := OutputTypeToExporterType(outputType) + if err != nil { + return err + } + + outputCfgC, err := config.NewConfigFrom(outputCfg) + if err != nil { + return err + } + + _, err = OutputConfigToExporterConfig(logp.NewNopLogger(), exporterType, outputCfgC) + if errors.Is(err, errors.ErrUnsupported) { + return fmt.Errorf("unsupported configuration for %s: %w", outputType, err) + } + + return nil +} + // getSupportedComponents returns components from the given model that can be run in an Otel Collector. func getSupportedComponents(logger *logp.Logger, model *component.Model) []*component.Component { var supportedComponents []*component.Component @@ -314,7 +339,7 @@ func getReceiversConfigForComponent( // agent self-monitoring is disabled monitoringConfig := beatMonitoringConfigGetter(comp.ID, beatName) if monitoringConfig == nil { - endpoint := componentmonitoring.BeatsMonitoringEndpoint(comp.ID) + endpoint := monitoringhelpers.BeatsMonitoringEndpoint(comp.ID) monitoringConfig = map[string]any{ "http": map[string]any{ "enabled": true, @@ -336,7 +361,7 @@ func getReceiversConfigForComponent( func getExportersConfigForComponent(comp *component.Component, logger *logp.Logger) (exporterCfg map[string]any, queueCfg map[string]any, extensionCfg map[string]any, err error) { exportersConfig := map[string]any{} extensionConfig := map[string]any{} - exporterType, err := getExporterTypeForComponent(comp) + exporterType, err := OutputTypeToExporterType(comp.OutputType) if err != nil { return nil, nil, nil, err } @@ -394,13 +419,13 @@ func getReceiverTypeForComponent(comp *component.Component) (otelcomponent.Type, } } -// getExporterTypeForComponent returns the exporter type for the given component. -func getExporterTypeForComponent(comp *component.Component) (otelcomponent.Type, error) { - switch comp.OutputType { +// OutputTypeToExporterType returns the exporter type for the given output type. +func OutputTypeToExporterType(outputType string) (otelcomponent.Type, error) { + switch outputType { case "elasticsearch": return otelcomponent.MustNewType("elasticsearch"), nil default: - return otelcomponent.Type{}, fmt.Errorf("unknown otel exporter type for output type: %s", comp.OutputType) + return otelcomponent.Type{}, fmt.Errorf("unknown otel exporter type for output type: %s", outputType) } } @@ -409,10 +434,7 @@ func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type, if unit.Type == client.UnitTypeInput { return nil, nil, nil, fmt.Errorf("unit type is an input, expected output: %v", unit) } - configTranslationFunc, ok := configTranslationFuncForExporter[exporterType] - if !ok { - return nil, nil, nil, fmt.Errorf("no config translation function for exporter type: %s", exporterType) - } + // we'd like to use the same exporter for all outputs with the same name, so we parse out the name for the unit id // these will be deduplicated by the configuration merging process at the end outputName := strings.TrimPrefix(unit.ID, inputType+"-") // TODO: Use a more structured approach here @@ -432,7 +454,7 @@ func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type, } // Config translation function can mutate queue settings defined under output config - exporterConfig, err := configTranslationFunc(outputCfgC, logger) + exporterConfig, err := OutputConfigToExporterConfig(logger, exporterType, outputCfgC) if err != nil { return nil, nil, nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err) } @@ -521,6 +543,21 @@ func getInputsForUnit(unit component.Unit, info info.Agent, defaultDataStreamTyp return inputs, nil } +// OutputConfigToExporterConfig translates the output configuration to an exporter configuration. +func OutputConfigToExporterConfig(logger *logp.Logger, exporterType otelcomponent.Type, outputConfig *config.C) (map[string]any, error) { + configTranslationFunc, ok := configTranslationFuncForExporter[exporterType] + if !ok { + return nil, fmt.Errorf("no config translation function for exporter type: %s", exporterType) + } + + exporterConfig, err := configTranslationFunc(outputConfig, logger) + if err != nil { + return nil, err + } + + return exporterConfig, nil +} + // getDefaultDatastreamTypeForComponent returns the default datastream type for a given component. // This is needed to translate from the agent policy config format to the beats config format. func getDefaultDatastreamTypeForComponent(comp *component.Component) (string, error) { diff --git a/internal/pkg/otel/translate/otelconfig_test.go b/internal/pkg/otel/translate/otelconfig_test.go index 874e1545e48..0479251963f 100644 --- a/internal/pkg/otel/translate/otelconfig_test.go +++ b/internal/pkg/otel/translate/otelconfig_test.go @@ -1358,6 +1358,52 @@ func TestGetBeatsAuthExtensionConfig(t *testing.T) { } } +func TestVerifyOutputIsOtelSupported(t *testing.T) { + tests := []struct { + name string + outputType string + outputCfg map[string]any + expectedError string + }{ + { + name: "supported output - elasticsearch", + outputType: "elasticsearch", + outputCfg: map[string]any{ + "type": "elasticsearch", + "hosts": []any{"localhost:9200"}, + }, + }, + { + name: "unsupported output type - kafka", + outputType: "kafka", + outputCfg: map[string]any{}, + expectedError: "unsupported output type: kafka", + }, + { + name: "unsupported configuration - indices field", + outputType: "elasticsearch", + outputCfg: map[string]any{ + "type": "elasticsearch", + "hosts": []any{"localhost:9200"}, + "indices": []any{}, + }, + expectedError: "unsupported configuration for elasticsearch:", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := VerifyOutputIsOtelSupported(tt.outputType, tt.outputCfg) + if tt.expectedError != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.expectedError) + } else { + require.NoError(t, err) + } + }) + } +} + func TestUnitToExporterConfig(t *testing.T) { logger := logp.NewNopLogger() esExporterType := otelcomponent.MustNewType("elasticsearch") diff --git a/pkg/component/component.go b/pkg/component/component.go index f9e9d5c57c9..c0ed4e17591 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -46,6 +46,8 @@ const ( ProcessRuntimeManager = RuntimeManager("process") OtelRuntimeManager = RuntimeManager("otel") DefaultRuntimeManager RuntimeManager = ProcessRuntimeManager + enabledKey = "enabled" + typeKey = "type" ) // ErrInputRuntimeCheckFail error is used when an input specification runtime prevention check occurs. @@ -401,11 +403,11 @@ func unitForInput(input inputI, id string) Unit { } func unitForOutput(output outputI, id string) Unit { - cfg, cfgErr := ExpectedConfig(output.config) + cfg, cfgErr := ExpectedConfig(output.Config) return Unit{ ID: id, Type: client.UnitTypeOutput, - LogLevel: output.logLevel, + LogLevel: output.LogLevel, Config: cfg, Err: cfgErr, } @@ -425,14 +427,14 @@ func (r *RuntimeSpecs) componentsForInputType( // Treat as non isolated units component on error of reading the input spec if componentErr != nil || !inputSpec.Spec.IsolateUnits { - componentID := fmt.Sprintf("%s-%s", inputType, output.name) - if componentErr == nil && !containsStr(inputSpec.Spec.Outputs, output.outputType) { + componentID := fmt.Sprintf("%s-%s", inputType, output.Name) + if componentErr == nil && !containsStr(inputSpec.Spec.Outputs, output.OutputType) { // This output is unsupported. componentErr = ErrOutputNotSupported } unitsForRuntimeManager := make(map[RuntimeManager][]Unit) - for _, input := range output.inputs[inputType] { + for _, input := range output.Inputs[inputType] { if input.enabled { unitID := fmt.Sprintf("%s-%s", componentID, input.id) unitsForRuntimeManager[input.runtimeManager] = append( @@ -455,20 +457,20 @@ func (r *RuntimeSpecs) componentsForInputType( Err: componentErr, InputSpec: &inputSpec, InputType: inputType, - OutputType: output.outputType, + OutputType: output.OutputType, Units: units, RuntimeManager: runtimeManager, Features: featureFlags.AsProto(), Component: componentConfig.AsProto(), - OutputStatusReporting: extractStatusReporting(output.config), + OutputStatusReporting: extractStatusReporting(output.Config), }) } } } else { - for _, input := range output.inputs[inputType] { + for _, input := range output.Inputs[inputType] { // Units are being mapped to components, so we need a unique ID for each. - componentID := fmt.Sprintf("%s-%s-%s", inputType, output.name, input.id) - if componentErr == nil && !containsStr(inputSpec.Spec.Outputs, output.outputType) { + componentID := fmt.Sprintf("%s-%s-%s", inputType, output.Name, input.id) + if componentErr == nil && !containsStr(inputSpec.Spec.Outputs, output.OutputType) { // This output is unsupported. componentErr = ErrOutputNotSupported } @@ -485,12 +487,12 @@ func (r *RuntimeSpecs) componentsForInputType( Err: componentErr, InputSpec: &inputSpec, InputType: inputType, - OutputType: output.outputType, + OutputType: output.OutputType, Units: units, RuntimeManager: input.runtimeManager, Features: featureFlags.AsProto(), Component: componentConfig.AsProto(), - OutputStatusReporting: extractStatusReporting(output.config), + OutputStatusReporting: extractStatusReporting(output.Config), }) } } @@ -500,7 +502,7 @@ func (r *RuntimeSpecs) componentsForInputType( func (r *RuntimeSpecs) componentsForOutput(output outputI, featureFlags *features.Flags, componentConfig *ComponentConfig) []Component { var components []Component - for inputType := range output.inputs { + for inputType := range output.Inputs { // No need for error checking at this stage -- we are guaranteed // to get a Component/s back. If there is an error that prevents it/them // from running then it will be in the Component's Err field and @@ -557,7 +559,7 @@ func (r *RuntimeSpecs) PolicyToComponents( var components []Component for _, outputName := range outputKeys { output := outputsMap[outputName] - if output.enabled { + if output.Enabled { components = append(components, r.componentsForOutput(output, featureFlags, componentConfig)...) } @@ -625,60 +627,12 @@ func toIntermediate(policy map[string]interface{}, aliasMapping map[string]strin if !ok { return nil, fmt.Errorf("invalid 'outputs.%s', expected a map not a %T", name, outputRaw) } - typeRaw, ok := output[typeKey] - if !ok { - return nil, fmt.Errorf("invalid 'outputs.%s', 'type' missing", name) - } - t, ok := typeRaw.(string) - if !ok { - return nil, fmt.Errorf("invalid 'outputs.%s.type', expected a string not a %T", name, typeRaw) - } - enabled := true - if enabledRaw, ok := output[enabledKey]; ok { - enabledVal, ok := enabledRaw.(bool) - if !ok { - return nil, fmt.Errorf("invalid 'outputs.%s.enabled', expected a bool not a %T", name, enabledRaw) - } - enabled = enabledVal - delete(output, enabledKey) - } - logLevel, err := getLogLevel(output, ll) + parsedOutput, err := ParseOutput(name, output, ll, headers) if err != nil { - return nil, fmt.Errorf("invalid 'outputs.%s.log_level', %w", name, err) - } - - // inject headers configured during enroll - if t == elasticsearchType && headers != nil { - // can be nil when called from install/uninstall - if agentHeaders := headers.Headers(); len(agentHeaders) > 0 { - headers := make(map[string]interface{}) - if existingHeadersRaw, found := output[headersKey]; found { - existingHeaders, ok := existingHeadersRaw.(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("invalid 'outputs.headers', expected a map not a %T", outputRaw) - } - headers = existingHeaders - } - - for headerName, headerVal := range agentHeaders { - // only set headers for those that are not already set - if _, ok := headers[headerName]; !ok { - headers[headerName] = headerVal - } - } - - output[headersKey] = headers - } + return nil, err } - outputsMap[name] = outputI{ - name: name, - enabled: enabled, - logLevel: logLevel, - outputType: t, - config: output, - inputs: make(map[string][]inputI), - } + outputsMap[name] = *parsedOutput } // map the inputs to the outputs @@ -769,7 +723,7 @@ func toIntermediate(policy map[string]interface{}, aliasMapping map[string]strin // allows individual inputs (like endpoint) to detect policy changes more easily. injectInputPolicyID(policy, input) - output.inputs[t] = append(output.inputs[t], inputI{ + output.Inputs[t] = append(output.Inputs[t], inputI{ idx: idx, id: id, enabled: enabled, @@ -785,6 +739,64 @@ func toIntermediate(policy map[string]interface{}, aliasMapping map[string]strin return outputsMap, nil } +// ParseOutput parses the output configuration into an intermediate structured representation. +func ParseOutput(outputName string, outputConfig map[string]any, ll logp.Level, headers HeadersProvider) (*outputI, error) { + typeRaw, ok := outputConfig[typeKey] + if !ok { + return nil, fmt.Errorf("invalid 'outputs.%s', 'type' missing", outputName) + } + t, ok := typeRaw.(string) + if !ok { + return nil, fmt.Errorf("invalid 'outputs.%s.type', expected a string not a %T", outputName, typeRaw) + } + enabled := true + if enabledRaw, ok := outputConfig[enabledKey]; ok { + enabledVal, ok := enabledRaw.(bool) + if !ok { + return nil, fmt.Errorf("invalid 'outputs.%s.enabled', expected a bool not a %T", outputName, enabledRaw) + } + enabled = enabledVal + delete(outputConfig, enabledKey) + } + logLevel, err := getLogLevel(outputConfig, ll) + if err != nil { + return nil, fmt.Errorf("invalid 'outputs.%s.log_level', %w", outputName, err) + } + + // inject headers configured during enroll + if t == elasticsearchType && headers != nil { + // can be nil when called from install/uninstall + if agentHeaders := headers.Headers(); len(agentHeaders) > 0 { + headers := make(map[string]interface{}) + if existingHeadersRaw, found := outputConfig[headersKey]; found { + existingHeaders, ok := existingHeadersRaw.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("invalid 'outputs.headers', expected a map not a %T", outputConfig) + } + headers = existingHeaders + } + + for headerName, headerVal := range agentHeaders { + // only set headers for those that are not already set + if _, ok := headers[headerName]; !ok { + headers[headerName] = headerVal + } + } + + outputConfig[headersKey] = headers + } + } + + return &outputI{ + Name: outputName, + Enabled: enabled, + LogLevel: logLevel, + OutputType: t, + Config: outputConfig, + Inputs: make(map[string][]inputI), + }, nil +} + type inputI struct { idx int id string @@ -800,20 +812,20 @@ type inputI struct { } type outputI struct { - name string - enabled bool - logLevel client.UnitLogLevel - outputType string + Name string + Enabled bool + LogLevel client.UnitLogLevel + OutputType string // The raw configuration for this output, with small cleanups: // - enabled key is removed // - log_level key is removed // - if outputType is "elasticsearch", headers key is extended by adding any // values in AgentInfo.esHeaders - config map[string]interface{} + Config map[string]interface{} // inputs directed at this output, keyed by canonical (non-alias) type. - inputs map[string][]inputI + Inputs map[string][]inputI } // varsForPlatform sets the runtime variables that are available in the @@ -873,7 +885,7 @@ func validateRuntimeChecks( func hasDuplicate(outputsMap map[string]outputI, id string) bool { for _, o := range outputsMap { - for _, i := range o.inputs { + for _, i := range o.Inputs { for _, j := range i { if j.id == id { return true From b05dbc8c312b22e368f862b21753269cbd0ecd15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Thu, 20 Nov 2025 15:56:44 +0100 Subject: [PATCH 4/8] Add changelog entry --- .../1763650536-self-monitoring-runtime.yaml | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 changelog/fragments/1763650536-self-monitoring-runtime.yaml diff --git a/changelog/fragments/1763650536-self-monitoring-runtime.yaml b/changelog/fragments/1763650536-self-monitoring-runtime.yaml new file mode 100644 index 00000000000..bfcac4e9b74 --- /dev/null +++ b/changelog/fragments/1763650536-self-monitoring-runtime.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Ensure the self-monitoring configuration accounts for the runtime components actually run in. + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 From e4032c176f732e512de0d3d8fe5f67a4e65cc93a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Thu, 20 Nov 2025 22:11:09 +0100 Subject: [PATCH 5/8] Log warning about switching to process runtime for monitoring --- internal/pkg/agent/application/application.go | 1 + .../monitoring/component/v1_monitor.go | 42 ++++++++++++------- .../monitoring/component/v1_monitor_test.go | 14 ++++++- .../install/componentvalidation/validation.go | 8 +++- .../integration/ess/beat_receivers_test.go | 10 ++++- 5 files changed, 57 insertions(+), 18 deletions(-) diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 668333fd642..c141815eef8 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -149,6 +149,7 @@ func New( cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, + log, ) runtime, err := runtime.NewManager( diff --git a/internal/pkg/agent/application/monitoring/component/v1_monitor.go b/internal/pkg/agent/application/monitoring/component/v1_monitor.go index e7fe9ba0bea..eb6b9d62070 100644 --- a/internal/pkg/agent/application/monitoring/component/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/component/v1_monitor.go @@ -104,6 +104,7 @@ type BeatsMonitor struct { config *monitoringConfig operatingSystem string agentInfo info.Agent + logger *logp.Logger } // componentInfo is the information necessary to generate monitoring configuration for a component. We don't just use @@ -122,7 +123,13 @@ type monitoringConfig struct { } // New creates a new BeatsMonitor instance. -func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent) *BeatsMonitor { +func New( + enabled bool, + operatingSystem string, + cfg *monitoringCfg.MonitoringConfig, + agentInfo info.Agent, + logger *logp.Logger, +) *BeatsMonitor { return &BeatsMonitor{ enabled: enabled, config: &monitoringConfig{ @@ -130,6 +137,7 @@ func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConf }, operatingSystem: operatingSystem, agentInfo: agentInfo, + logger: logger, } } @@ -242,11 +250,13 @@ func (b *BeatsMonitor) MonitoringConfig( return nil, nil } - outputOtelSupported := isOutputOtelSupported(outputCfg) + outputOtelSupportedErr := verifyOutputOtelSupported(outputCfg) monitoringRuntime := component.RuntimeManager(b.config.C.RuntimeManager) - if !outputOtelSupported { + if outputOtelSupportedErr != nil { + b.logger.Warnf("otel runtime is not supported for monitoring output, switching to process runtime, reason: %v", outputOtelSupportedErr) monitoringRuntime = monitoringCfg.ProcessRuntimeManager } + outputOtelSupported := outputOtelSupportedErr == nil componentInfos := b.getComponentInfos(components, monitoringRuntime, outputOtelSupported, componentIDPidMap) // initializes inputs collection so injectors don't have to deal with it @@ -482,13 +492,18 @@ func (b *BeatsMonitor) getComponentInfos( } // If any other component uses the Otel runtime, also add a component to monitor its telemetry. // This component only works in the Otel runtime, so we can't add it if the output doesn't support it. - if b.config.C.MonitorMetrics && usingOtelRuntime(componentInfos) && outputOtelSupported { - componentInfos = append(componentInfos, - componentInfo{ - ID: prometheusMonitoringComponentId, - BinaryName: metricBeatName, - RuntimeManager: component.OtelRuntimeManager, - }) + if b.config.C.MonitorMetrics && usingOtelRuntime(componentInfos) { + if outputOtelSupported { + componentInfos = append(componentInfos, + componentInfo{ + ID: prometheusMonitoringComponentId, + BinaryName: metricBeatName, + RuntimeManager: component.OtelRuntimeManager, + }) + } else { + b.logger.Warn("The Otel prometheus metrics monitoring input can't run in a beats process, skipping") + } + } // sort the components to ensure a consistent order of inputs in the configuration slices.SortFunc(componentInfos, func(a, b componentInfo) int { @@ -1525,14 +1540,13 @@ func isSupportedBeatsBinary(binaryName string) bool { return false } -func isOutputOtelSupported(outputCfg map[string]any) bool { +func verifyOutputOtelSupported(outputCfg map[string]any) error { parsed, err := component.ParseOutput(monitoringOutput, outputCfg, logp.InfoLevel, nil) if err != nil { - return false + return err } - err = translate.VerifyOutputIsOtelSupported(parsed.OutputType, outputCfg) - return err == nil + return translate.VerifyOutputIsOtelSupported(parsed.OutputType, outputCfg) } func monitoringDrop(path string) (drop string) { diff --git a/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go index 7999d4e408c..9155369ed22 100644 --- a/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go @@ -17,6 +17,8 @@ import ( "github.com/go-viper/mapstructure/v2" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" @@ -126,6 +128,7 @@ func TestMonitoringFull(t *testing.T) { }, }, agentInfo: agentInfo, + logger: logp.NewNopLogger(), } expectedConfigBytes, err := os.ReadFile(tc.ExpectedConfigPath) @@ -176,6 +179,7 @@ func TestMonitoringWithEndpoint(t *testing.T) { }, }, agentInfo: agentInfo, + logger: logp.NewNopLogger(), } policy := map[string]any{ @@ -350,6 +354,7 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) { config: tc.monitoringCfg, operatingSystem: runtime.GOOS, agentInfo: agentInfo, + logger: logp.NewNopLogger(), } got, err := b.MonitoringConfig(tc.policy, components, map[string]uint64{}) // put a componentID/binary mapping to have something in the beats monitoring input assert.NoError(t, err) @@ -583,6 +588,7 @@ func TestMonitoringConfigMetricsFailureThreshold(t *testing.T) { config: tc.monitoringCfg, operatingSystem: runtime.GOOS, agentInfo: agentInfo, + logger: logp.NewNopLogger(), } got, err := b.MonitoringConfig(tc.policy, components, map[string]uint64{}) // put a componentID/binary mapping to have something in the beats monitoring input assert.NoError(t, err) @@ -758,6 +764,7 @@ func TestErrorMonitoringConfigMetricsFailureThreshold(t *testing.T) { config: tc.monitoringCfg, operatingSystem: runtime.GOOS, agentInfo: agentInfo, + logger: logp.NewNopLogger(), } _, err := b.MonitoringConfig(tc.policy, components, map[string]uint64{}) // put a componentID/binary mapping to have something in the beats monitoring input @@ -797,6 +804,7 @@ func TestMonitoringConfigComponentFields(t *testing.T) { enabled: true, config: cfg, agentInfo: agentInfo, + logger: logp.NewNopLogger(), } components := []component.Component{ @@ -907,6 +915,7 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) { enabled: true, config: cfg, agentInfo: agentInfo, + logger: logp.NewNopLogger(), } components := []component.Component{ @@ -1005,6 +1014,7 @@ func TestMonitoringWithOtelRuntime(t *testing.T) { enabled: true, config: cfg, agentInfo: agentInfo, + logger: logp.NewNopLogger(), } components := []component.Component{ @@ -1090,6 +1100,7 @@ func TestEnrichArgs(t *testing.T) { b := &BeatsMonitor{ enabled: test.enabled, config: &test.config, + logger: logp.NewNopLogger(), } args := b.EnrichArgs(unitID, test.binaryName, nil) // replace socket path with placeholder, it's annoying to do cross-platform tests on these @@ -1123,7 +1134,7 @@ func TestMonitorReload(t *testing.T) { monitorcfg.MonitorLogs = false monitorcfg.MonitorMetrics = false - beatsMonitor := New(true, "", monitorcfg, nil) + beatsMonitor := New(true, "", monitorcfg, nil, logp.NewNopLogger()) assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false) assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false) @@ -1197,6 +1208,7 @@ func TestMonitoringConfigOtelOutputSupport(t *testing.T) { }, }, agentInfo: agentInfo, + logger: logp.NewNopLogger(), } policy := map[string]any{ diff --git a/internal/pkg/agent/install/componentvalidation/validation.go b/internal/pkg/agent/install/componentvalidation/validation.go index 47b19a448fe..63454e9dd55 100644 --- a/internal/pkg/agent/install/componentvalidation/validation.go +++ b/internal/pkg/agent/install/componentvalidation/validation.go @@ -125,7 +125,13 @@ func GetMonitoringFn(ctx context.Context, logger *logger.Logger, cfg map[string] return nil, fmt.Errorf("could not load agent info: %w", err) } - monitor := componentmonitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo) + monitor := componentmonitoring.New( + agentCfg.Settings.V1MonitoringEnabled, + agentCfg.Settings.DownloadConfig.OS(), + agentCfg.Settings.MonitoringConfig, + agentInfo, + logger, + ) return monitor.MonitoringConfig, nil } diff --git a/testing/integration/ess/beat_receivers_test.go b/testing/integration/ess/beat_receivers_test.go index 741dabb2933..71caba367c5 100644 --- a/testing/integration/ess/beat_receivers_test.go +++ b/testing/integration/ess/beat_receivers_test.go @@ -819,8 +819,9 @@ agent.monitoring.enabled: false // Log lines TestBeatsReceiverProcessRuntimeFallback checks for const ( - otelRuntimeUnsupportedLogLineStart = "otel runtime is not supported" - prometheusInputSkippedLogLine = "The Otel prometheus metrics monitoring input can't run in a beats process, skipping" + otelRuntimeUnsupportedLogLineStart = "otel runtime is not supported" + otelRuntimeMonitoringOutputUnsupportedLogLineStart = "otel runtime is not supported for monitoring output" + prometheusInputSkippedLogLine = "The Otel prometheus metrics monitoring input can't run in a beats process, skipping" ) // TestBeatsReceiverProcessRuntimeFallback verifies that we fall back to the process runtime if the otel runtime @@ -885,6 +886,7 @@ outputs: // verify we've logged a warning about using the process runtime var unsupportedLogRecords []map[string]any var prometheusUnsupportedLogRecord map[string]any + var monitoringOutputUnsupportedLogRecord map[string]any for _, line := range strings.Split(string(logsBytes), "\n") { line = strings.TrimSpace(line) if line == "" { @@ -902,6 +904,9 @@ outputs: if strings.HasPrefix(message, prometheusInputSkippedLogLine) { prometheusUnsupportedLogRecord = logRecord } + if strings.HasPrefix(message, otelRuntimeMonitoringOutputUnsupportedLogLineStart) { + monitoringOutputUnsupportedLogRecord = logRecord + } } } @@ -914,6 +919,7 @@ outputs: assert.Len(t, unsupportedLogRecords, 5, "one log line for each component we try to run") assert.NotEmpty(t, prometheusUnsupportedLogRecord, "should get a log line about Otel prometheus metrics input being skipped") + assert.NotEmpty(t, monitoringOutputUnsupportedLogRecord, "should get a log line about monitoring output not being supported") } // TestComponentWorkDir verifies that the component working directory is not deleted when moving the component from From b7965be55499410e1362c24f7ea33d2eb9eabc90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Fri, 21 Nov 2025 18:49:02 +0100 Subject: [PATCH 6/8] Fix monitoring config types --- .../agent/application/monitoring/component/v1_monitor.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/pkg/agent/application/monitoring/component/v1_monitor.go b/internal/pkg/agent/application/monitoring/component/v1_monitor.go index eb6b9d62070..6eb8ad73394 100644 --- a/internal/pkg/agent/application/monitoring/component/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/component/v1_monitor.go @@ -531,7 +531,7 @@ func (b *BeatsMonitor) injectLogsInput( "type": "filestream", useOutputKey: monitoringOutput, "streams": streams, - "_runtime_experimental": monitoringRuntime, + "_runtime_experimental": string(monitoringRuntime), } inputs := []any{input} @@ -594,7 +594,7 @@ func (b *BeatsMonitor) injectMetricsInput( "namespace": monitoringNamespace, }, "streams": beatsStreams, - "_runtime_experimental": monitoringRuntime, + "_runtime_experimental": string(monitoringRuntime), }, map[string]interface{}{ idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID), @@ -605,7 +605,7 @@ func (b *BeatsMonitor) injectMetricsInput( "namespace": monitoringNamespace, }, "streams": httpStreams, - "_runtime_experimental": monitoringRuntime, + "_runtime_experimental": string(monitoringRuntime), }, } From c20cea541f651c014e6c4238e5c8e66aaa5eb2b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Fri, 21 Nov 2025 21:52:56 +0100 Subject: [PATCH 7/8] fix TestBeatsReceiverProcessRuntimeFallback --- .../integration/ess/beat_receivers_test.go | 60 +++++++++++++++---- 1 file changed, 48 insertions(+), 12 deletions(-) diff --git a/testing/integration/ess/beat_receivers_test.go b/testing/integration/ess/beat_receivers_test.go index 71caba367c5..96f81ba4b70 100644 --- a/testing/integration/ess/beat_receivers_test.go +++ b/testing/integration/ess/beat_receivers_test.go @@ -819,7 +819,7 @@ agent.monitoring.enabled: false // Log lines TestBeatsReceiverProcessRuntimeFallback checks for const ( - otelRuntimeUnsupportedLogLineStart = "otel runtime is not supported" + otelRuntimeUnsupportedLogLineStart = "otel runtime is not supported for component" otelRuntimeMonitoringOutputUnsupportedLogLineStart = "otel runtime is not supported for monitoring output" prometheusInputSkippedLogLine = "The Otel prometheus metrics monitoring input can't run in a beats process, skipping" ) @@ -842,19 +842,31 @@ func TestBeatsReceiverProcessRuntimeFallback(t *testing.T) { config := `agent.logging.to_stderr: true agent.logging.to_files: false inputs: - # Collecting system metrics - type: system/metrics id: unique-system-metrics-input _runtime_experimental: otel streams: - metricsets: - cpu + - type: system/metrics + id: unique-system-metrics-input-2 + use_output: supported + _runtime_experimental: otel + streams: + - metricsets: + - cpu outputs: default: type: elasticsearch hosts: [http://localhost:9200] api_key: placeholder indices: [] # not supported by the elasticsearch exporter + supported: + type: elasticsearch + hosts: [http://localhost:9200] + api_key: placeholder + status_reporting: + enabled: false ` // this is the context for the whole test, with a global timeout defined @@ -877,8 +889,24 @@ outputs: var statusErr error status, statusErr := fixture.ExecStatus(ctx) assert.NoError(collect, statusErr) - // we should be running beats processes even though the otel runtime was requested - assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 4) + // we should be running beats processes for components with default output even though the otel runtime was requested + // agent should be healthy + assert.Equal(collect, int(cproto.State_HEALTHY), status.State) + assert.Equal(collect, 5, len(status.Components)) + + // all the components should be healthy, their units should be healthy, and they should identify + // themselves as running in the process runtime if they're using the default or monitoring outputs + for _, comp := range status.Components { + assert.Equal(collect, int(cproto.State_HEALTHY), comp.State) + expectedComponentVersionInfoName := componentVersionInfoNameForRuntime(component.OtelRuntimeManager) + if strings.HasSuffix(comp.ID, "default") || strings.HasSuffix(comp.ID, "monitoring") { + expectedComponentVersionInfoName = componentVersionInfoNameForRuntime(component.ProcessRuntimeManager) + } + assert.Equal(collect, expectedComponentVersionInfoName, comp.VersionInfo.Name) + for _, unit := range comp.Units { + assert.Equal(collect, int(cproto.State_HEALTHY), unit.State) + } + } }, 1*time.Minute, 1*time.Second) logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"}) require.NoError(t, err) @@ -917,7 +945,7 @@ outputs: } }) - assert.Len(t, unsupportedLogRecords, 5, "one log line for each component we try to run") + assert.Len(t, unsupportedLogRecords, 1, "one log line for each component we try to run") assert.NotEmpty(t, prometheusUnsupportedLogRecord, "should get a log line about Otel prometheus metrics input being skipped") assert.NotEmpty(t, monitoringOutputUnsupportedLogRecord, "should get a log line about monitoring output not being supported") } @@ -1068,13 +1096,8 @@ func assertCollectorComponentsHealthy(t *assert.CollectT, status *atesting.Agent } func assertBeatsHealthy(t *assert.CollectT, status *atesting.AgentStatusOutput, runtime component.RuntimeManager, componentCount int) { - var componentVersionInfoName string - switch runtime { - case "otel": - componentVersionInfoName = "beats-receiver" - default: - componentVersionInfoName = "beat-v2-client" - } + t.Helper() + componentVersionInfoName := componentVersionInfoNameForRuntime(runtime) // agent should be healthy assert.Equal(t, int(cproto.State_HEALTHY), status.State) @@ -1798,3 +1821,16 @@ func TestMonitoringNoDuplicates(t *testing.T) { combinedOutput, err = fut.Uninstall(ctx, &atesting.UninstallOpts{Force: true}) require.NoErrorf(t, err, "error uninstalling beat receiver agent monitoring, err: %s, combined output: %s", err, string(combinedOutput)) } + +func componentVersionInfoNameForRuntime(runtime component.RuntimeManager) string { + var componentVersionInfoName string + switch runtime { + case component.OtelRuntimeManager: + componentVersionInfoName = "beats-receiver" + case component.ProcessRuntimeManager: + componentVersionInfoName = "beat-v2-client" + default: + componentVersionInfoName = componentVersionInfoNameForRuntime(component.DefaultRuntimeManager) + } + return componentVersionInfoName +} From 375b928f269b0f93cce1a10548ab7811d368b7ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Tue, 25 Nov 2025 15:06:06 +0100 Subject: [PATCH 8/8] Add logstash output to test cases --- .../application/monitoring/component/v1_monitor_test.go | 9 +++++++++ internal/pkg/otel/translate/otelconfig_test.go | 6 ++++++ 2 files changed, 15 insertions(+) diff --git a/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go index 9155369ed22..eb71cc548d7 100644 --- a/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/component/v1_monitor_test.go @@ -1171,6 +1171,15 @@ func TestMonitoringConfigOtelOutputSupport(t *testing.T) { expectPrometheusMonitoring: false, monitoringRuntimeManager: monitoringcfg.ProcessRuntimeManager, }, + { + name: "logstash output - should NOT have prometheus monitoring", + outputConfig: map[string]any{ + "type": "logstash", + "hosts": []string{"localhost:9092"}, + }, + expectPrometheusMonitoring: false, + monitoringRuntimeManager: monitoringcfg.ProcessRuntimeManager, + }, { name: "elasticsearch output - should have prometheus monitoring", outputConfig: map[string]any{ diff --git a/internal/pkg/otel/translate/otelconfig_test.go b/internal/pkg/otel/translate/otelconfig_test.go index 0479251963f..3d8330fc82a 100644 --- a/internal/pkg/otel/translate/otelconfig_test.go +++ b/internal/pkg/otel/translate/otelconfig_test.go @@ -1379,6 +1379,12 @@ func TestVerifyOutputIsOtelSupported(t *testing.T) { outputCfg: map[string]any{}, expectedError: "unsupported output type: kafka", }, + { + name: "unsupported output type - logstash", + outputType: "logstash", + outputCfg: map[string]any{}, + expectedError: "unsupported output type: logstash", + }, { name: "unsupported configuration - indices field", outputType: "elasticsearch",