Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for include_fields and drop_fields #1120

Merged
merged 1 commit into from Mar 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Expand Up @@ -124,4 +124,18 @@ output:
rotate_every_kb: 1000
#number_of_files: 7

{% if filter_enabled %}

filter:

{%- if drop_fields %}
- drop_fields:
fields: {{drop_fields}}
{%- endif %}
{%- if include_fields is not none %}
- include_fields:
fields: {{include_fields}}
{%- endif %}
{% endif %}

# vim: set ft=jinja:
53 changes: 53 additions & 0 deletions filebeat/tests/system/test_filtering.py
@@ -0,0 +1,53 @@
from filebeat import BaseTest
import os

"""
Contains tests for filtering.
"""


class Test(BaseTest):
def test_dropfields(self):
"""
Check drop_fields filtering action
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
filter_enabled=True,
drop_fields=["beat"],
include_fields=None,
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("test message\n")

filebeat = self.start_beat()
self.wait_until(lambda: self.output_has(lines=1))
filebeat.check_kill_and_wait()

output = self.read_output(
required_fields=["@timestamp", "type"],
)[0]
assert "beat.name" not in output
assert "message" in output

def test_include_fields(self):
"""
Check drop_fields filtering action
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
filter_enabled=True,
include_fields=["source", "offset", "message"]
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("test message\n")

filebeat = self.start_beat()
self.wait_until(lambda: self.output_has(lines=1))
filebeat.check_kill_and_wait()

output = self.read_output(
required_fields=["@timestamp", "type"],
)[0]
assert "beat.name" not in output
assert "message" in output
17 changes: 13 additions & 4 deletions libbeat/beat/beat.go
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/urso/ucfg"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/filter"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/service"
Expand Down Expand Up @@ -84,6 +85,7 @@ type BeatConfig struct {
Output map[string]*ucfg.Config
Logging logp.Logging
Shipper publisher.ShipperConfig
Filter []filter.FilterConfig
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we don't introduce a new top level config here as we are in the process to get rid of it. This leads to problems when we want for example run multiple beats as one binary. We should only have one common name space which is beat.

Alternatively this could also be added to every single beat config to have it "local". Means it would be filebeat.filter or packetbeat.filer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also do this change in a separate PR once it's merged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we merge it like that we probably also going to keep it that way as also tests below and config changes for topbeat were already made for it. I suggest to currently add it under shipper which should be beat, same as we have the fields etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think shipper or beat (to be honest I think beat is too generic, maybe host?) should be the section that includes only the configuration options for that host like geoip configuration, max_queue, tags and shouldn't include fields and filter as these are not a configuration of the host and more of an enhancement of the output data.
I suggest to leave it like this for now and change it in a separate PR if we decide t o structure differently the configuration file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1165 will allow to place it under beat.filter as we do we fields and tags.

}

