Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIBBEAT: Enhancement replace_string processor for replacing strings values of fields. #17342

Merged
merged 23 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0eaa435
Add processor for replacing strings values
premendrasingh Mar 30, 2020
121b558
Update CHANGELOG.next.asciidoc
premendrasingh Mar 30, 2020
0ccce99
Changed signature according to review comments.
premendrasingh Apr 1, 2020
52b4d6f
Merge remote-tracking branch 'beats_upstream/master' into processor_r…
premendrasingh Apr 1, 2020
e2ba62a
Changed pattern to *regexp.Regexp as suggested.
premendrasingh Apr 2, 2020
cf2608a
Merge remote-tracking branch 'beats_upstream/master' into processor_r…
premendrasingh Apr 2, 2020
f80ec20
Merge remote-tracking branch 'beats_upstream/master' into processor_r…
premendrasingh Apr 3, 2020
9ed18dd
Merge remote-tracking branch 'beats_upstream/master' into processor_r…
premendrasingh Apr 16, 2020
1256e55
Fix formatting errors. Add doc for replace processor
premendrasingh Apr 17, 2020
a396379
Fix documentation for replace processor
premendrasingh Apr 17, 2020
147c67c
Merge remote-tracking branch 'beats_upstream/master' into processor_r…
premendrasingh Apr 17, 2020
0c0c44f
Merge remote-tracking branch 'beats_upstream/master' into processor_r…
premendrasingh Apr 17, 2020
4d9f433
Merge remote-tracking branch 'beats_upstream/master' into processor_r…
premendrasingh Apr 17, 2020
277978a
Merge branch 'processor_replace_string' of github.com:premendrasingh/…
premendrasingh Apr 18, 2020
851f33a
Merge remote-tracking branch 'beats_upstream/master' into processor_r…
premendrasingh Apr 18, 2020
2bb9cbc
Merge remote-tracking branch 'beats_upstream/master' into processor_r…
premendrasingh Apr 20, 2020
dcd2244
Merge remote-tracking branch 'beats_upstream/master' into processor_r…
premendrasingh Apr 21, 2020
b4f7d2c
Merge remote-tracking branch 'beats_upstream/master' into processor_r…
premendrasingh Apr 21, 2020
2561c12
Changed to a simpler example. Thanks for the suggestion
premendrasingh Apr 21, 2020
6bc80cb
Fix sentence
premendrasingh Apr 21, 2020
889c0cf
Update imports to use github.com/elastic/beats/v7/libbeat
premendrasingh Apr 22, 2020
2acc815
Merge remote-tracking branch 'beats_upstream/master' into processor_r…
premendrasingh Apr 22, 2020
4a3427d
Merge remote-tracking branch 'beats_upstream/master' into processor_r…
premendrasingh Apr 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update RPM packages contained in Beat Docker images. {issue}17035[17035]
- Update supported versions of `redis` output. {pull}17198[17198]
- Update documentation for system.process.memory fields to include clarification on Windows os's. {pull}17268[17268]
- Add `replace` processor for replacing string values of fields. {pull}17342[17342]

*Auditbeat*

Expand Down
118 changes: 118 additions & 0 deletions libbeat/processors/actions/replace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"fmt"
"github.com/pkg/errors"
"regexp"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/libbeat/processors/script/javascript/module/processor"
)

type replaceString struct {
config replaceStringConfig
}

type replaceStringConfig struct {
Fields []replaceConfig `config:"fields"`
IgnoreMissing bool `config:"ignore_missing"`
FailOnError bool `config:"fail_on_error"`
}

type replaceConfig struct {
Field string `config:"field"`
Pattern string `config:"pattern"`
Copy link
Contributor

@vjsamuel vjsamuel Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso @exekias should https://github.com/elastic/beats/blob/master/libbeat/common/match/matchers.go be enhanced to do replacing strings and just use matcher.Match here?

using plain old regex could be slower as compared to the optimized one in libbeat.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for Pattern in the config use Patter *regexp.Regexp. Config unpacking will automatically try to compile the regex and fail with error + setting name if this has failed.

matcher.Match is an optimization for some custom cases only, but falls back to regexp if the case becomes more 'complicated'. Given the optimizations we have in matcher, I only see the case for a constant string match being helpful (which would become a sub-string search, or in some cases string-prefix/suffix comparison).

The matcher package also replaces capturing-group-matches with non-capturing-groups (greatly reduces allocations). Having patterns and replacement like gsub, do we want to allow users to use capturing group in the replacement in the future? E.g.

pattern: 'some (?P<important>[a-zA-Z]) string'
replace: 'found: {{important}}'

For now I would not enhance the matcher package. Only if we figure this is indeed a common problem. When doing so we might have to remove some of the optimizations. In case we find we really need to optimize another type (e.g. matcher.Replacer) might give us better flexibility in applying the kind of optimizations we need for the use-case, while not un-optimizing matcher.Matcher.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso Changed to pattern, as suggested.

Replacement string `config:"replacement"`
}

func init() {
processors.RegisterPlugin("replace",
checks.ConfigChecked(NewReplaceString,
checks.RequireFields("fields")))

jsprocessor.RegisterPlugin("Replace", NewReplaceString)
}

// NewReplaceString returns a new replace processor.
func NewReplaceString(c *common.Config) (processors.Processor, error) {
config := replaceStringConfig{
IgnoreMissing: false,
FailOnError: true,
}
err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("failed to unpack the replace configuration: %s", err)
}

