Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new parser called include_message to filter messages #32094

Merged
merged 17 commits into from Jun 27, 2022
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -118,6 +118,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add `auth.oauth2.google.jwt_json` option to `httpjson` input. {pull}31750[31750]
- Add authentication fields to RabbitMQ module documents. {issue}31159[31159] {pull}31680[31680]
- Add template helper function for decoding hexadecimal strings. {pull}31886[31886]
- Add new `parser` called `include_message` to filter based on message contents. {issue}31794[31794] {pull}32094[32094]

*Auditbeat*

Expand Down
14 changes: 14 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Expand Up @@ -267,11 +267,15 @@ filebeat.inputs:
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list. The include_lines is called before
# exclude_lines. By default, no lines are dropped.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#exclude_lines: ['^DBG']

# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list. The include_lines is called before
# exclude_lines. By default, all the lines are exported.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#include_lines: ['^ERR', '^WARN']

### Prospector options
Expand Down Expand Up @@ -327,6 +331,16 @@ filebeat.inputs:
# be used.
#add_error_key: false

#### Filtering messages

# You can filter messsages in the parsers pipeline. Use this method if you would like to
# include or exclude lines before they are aggregated into multiline or the JSON contents
# are parsed.

#parsers:
#- include_message.patterns:
- ["WARN", "ERR"]

#### Multiline options

# Multiline can be used for log messages spanning multiple lines. This is common
Expand Down
4 changes: 4 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.yml.tmpl
Expand Up @@ -22,10 +22,14 @@ filebeat.inputs:

# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#exclude_lines: ['^DBG']

# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#include_lines: ['^ERR', '^WARN']

# Exclude files. A list of regular expressions to match. Filebeat drops the files that
Expand Down
22 changes: 22 additions & 0 deletions filebeat/docs/inputs/input-filestream-reader-options.asciidoc
Expand Up @@ -318,3 +318,25 @@ The RFC 5424 format accepts the following forms of timestamps:
** `2003-10-11T22:14:15.123456-06:00`

Formats with an asterisk (*) are a non-standard allowance.

[float]
===== `include_message`

Use the `include_message` parser to filter messages in the parsers pipeline. Messages that
match the provided pattern are passed to the next parser, the others are dropped.

You should use `include_message` instead of `include_lines` if you would like to
control when the filtering happens. `include_lines` runs after the parsers, `include_message`
runs in the parsers pipeline.

*`patterns`*:: List of regexp patterns to match.

This example shows you how to include messages that start with the string ERR or WARN:

[source,yaml]
----
paths:
- "/var/log/containers/*.log"
parsers:
- include_message.patterns: ["^ERR", "^WARN"]
----
14 changes: 14 additions & 0 deletions filebeat/filebeat.reference.yml
Expand Up @@ -674,11 +674,15 @@ filebeat.inputs:
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list. The include_lines is called before
# exclude_lines. By default, no lines are dropped.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#exclude_lines: ['^DBG']

# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list. The include_lines is called before
# exclude_lines. By default, all the lines are exported.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#include_lines: ['^ERR', '^WARN']

### Prospector options
Expand Down Expand Up @@ -734,6 +738,16 @@ filebeat.inputs:
# be used.
#add_error_key: false

#### Filtering messages

# You can filter messsages in the parsers pipeline. Use this method if you would like to
# include or exclude lines before they are aggregated into multiline or the JSON contents
# are parsed.

#parsers:
#- include_message.patterns:
- ["WARN", "ERR"]

#### Multiline options

# Multiline can be used for log messages spanning multiple lines. This is common
Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.yml
Expand Up @@ -34,10 +34,14 @@ filebeat.inputs:

# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#exclude_lines: ['^DBG']

# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#include_lines: ['^ERR', '^WARN']

# Exclude files. A list of regular expressions to match. Filebeat drops the files that
Expand Down
84 changes: 84 additions & 0 deletions libbeat/reader/filter/filter.go
@@ -0,0 +1,84 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filter

import (
"context"
"io"

"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/match"
"github.com/elastic/go-concert/ctxtool"
)

type Config struct {
Patterns []match.Matcher `config:"patterns" validate:"required"`
}

func DefaultConfig() Config {
return Config{}
}

// FilterParser accepts a list of matchers to determine if a line
// should be kept or not. If one of the patterns matches the
// contents of the message, it is returned to the next reader.
// If not, the message is dropped.
type FilterParser struct {
ctx ctxtool.CancelContext
logger *logp.Logger
r reader.Reader
matchers []match.Matcher
}

func NewParser(r reader.Reader, c *Config) *FilterParser {
return &FilterParser{
ctx: ctxtool.WithCancelContext(context.Background()),
logger: logp.NewLogger("filter_parser"),
r: r,
matchers: c.Patterns,
}
}

func (p *FilterParser) Next() (reader.Message, error) {
for p.ctx.Err() == nil {
message, err := p.r.Next()
if err != nil {
return message, err
}
if p.matchAny(string(message.Content)) {
return message, err
}
p.logger.Debug("dropping message because it does not match any of the provided patterns [%v]: %s", p.matchers, string(message.Content))
}
return reader.Message{}, io.EOF
}

func (p *FilterParser) matchAny(text string) bool {
for _, m := range p.matchers {
if m.MatchString(text) {
return true
}
}
return false
}

func (p *FilterParser) Close() error {
p.ctx.Cancel()
return p.r.Close()
}
130 changes: 130 additions & 0 deletions libbeat/reader/filter/filter_test.go
@@ -0,0 +1,130 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filter

import (
"io"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/elastic-agent-libs/config"
)

func TestParser(t *testing.T) {
tests := map[string]struct {
config map[string]interface{}
input []reader.Message
expectedMessageContent [][]byte
}{
"keep all messages": {
config: map[string]interface{}{
"patterns": []string{"this matches*"},
},
input: []reader.Message{
{
Content: []byte("this matches"),
},
{
Content: []byte("this matches again"),
},
},
expectedMessageContent: [][]byte{
[]byte("this matches"),
[]byte("this matches again"),
},
},
"keep all messages with multiple patterns": {
config: map[string]interface{}{
"patterns": []string{"this matches*", "should match as well*"},
},
input: []reader.Message{
{
Content: []byte("this matches"),
},
{
Content: []byte("should match as well"),
},
},
expectedMessageContent: [][]byte{
[]byte("this matches"),
[]byte("should match as well"),
},
},
"keep one message": {
config: map[string]interface{}{
"patterns": []string{"this matches*"},
},
input: []reader.Message{
{
Content: []byte("this matches"),
},
{
Content: []byte("this does not match"),
},
},
expectedMessageContent: [][]byte{
[]byte("this matches"),
},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
var c Config
cfg := config.MustNewConfigFrom(test.config)
err := cfg.Unpack(&c)
require.NoError(t, err)
r := NewParser(newTestReader(test.input), &c)

contents := make([][]byte, 0)
msg, err := r.Next()
for err == nil {
contents = append(contents, msg.Content)
msg, err = r.Next()
}
require.ElementsMatch(t, test.expectedMessageContent, contents)
})

}
}

type testReader struct {
msg []reader.Message
idx int
}

func newTestReader(input []reader.Message) reader.Reader {
return &testReader{
msg: input,
idx: 0,
}
}

func (r *testReader) Next() (reader.Message, error) {
if r.idx == len(r.msg) {
return reader.Message{}, io.EOF
}

m := r.msg[r.idx]
r.idx += 1
return m, nil
}

func (r *testReader) Close() error { return nil }