var printVersion *bool
Expand Down Expand Up @@ -149,7 +151,7 @@ func (b *Beat) Start() error {
// Additional command line args are used to overwrite config options
err, exit := b.CommandLineSetup()
if err != nil {
return err
return fmt.Errorf("fails to load command line setup: %v\n", err)
}

if exit {
Expand All @@ -159,13 +161,13 @@ func (b *Beat) Start() error {
// Loads base config
err = b.LoadConfig()
if err != nil {
return err
return fmt.Errorf("fails to load the config: %v\n", err)
}

// Configures beat
err = b.BT.Config(b)
if err != nil {
return err
return fmt.Errorf("fails to load the beat config: %v\n", err)
}
b.setState(ConfigState)

Expand Down Expand Up @@ -229,13 +231,20 @@ func (b *Beat) LoadConfig() error {

pub, err := publisher.New(b.Name, b.Config.Output, b.Config.Shipper)
if err != nil {
return fmt.Errorf("error Initialising publisher: %v\n", err)
return fmt.Errorf("error initializing publisher: %v\n", err)
}

filters, err := filter.New(b.Config.Filter)
if err != nil {
return fmt.Errorf("error initializing filters: %v\n", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could apply the same to the errors above in case they are not good enough, means we "enhance" the error message.

}

b.Publisher = pub
pub.RegisterFilter(filters)
b.Events = pub.Client()

logp.Info("Init Beat: %s; Version: %s", b.Name, b.Version)
logp.Info("Filter %v", filters)

return nil
}
Expand Down
86 changes: 86 additions & 0 deletions libbeat/common/event.go
@@ -0,0 +1,86 @@
package common

import (
"encoding/json"
"reflect"
"time"

"github.com/elastic/beats/libbeat/logp"
)

func MarshallUnmarshall(v interface{}) (MapStr, error) {
// decode and encode JSON
marshaled, err := json.Marshal(v)
if err != nil {
logp.Warn("marshal err: %v", err)
return nil, err
}
var v1 MapStr
err = json.Unmarshal(marshaled, &v1)
if err != nil {
logp.Warn("unmarshal err: %v")
return nil, err
}

return v1, nil
}

func ConvertToGenericEvent(v MapStr) MapStr {

for key, value := range v {

switch value.(type) {
case Time, *Time:
continue
case time.Location, *time.Location:
continue
case MapStr:
v[key] = ConvertToGenericEvent(value.(MapStr))
continue
case *MapStr:
v[key] = ConvertToGenericEvent(*value.(*MapStr))
continue
default:

typ := reflect.TypeOf(value)

if typ.Kind() == reflect.Ptr {
typ = typ.Elem()
}

switch typ.Kind() {
case reflect.Bool:
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
case reflect.Uintptr:
case reflect.Float32, reflect.Float64:
case reflect.Complex64, reflect.Complex128:
case reflect.String:
case reflect.UnsafePointer:
case reflect.Array, reflect.Slice:
//case reflect.Chan:
//case reflect.Func:
//case reflect.Interface:
case reflect.Map:
anothermap, err := MarshallUnmarshall(value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember we had some issue in the past with JSON marshalling / unmarshalling. Could this have quite an impact on performance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the plan is to call unmarshall/Marshall only when it is really needed. It shouldn't be called for official supported beats, but it might be needed for the community beats that are exporting the data into a structure instead of a Mapstr.

if err != nil {
logp.Warn("fail to marschall & unmarshall map %v", key)
continue
}
v[key] = anothermap

case reflect.Struct:
anothermap, err := MarshallUnmarshall(value)
if err != nil {
logp.Warn("fail to marschall & unmarshall struct %v", key)
continue
}
v[key] = anothermap
default:
logp.Warn("unknown type %v", typ)
continue
}
}
}
return v
}
123 changes: 123 additions & 0 deletions libbeat/common/event_test.go
@@ -0,0 +1,123 @@
package common

import (
"testing"

"github.com/elastic/beats/libbeat/logp"
"github.com/stretchr/testify/assert"
)

func TestConvertNestedMapStr(t *testing.T) {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"})

type io struct {
Input MapStr
Output MapStr
}

type String string

tests := []io{
io{
Input: MapStr{
"key": MapStr{
"key1": "value1",
},
},
Output: MapStr{
"key": MapStr{
"key1": "value1",
},
},
},
io{
Input: MapStr{
"key": MapStr{
"key1": String("value1"),
},
},
Output: MapStr{
"key": MapStr{
"key1": String("value1"),
},
},
},
io{
Input: MapStr{
"key": MapStr{
"key1": []string{"value1", "value2"},
},
},
Output: MapStr{
"key": MapStr{
"key1": []string{"value1", "value2"},
},
},
},
io{
Input: MapStr{
"key": MapStr{
"key1": []String{"value1", "value2"},
},
},
Output: MapStr{
"key": MapStr{
"key1": []String{"value1", "value2"},
},
},
},
io{
Input: MapStr{
"@timestamp": MustParseTime("2015-03-01T12:34:56.123Z"),
},
Output: MapStr{
"@timestamp": MustParseTime("2015-03-01T12:34:56.123Z"),
},
},
}

for _, test := range tests {
assert.EqualValues(t, test.Output, ConvertToGenericEvent(test.Input))
}

}

func TestConvertNestedStruct(t *testing.T) {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"})

type io struct {
Input MapStr
Output MapStr
}

type TestStruct struct {
A string
B int
}

tests := []io{
io{
Input: MapStr{
"key": MapStr{
"key1": TestStruct{
A: "hello",
B: 5,
},
},
},
Output: MapStr{
"key": MapStr{
"key1": MapStr{
"A": "hello",
"B": float64(5),
},
},
},
},
}

for _, test := range tests {
assert.EqualValues(t, test.Output, ConvertToGenericEvent(test.Input))
}

}