f := &replaceString{
config: config,
}
return f, nil
}

func (f *replaceString) Run(event *beat.Event) (*beat.Event, error) {
var backup common.MapStr
// Creates a copy of the event to revert in case of failure
if f.config.FailOnError {
backup = event.Fields.Clone()
}

for _, field := range f.config.Fields {
err := f.replaceField(field.Field, field.Pattern, field.Replacement, event.Fields)
if err != nil {
errMsg := fmt.Errorf("Failed to replace fields in processor: %s", err)
logp.Debug("replace", errMsg.Error())
if f.config.FailOnError {
event.Fields = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
}
}

return event, nil
}

func (f *replaceString) replaceField(field string, pattern string, replacement string, fields common.MapStr) error {
currentValue, err := fields.GetValue(field)
if err != nil {
// Ignore ErrKeyNotFound errors
if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound {
return nil
}
return fmt.Errorf("could not fetch value for key: %s, Error: %s", field, err)
}

re := regexp.MustCompile(pattern)
updatedString := re.ReplaceAllString(currentValue.(string), replacement)
_, err = fields.Put(field, updatedString)
if err != nil {
return fmt.Errorf("could not put value: %s: %v, %v", replacement, currentValue, err)
}
return nil
}

func (f *replaceString) String() string {
return "replace=" + fmt.Sprintf("%+v", f.config.Fields)
}
247 changes: 247 additions & 0 deletions libbeat/processors/actions/replace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

func TestReplaceRun(t *testing.T) {
var tests = []struct {
description string
Fields []replaceConfig
IgnoreMissing bool
FailOnError bool
Input common.MapStr
Output common.MapStr
error bool
}{
{
description: "simple field replacing",
Fields: []replaceConfig{
{
Field: "f",
Pattern: "a",
Replacement: "b",
},
},
Input: common.MapStr{
"f": "abc",
},
Output: common.MapStr{
"f": "bbc",
},
error: false,
IgnoreMissing: false,
FailOnError: true,
},
{
description: "Add one more hierarchy to event",
Fields: []replaceConfig{
{
Field: "f.b",
Pattern: "a",
Replacement: "b",
},
},
Input: common.MapStr{
"f": common.MapStr{
"b": "abc",
},
},
Output: common.MapStr{
"f": common.MapStr{
"b": "bbc",
},
},
error: false,
IgnoreMissing: false,
FailOnError: true,
},
{
description: "replace two fields at the same time.",
Fields: []replaceConfig{
{
Field: "f",
Pattern: "a.*c",
Replacement: "cab",
},
{
Field: "g",
Pattern: "ef",
Replacement: "oor",
},
},
Input: common.MapStr{
"f": "abbbc",
"g": "def",
},
Output: common.MapStr{
"f": "cab",
"g": "door",
},
error: false,
IgnoreMissing: false,
FailOnError: true,
},
{
description: "test missing fields",
Fields: []replaceConfig{
{
Field: "f",
Pattern: "abc",
Replacement: "xyz",
},
{
Field: "g",
Pattern: "def",
Replacement: "",
},
},
Input: common.MapStr{
"m": "abc",
"n": "def",
},
Output: common.MapStr{
"m": "abc",
"n": "def",
"error": common.MapStr{
"message": "Failed to replace fields in processor: could not fetch value for key: f, Error: key not found",
},
},
error: true,
IgnoreMissing: false,
FailOnError: true,
},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
f := &replaceString{
config: replaceStringConfig{
Fields: test.Fields,
IgnoreMissing: test.IgnoreMissing,
FailOnError: test.FailOnError,
},
}
event := &beat.Event{
Fields: test.Input,
}

newEvent, err := f.Run(event)
if !test.error {
assert.Nil(t, err)
} else {
assert.NotNil(t, err)
}

assert.True(t, reflect.DeepEqual(newEvent.Fields, test.Output))
})
}
}

func TestReplaceField(t *testing.T) {
var tests = []struct {
Field string
Pattern string
Replacement string
ignoreMissing bool
failOnError bool
Input common.MapStr
Output common.MapStr
error bool
description string
}{
{
description: "replace part of field value with another string",
Field: "f",
Pattern: "a",
Replacement: "b",
Input: common.MapStr{
"f": "abc",
},
Output: common.MapStr{
"f": "bbc",
},
error: false,
failOnError: true,
ignoreMissing: false,
},
{
description: "Add hierarchy to event and replace",
Field: "f.b",
Pattern: "a",
Replacement: "b",
Input: common.MapStr{
"f": common.MapStr{
"b": "abc",
},
},
Output: common.MapStr{
"f": common.MapStr{
"b": "bbc",
},
},
error: false,
ignoreMissing: false,
failOnError: true,
},
{
description: "try replacing value of missing fields in event",
Field: "f",
Pattern: "abc",
Replacement: "xyz",
Input: common.MapStr{
"m": "abc",
"n": "def",
},
Output: common.MapStr{
"m": "abc",
"n": "def",
},
error: true,
ignoreMissing: false,
failOnError: true,
},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {

f := &replaceString{
config: replaceStringConfig{
IgnoreMissing: test.ignoreMissing,
FailOnError: test.failOnError,
},
}

err := f.replaceField(test.Field, test.Pattern, test.Replacement, test.Input)
if err != nil {
assert.Equal(t, test.error, true)
}

assert.True(t, reflect.DeepEqual(test.Input, test.Output))
})
}
}