Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support performance presets in the Elasticsearch output #37259

Merged
merged 17 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ type elasticsearchConfig struct {
Backoff Backoff `config:"backoff"`
NonIndexablePolicy *config.Namespace `config:"non_indexable_policy"`
AllowOlderVersion bool `config:"allow_older_versions"`
Queue config.Namespace `config:"queue"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
Queue config.Namespace `config:"queue"`
}

type Backoff struct {
Expand Down
130 changes: 130 additions & 0 deletions libbeat/outputs/elasticsearch/config_presets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package elasticsearch

import (
"fmt"
"strings"
"time"

"github.com/elastic/elastic-agent-libs/config"
)

const (
presetNone = ""
presetCustom = "custom"
presetBalanced = "balanced"
presetThroughput = "throughput"
presetScale = "scale"
presetLatency = "latency"
)

var presetConfigs = map[string]*config.C{
presetNone: config.MustNewConfigFrom(map[string]interface{}{}),
presetCustom: config.MustNewConfigFrom(map[string]interface{}{}),
presetBalanced: config.MustNewConfigFrom(map[string]interface{}{
"bulk_max_size": 1600,
"worker": 1,
"queue.mem.events": 3200,
"queue.mem.flush.min_events": 1600,
"queue.mem.flush.timeout": 10 * time.Second,
"compression_level": 1,
"idle_connection_timeout": 3 * time.Second,
}),
presetThroughput: config.MustNewConfigFrom(map[string]interface{}{
"bulk_max_size": 1600,
"worker": 4,
"queue.mem.events": 12800,
"queue.mem.flush.min_events": 1600,
"queue.mem.flush.timeout": 5 * time.Second,
"compression_level": 1,
"idle_connection_timeout": 15 * time.Second,
}),
presetScale: config.MustNewConfigFrom(map[string]interface{}{
"bulk_max_size": 1600,
"worker": 1,
"queue.mem.events": 3200,
"queue.mem.flush.min_events": 1600,
"queue.mem.flush.timeout": 20 * time.Second,
"compression_level": 1,
"idle_connection_timeout": 1 * time.Second,
}),
presetLatency: config.MustNewConfigFrom(map[string]interface{}{
"bulk_max_size": 50,
"worker": 1,
"queue.mem.events": 4100,
"queue.mem.flush.min_events": 2050,
"queue.mem.flush.timeout": 1 * time.Second,
"compression_level": 1,
"idle_connection_timeout": 60 * time.Second,
}),
}

// Given a user config, check its preset field and apply any corresponding
// config overrides.
// Returns a list of the user fields that were overwritten.
func applyPreset(preset string, userConfig *config.C) ([]string, error) {
presetConfig := presetConfigs[preset]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to do presetConfig, ok := to check if the preset exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what the presetConfig == nil check on the next line is for, the ok check wouldn't catch anything that nil doesn't already

if presetConfig == nil {
return nil, fmt.Errorf("unknown preset value %v", preset)
}

// Check for any user-provided fields that overlap with the preset.
// Queue parameters have special handling since they must be applied
// as a group so all queue parameters conflict with each other.
userKeys := userConfig.FlattenedKeys()
presetKeys := presetConfig.FlattenedKeys()
presetConfiguresQueue := listContainsPrefix(presetKeys, "queue.")
overridden := []string{}
for _, key := range userKeys {
if strings.HasPrefix(key, "queue.") && presetConfiguresQueue {
overridden = append(overridden, key)
} else if listContainsStr(presetKeys, key) {
faec marked this conversation as resolved.
Show resolved Hide resolved
overridden = append(overridden, key)
}
}
// Remove the queue parameters if needed, then merge the preset
// config on top of the user config.
if presetConfiguresQueue {
_, _ = userConfig.Remove("queue", -1)
}
err := userConfig.Merge(presetConfig)
if err != nil {
return nil, err
}
return overridden, nil
}

// TODO: Replace this with slices.Contains once we hit Go 1.21.
func listContainsStr(list []string, str string) bool {
for _, s := range list {
if s == str {
return true
}
}
return false
}

func listContainsPrefix(list []string, prefix string) bool {
for _, s := range list {
if strings.HasPrefix(s, prefix) {
return true
}
}
return false
}
201 changes: 201 additions & 0 deletions libbeat/outputs/elasticsearch/config_presets_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package elasticsearch

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/elastic-agent-libs/config"
)

func TestApplyPresetNoConflicts(t *testing.T) {
const testHost = "http://elastic-host:9200"
cfg := config.MustNewConfigFrom(map[string]interface{}{
"hosts": []string{testHost},

// Set some parameters that aren't affected by performance presets
"max_retries": 5,
"loadbalance": true,
})
// Apply the preset and make sure no conflicts are reported.
conflicts, err := applyPreset(presetThroughput, cfg)
require.NoError(t, err, "Valid preset must apply successfully")
assert.Equal(t, 0, len(conflicts), "applyPreset should report no conflicts from non-preset fields")

// Unpack the final config into elasticsearchConfig and verify that both user
// and preset fields are set correctly.
esConfig := elasticsearchConfig{}
err = cfg.Unpack(&esConfig)
require.NoError(t, err, "Config should unpack successfully")

// Check basic user params
assert.Equal(t, 5, esConfig.MaxRetries, "Non-preset fields should be unchanged by applyPreset")
assert.Equal(t, true, esConfig.LoadBalance, "Non-preset fields should be unchanged by applyPreset")

// Check basic preset params
assert.Equal(t, 1600, esConfig.BulkMaxSize, "Preset fields should be set by applyPreset")
assert.Equal(t, 1, esConfig.CompressionLevel, "Preset fields should be set by applyPreset")
assert.Equal(t, 15*time.Second, esConfig.Transport.IdleConnTimeout, "Preset fields should be set by applyPreset")

// Check preset queue params
var memQueueConfig struct {
Events int `config:"events"`
FlushMinEvents int `config:"flush.min_events"`
FlushTimeout time.Duration `config:"flush.timeout"`
}
require.Equal(t, "mem", esConfig.Queue.Name(), "applyPreset should configure the memory queue")
err = esConfig.Queue.Config().Unpack(&memQueueConfig)
assert.NoError(t, err, "applyPreset should set valid memory queue config")

assert.Equal(t, 12800, memQueueConfig.Events, "Queue fields should match preset definition")
assert.Equal(t, 1600, memQueueConfig.FlushMinEvents, "Queue fields should match preset definition")
assert.Equal(t, 5*time.Second, memQueueConfig.FlushTimeout, "Queue fields should match preset definition")

// Check calculated hosts, which should contain one copy of the user config
// hosts for each configured worker (which for presetThroughput is 4).
hosts, err := outputs.ReadHostList(cfg)
require.NoError(t, err, "ReadHostList should succeed")
assert.Equal(t, 4, len(hosts), "'throughput' preset should create 4 workers per host")
for _, host := range hosts {
assert.Equal(t, testHost, host, "Computed hosts should match user config")
}
}

func TestApplyPresetWithConflicts(t *testing.T) {
const testHost = "http://elastic-host:9200"
cfg := config.MustNewConfigFrom(map[string]interface{}{
"hosts": []string{testHost},

// Set parameters contained in the performance presets, with
// arbitrary numbers that do not match the preset values so we can
// make sure everything is overridden.
"bulk_max_size": 100,
"worker": 10,
"queue.mem.events": 1000,
"queue.mem.flush.min_events": 100,
"queue.mem.flush.timeout": 100 * time.Second,
"compression_level": 5,
"idle_connection_timeout": 100 * time.Second,
})
// Apply the preset and ensure all preset fields are reported as conflicts
conflicts, err := applyPreset(presetBalanced, cfg)
require.NoError(t, err, "Valid preset must apply successfully")
expectedConflicts := []string{
"bulk_max_size",
"worker",
"queue.mem.events",
"queue.mem.flush.min_events",
"queue.mem.flush.timeout",
"compression_level",
"idle_connection_timeout",
}
assert.ElementsMatch(t, expectedConflicts, conflicts, "All preset fields should be reported as overridden")

// Unpack the final config into elasticsearchConfig and verify that user
// fields were overridden
esConfig := elasticsearchConfig{}
err = cfg.Unpack(&esConfig)
require.NoError(t, err, "Valid config tree must unpack successfully")

// Check basic preset params
assert.Equal(t, 1600, esConfig.BulkMaxSize, "Preset fields should be set by applyPreset")
assert.Equal(t, 1, esConfig.CompressionLevel, "Preset fields should be set by applyPreset")
assert.Equal(t, 3*time.Second, esConfig.Transport.IdleConnTimeout, "Preset fields should be set by applyPreset")

// Check preset queue params
var memQueueConfig struct {
Events int `config:"events"`
FlushMinEvents int `config:"flush.min_events"`
FlushTimeout time.Duration `config:"flush.timeout"`
}
require.Equal(t, "mem", esConfig.Queue.Name(), "applyPreset should configure the memory queue")
err = esConfig.Queue.Config().Unpack(&memQueueConfig)
assert.NoError(t, err, "applyPreset should set valid memory queue config")

assert.Equal(t, 3200, memQueueConfig.Events, "Queue fields should match preset definition")
assert.Equal(t, 1600, memQueueConfig.FlushMinEvents, "Queue fields should match preset definition")
assert.Equal(t, 10*time.Second, memQueueConfig.FlushTimeout, "Queue fields should match preset definition")

// Check calculated hosts, which should contain one copy of the user config
// hosts for each configured worker (which for presetBalanced is 1).
hosts, err := outputs.ReadHostList(cfg)
require.NoError(t, err, "ReadHostList should succeed")
require.Equal(t, 1, len(hosts), "'balanced' preset should create 1 worker per host")
assert.Equal(t, testHost, hosts[0])
}

func TestApplyPresetCustom(t *testing.T) {
const testHost = "http://elastic-host:9200"
cfg := config.MustNewConfigFrom(map[string]interface{}{
"hosts": []string{testHost},

// Set parameters contained in the performance presets, with
// arbitrary numbers that do not match the preset values so we can
// make sure nothing is overridden.
"bulk_max_size": 100,
"worker": 2,
"queue.mem.events": 1000,
"queue.mem.flush.min_events": 100,
"queue.mem.flush.timeout": 100 * time.Second,
"compression_level": 5,
"idle_connection_timeout": 100 * time.Second,
})
// Apply the preset and make sure no conflicts are reported.
conflicts, err := applyPreset(presetCustom, cfg)
require.NoError(t, err, "Custom preset must apply successfully")
assert.Equal(t, 0, len(conflicts), "applyPreset should report no conflicts when preset is 'custom'")

// Unpack the final config into elasticsearchConfig and verify that both user
// and preset fields are set correctly.
esConfig := elasticsearchConfig{}
err = cfg.Unpack(&esConfig)
require.NoError(t, err, "Config should unpack successfully")

// Check basic user params
assert.Equal(t, 100, esConfig.BulkMaxSize, "Preset fields should be set by applyPreset")
assert.Equal(t, 5, esConfig.CompressionLevel, "Preset fields should be set by applyPreset")
assert.Equal(t, 100*time.Second, esConfig.Transport.IdleConnTimeout, "Preset fields should be set by applyPreset")

// Check user queue params
var memQueueConfig struct {
Events int `config:"events"`
FlushMinEvents int `config:"flush.min_events"`
FlushTimeout time.Duration `config:"flush.timeout"`
}
require.Equal(t, "mem", esConfig.Queue.Name(), "applyPreset with custom preset should preserve user queue settings")
err = esConfig.Queue.Config().Unpack(&memQueueConfig)
assert.NoError(t, err, "Queue settings should unpack successfully")

assert.Equal(t, 1000, memQueueConfig.Events, "Queue fields should match preset definition")
assert.Equal(t, 100, memQueueConfig.FlushMinEvents, "Queue fields should match preset definition")
assert.Equal(t, 100*time.Second, memQueueConfig.FlushTimeout, "Queue fields should match preset definition")

// Check calculated hosts, which should contain one copy of the user config
// hosts for each configured worker (which in this case means 2).
hosts, err := outputs.ReadHostList(cfg)
require.NoError(t, err, "ReadHostList should succeed")
assert.Equal(t, 2, len(hosts), "'custom' preset should leave worker count unchanged")
for _, host := range hosts {
assert.Equal(t, testHost, host, "Computed hosts should match user config")
}
}
16 changes: 16 additions & 0 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ func makeES(
}

esConfig := defaultConfig
preset, err := cfg.String("preset", -1)
if err == nil && preset != "" {
// Performance preset is present, apply it and log any fields that
// were overridden
overriddenFields, err := applyPreset(preset, cfg)
if err != nil {
return outputs.Fail(err)
}
for _, field := range overriddenFields {
log.Warnf("Setting '%v' is ignored because of performance preset '%v'",
faec marked this conversation as resolved.
Show resolved Hide resolved
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
field, preset)
}
}

// Unpack the full config, including any performance preset overrides,
// into the config struct.
if err := cfg.Unpack(&esConfig); err != nil {
return outputs.Fail(err)
}
Expand Down
6 changes: 3 additions & 3 deletions libbeat/outputs/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package outputs
import "github.com/elastic/elastic-agent-libs/config"

// ReadHostList reads a list of hosts to connect to from an configuration
// object. If the `workers` settings is > 1, each host is duplicated in the final
// host list by the number of `workers`.
// object. If the `worker` settings is > 1, each host is duplicated in the final
// host list by the number of `worker`.
func ReadHostList(cfg *config.C) ([]string, error) {
config := struct {
Hosts []string `config:"hosts" validate:"required"`
Expand All @@ -40,7 +40,7 @@ func ReadHostList(cfg *config.C) ([]string, error) {
return lst, nil
}

// duplicate entries config.Workers times
// duplicate entries config.Worker times
hosts := make([]string, 0, len(lst)*config.Worker)
for _, entry := range lst {
for i := 0; i < config.Worker; i++ {
Expand Down