Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use filestream input as default for hints autodiscover. #36950

1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -245,6 +245,7 @@ is collected by it.
- Avoid unwanted publication of Azure entity records. {pull}36753[36753]
- Avoid unwanted publication of Okta entity records. {pull}36770[36770]
- Add support for Digest Authentication to CEL input. {issue}35514[35514] {pull}36932[36932]
- Use filestream input with file_identity.fingerprint as default for hints autodiscover. {issue}35984[35984] {pull}36950[36950]

*Auditbeat*

Expand Down
23 changes: 19 additions & 4 deletions deploy/kubernetes/filebeat-kubernetes.yaml
Expand Up @@ -112,9 +112,16 @@ metadata:
data:
filebeat.yml: |-
filebeat.inputs:
- type: container
- type: filestream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MichaelKatsoulis
shouldn't here be defined id in input? or in such case it will be automatically generated?

doc:

Each filestream input must have a unique ID. Omitting or changing the filestream ID may cause data duplication. Without a unique ID, filestream is unable to correctly track the state of files.

so for all files that are matching /var/log/containers/*.log we have 1 filestream with unique id, correct? do you know what does it imply in comparison to the autodiscover where it will be created a dedicated filestream per container?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tetianakravchenko Yes an id will automatically get generated. When the filebeat.input is used instead of auto discovery then there will be one stream of filestream input looking at all files in the path. When autodiscovery is used there will be one stream for each discovered container looking at one log file only.

For the metadata in first scenario, the processor is used which requires the matchers log path so it can extract the container id from the log file name, and add the metadata of that container.
In the autodiscovery case the metadata are enriched by the kubernetes provider.

So yes, we have one filestream with one id for all the log collection. Both options work just fine. But with the first approach we cannot enable hints.

paths:
- /var/log/containers/*.log
parsers:
- container: ~
prospector:
scanner:
fingerprint.enabled: true
symlinks: true
file_identity.fingerprint: ~
rdner marked this conversation as resolved.
Show resolved Hide resolved
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
Expand All @@ -123,15 +130,23 @@ data:
logs_path: "/var/log/containers/"

# To enable hints based autodiscover, remove `filebeat.inputs` configuration and uncomment this:
#filebeat.autodiscover:
# filebeat.autodiscover:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this extra space intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, when I commented in and out the autodiscover block, it added this space. TBH it looks more readable

# providers:
# - type: kubernetes
# node: ${NODE_NAME}
# hints.enabled: true
# hints.default_config:
# type: container
# type: filestream
# id: kubernetes-container-logs-${data.kubernetes.pod.name}-${data.kubernetes.container.id}
# paths:
# - /var/log/containers/*${data.kubernetes.container.id}.log
# - /var/log/containers/*-${data.kubernetes.container.id}.log
# parsers:
# - container: ~
# prospector:
# scanner:
# fingerprint.enabled: true
# symlinks: true
# file_identity.fingerprint: ~

processors:
- add_cloud_metadata:
Expand Down
23 changes: 19 additions & 4 deletions deploy/kubernetes/filebeat/filebeat-configmap.yaml
Expand Up @@ -8,9 +8,16 @@ metadata:
data:
filebeat.yml: |-
filebeat.inputs:
- type: container
- type: filestream
paths:
- /var/log/containers/*.log
parsers:
- container: ~
prospector:
scanner:
fingerprint.enabled: true
symlinks: true
file_identity.fingerprint: ~
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
Expand All @@ -19,15 +26,23 @@ data:
logs_path: "/var/log/containers/"

# To enable hints based autodiscover, remove `filebeat.inputs` configuration and uncomment this:
#filebeat.autodiscover:
# filebeat.autodiscover:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same: is this needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say to remove the spaces because you can end up uncommenting this block and this not to have the correct spacing.

# providers:
# - type: kubernetes
# node: ${NODE_NAME}
# hints.enabled: true
# hints.default_config:
# type: container
# type: filestream
# id: kubernetes-container-logs-${data.kubernetes.pod.name}-${data.kubernetes.container.id}
# paths:
# - /var/log/containers/*${data.kubernetes.container.id}.log
# - /var/log/containers/*-${data.kubernetes.container.id}.log
# parsers:
# - container: ~
# prospector:
# scanner:
# fingerprint.enabled: true
# symlinks: true
# file_identity.fingerprint: ~

processors:
- add_cloud_metadata:
Expand Down
28 changes: 22 additions & 6 deletions filebeat/autodiscover/builder/hints/config.go
Expand Up @@ -17,7 +17,9 @@

package hints

import conf "github.com/elastic/elastic-agent-libs/config"
import (
conf "github.com/elastic/elastic-agent-libs/config"
)

type config struct {
Key string `config:"key"`
Expand All @@ -26,11 +28,25 @@ type config struct {

func defaultConfig() config {
defaultCfgRaw := map[string]interface{}{
"type": "container",
"type": "filestream",
"id": "kubernetes-container-logs-${data.kubernetes.container.id}",
"prospector": map[string]interface{}{
"scanner": map[string]interface{}{
"fingerprint.enabled": true,
"symlinks": true,
},
},
"file_identity.fingerprint": nil,
"parsers": []interface{}{
map[string]interface{}{
"container": map[string]interface{}{
"stream": "all",
"format": "auto",
},
},
},
"paths": []string{
// To be able to use this builder with CRI-O replace paths with:
// /var/log/pods/${data.kubernetes.pod.uid}/${data.kubernetes.container.name}/*.log
"/var/lib/docker/containers/${data.container.id}/*-json.log",
"/var/log/containers/*-${data.kubernetes.container.id}.log",
},
}
defaultCfg, _ := conf.NewConfigFrom(defaultCfgRaw)
Expand All @@ -55,7 +71,7 @@ func (c *config) Unpack(from *conf.C) error {
if len(fields) == 1 && fields[0] == "enabled" {
// only enabling/disabling default config:
if err := c.DefaultConfig.Merge(config); err != nil {
return nil
return err
}
} else {
// full config provided, discard default. It must be a clone of the
Expand Down
52 changes: 40 additions & 12 deletions filebeat/autodiscover/builder/hints/logs.go
Expand Up @@ -51,6 +51,8 @@ const (
processors = "processors"
json = "json"
pipeline = "pipeline"
ndjson = "ndjson"
parsers = "parsers"
)

// validModuleNames to sanitize user input
Expand Down Expand Up @@ -115,10 +117,20 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf
continue
}

inputType, _ := config.String("type", -1)
tempCfg := mapstr.M{}
mline := l.getMultiline(h)
if len(mline) != 0 {
kubernetes.ShouldPut(tempCfg, multiline, mline, l.log)

if mline := l.getMultiline(h); len(mline) != 0 {
if inputType == harvester.FilestreamType {
// multiline options should be under multiline parser in filestream input
parsersTempCfg := []mapstr.M{}
mlineTempCfg := mapstr.M{}
kubernetes.ShouldPut(mlineTempCfg, multiline, mline, l.log)
parsersTempCfg = append(parsersTempCfg, mlineTempCfg)
kubernetes.ShouldPut(tempCfg, parsers, parsersTempCfg, l.log)
} else {
kubernetes.ShouldPut(tempCfg, multiline, mline, l.log)
}
}
if ilines := l.getIncludeLines(h); len(ilines) != 0 {
kubernetes.ShouldPut(tempCfg, includeLines, ilines, l.log)
Expand All @@ -136,15 +148,24 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf
}

if jsonOpts := l.getJSONOptions(h); len(jsonOpts) != 0 {
kubernetes.ShouldPut(tempCfg, json, jsonOpts, l.log)
if inputType == harvester.FilestreamType {
// json options should be under ndjson parser in filestream input
parsersTempCfg := []mapstr.M{}
ndjsonTempCfg := mapstr.M{}
Copy link
Contributor

@gizas gizas Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if this is empty before calling next line?

Ignore this. I just realsised that those are empty mapstr.M

kubernetes.ShouldPut(ndjsonTempCfg, ndjson, jsonOpts, l.log)
parsersTempCfg = append(parsersTempCfg, ndjsonTempCfg)
kubernetes.ShouldPut(tempCfg, parsers, parsersTempCfg, l.log)
} else {
kubernetes.ShouldPut(tempCfg, json, jsonOpts, l.log)
}

}
// Merge config template with the configs from the annotations
// AppendValues option is used to append arrays from annotations to existing arrays while merging
if err := config.MergeWithOpts(tempCfg, ucfg.AppendValues); err != nil {
logp.Debug("hints.builder", "config merge failed with error: %v", err)
l.log.Debugf("hints.builder", "config merge failed with error: %v", err)
continue
}

module := l.getModule(hints)
if module != "" {
moduleConf := map[string]interface{}{
Expand All @@ -154,24 +175,31 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf
filesets := l.getFilesets(hints, module)
for fileset, cfg := range filesets {
filesetConf, _ := conf.NewConfigFrom(config)

if inputType, _ := filesetConf.String("type", -1); inputType == harvester.ContainerType {
if inputType == harvester.ContainerType {
_ = filesetConf.SetString("stream", -1, cfg.Stream)
} else if inputType == harvester.FilestreamType {
filestreamContainerParser := map[string]interface{}{
"container": map[string]interface{}{
"stream": cfg.Stream,
"format": "auto",
},
}
parserCfg, _ := conf.NewConfigFrom(filestreamContainerParser)
_ = filesetConf.SetChild("parsers", 0, parserCfg)
} else {
_ = filesetConf.SetString("containers.stream", -1, cfg.Stream)
}

moduleConf[fileset+".enabled"] = cfg.Enabled
moduleConf[fileset+".input"] = filesetConf

logp.Debug("hints.builder", "generated config %+v", moduleConf)
l.log.Debugf("hints.builder", "generated config %+v", moduleConf)
}
config, _ = conf.NewConfigFrom(moduleConf)
}
logp.Debug("hints.builder", "generated config %+v", config)
l.log.Debugf("hints.builder", "generated config %+v of logHints %+v", config, l)
configs = append(configs, config)
}

// Apply information in event to the template to generate the final config
return template.ApplyConfigTemplate(event, configs)
}
Expand Down Expand Up @@ -222,7 +250,7 @@ func (l *logHints) getFilesets(hints mapstr.M, module string) map[string]*filese

moduleFilesets, err := l.registry.ModuleAvailableFilesets(module)
if err != nil {
logp.Err("Error retrieving module filesets: %+v", err)
l.log.Errorf("Error retrieving module filesets: %+v", err)
return nil
}

Expand Down