Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filebeat: Error when using autodiscover + hints + templates with filebeat 8.6 #34388

Closed
Laffs2k5 opened this issue Jan 25, 2023 · 9 comments · Fixed by #35645
Closed

Filebeat: Error when using autodiscover + hints + templates with filebeat 8.6 #34388

Laffs2k5 opened this issue Jan 25, 2023 · 9 comments · Fixed by #35645
Assignees
Labels
Team:Cloudnative-Monitoring Label for the Cloud Native Monitoring team Team:Elastic-Agent Label for the Agent team

Comments

@Laffs2k5
Copy link

FYI: Not a confirmed bug, no response received on discuss.elastic.co.

Requested information:

After upgrading our filebeat kubernetes daemonset from 7.17.8 to 8.6.0 I can't seem to get rid of errors of this type:

log.level: error
log.logger: autodiscover
log.origin.file.line: 109
log.origin.file.name: cfgfile/list.go

message:
Error creating runner from config: failed to create input: Can only start an input when all related states are finished: {Id: ea745ab688be85a9-native::3884203-2049, Finished: false, Fileinfo: &{frontend-86c8579b5b-mhnpg_helpdesk-frontend_frontend-mgmt-1cc73434a92abe9b93d9a3d971cfc4182e8ce64ac0e03f0c6e395875236fd514.log 374 416 {204820038 63804978609 0x56347552d700} {2049 3884203 1 33184 0 0 0 0 374 4096 8 {1669381808 728813408} {1669381809 204820038} {1669381809 204820038} [0 0 0]}}, Source: /var/log/containers/frontend-86c8579b5b-mhnpg_helpdesk-frontend_frontend-mgmt-1cc73434a92abe9b93d9a3d971cfc4182e8ce64ac0e03f0c6e395875236fd514.log, Offset: 0, Timestamp: 2023-01-19 13:38:27.166489276 +0000 UTC m=+58865.698641043, TTL: -1ns, Type: container, Meta: map[stream:stdout], FileStateOS: 3884203-2049}

The number of errors varies depending of the number of pods deployed. In our current production cluster I'm observing roughly 60k messages pr. 24h.

Filebeat is currently deployed as a daemonset using the official helm chart version 8.5.1 and running in Azure AKS, kubernetes version 1.25.4. (recently upgraded, the errors were also appearing in kubernetes version 1.24.6)

