diff --git a/changelog/fragments/1764016303-allow-host-to-be-a-string-for-otel-configuration-translation.yaml b/changelog/fragments/1764016303-allow-host-to-be-a-string-for-otel-configuration-translation.yaml new file mode 100644 index 00000000000..a046b112df1 --- /dev/null +++ b/changelog/fragments/1764016303-allow-host-to-be-a-string-for-otel-configuration-translation.yaml @@ -0,0 +1,46 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: allow host to be a string for otel configuration translation + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/11394 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/11352 + diff --git a/internal/pkg/otel/translate/output_elasticsearch.go b/internal/pkg/otel/translate/output_elasticsearch.go index 36e0c4ea708..eb403b142d6 100644 --- a/internal/pkg/otel/translate/output_elasticsearch.go +++ b/internal/pkg/otel/translate/output_elasticsearch.go @@ -10,6 +10,7 @@ import ( "fmt" "net/url" "reflect" + "slices" "strings" "time" @@ -27,7 +28,6 @@ import ( type esToOTelOptions struct { elasticsearch.ElasticsearchConfig `config:",inline"` - outputs.HostWorkerCfg `config:",inline"` Index string `config:"index"` Preset string `config:"preset"` @@ -38,9 +38,6 @@ var defaultOptions = esToOTelOptions{ Index: "", // Dynamic routing is disabled if index is set Preset: "custom", // default is custom if not set - HostWorkerCfg: outputs.HostWorkerCfg{ - Workers: 1, - }, } // ToOTelConfig converts a Beat config into OTel elasticsearch exporter config @@ -97,13 +94,19 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error) } // Create url using host name, protocol and path + outputHosts, err := outputs.ReadHostList(output) + if err != nil { + return nil, fmt.Errorf("error reading host list: %w", err) + } hosts := []string{} - for _, h := range escfg.Hosts { + for _, h := range outputHosts { esURL, err := common.MakeURL(escfg.Protocol, escfg.Path, h, 9200) if err != nil { return nil, fmt.Errorf("cannot generate ES URL from host %w", err) } - hosts = append(hosts, esURL) + if !slices.Contains(hosts, esURL) { + hosts = append(hosts, esURL) + } } otelYAMLCfg := map[string]any{ @@ -114,7 +117,7 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error) // where it could spin as many goroutines as it liked. // Given that batcher implementation can change and it has a history of such changes, // let's keep max_conns_per_host setting for now and remove it once exporterhelper is stable. - "max_conns_per_host": getTotalNumWorkers(escfg), // num_workers * len(hosts) if loadbalance is true + "max_conns_per_host": getTotalNumWorkers(output), // num_workers * len(hosts) if loadbalance is true // Retry "retry": map[string]any{ @@ -135,7 +138,7 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error) "queue_size": getQueueSize(logger, output), "block_on_overflow": true, "wait_for_result": true, - "num_consumers": getTotalNumWorkers(escfg), // num_workers * len(hosts) if loadbalance is true + "num_consumers": getTotalNumWorkers(output), // num_workers * len(hosts) if loadbalance is true }, "mapping": map[string]any{ @@ -171,13 +174,14 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error) return otelYAMLCfg, nil } -func getTotalNumWorkers(escfg esToOTelOptions) int { - // calculate total workers - totalWorkers := escfg.NumWorkers() - if escfg.LoadBalance && len(escfg.Hosts) > 1 { - totalWorkers = (escfg.NumWorkers() * len(escfg.Hosts)) +// getTotalNumWorkers returns the number of hosts that beats would +// have used taking into account hosts, loadbalance and worker +func getTotalNumWorkers(cfg *config.C) int { + hostList, err := outputs.ReadHostList(cfg) + if err != nil { + return 1 } - return totalWorkers + return len(hostList) } // log warning for unsupported config diff --git a/internal/pkg/otel/translate/output_elasticsearch_test.go b/internal/pkg/otel/translate/output_elasticsearch_test.go index 992a2669181..b151176c219 100644 --- a/internal/pkg/otel/translate/output_elasticsearch_test.go +++ b/internal/pkg/otel/translate/output_elasticsearch_test.go @@ -82,7 +82,6 @@ compression_params: require.NoError(t, err, "error translating elasticsearch output to ES exporter config") expOutput := newFromYamlString(t, OTelCfg) compareAndAssert(t, expOutput, confmap.NewFromStringMap(got)) - }) t.Run("test api key is encoded before mapping to es-exporter", func(t *testing.T) { @@ -128,7 +127,50 @@ compression_params: require.NoError(t, err, "error translating elasticsearch output to ES exporter config ") expOutput := newFromYamlString(t, OTelCfg) compareAndAssert(t, expOutput, confmap.NewFromStringMap(got)) + }) + + t.Run("test hosts can be a string", func(t *testing.T) { + beatCfg := ` +hosts: "localhost:9200" +index: "some-index" +api_key: "TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA" +` + OTelCfg := ` +endpoints: + - http://localhost:9200 +logs_index: some-index +logs_dynamic_pipeline: + enabled: true +retry: + enabled: true + initial_interval: 1s + max_interval: 1m0s + max_retries: 3 +sending_queue: + batch: + flush_timeout: 10s + max_size: 1600 + min_size: 0 + sizer: items + block_on_overflow: true + enabled: true + num_consumers: 1 + queue_size: 3200 + wait_for_result: true +mapping: + mode: bodymap +max_conns_per_host: 1 +api_key: VGlOQUdHNEJhYU1kYUgxdFJmdVU6S25SNnlFNDFSclNvd2Iwa1EwSFdvQQ== +compression: gzip +compression_params: + level: 1 + ` + cfg := config.MustNewConfigFrom(beatCfg) + got, err := ToOTelConfig(cfg, logger) + require.NoError(t, err, "error translating elasticsearch output to ES exporter config ") + expOutput := newFromYamlString(t, OTelCfg) + compareAndAssert(t, expOutput, confmap.NewFromStringMap(got)) }) // when preset is configured, we only test worker, bulk_max_size @@ -281,9 +323,7 @@ sending_queue: compareAndAssert(t, expOutput, confmap.NewFromStringMap(got)) }) } - }) - } func TestCompressionConfig(t *testing.T) { @@ -345,7 +385,6 @@ compression: none compareAndAssert(t, expOutput, confmap.NewFromStringMap(got)) }) } - } func newFromYamlString(t *testing.T, input string) *confmap.Conf {