Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debounce input reload on autodiscover #35645

Merged
merged 15 commits into from
Jun 20, 2023
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]


*Filebeat*
- Fix input reload on autodiscover {issue}34388[34388] {pull}35645[35645]


*Heartbeat*
Expand Down
6 changes: 6 additions & 0 deletions dev-tools/mage/gotest.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ func BuildSystemTestGoBinary(binArgs TestBinaryArgs) error {
"test", "-c",
"-o", binArgs.Name + ".test",
}

if DevBuild {
// Disable optimizations (-N) and inlining (-l) for debugging.
args = append(args, `-gcflags=all=-N -l`)
}

if TestCoverage {
args = append(args, "-coverpkg", "./...")
}
Expand Down
18 changes: 17 additions & 1 deletion filebeat/tests/system/test_autodiscover.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,23 @@ def _test(self, container):
with open(os.path.join(self.working_dir, f'{container.name}.log'), 'wb') as f:
f.write(b'Busybox output 1\n')

self.wait_until(lambda: self.log_contains('Starting runner: input'))
docker_client = docker.from_env()

def wait_container_start():
for i, c in enumerate(docker_client.containers.list()):
if c.name == container.name:
return True

# Ensure the container is running before checkging
# if the input is running
self.wait_until(
wait_container_start,
name="wait for test container",
err_msg="the test container is not running yet")

self.wait_until(lambda: self.log_contains('Starting runner: input'),
name="wait for input to start",
err_msg="did not find 'Starting runner: input' in the logs")
self.wait_until(lambda: self.output_has(lines=1))

output = self.read_output_json()
Expand Down
79 changes: 43 additions & 36 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"fmt"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/autodiscover/meta"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
Expand All @@ -35,8 +33,8 @@ import (
)

const (
// If a config reload fails after a new event, a new reload will be run after this period
retryPeriod = 10 * time.Second
// defaultDebouncePeriod is the time autodiscover will wait before reloading inputs
defaultDebouncePeriod = time.Second
)

// EventConfigurer is used to configure the creation of configuration objects
Expand Down Expand Up @@ -65,10 +63,7 @@ type Autodiscover struct {
meta *meta.Map
listener bus.Listener
logger *logp.Logger

// workDone is a channel used for testing purpouses, to know when the worker has
// done some work.
workDone chan struct{}
debouncePeriod time.Duration
ycombinator marked this conversation as resolved.
Show resolved Hide resolved
}

// NewAutodiscover instantiates and returns a new Autodiscover manager
Expand All @@ -90,7 +85,7 @@ func NewAutodiscover(
for _, providerCfg := range c.Providers {
provider, err := Registry.BuildProvider(name, bus, providerCfg, keystore)
if err != nil {
return nil, errors.Wrap(err, "error in autodiscover provider settings")
return nil, fmt.Errorf("error in autodiscover provider settings: %w", err)
}
logger.Debugf("Configured autodiscover provider: %s", provider)
providers = append(providers, provider)
Expand All @@ -102,10 +97,11 @@ func NewAutodiscover(
factory: factory,
configurer: configurer,
configs: map[string]map[uint64]*reload.ConfigWithMeta{},
runners: cfgfile.NewRunnerList("autodiscover", factory, pipeline),
runners: cfgfile.NewRunnerList("autodiscover.cfgfile", factory, pipeline),
providers: providers,
meta: meta.NewMap(),
logger: logger,
debouncePeriod: defaultDebouncePeriod,
}, nil
}

Expand All @@ -132,6 +128,7 @@ func (a *Autodiscover) Start() {

func (a *Autodiscover) worker() {
var updated, retry bool
t := time.NewTimer(defaultDebouncePeriod)

for {
select {
Expand All @@ -142,38 +139,48 @@ func (a *Autodiscover) worker() {
}

if _, ok := event["start"]; ok {
updated = a.handleStart(event)
// if updated is true, we don't want to set it back to false
if a.handleStart(event) {
updated = true
}
}
if _, ok := event["stop"]; ok {
updated = a.handleStop(event)
// if updated is true, we don't want to set it back to false
if a.handleStop(event) {
updated = true
}
}

case <-time.After(retryPeriod):
}
case <-t.C:
if updated || retry {
a.logger.Debugf("Reloading autodiscover configs reason: updated: %t, retry: %t", updated, retry)

if updated || retry {
if retry {
a.logger.Debug("Reloading existing autodiscover configs after error")
}
configs := []*reload.ConfigWithMeta{}
for _, list := range a.configs {
for _, c := range list {
configs = append(configs, c)
}
}

configs := []*reload.ConfigWithMeta{}
for _, list := range a.configs {
for _, c := range list {
configs = append(configs, c)
a.logger.Debugf("calling reload with %d config(s)", len(configs))
err := a.runners.Reload(configs)

// reset updated status
updated = false

// On error, make sure the next run also updates because some runners were not properly loaded
retry = err != nil
if retry {
// The recoverable errors that can lead to retry are related
// to the harvester state, so we need to give the publishing
// pipeline some time to finish flushing the events from that
// file. Hence we wait for 10x the normal debounce period.
t.Reset(10 * a.debouncePeriod)
continue
}
}

err := a.runners.Reload(configs)

// On error, make sure the next run also updates because some runners were not properly loaded
retry = err != nil
// reset updated status
updated = false
}

// For testing purpouses.
if a.workDone != nil {
a.workDone <- struct{}{}
t.Reset(a.debouncePeriod)
}
}
}
Expand Down Expand Up @@ -233,9 +240,9 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {

err = a.factory.CheckConfig(config)
if err != nil {
a.logger.Error(errors.Wrap(err, fmt.Sprintf(
"Auto discover config check failed for config '%s', won't start runner",
conf.DebugString(config, true))))
a.logger.Errorf(
"Auto discover config check failed for config '%s', won't start runner, err: %s",
conf.DebugString(config, true), err)
continue
}
newCfg[hash] = &reload.ConfigWithMeta{
Expand Down
Loading
Loading