Skip to content

Commit

Permalink
[Filebeat] Update compatibility function to remove processor descript…
Browse files Browse the repository at this point in the history
…ion on ES < 7.9.0 (elastic#27774)

* Update Filebeat compatibility function to remove processor description field on ES < 7.9.0

* Fix function description

Co-authored-by: Andrew Kroh <andrew.kroh@elastic.co>
  • Loading branch information
2 people authored and wiwen committed Nov 1, 2021
1 parent 6409040 commit 32c550a
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Auditd: Fix Top Exec Commands dashboard visualization. {pull}27638[27638]
- Store offset in `log.offset` field of events from the filestream input. {pull}27688[27688]
- Fix `httpjson` input rate limit processing and documentation. {pull}[]
- Update Filebeat compatibility function to remove processor description field on ES < 7.9.0 {pull}27774[27774]

*Heartbeat*

Expand Down
26 changes: 23 additions & 3 deletions filebeat/fileset/compatibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ var processorCompatibilityChecks = []processorCompatibility{
},
adaptConfig: deleteProcessor,
},
{
procType: "*",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("7.9.0"))
},
adaptConfig: removeDescription,
},
}

// Processor represents and Ingest Node processor definition.
Expand Down Expand Up @@ -273,17 +280,17 @@ nextProcessor:

// Run compatibility checks on the processor.
for _, proc := range processorCompatibilityChecks {
if processor.Name() != proc.procType {
if processor.Name() != proc.procType && proc.procType != "*" {
continue
}

if !proc.checkVersion(&esVersion) {
continue
}

processor, err = proc.adaptConfig(processor, log.With("processor_type", proc.procType, "processor_index", i))
processor, err = proc.adaptConfig(processor, log.With("processor_type", processor.Name(), "processor_index", i))
if err != nil {
return fmt.Errorf("failed to adapt %q processor at index %d: %w", proc.procType, i, err)
return fmt.Errorf("failed to adapt %q processor at index %d: %w", processor.Name(), i, err)
}
if processor.IsNil() {
continue nextProcessor
Expand Down Expand Up @@ -408,3 +415,16 @@ func replaceConvertIP(processor Processor, log *logp.Logger) (Processor, error)
log.Debug("processor output=", processor.String())
return processor, nil
}

// removeDescription removes the description config option so ES less than 7.9 will work.
func removeDescription(processor Processor, log *logp.Logger) (Processor, error) {
_, ok := processor.GetString("description")
if !ok {
return processor, nil
}

log.Debug("Removing unsupported 'description' from processor.")
processor.Delete("description")

return processor, nil
}
116 changes: 115 additions & 1 deletion filebeat/fileset/compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,6 @@ func TestReplaceConvertIPWithGrok(t *testing.T) {
"^%{IP:bar}$",
},
"ignore_missing": true,
"description": "foo bar",
"if": "condition",
"ignore_failure": false,
"tag": "myTag",
Expand Down Expand Up @@ -1341,3 +1340,118 @@ func TestReplaceAlternativeFlowProcessors(t *testing.T) {
})
}
}

func TestRemoveDescription(t *testing.T) {
cases := []struct {
name string
esVersion *common.Version
content map[string]interface{}
expected map[string]interface{}
isErrExpected bool
}{
{
name: "ES < 7.9.0",
esVersion: common.MustNewVersion("7.8.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"description": "This is a description",
},
},
map[string]interface{}{
"script": map[string]interface{}{
"source": "abcd",
"lang": "painless",
"description": "This is a description",
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
},
},
map[string]interface{}{
"script": map[string]interface{}{
"source": "abcd",
"lang": "painless",
},
},
},
},
isErrExpected: false,
},
{
name: "ES == 7.9.0",
esVersion: common.MustNewVersion("7.9.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"description": "This is a description",
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"description": "This is a description",
},
},
},
},
isErrExpected: false,
},
{
name: "ES > 7.9.0",
esVersion: common.MustNewVersion("8.0.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"description": "This is a description",
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"description": "This is a description",
},
},
},
},
isErrExpected: false,
},
}

for _, test := range cases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
if test.isErrExpected {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, test.expected, test.content, test.name)
}
})
}
}

0 comments on commit 32c550a

Please sign in to comment.