This is the relevant part our current filebeat configuration (I've excluded output.* and setup.*):

        logging:
          level: warning
          metrics.enabled: false
          json: true
        processors:
          - # disable logs from select sources
            drop_event.when.or:
              - equals.kubernetes.labels.app: "secrets-store-csi-driver"
              - equals.kubernetes.labels.app: "secrets-store-provider-azure"
              - equals.kubernetes.labels.app: "konnectivity-agent"
        filebeat.autodiscover:
          providers:
            - type: kubernetes
              node: ${NODE_NAME}
              cleanup_timeout: 2m
              hints.enabled: true
              hints.default_config:
                type: container
                paths:
                  - /var/log/containers/*-${data.kubernetes.container.id}.log
              templates:
                - # nginx logs: configure the filebeat nginx module
                  condition.equals:
                    # This pod annotation must be set on the app during deployment for this template to be applied
                    # See available fields for matching here:
                    #   https://www.elastic.co/guide/en/beats/filebeat/current/configuration-autodiscover.html#_kubernetes
                    kubernetes.annotations.no.dsb-norge.filebeat/autodiscover-template: nginx
                  config:
                    - module: nginx
                      ingress_controller:
                        enabled: false
                      access:
                        enabled: true
                        input:
                          type: container
                          stream: stdout
                          paths:
                            - /var/log/containers/*-${data.kubernetes.container.id}.log
                      error:
                        enabled: true
                        input:
                          type: container
                          stream: stderr
                          paths:
                            - /var/log/containers/*-${data.kubernetes.container.id}.log

I'm able to avoid the error by either removing the templates:-config or by disabling the hints.default_config. Either of these are not suitable solutions for us as they result in missing logs or logs not being parsed correctly.

The error messages all refer to log files from our nginx pods. We have multiple other types of deployments without issues indicated by filebeat. Since we are using the nginx module conditionally for these pods this leads me to think there's some kind of race condition happening when the nginx module is applied with templates-config in combination with default hints configuration.

We were able to achieve autodiscover with hints (including default_config) and templates using filebeat 7.17.8 just fine without errors. This was running on the same Kubernetes version and deployed with official helm chart version 7.17.3.

First I thought maybe I was experiencing #11834. But after reading I saw that this was fixed in filebeat 7.9.0. Reading a bit further I saw that there were a couple of more issues resulting in this error but those had also been fixed:

I have verified that we are not missing log entries and therefore I'm suspecting that my issue is also a "recoverable error", and that it should possibly not be logged on error level 🤷‍♀️

I also saw #33653 , we are not using agent/fleet and with my limited knowledge I cannot determine if it's related or not.

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Jan 25, 2023
@gizas gizas added the Team:Cloudnative-Monitoring Label for the Cloud Native Monitoring team label Feb 14, 2023
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Feb 14, 2023
@gizas
Copy link
Contributor

gizas commented Feb 15, 2023

Sorry for the delay in this! We have planned the story and we will come back once we have any update

@Laffs2k5
Copy link
Author

Not a problem. Just let me know when/if you need any more information :-)

@BBQigniter
Copy link
Contributor

spotted same errors with our ECK (2.6.1) managed Filebeat (8.6.1) instances

@rdner
Copy link
Member

rdner commented Feb 22, 2023

Looks like the above-mentioned error comes from the log input (used under the hood of the container input):

// In case a input is tried to be started with an unfinished state matching the glob pattern
if !state.Finished {
return &common.ErrInputNotFinished{State: state.String()}
}

This might indicate a race condition between stopping the log input harvester for a certain state (file):

defer func() {
// Channel to stop internal harvester routines
h.stop()
// Makes sure file is properly closed when the harvester is stopped
h.cleanup()
harvesterRunning.Add(-1)
// Marks harvester stopping completed
h.stopWg.Done()
}()

func (h *Harvester) cleanup() {
// Mark harvester as finished
h.state.Finished = true

And starting a new log input with a re-used context containing the same states.

@belimawr might give more context since he's investigated a similar issue here #33653

In the near feature (#34393) we're going to migrate the container input to using filestream instead of log as its implementation, this might fix the issue.

@cmacknz cmacknz added the Team:Elastic-Agent Label for the Agent team label Feb 22, 2023
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent (Team:Elastic-Agent)

@cmacknz
Copy link
Member

cmacknz commented Feb 22, 2023

Once we solve #34393, we'll come back and verify whether that was enough to solve this problem.

@belimawr
Copy link
Contributor

I gave a quick read and this issue and #33653 might be related or even the same.

Even though #33653 mentions the Elastic-Agent, the bits of code that cause the issue are related to starting/stopping inputs. Kubernetes autodiscover probably uses the same code path/logic when discovering new pods, hence I strongly believe it's the same issue.

I need to do some investigation/testing to be 100% sure.

@belimawr
Copy link
Contributor

Overview

The log input keeps a global state for all files it harvesters, there is also a number of abstractions around an input that need to be correctly shutdown when an input is shutdown.

On a high level, the abstrations are:
RunnerList -> Runner -> Input -> Harvester -> Reader
RunnerList is the outer most abstraction and it is responsible for starting/stopping inputs, the main point for reloading configuration (starting/stopping inputs) is it's Reload funcition:

func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {
r.mutex.Lock()
defer r.mutex.Unlock()
var errs multierror.Errors
startList := map[uint64]*reload.ConfigWithMeta{}
stopList := r.copyRunnerList()
r.logger.Debugf("Starting reload procedure, current runners: %d", len(stopList))
// diff current & desired state, create action lists
for _, config := range configs {
hash, err := HashConfig(config.Config)
if err != nil {
r.logger.Errorf("Unable to hash given config: %s", err)
errs = append(errs, errors.Wrap(err, "Unable to hash given config"))
continue
}
if _, ok := r.runners[hash]; ok {
delete(stopList, hash)
} else {
startList[hash] = config
}
}
r.logger.Debugf("Start list: %d, Stop list: %d", len(startList), len(stopList))
wg := sync.WaitGroup{}
// Stop removed runners
for hash, runner := range stopList {
wg.Add(1)
r.logger.Debugf("Stopping runner: %s", runner)
delete(r.runners, hash)
go func(runner Runner) {
defer wg.Done()
runner.Stop()
r.logger.Debugf("Runner: '%s' has stopped", runner)
}(runner)
moduleStops.Add(1)
}
// Wait for all runners to stop before starting new ones
wg.Wait()
// Start new runners
for hash, config := range startList {
runner, err := createRunner(r.factory, r.pipeline, config)
if err != nil {
if _, ok := err.(*common.ErrInputNotFinished); ok {
// error is related to state, we should not log at error level
r.logger.Debugf("Error creating runner from config: %s", err)
} else {
r.logger.Errorf("Error creating runner from config: %s", err)
}
errs = append(errs, errors.Wrap(err, "Error creating runner from config"))
continue
}
r.logger.Debugf("Starting runner: %s", runner)
r.runners[hash] = runner
runner.Start()
moduleStarts.Add(1)
}
// NOTE: This metric tracks the number of modules in the list. The true
// number of modules in the running state may differ because modules can
// stop on their own (i.e. on errors) and also when this stops a module
// above it is done asynchronously.
moduleRunning.Set(int64(len(r.runners)))
return errs.Err()
}

To map a runner to a config, it hashes the config (the yaml fields of an input), based on this hash it creates a list of runners (that wraps inputs) to be stopped and started.
The first thing it does is to create a goroutine to stop each runner, then it waits all of them to finish, finally it moves on to start the new runners (inputs). It is possible that a file that was harvested by an runner that stopped will be harvested by a runner that's starting. There are at least two common cases for this: Kubernetes autodiscovery and Elastic-Agent policy updates.

The state of a file is represented by the State struct:

// State is used to communicate the reading state of a file
type State struct {
Id string `json:"id" struct:"id"`
PrevId string `json:"prev_id" struct:"prev_id"`
Finished bool `json:"-" struct:"-"` // harvester state
Fileinfo os.FileInfo `json:"-" struct:"-"` // the file info
Source string `json:"source" struct:"source"`
Offset int64 `json:"offset" struct:"offset"`
Timestamp time.Time `json:"timestamp" struct:"timestamp"`
TTL time.Duration `json:"ttl" struct:"ttl"`
Type string `json:"type" struct:"type"`
Meta map[string]string `json:"meta" struct:"meta,omitempty"`
FileStateOS file.StateOS `json:"FileStateOS" struct:"FileStateOS"`
IdentifierName string `json:"identifier_name" struct:"identifier_name"`
}

One of it's attributes is the Finished flag, as the documentation states, it represents the "harvester state", in other words, if the file is actively being read or not.

There is a global Registrar that keeps the state for every file harvested by any instance of the log input. The registrar has got a Run loop that runs on an independent goroutine and is responsible for reading a slice of states from a channel and updating them.

A copy of the harvester state is propagated with each event published by the input, the harvester also keeps its own copy of this struct, whenever an event (or a batch of events) is acked, their states are sent by the acker to the registrar. The acker code responsible for those states update is:

// eventAcker handles publisher pipeline ACKs and forwards
// them to the registrar or directly to the stateless logger.
func eventACKer(statelessOut statelessLogger, statefulOut statefulLogger) beat.ACKer {
log := logp.NewLogger("acker")
return acker.EventPrivateReporter(func(_ int, data []interface{}) {
stateless := 0
states := make([]file.State, 0, len(data))
for _, datum := range data {
if datum == nil {
stateless++
continue
}
st, ok := datum.(file.State)
if !ok {
stateless++
continue
}
states = append(states, st)
}
if len(states) > 0 {
log.Debugw("stateful ack", "count", len(states))
statefulOut.Published(states)
}
if stateless > 0 {
log.Debugw("stateless ack", "count", stateless)
statelessOut.Published(stateless)
}
})
}

The log input uses "stateful events", so they're sent to statefulOut.Published. They're sent through the registrarLogger that shares a channel with the global instance of the registrar.

One very important thing to notice is that the Registrar's channel for updates is a buffered channel of length 1:

Channel: make(chan []file.State, 1),

This means that the acker call to publish states as well as any other call to update states is non blocking.

When the harvester stops (its Run method returns), there is a call to h.cleanup() that, among other things, sets its state to finished and publishes it.

func (h *Harvester) cleanup() {
// Mark harvester as finished
h.state.Finished = true
h.logger.Debugf("Stopping harvester.")
defer h.logger.Debugf("harvester cleanup finished.")
// Make sure file is closed as soon as harvester exits
// If file was never opened, it can't be closed
if h.source != nil {
// close file handler
h.source.Close()
h.logger.Debugf("Closing file")
harvesterOpenFiles.Add(-1)
// On completion, push offset so we can continue where we left off if we relaunch on the same file
// Only send offset if file object was created successfully
h.SendStateUpdate()
} else {
h.logger.Warn("Stopping harvester, NOT closing file as file info not available.")
}
harvesterClosed.Add(1)
}

It's interesting to notice that h.cleanup() sends the state update twice:
The first one uses the same asyncrhonous path as the acker:

h.publishState(h.state)

The harvester publishState is defined here:
func(state file.State) bool {
return p.stateOutlet.OnEvent(beat.Event{Private: state})
},

The second one uses a synchronous path and calls directly into the States update method.

h.states.Update(h.state)

Even with this synchronous call to update the state of a file, there is a chance that some events on the publishing queue will be acknowledged and set the harvester's state back to "not finished" before the new input starts, which leads to the (in)famous error:

Error creating runner from config: failed to create input: Can only start an input when all related states are finished: 

The current state of things

The Kubernetes autodiscovery keeps retrying to start inputs that failed, this act of retrying should, eventually, try to start the input for a file when it's harvester state is finally set to "harvester finished". However the bigger the load on Filebeat, the harder it might be to get back to a consistent state. Another problem is that the file might be deleted, so some data can be lost.

The Kubernetes autodiscovery (with hints enabled) calls RunnersList.Reload quite often and for some reason, I did not investigate, it leads to the inputs for some files to be stopped/started even though the container is still alive.

Other issues I found

When a runner is created, there is a CheckConfig method called:

func (r *RunnerFactory) CheckConfig(cfg *conf.C) error {
runner, err := r.Create(pipeline.NewNilPipeline(), cfg)
if _, ok := err.(*common.ErrInputNotFinished); ok {
// error is related to state, and hence config can be considered valid
return nil
}
if err != nil {
return err
}
runner.Stop()
return nil
}

This method instantiates a runner and stops it right away to ensure the configuration is correct. It happens to ignore common.ErrInputNotFinished (the input not finished error). At least a few times when Filebeat was starting I saw the log from the runner stopping being logged before the input reporting it had stopped, which is definitely the wrong order. However, on my tests, this did not cause the error this issue is about. Anyway this also needs to be investigated. Interestingly enough, this did not seem to happen when the Reload method from RunnersList was called.

Testing setup

For my tests I used a VM running Minikube with the none driver, so I could run Filebeat outside of Kubernetes and it would still have access to the log files, hence I could easily attach Delve (the debugger) to it. However it should be easily reproducible running Filebeat on a container.

Two other important things on my setup:

  • I modified Filebeat to log the timestamp with nanosecond precision because I was shipping all logs to Elastic Cloud
  • I had to change the index templates and Data views to support the timestamp in nanoseconds and on a field different than @timestamp.

Below is the filebeat.yml I used. Filebeat logs can get quite verbose, filtering for a specific pod helps a lot.

filebeat.autodiscover:
  providers:
    - type: kubernetes
      node: ${NODE_NAME}
      hints.enabled: true
      hints.default_config:
        type: container
        id: "runner-${data.kubernetes.container.id}"
        paths:
          - /var/log/containers/*-${data.kubernetes.container.id}.log
        fields:
          role: kubernetes

logging:
  level: debug
  selectors:
    - acker
    - autodiscover
    - harvester
    - harvester_registry
    - input
    - input.harvester
    - input_runner
    - kubernetes
    - registrar
output.elasticsearch:
  hosts:
    - https://foo.bar.cloud.elastic.co:443
  protocol: https
  username: foo
  password: bar

Filebeat monitoring config;

filebeat.inputs:
- type: filestream
  id: my-filestream-id
  enabled: true
  paths:
    - /home/vagrant/work/logs/*
  fields:
    role: monitoring
  parsers:
    - ndjson:
        overwrite_keys: true
        expand_keys: true
        add_error_key: true
        ignore_decoding_error: true
  processors:
    - timestamp:
        layouts:
          - "2006-01-02T15:04:05.999999999Z07:00"
        test:
          - "2023-03-23T12:06:36.981660099+01:00"
        field: zap-nano-timestamp

setup.template.name:
setup.template.pattern:
output.elasticsearch:
  hosts:
    - https://foo.bar.cloud.elastic.co:443
  protocol: https
  username: foo
  password: bar
  index: "filebeat-logs-rfc3339-nano-%{[agent.version]}"

setup.template.name: "filebeat-logs-rfc3339-nano-%{[agent.version]}"
setup.template.pattern: "filebeat-logs-rfc3339-nano-%{[agent.version]}"

The code I used

@belimawr
Copy link
Contributor

belimawr commented May 4, 2023

This seems to be the same issue but with Docker: #13607.
I believe all autodiscover implementations can face this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Team:Cloudnative-Monitoring Label for the Cloud Native Monitoring team Team:Elastic-Agent Label for the Agent team
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants