Skip to content

Commit

Permalink
Use filestream input as default for hints autodiscover. (#36950)
Browse files Browse the repository at this point in the history
* Use filestream input as default for hints autodiscover. Map co.elastic.logs/json* in hints to the ndjson parser of filestream

* Update filebeat-kubernetes.yaml

* Map co.elastic.logs/multiline.* hints to multiline parser of filestream input

* Update documentation


* Use file_identity.fingerprint as default way of file unique id creation
---------

Co-authored-by: Andrew Gizas <andreas.gkizas@elastic.co>
  • Loading branch information
MichaelKatsoulis and gizas committed Nov 1, 2023
1 parent c0a647a commit 41ab08c
Show file tree
Hide file tree
Showing 9 changed files with 525 additions and 91 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -254,6 +254,7 @@ is collected by it.
- Avoid unwanted publication of Azure entity records. {pull}36753[36753]
- Avoid unwanted publication of Okta entity records. {pull}36770[36770]
- Add support for Digest Authentication to CEL input. {issue}35514[35514] {pull}36932[36932]
- Use filestream input with file_identity.fingerprint as default for hints autodiscover. {issue}35984[35984] {pull}36950[36950]

*Auditbeat*

Expand Down
23 changes: 19 additions & 4 deletions deploy/kubernetes/filebeat-kubernetes.yaml
Expand Up @@ -112,9 +112,16 @@ metadata:
data:
filebeat.yml: |-
filebeat.inputs:
- type: container
- type: filestream
paths:
- /var/log/containers/*.log
parsers:
- container: ~
prospector:
scanner:
fingerprint.enabled: true
symlinks: true
file_identity.fingerprint: ~
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
Expand All @@ -123,15 +130,23 @@ data:
logs_path: "/var/log/containers/"
# To enable hints based autodiscover, remove `filebeat.inputs` configuration and uncomment this:
#filebeat.autodiscover:
# filebeat.autodiscover:
# providers:
# - type: kubernetes
# node: ${NODE_NAME}
# hints.enabled: true
# hints.default_config:
# type: container
# type: filestream
# id: kubernetes-container-logs-${data.kubernetes.pod.name}-${data.kubernetes.container.id}
# paths:
# - /var/log/containers/*${data.kubernetes.container.id}.log
# - /var/log/containers/*-${data.kubernetes.container.id}.log
# parsers:
# - container: ~
# prospector:
# scanner:
# fingerprint.enabled: true
# symlinks: true
# file_identity.fingerprint: ~
processors:
- add_cloud_metadata:
Expand Down
23 changes: 19 additions & 4 deletions deploy/kubernetes/filebeat/filebeat-configmap.yaml
Expand Up @@ -8,9 +8,16 @@ metadata:
data:
filebeat.yml: |-
filebeat.inputs:
- type: container
- type: filestream
paths:
- /var/log/containers/*.log
parsers:
- container: ~
prospector:
scanner:
fingerprint.enabled: true
symlinks: true
file_identity.fingerprint: ~
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
Expand All @@ -19,15 +26,23 @@ data:
logs_path: "/var/log/containers/"
# To enable hints based autodiscover, remove `filebeat.inputs` configuration and uncomment this:
#filebeat.autodiscover:
# filebeat.autodiscover:
# providers:
# - type: kubernetes
# node: ${NODE_NAME}
# hints.enabled: true
# hints.default_config:
# type: container
# type: filestream
# id: kubernetes-container-logs-${data.kubernetes.pod.name}-${data.kubernetes.container.id}
# paths:
# - /var/log/containers/*${data.kubernetes.container.id}.log
# - /var/log/containers/*-${data.kubernetes.container.id}.log
# parsers:
# - container: ~
# prospector:
# scanner:
# fingerprint.enabled: true
# symlinks: true
# file_identity.fingerprint: ~
processors:
- add_cloud_metadata:
Expand Down
28 changes: 22 additions & 6 deletions filebeat/autodiscover/builder/hints/config.go
Expand Up @@ -17,7 +17,9 @@

package hints

import conf "github.com/elastic/elastic-agent-libs/config"
import (
conf "github.com/elastic/elastic-agent-libs/config"
)

type config struct {
Key string `config:"key"`
Expand All @@ -26,11 +28,25 @@ type config struct {

func defaultConfig() config {
defaultCfgRaw := map[string]interface{}{
"type": "container",
"type": "filestream",
"id": "kubernetes-container-logs-${data.kubernetes.container.id}",
"prospector": map[string]interface{}{
"scanner": map[string]interface{}{
"fingerprint.enabled": true,
"symlinks": true,
},
},
"file_identity.fingerprint": nil,
"parsers": []interface{}{
map[string]interface{}{
"container": map[string]interface{}{
"stream": "all",
"format": "auto",
},
},
},
"paths": []string{
// To be able to use this builder with CRI-O replace paths with:
// /var/log/pods/${data.kubernetes.pod.uid}/${data.kubernetes.container.name}/*.log
"/var/lib/docker/containers/${data.container.id}/*-json.log",
"/var/log/containers/*-${data.kubernetes.container.id}.log",
},
}
defaultCfg, _ := conf.NewConfigFrom(defaultCfgRaw)
Expand All @@ -55,7 +71,7 @@ func (c *config) Unpack(from *conf.C) error {
if len(fields) == 1 && fields[0] == "enabled" {
// only enabling/disabling default config:
if err := c.DefaultConfig.Merge(config); err != nil {
return nil
return err
}
} else {
// full config provided, discard default. It must be a clone of the
Expand Down
52 changes: 40 additions & 12 deletions filebeat/autodiscover/builder/hints/logs.go
Expand Up @@ -51,6 +51,8 @@ const (
processors = "processors"
json = "json"
pipeline = "pipeline"
ndjson = "ndjson"
parsers = "parsers"
)

// validModuleNames to sanitize user input
Expand Down Expand Up @@ -115,10 +117,20 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf
continue
}

inputType, _ := config.String("type", -1)
tempCfg := mapstr.M{}
mline := l.getMultiline(h)
if len(mline) != 0 {
kubernetes.ShouldPut(tempCfg, multiline, mline, l.log)

if mline := l.getMultiline(h); len(mline) != 0 {
if inputType == harvester.FilestreamType {
// multiline options should be under multiline parser in filestream input
parsersTempCfg := []mapstr.M{}
mlineTempCfg := mapstr.M{}
kubernetes.ShouldPut(mlineTempCfg, multiline, mline, l.log)
parsersTempCfg = append(parsersTempCfg, mlineTempCfg)
kubernetes.ShouldPut(tempCfg, parsers, parsersTempCfg, l.log)
} else {
kubernetes.ShouldPut(tempCfg, multiline, mline, l.log)
}
}
if ilines := l.getIncludeLines(h); len(ilines) != 0 {
kubernetes.ShouldPut(tempCfg, includeLines, ilines, l.log)
Expand All @@ -136,15 +148,24 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf
}

if jsonOpts := l.getJSONOptions(h); len(jsonOpts) != 0 {
kubernetes.ShouldPut(tempCfg, json, jsonOpts, l.log)
if inputType == harvester.FilestreamType {
// json options should be under ndjson parser in filestream input
parsersTempCfg := []mapstr.M{}
ndjsonTempCfg := mapstr.M{}
kubernetes.ShouldPut(ndjsonTempCfg, ndjson, jsonOpts, l.log)
parsersTempCfg = append(parsersTempCfg, ndjsonTempCfg)
kubernetes.ShouldPut(tempCfg, parsers, parsersTempCfg, l.log)
} else {
kubernetes.ShouldPut(tempCfg, json, jsonOpts, l.log)
}

}
// Merge config template with the configs from the annotations
// AppendValues option is used to append arrays from annotations to existing arrays while merging
if err := config.MergeWithOpts(tempCfg, ucfg.AppendValues); err != nil {
logp.Debug("hints.builder", "config merge failed with error: %v", err)
l.log.Debugf("hints.builder", "config merge failed with error: %v", err)
continue
}

module := l.getModule(hints)
if module != "" {
moduleConf := map[string]interface{}{
Expand All @@ -154,24 +175,31 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf
filesets := l.getFilesets(hints, module)
for fileset, cfg := range filesets {
filesetConf, _ := conf.NewConfigFrom(config)

if inputType, _ := filesetConf.String("type", -1); inputType == harvester.ContainerType {
if inputType == harvester.ContainerType {
_ = filesetConf.SetString("stream", -1, cfg.Stream)
} else if inputType == harvester.FilestreamType {
filestreamContainerParser := map[string]interface{}{
"container": map[string]interface{}{
"stream": cfg.Stream,
"format": "auto",
},
}
parserCfg, _ := conf.NewConfigFrom(filestreamContainerParser)
_ = filesetConf.SetChild("parsers", 0, parserCfg)
} else {
_ = filesetConf.SetString("containers.stream", -1, cfg.Stream)
}

moduleConf[fileset+".enabled"] = cfg.Enabled
moduleConf[fileset+".input"] = filesetConf

logp.Debug("hints.builder", "generated config %+v", moduleConf)
l.log.Debugf("hints.builder", "generated config %+v", moduleConf)
}
config, _ = conf.NewConfigFrom(moduleConf)
}
logp.Debug("hints.builder", "generated config %+v", config)
l.log.Debugf("hints.builder", "generated config %+v of logHints %+v", config, l)
configs = append(configs, config)
}

// Apply information in event to the template to generate the final config
return template.ApplyConfigTemplate(event, configs)
}
Expand Down Expand Up @@ -222,7 +250,7 @@ func (l *logHints) getFilesets(hints mapstr.M, module string) map[string]*filese

moduleFilesets, err := l.registry.ModuleAvailableFilesets(module)
if err != nil {
logp.Err("Error retrieving module filesets: %+v", err)
l.log.Errorf("Error retrieving module filesets: %+v", err)
return nil
}

Expand Down

0 comments on commit 41ab08c

Please sign in to comment.