From 41ab08cd6aa9e2c367b0c4b670f05f786a2862db Mon Sep 17 00:00:00 2001 From: Michael Katsoulis Date: Wed, 1 Nov 2023 15:58:01 +0200 Subject: [PATCH] Use filestream input as default for hints autodiscover. (#36950) * Use filestream input as default for hints autodiscover. Map co.elastic.logs/json* in hints to the ndjson parser of filestream * Update filebeat-kubernetes.yaml * Map co.elastic.logs/multiline.* hints to multiline parser of filestream input * Update documentation * Use file_identity.fingerprint as default way of file unique id creation --------- Co-authored-by: Andrew Gizas --- CHANGELOG.next.asciidoc | 1 + deploy/kubernetes/filebeat-kubernetes.yaml | 23 +- .../filebeat/filebeat-configmap.yaml | 23 +- filebeat/autodiscover/builder/hints/config.go | 28 +- filebeat/autodiscover/builder/hints/logs.go | 52 +- .../autodiscover/builder/hints/logs_test.go | 444 +++++++++++++++--- filebeat/docs/autodiscover-hints.asciidoc | 31 +- .../input-filestream-reader-options.asciidoc | 1 + filebeat/harvester/util.go | 13 +- 9 files changed, 525 insertions(+), 91 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3fc9493640b..7d84cb5885f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -254,6 +254,7 @@ is collected by it. - Avoid unwanted publication of Azure entity records. {pull}36753[36753] - Avoid unwanted publication of Okta entity records. {pull}36770[36770] - Add support for Digest Authentication to CEL input. {issue}35514[35514] {pull}36932[36932] +- Use filestream input with file_identity.fingerprint as default for hints autodiscover. {issue}35984[35984] {pull}36950[36950] *Auditbeat* diff --git a/deploy/kubernetes/filebeat-kubernetes.yaml b/deploy/kubernetes/filebeat-kubernetes.yaml index d1f8fd975e0..497010cef90 100644 --- a/deploy/kubernetes/filebeat-kubernetes.yaml +++ b/deploy/kubernetes/filebeat-kubernetes.yaml @@ -112,9 +112,16 @@ metadata: data: filebeat.yml: |- filebeat.inputs: - - type: container + - type: filestream paths: - /var/log/containers/*.log + parsers: + - container: ~ + prospector: + scanner: + fingerprint.enabled: true + symlinks: true + file_identity.fingerprint: ~ processors: - add_kubernetes_metadata: host: ${NODE_NAME} @@ -123,15 +130,23 @@ data: logs_path: "/var/log/containers/" # To enable hints based autodiscover, remove `filebeat.inputs` configuration and uncomment this: - #filebeat.autodiscover: + # filebeat.autodiscover: # providers: # - type: kubernetes # node: ${NODE_NAME} # hints.enabled: true # hints.default_config: - # type: container + # type: filestream + # id: kubernetes-container-logs-${data.kubernetes.pod.name}-${data.kubernetes.container.id} # paths: - # - /var/log/containers/*${data.kubernetes.container.id}.log + # - /var/log/containers/*-${data.kubernetes.container.id}.log + # parsers: + # - container: ~ + # prospector: + # scanner: + # fingerprint.enabled: true + # symlinks: true + # file_identity.fingerprint: ~ processors: - add_cloud_metadata: diff --git a/deploy/kubernetes/filebeat/filebeat-configmap.yaml b/deploy/kubernetes/filebeat/filebeat-configmap.yaml index 6cbd6ad8b1a..f2614e8c035 100644 --- a/deploy/kubernetes/filebeat/filebeat-configmap.yaml +++ b/deploy/kubernetes/filebeat/filebeat-configmap.yaml @@ -8,9 +8,16 @@ metadata: data: filebeat.yml: |- filebeat.inputs: - - type: container + - type: filestream paths: - /var/log/containers/*.log + parsers: + - container: ~ + prospector: + scanner: + fingerprint.enabled: true + symlinks: true + file_identity.fingerprint: ~ processors: - add_kubernetes_metadata: host: ${NODE_NAME} @@ -19,15 +26,23 @@ data: logs_path: "/var/log/containers/" # To enable hints based autodiscover, remove `filebeat.inputs` configuration and uncomment this: - #filebeat.autodiscover: + # filebeat.autodiscover: # providers: # - type: kubernetes # node: ${NODE_NAME} # hints.enabled: true # hints.default_config: - # type: container + # type: filestream + # id: kubernetes-container-logs-${data.kubernetes.pod.name}-${data.kubernetes.container.id} # paths: - # - /var/log/containers/*${data.kubernetes.container.id}.log + # - /var/log/containers/*-${data.kubernetes.container.id}.log + # parsers: + # - container: ~ + # prospector: + # scanner: + # fingerprint.enabled: true + # symlinks: true + # file_identity.fingerprint: ~ processors: - add_cloud_metadata: diff --git a/filebeat/autodiscover/builder/hints/config.go b/filebeat/autodiscover/builder/hints/config.go index dd1c2f75900..1161eb30e3f 100644 --- a/filebeat/autodiscover/builder/hints/config.go +++ b/filebeat/autodiscover/builder/hints/config.go @@ -17,7 +17,9 @@ package hints -import conf "github.com/elastic/elastic-agent-libs/config" +import ( + conf "github.com/elastic/elastic-agent-libs/config" +) type config struct { Key string `config:"key"` @@ -26,11 +28,25 @@ type config struct { func defaultConfig() config { defaultCfgRaw := map[string]interface{}{ - "type": "container", + "type": "filestream", + "id": "kubernetes-container-logs-${data.kubernetes.container.id}", + "prospector": map[string]interface{}{ + "scanner": map[string]interface{}{ + "fingerprint.enabled": true, + "symlinks": true, + }, + }, + "file_identity.fingerprint": nil, + "parsers": []interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "all", + "format": "auto", + }, + }, + }, "paths": []string{ - // To be able to use this builder with CRI-O replace paths with: - // /var/log/pods/${data.kubernetes.pod.uid}/${data.kubernetes.container.name}/*.log - "/var/lib/docker/containers/${data.container.id}/*-json.log", + "/var/log/containers/*-${data.kubernetes.container.id}.log", }, } defaultCfg, _ := conf.NewConfigFrom(defaultCfgRaw) @@ -55,7 +71,7 @@ func (c *config) Unpack(from *conf.C) error { if len(fields) == 1 && fields[0] == "enabled" { // only enabling/disabling default config: if err := c.DefaultConfig.Merge(config); err != nil { - return nil + return err } } else { // full config provided, discard default. It must be a clone of the diff --git a/filebeat/autodiscover/builder/hints/logs.go b/filebeat/autodiscover/builder/hints/logs.go index 52f8c91672a..c39bfd53353 100644 --- a/filebeat/autodiscover/builder/hints/logs.go +++ b/filebeat/autodiscover/builder/hints/logs.go @@ -51,6 +51,8 @@ const ( processors = "processors" json = "json" pipeline = "pipeline" + ndjson = "ndjson" + parsers = "parsers" ) // validModuleNames to sanitize user input @@ -115,10 +117,20 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf continue } + inputType, _ := config.String("type", -1) tempCfg := mapstr.M{} - mline := l.getMultiline(h) - if len(mline) != 0 { - kubernetes.ShouldPut(tempCfg, multiline, mline, l.log) + + if mline := l.getMultiline(h); len(mline) != 0 { + if inputType == harvester.FilestreamType { + // multiline options should be under multiline parser in filestream input + parsersTempCfg := []mapstr.M{} + mlineTempCfg := mapstr.M{} + kubernetes.ShouldPut(mlineTempCfg, multiline, mline, l.log) + parsersTempCfg = append(parsersTempCfg, mlineTempCfg) + kubernetes.ShouldPut(tempCfg, parsers, parsersTempCfg, l.log) + } else { + kubernetes.ShouldPut(tempCfg, multiline, mline, l.log) + } } if ilines := l.getIncludeLines(h); len(ilines) != 0 { kubernetes.ShouldPut(tempCfg, includeLines, ilines, l.log) @@ -136,15 +148,24 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf } if jsonOpts := l.getJSONOptions(h); len(jsonOpts) != 0 { - kubernetes.ShouldPut(tempCfg, json, jsonOpts, l.log) + if inputType == harvester.FilestreamType { + // json options should be under ndjson parser in filestream input + parsersTempCfg := []mapstr.M{} + ndjsonTempCfg := mapstr.M{} + kubernetes.ShouldPut(ndjsonTempCfg, ndjson, jsonOpts, l.log) + parsersTempCfg = append(parsersTempCfg, ndjsonTempCfg) + kubernetes.ShouldPut(tempCfg, parsers, parsersTempCfg, l.log) + } else { + kubernetes.ShouldPut(tempCfg, json, jsonOpts, l.log) + } + } // Merge config template with the configs from the annotations // AppendValues option is used to append arrays from annotations to existing arrays while merging if err := config.MergeWithOpts(tempCfg, ucfg.AppendValues); err != nil { - logp.Debug("hints.builder", "config merge failed with error: %v", err) + l.log.Debugf("hints.builder", "config merge failed with error: %v", err) continue } - module := l.getModule(hints) if module != "" { moduleConf := map[string]interface{}{ @@ -154,9 +175,17 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf filesets := l.getFilesets(hints, module) for fileset, cfg := range filesets { filesetConf, _ := conf.NewConfigFrom(config) - - if inputType, _ := filesetConf.String("type", -1); inputType == harvester.ContainerType { + if inputType == harvester.ContainerType { _ = filesetConf.SetString("stream", -1, cfg.Stream) + } else if inputType == harvester.FilestreamType { + filestreamContainerParser := map[string]interface{}{ + "container": map[string]interface{}{ + "stream": cfg.Stream, + "format": "auto", + }, + } + parserCfg, _ := conf.NewConfigFrom(filestreamContainerParser) + _ = filesetConf.SetChild("parsers", 0, parserCfg) } else { _ = filesetConf.SetString("containers.stream", -1, cfg.Stream) } @@ -164,14 +193,13 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf moduleConf[fileset+".enabled"] = cfg.Enabled moduleConf[fileset+".input"] = filesetConf - logp.Debug("hints.builder", "generated config %+v", moduleConf) + l.log.Debugf("hints.builder", "generated config %+v", moduleConf) } config, _ = conf.NewConfigFrom(moduleConf) } - logp.Debug("hints.builder", "generated config %+v", config) + l.log.Debugf("hints.builder", "generated config %+v of logHints %+v", config, l) configs = append(configs, config) } - // Apply information in event to the template to generate the final config return template.ApplyConfigTemplate(event, configs) } @@ -222,7 +250,7 @@ func (l *logHints) getFilesets(hints mapstr.M, module string) map[string]*filese moduleFilesets, err := l.registry.ModuleAvailableFilesets(module) if err != nil { - logp.Err("Error retrieving module filesets: %+v", err) + l.log.Errorf("Error retrieving module filesets: %+v", err) return nil } diff --git a/filebeat/autodiscover/builder/hints/logs_test.go b/filebeat/autodiscover/builder/hints/logs_test.go index cd2b7236771..4dc889e44d7 100644 --- a/filebeat/autodiscover/builder/hints/logs_test.go +++ b/filebeat/autodiscover/builder/hints/logs_test.go @@ -31,7 +31,7 @@ import ( ) func TestGenerateHints(t *testing.T) { - customCfg := conf.MustNewConfigFrom(map[string]interface{}{ + customDockerCfg := conf.MustNewConfigFrom(map[string]interface{}{ "default_config": map[string]interface{}{ "type": "docker", "containers": map[string]interface{}{ @@ -43,7 +43,7 @@ func TestGenerateHints(t *testing.T) { }, }) - customProcessorCfg := conf.MustNewConfigFrom(map[string]interface{}{ + customContainerCfg := conf.MustNewConfigFrom(map[string]interface{}{ "default_config": map[string]interface{}{ "type": "container", "paths": []string{ @@ -61,6 +61,31 @@ func TestGenerateHints(t *testing.T) { }, }) + customFilestreamCfg := conf.MustNewConfigFrom(map[string]interface{}{ + "default_config": map[string]interface{}{ + "type": "filestream", + "id": "kubernetes-container-logs-${data.kubernetes.container.id}", + "prospector": map[string]interface{}{ + "scanner": map[string]interface{}{ + "fingerprint.enabled": true, + "symlinks": true, + }, + }, + "file_identity.fingerprint": nil, + "paths": []string{ + "/var/log/containers/*-${data.kubernetes.container.id}.log", + }, + "parsers": []interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "all", + "format": "auto", + }, + }, + }, + }, + }) + defaultCfg := conf.NewConfig() defaultDisabled := conf.MustNewConfigFrom(map[string]interface{}{ @@ -77,7 +102,7 @@ func TestGenerateHints(t *testing.T) { result []mapstr.M }{ { - msg: "Default config is correct", + msg: "Default config is correct(default input)", config: defaultCfg, event: bus.Event{ "host": "1.2.3.4", @@ -95,8 +120,28 @@ func TestGenerateHints(t *testing.T) { len: 1, result: []mapstr.M{ { - "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, - "type": "container", + "id": "kubernetes-container-logs-abc", + "paths": []interface{}{"/var/log/containers/*-abc.log"}, + "parsers": []interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "format": "auto", + "stream": "all", + }, + }, + }, + "prospector": map[string]interface{}{ + "scanner": map[string]interface{}{ + "symlinks": true, + "fingerprint": map[string]interface{}{ + "enabled": true, + }, + }, + }, + "file_identity": map[string]interface{}{ + "fingerprint": nil, + }, + "type": "filestream", }, }, }, @@ -120,7 +165,7 @@ func TestGenerateHints(t *testing.T) { result: []mapstr.M{}, }, { - msg: "Hint to enable when disabled by default works", + msg: "Hint to enable when disabled by default works(filestream)", config: defaultDisabled, event: bus.Event{ "host": "1.2.3.4", @@ -144,15 +189,35 @@ func TestGenerateHints(t *testing.T) { len: 1, result: []mapstr.M{ { - "type": "container", - "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, + "id": "kubernetes-container-logs-abc", + "paths": []interface{}{"/var/log/containers/*-abc.log"}, + "parsers": []interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "format": "auto", + "stream": "all", + }, + }, + }, + "prospector": map[string]interface{}{ + "scanner": map[string]interface{}{ + "symlinks": true, + "fingerprint": map[string]interface{}{ + "enabled": true, + }, + }, + }, + "file_identity": map[string]interface{}{ + "fingerprint": nil, + }, "exclude_lines": []interface{}{"^test2", "^test3"}, + "type": "filestream", }, }, }, { msg: "Hints without host should return nothing", - config: customCfg, + config: customDockerCfg, event: bus.Event{ "hints": mapstr.M{ "metrics": mapstr.M{ @@ -165,7 +230,7 @@ func TestGenerateHints(t *testing.T) { }, { msg: "Hints with logs.disable should return nothing", - config: customCfg, + config: customDockerCfg, event: bus.Event{ "hints": mapstr.M{ "logs": mapstr.M{ @@ -177,8 +242,8 @@ func TestGenerateHints(t *testing.T) { result: []mapstr.M{}, }, { - msg: "Empty event hints should return default config", - config: customCfg, + msg: "Empty event hints should return default config(docker input)", + config: customDockerCfg, event: bus.Event{ "host": "1.2.3.4", "kubernetes": mapstr.M{ @@ -204,8 +269,8 @@ func TestGenerateHints(t *testing.T) { }, }, { - msg: "Hint with include|exclude_lines must be part of the input config", - config: customCfg, + msg: "Hint with include|exclude_lines must be part of the input config(docker input)", + config: customDockerCfg, event: bus.Event{ "host": "1.2.3.4", "kubernetes": mapstr.M{ @@ -239,8 +304,8 @@ func TestGenerateHints(t *testing.T) { }, }, { - msg: "Hints with two sets of include|exclude_lines must be part of the input config", - config: customCfg, + msg: "Hints with two sets of include|exclude_lines must be part of the input config(docker input)", + config: customDockerCfg, event: bus.Event{ "host": "1.2.3.4", "kubernetes": mapstr.M{ @@ -285,8 +350,8 @@ func TestGenerateHints(t *testing.T) { }, }, { - msg: "Hint with multiline config must have a multiline in the input config", - config: customCfg, + msg: "Hint with multiline config must have a multiline in the input config(docker input)", + config: customDockerCfg, event: bus.Event{ "host": "1.2.3.4", "kubernetes": mapstr.M{ @@ -324,8 +389,171 @@ func TestGenerateHints(t *testing.T) { }, }, { - msg: "Hint with inputs config as json must be accepted", - config: customCfg, + msg: "Hint with multiline config must have a multiline in the input config parsers(filestream input)", + config: customFilestreamCfg, + event: bus.Event{ + "host": "1.2.3.4", + "kubernetes": mapstr.M{ + "container": mapstr.M{ + "name": "foobar", + "id": "abc", + }, + }, + "container": mapstr.M{ + "name": "foobar", + "id": "abc", + }, + "hints": mapstr.M{ + "logs": mapstr.M{ + "multiline": mapstr.M{ + "pattern": "^test", + "negate": "true", + }, + }, + }, + }, + len: 1, + result: []mapstr.M{ + { + "id": "kubernetes-container-logs-abc", + "paths": []interface{}{"/var/log/containers/*-abc.log"}, + "parsers": []interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "format": "auto", + "stream": "all", + }, + }, + map[string]interface{}{ + "multiline": map[string]interface{}{ + "pattern": "^test", + "negate": "true", + }, + }, + }, + "prospector": map[string]interface{}{ + "scanner": map[string]interface{}{ + "symlinks": true, + "fingerprint": map[string]interface{}{ + "enabled": true, + }, + }, + }, + "file_identity": map[string]interface{}{ + "fingerprint": nil, + }, + "type": "filestream", + }, + }, + }, + { + msg: "Hint with json config options must include them in the input config ndjson parser(filestream input)", + config: customFilestreamCfg, + event: bus.Event{ + "host": "1.2.3.4", + "kubernetes": mapstr.M{ + "container": mapstr.M{ + "name": "foobar", + "id": "abc", + }, + }, + "container": mapstr.M{ + "name": "foobar", + "id": "abc", + }, + "hints": mapstr.M{ + "logs": mapstr.M{ + "json": mapstr.M{ + "add_error_key": true, + "expand_keys": true, + }, + }, + }, + }, + len: 1, + result: []mapstr.M{ + { + "id": "kubernetes-container-logs-abc", + "paths": []interface{}{"/var/log/containers/*-abc.log"}, + "parsers": []interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "format": "auto", + "stream": "all", + }, + }, + map[string]interface{}{ + "ndjson": map[string]interface{}{ + "add_error_key": true, + "expand_keys": true, + }, + }, + }, + "prospector": map[string]interface{}{ + "scanner": map[string]interface{}{ + "symlinks": true, + "fingerprint": map[string]interface{}{ + "enabled": true, + }, + }, + }, + "file_identity": map[string]interface{}{ + "fingerprint": nil, + }, + "type": "filestream", + }, + }, + }, + { + msg: "Hint with json config options must include them in the input config(container input)", + config: customContainerCfg, + event: bus.Event{ + "host": "1.2.3.4", + "kubernetes": mapstr.M{ + "container": mapstr.M{ + "name": "foobar", + "id": "abc", + }, + }, + "container": mapstr.M{ + "name": "foobar", + "id": "abc", + }, + "hints": mapstr.M{ + "logs": mapstr.M{ + "json": mapstr.M{ + "add_error_key": true, + "expand_keys": true, + }, + }, + }, + }, + len: 1, + result: []mapstr.M{ + { + "type": "container", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, + "close_timeout": "true", + "json": map[string]interface{}{ + "add_error_key": true, + "expand_keys": true, + }, + "processors": []interface{}{ + map[string]interface{}{ + "add_tags": map[string]interface{}{ + "tags": []interface{}{"web"}, + "target": "environment", + }, + }, + }, + }, + }, + }, + { + msg: "Hint with inputs config as json must be accepted(docker input)", + config: customDockerCfg, event: bus.Event{ "host": "1.2.3.4", "kubernetes": mapstr.M{ @@ -359,8 +587,8 @@ func TestGenerateHints(t *testing.T) { }, }, { - msg: "Hint with processors config must have a processors in the input config", - config: customCfg, + msg: "Hint with processors config must have a processors in the input config(docker input)", + config: customDockerCfg, event: bus.Event{ "host": "1.2.3.4", "kubernetes": mapstr.M{ @@ -408,8 +636,8 @@ func TestGenerateHints(t *testing.T) { }, }, { - msg: "Processors in hints must be appended in the processors of the default config", - config: customProcessorCfg, + msg: "Processors in hints must be appended in the processors of the default config(container input)", + config: customContainerCfg, event: bus.Event{ "host": "1.2.3.4", "kubernetes": mapstr.M{ @@ -463,8 +691,8 @@ func TestGenerateHints(t *testing.T) { }, }, { - msg: "Hint with module should attach input to its filesets", - config: customCfg, + msg: "Hint with module should attach input to its filesets(docker input)", + config: customDockerCfg, event: bus.Event{ "host": "1.2.3.4", "kubernetes": mapstr.M{ @@ -513,8 +741,8 @@ func TestGenerateHints(t *testing.T) { }, }, { - msg: "Hint with module should honor defined filesets", - config: customCfg, + msg: "Hint with module should honor defined filesets(docker input)", + config: customDockerCfg, event: bus.Event{ "host": "1.2.3.4", "kubernetes": mapstr.M{ @@ -564,8 +792,8 @@ func TestGenerateHints(t *testing.T) { }, }, { - msg: "Hint with module should honor defined filesets with streams", - config: customCfg, + msg: "Hint with module should honor defined filesets with streams(docker input)", + config: customDockerCfg, event: bus.Event{ "host": "1.2.3.4", "kubernetes": mapstr.M{ @@ -616,7 +844,7 @@ func TestGenerateHints(t *testing.T) { }, }, { - msg: "Hint with module should attach input to its filesets", + msg: "Hint with module should attach input to its filesets(default input)", config: defaultCfg, event: bus.Event{ "host": "1.2.3.4", @@ -643,28 +871,62 @@ func TestGenerateHints(t *testing.T) { "error": map[string]interface{}{ "enabled": true, "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "id": "kubernetes-container-logs-abc", + "paths": []interface{}{"/var/log/containers/*-abc.log"}, + "parsers": []interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "format": "auto", + "stream": "all", + }, + }, }, + "prospector": map[string]interface{}{ + "scanner": map[string]interface{}{ + "symlinks": true, + "fingerprint": map[string]interface{}{ + "enabled": true, + }, + }, + }, + "file_identity": map[string]interface{}{ + "fingerprint": nil, + }, + "type": "filestream", }, }, "access": map[string]interface{}{ "enabled": true, "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "id": "kubernetes-container-logs-abc", + "paths": []interface{}{"/var/log/containers/*-abc.log"}, + "parsers": []interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "format": "auto", + "stream": "all", + }, + }, }, + "prospector": map[string]interface{}{ + "scanner": map[string]interface{}{ + "symlinks": true, + "fingerprint": map[string]interface{}{ + "enabled": true, + }, + }, + }, + "file_identity": map[string]interface{}{ + "fingerprint": nil, + }, + "type": "filestream", }, }, }, }, }, { - msg: "Hint with module should honor defined filesets", + msg: "Hint with module should honor defined filesets(default input)", config: defaultCfg, event: bus.Event{ "host": "1.2.3.4", @@ -692,28 +954,62 @@ func TestGenerateHints(t *testing.T) { "access": map[string]interface{}{ "enabled": true, "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "id": "kubernetes-container-logs-abc", + "paths": []interface{}{"/var/log/containers/*-abc.log"}, + "parsers": []interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "format": "auto", + "stream": "all", + }, + }, }, + "prospector": map[string]interface{}{ + "scanner": map[string]interface{}{ + "symlinks": true, + "fingerprint": map[string]interface{}{ + "enabled": true, + }, + }, + }, + "file_identity": map[string]interface{}{ + "fingerprint": nil, + }, + "type": "filestream", }, }, "error": map[string]interface{}{ "enabled": false, "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "id": "kubernetes-container-logs-abc", + "paths": []interface{}{"/var/log/containers/*-abc.log"}, + "parsers": []interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "format": "auto", + "stream": "all", + }, + }, }, + "prospector": map[string]interface{}{ + "scanner": map[string]interface{}{ + "symlinks": true, + "fingerprint": map[string]interface{}{ + "enabled": true, + }, + }, + }, + "file_identity": map[string]interface{}{ + "fingerprint": nil, + }, + "type": "filestream", }, }, }, }, }, { - msg: "Hint with module should honor defined filesets with streams", + msg: "Hint with module should honor defined filesets with streams(default input)", config: defaultCfg, event: bus.Event{ "host": "1.2.3.4", @@ -742,21 +1038,55 @@ func TestGenerateHints(t *testing.T) { "access": map[string]interface{}{ "enabled": true, "input": map[string]interface{}{ - "type": "container", - "stream": "stdout", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "id": "kubernetes-container-logs-abc", + "paths": []interface{}{"/var/log/containers/*-abc.log"}, + "parsers": []interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "format": "auto", + "stream": "stdout", + }, + }, + }, + "prospector": map[string]interface{}{ + "scanner": map[string]interface{}{ + "symlinks": true, + "fingerprint": map[string]interface{}{ + "enabled": true, + }, + }, }, + "file_identity": map[string]interface{}{ + "fingerprint": nil, + }, + "type": "filestream", }, }, "error": map[string]interface{}{ "enabled": true, "input": map[string]interface{}{ - "type": "container", - "stream": "stderr", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "id": "kubernetes-container-logs-abc", + "paths": []interface{}{"/var/log/containers/*-abc.log"}, + "parsers": []interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "format": "auto", + "stream": "stderr", + }, + }, + }, + "prospector": map[string]interface{}{ + "scanner": map[string]interface{}{ + "symlinks": true, + "fingerprint": map[string]interface{}{ + "enabled": true, + }, + }, + }, + "file_identity": map[string]interface{}{ + "fingerprint": nil, }, + "type": "filestream", }, }, }, diff --git a/filebeat/docs/autodiscover-hints.asciidoc b/filebeat/docs/autodiscover-hints.asciidoc index d4fa08aee3d..8a21edb8b2b 100644 --- a/filebeat/docs/autodiscover-hints.asciidoc +++ b/filebeat/docs/autodiscover-hints.asciidoc @@ -2,7 +2,7 @@ hints in Kubernetes Pod annotations or Docker labels that have the prefix `co.elastic.logs`. As soon as the container starts, {beatname_uc} will check if it contains any hints and launch the proper config for it. Hints tell {beatname_uc} how to get logs for the given container. By default logs will be retrieved -from the container using the `container` input. You can use hints to modify this behavior. This is the full +from the container using the `filestream` input. You can use hints to modify this behavior. This is the full list of supported hints: [float] @@ -23,7 +23,34 @@ Multiline settings. See <> for a full list of all supported [float] ===== `co.elastic.logs/json.*` -JSON settings. See <> for a full list of all supported options. +JSON settings. In case of `filestream` input(default) see <> for a full list of all supported options. +In case of `container` or `log` input see <> for a full list of all supported options. + +For example, the following hints with json options: +[source,yaml] +----- +co.elastic.logs/json.message_key: "log" +co.elastic.logs/json.add_error_key: "true" +----- +will lead to the following input configuration: + +* `filestream` +[source,yaml] +----- +parsers: + - ndjson: + message_key: "log" + add_error_key: "true" +----- + +* `log` +[source,yaml] +----- +json.message_key: "log" +json.add_error_key: "true" +----- + +NOTE: `keys_under_root` json option of `log` input is replaced with `target` option in filestream input. Read the documentation(<>) on how to use it correctly. [float] ===== `co.elastic.logs/include_lines` diff --git a/filebeat/docs/inputs/input-filestream-reader-options.asciidoc b/filebeat/docs/inputs/input-filestream-reader-options.asciidoc index 9a6e86c146d..59d9018cb9c 100644 --- a/filebeat/docs/inputs/input-filestream-reader-options.asciidoc +++ b/filebeat/docs/inputs/input-filestream-reader-options.asciidoc @@ -182,6 +182,7 @@ multiple lines. See <> for more information about configuring multiline options. [float] +[id="{beatname_lc}-input-{type}-ndjson"] ===== `ndjson` These options make it possible for {beatname_uc} to decode logs structured as diff --git a/filebeat/harvester/util.go b/filebeat/harvester/util.go index f39a718573a..acfa1910851 100644 --- a/filebeat/harvester/util.go +++ b/filebeat/harvester/util.go @@ -21,12 +21,13 @@ import "github.com/elastic/beats/v7/libbeat/common/match" // Contains available input types const ( - LogType = "log" - StdinType = "stdin" - RedisType = "redis" - UdpType = "udp" - DockerType = "docker" - ContainerType = "container" + LogType = "log" + StdinType = "stdin" + RedisType = "redis" + UdpType = "udp" + DockerType = "docker" + ContainerType = "container" + FilestreamType = "filestream" ) // MatchAny checks if the text matches any of the regular expressions