diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 6a73499060d..48e462c722b 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -28,6 +28,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff] *Filebeat* - Default config for ignore_older is now infinite instead of 24h, means ignore_older is disabled by default. Use close_older to only close file handlers. +- Scalar values in used in the `fields` configuration setting are no longer automatically converted to strings. {pull}1092[1092] *Winlogbeat* @@ -72,6 +73,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff] - Improve logstash and elasticsearch backoff behavior. {pull}927[927] - Add experimental Kafka output. {pull}942[942] - Add config file option to configure GOMAXPROCS. {pull}969[969] +- Added a `fields` and `fields_under_root` as options available under the `shipper` configuration {pull}1092[1092] *Packetbeat* - Change the DNS library used throughout the dns package to github.com/miekg/dns. {pull}803[803] @@ -84,10 +86,12 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff] - Add multiline support for combining multiple related lines into one event. {issue}461[461] - Add close_older configuration option to complete ignore_older https://github.com/elastic/filebeat/issues/181[181] - Add experimental option to enable filebeat publisher pipeline to operate asynchonrously {pull}782[782] +- Added the ability to set a list of tags for each prospector {pull}1092[1092] *Winlogbeat* - Add caching of event metadata handles and the system render context for the wineventlog API {pull}888[888] - Improve config validation by checking for unknown top-level YAML keys. {pull}1100[1100] +- Added the ability to set tags, fields, and fields_under_root as options for each event log {pull}1092[1092] ==== Deprecated diff --git a/filebeat/config/config.go b/filebeat/config/config.go index 74c327ce12f..2abaf972849 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -57,11 +57,11 @@ type ProspectorConfig struct { } type HarvesterConfig struct { + common.EventMetadata `config:",inline"` // Fields and tags to add to events. + BufferSize int `config:"harvester_buffer_size"` DocumentType string `config:"document_type"` Encoding string `config:"encoding"` - Fields common.MapStr - FieldsUnderRoot bool `config:"fields_under_root"` InputType string `config:"input_type"` TailFiles bool `config:"tail_files"` Backoff string `config:"backoff"` diff --git a/filebeat/docs/reference/configuration/filebeat-options.asciidoc b/filebeat/docs/reference/configuration/filebeat-options.asciidoc index 785c32166a7..e92190bc003 100644 --- a/filebeat/docs/reference/configuration/filebeat-options.asciidoc +++ b/filebeat/docs/reference/configuration/filebeat-options.asciidoc @@ -92,27 +92,51 @@ The following example configures Filebeat to ignore all the files that have a `g exclude_files: [".gz$"] ------------------------------------------------------------------------------------- +===== tags + +A list of tags that the Beat includes in the `tags` field of each published +event. Tags make it easy to select specific events in Kibana or apply +conditional filtering in Logstash. These tags will be appended to the list of +tags specified in the `shipper` configuration. + +Example: + +[source,yaml] +-------------------------------------------------------------------------------- +filebeat: + prospectors: + - paths: ["/var/log/app/*.json"] + tags: ["json"] +-------------------------------------------------------------------------------- + [[configuration-fields]] ===== fields -Optional fields that you can specify to add additional information to the output. For -example, you might add fields that you can use for filtering log data. Fields can be -scalar values, arrays, dictionaries, or any nested combination of these. All scalar values will be interpreted as strings. By default, -the fields that you specify here will be grouped under a `fields` sub-dictionary in the output document. To store the custom fields as top-level fields, set the `fields_under_root` option to true. +Optional fields that you can specify to add additional information to the +output. For example, you might add fields that you can use for filtering log +data. Fields can be scalar values, arrays, dictionaries, or any nested +combination of these. By default, the fields that you specify here will be +grouped under a `fields` sub-dictionary in the output document. To store the +custom fields as top-level fields, set the `fields_under_root` option to true. +If a duplicate field is declared in the `shipper` configuration, then its value +will be overwritten by the value declared here. [source,yaml] -------------------------------------------------------------------------------------- -fields: - level: debug - review: 1 +-------------------------------------------------------------------------------- +filebeat: + prospectors: + - paths: ["/var/log/app/*.log"] + fields: + app_id: query_engine_12 +-------------------------------------------------------------------------------- -------------------------------------------------------------------------------------- [[fields-under-root]] ===== fields_under_root -If this option is set to true, the custom <> are stored as top-level fields -in the output document instead of being grouped under a `fields` sub-dictionary. -If the custom field names conflict with other field names added by Filebeat, the custom fields overwrite the other fields. +If this option is set to true, the custom <> are stored as +top-level fields in the output document instead of being grouped under a +`fields` sub-dictionary. If the custom field names conflict with other field +names added by Filebeat, then the custom fields overwrite the other fields. [[ignore-older]] ===== ignore_older diff --git a/filebeat/filebeat.yml b/filebeat/filebeat.yml index 9127e16258b..6aac06a26c1 100644 --- a/filebeat/filebeat.yml +++ b/filebeat/filebeat.yml @@ -360,6 +360,17 @@ shipper: # logical properties. #tags: ["service-X", "web-tier"] + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # If this option is set to true, the custom fields are stored as top-level + # fields in the output document instead of being grouped under a fields + # sub-dictionary. Default is false. + #fields_under_root: false + # Uncomment the following if you want to ignore transactions created # by the server on which the shipper is installed. This option is useful # to remove duplicates if shippers are installed on multiple servers. diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index c8e4825739a..a097f178ca3 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -127,20 +127,18 @@ func (h *Harvester) Harvest() { } if h.shouldExportLine(text) { - // Sends text to spooler event := &input.FileEvent{ - ReadTime: ts, - Source: &h.Path, - InputType: h.Config.InputType, - DocumentType: h.Config.DocumentType, - Offset: h.Offset, - Bytes: bytesRead, - Text: &text, - Fields: h.Config.Fields, - Fileinfo: &info, + EventMetadata: h.Config.EventMetadata, + ReadTime: ts, + Source: &h.Path, + InputType: h.Config.InputType, + DocumentType: h.Config.DocumentType, + Offset: h.Offset, + Bytes: bytesRead, + Text: &text, + Fileinfo: &info, } - event.SetFieldsUnderRoot(h.Config.FieldsUnderRoot) h.SpoolerChan <- event // ship the new event downstream } diff --git a/filebeat/input/file.go b/filebeat/input/file.go index 5a80b9e51cb..49c9df9611e 100644 --- a/filebeat/input/file.go +++ b/filebeat/input/file.go @@ -17,6 +17,7 @@ type File struct { // FileEvent is sent to the output and must contain all relevant information type FileEvent struct { + common.EventMetadata ReadTime time.Time Source *string InputType string @@ -24,10 +25,7 @@ type FileEvent struct { Offset int64 Bytes int Text *string - Fields common.MapStr Fileinfo *os.FileInfo - - fieldsUnderRoot bool } type FileState struct { @@ -57,37 +55,16 @@ func (f *FileEvent) GetState() *FileState { return state } -// SetFieldsUnderRoot sets whether the fields should be added -// top level to the output documentation (fieldsUnderRoot = true) or -// under a fields dictionary. -func (f *FileEvent) SetFieldsUnderRoot(fieldsUnderRoot bool) { - f.fieldsUnderRoot = fieldsUnderRoot -} - func (f *FileEvent) ToMapStr() common.MapStr { event := common.MapStr{ - "@timestamp": common.Time(f.ReadTime), - "source": f.Source, - "offset": f.Offset, // Offset here is the offset before the starting char. - "message": f.Text, - "type": f.DocumentType, - "input_type": f.InputType, - "count": 1, - } - - if f.Fields != nil { - if f.fieldsUnderRoot { - for key, value := range f.Fields { - // in case of conflicts, overwrite - _, found := event[key] - if found { - logp.Warn("Overwriting %s key", key) - } - event[key] = value - } - } else { - event["fields"] = f.Fields - } + common.EventMetadataKey: f.EventMetadata, + "@timestamp": common.Time(f.ReadTime), + "source": f.Source, + "offset": f.Offset, // Offset here is the offset before the starting char. + "message": f.Text, + "type": f.DocumentType, + "input_type": f.InputType, + "count": 1, } return event diff --git a/filebeat/input/file_test.go b/filebeat/input/file_test.go index 929621a6f6d..072d922ee0e 100644 --- a/filebeat/input/file_test.go +++ b/filebeat/input/file_test.go @@ -6,7 +6,6 @@ import ( "path/filepath" "testing" - "github.com/elastic/beats/libbeat/common" "github.com/stretchr/testify/assert" ) @@ -112,23 +111,3 @@ func TestFileEventToMapStr(t *testing.T) { _, found := mapStr["fields"] assert.False(t, found) } - -func TestFieldsUnderRoot(t *testing.T) { - event := FileEvent{ - Fields: common.MapStr{ - "hello": "world", - }, - } - event.SetFieldsUnderRoot(true) - mapStr := event.ToMapStr() - _, found := mapStr["fields"] - assert.False(t, found) - assert.Equal(t, "world", mapStr["hello"]) - - event.SetFieldsUnderRoot(false) - mapStr = event.ToMapStr() - _, found = mapStr["hello"] - assert.False(t, found) - _, found = mapStr["fields"] - assert.True(t, found) -} diff --git a/filebeat/tests/system/test_fields.py b/filebeat/tests/system/test_fields.py index a98205db604..f58bf422642 100644 --- a/filebeat/tests/system/test_fields.py +++ b/filebeat/tests/system/test_fields.py @@ -15,7 +15,7 @@ def test_custom_fields(self): """ self.render_config_template( path=os.path.abspath(self.working_dir) + "/test.log", - fields={"hello": "world"} + fields={"hello": "world", "number": 2} ) with open(self.working_dir + "/test.log", "w") as f: @@ -28,6 +28,7 @@ def test_custom_fields(self): output = self.read_output() doc = output[0] assert doc["fields.hello"] == "world" + assert doc["fields.number"] == 2 def test_custom_fields_under_root(self): """ diff --git a/libbeat/common/mapstr.go b/libbeat/common/mapstr.go index 81530b03990..e32a6473d81 100644 --- a/libbeat/common/mapstr.go +++ b/libbeat/common/mapstr.go @@ -2,13 +2,31 @@ package common import ( "encoding/json" + "errors" "fmt" "time" ) +const ( + EventMetadataKey = "_event_metadata" + FieldsKey = "fields" + TagsKey = "tags" +) + +var ErrorFieldsIsNotMapStr = errors.New("the value stored in fields is not a MapStr") +var ErrorTagsIsNotStringArray = errors.New("the value stored in tags is not a []string") + // Commonly used map of things, used in JSON creation and the like. type MapStr map[string]interface{} +// EventMetadata contains fields and tags that can be added to an event via +// configuration. +type EventMetadata struct { + Fields MapStr + FieldsUnderRoot bool `config:"fields_under_root"` + Tags []string +} + // Eventer defines a type its ability to fill a MapStr. type Eventer interface { // Add fields to MapStr. @@ -69,6 +87,7 @@ func (m MapStr) EnsureTimestampField(now func() time.Time) error { return fmt.Errorf("Don't know how to convert %v to a Time value", ts) } +// EnsureCountField sets the 'count' field to 1 if count does not already exist. func (m MapStr) EnsureCountField() error { _, exists := m["count"] if !exists { @@ -77,7 +96,7 @@ func (m MapStr) EnsureCountField() error { return nil } -// Prints the dict as a json +// String returns the MapStr as a JSON string. func (m MapStr) String() string { bytes, err := json.Marshal(m) if err != nil { @@ -86,51 +105,60 @@ func (m MapStr) String() string { return string(bytes) } -// UnmarshalYAML helps out with the YAML unmarshalling when the target -// variable is a MapStr. The default behavior is to unmarshal nested -// maps to map[interface{}]interface{} values, and such values can't -// be marshalled as JSON. +// MergeFields merges the top-level keys and values in each source hash (it does +// not perform a deep merge). If the same key exists in both, the value in +// fields takes precedence. If underRoot is true then the contents of the fields +// MapStr is merged with the value of the 'fields' key in ms. // -// The keys of map[interface{}]interface{} maps will be converted to -// strings with a %v format string, as will any scalar values that -// aren't already strings (i.e. numbers and boolean values). -// -// Since we want to modify the receiver it needs to be a pointer. -func (ms *MapStr) UnmarshalYAML(unmarshal func(interface{}) error) error { - var result map[interface{}]interface{} - err := unmarshal(&result) - if err != nil { - panic(err) +// An error is returned if underRoot is true and the value of ms.fields is not a +// MapStr. +func MergeFields(ms, fields MapStr, underRoot bool) error { + if ms == nil || fields == nil { + return nil + } + + fieldsMS := ms + if !underRoot { + f, ok := ms[FieldsKey] + if !ok { + fieldsMS = make(MapStr, len(fields)) + ms[FieldsKey] = fieldsMS + } else { + // Use existing 'fields' value. + fieldsMS, ok = f.(MapStr) + if !ok { + return ErrorFieldsIsNotMapStr + } + } } - *ms = cleanUpInterfaceMap(result) + + // Add fields and override. + for k, v := range fields { + fieldsMS[k] = v + } + return nil } -func cleanUpInterfaceArray(in []interface{}) []interface{} { - result := make([]interface{}, len(in)) - for i, v := range in { - result[i] = cleanUpMapValue(v) +// AddTag appends a tag to the tags field of ms. If the tags field does not +// exist then it will be created. If the tags field exists and is not a []string +// then an error will be returned. It does not deduplicate the list of tags. +func AddTags(ms MapStr, tags []string) error { + if ms == nil || len(tags) == 0 { + return nil } - return result -} -func cleanUpInterfaceMap(in map[interface{}]interface{}) MapStr { - result := make(MapStr) - for k, v := range in { - result[fmt.Sprintf("%v", k)] = cleanUpMapValue(v) + tagsIfc, ok := ms[TagsKey] + if !ok { + ms[TagsKey] = tags + return nil } - return result -} -func cleanUpMapValue(v interface{}) interface{} { - switch v := v.(type) { - case []interface{}: - return cleanUpInterfaceArray(v) - case map[interface{}]interface{}: - return cleanUpInterfaceMap(v) - case string: - return v - default: - return fmt.Sprintf("%v", v) + existingTags, ok := tagsIfc.([]string) + if !ok { + return ErrorTagsIsNotStringArray } + + ms[TagsKey] = append(existingTags, tags...) + return nil } diff --git a/libbeat/common/mapstr_test.go b/libbeat/common/mapstr_test.go index 9d9f95d24f4..011a33279fa 100644 --- a/libbeat/common/mapstr_test.go +++ b/libbeat/common/mapstr_test.go @@ -1,12 +1,10 @@ package common import ( - "strings" "testing" "time" "github.com/stretchr/testify/assert" - "gopkg.in/yaml.v2" ) func TestMapStrUpdate(t *testing.T) { @@ -188,83 +186,148 @@ func TestString(t *testing.T) { } } -func TestUnmarshalYAML(t *testing.T) { +func TestMergeFields(t *testing.T) { type io struct { - InputLines []string - Output MapStr + UnderRoot bool + Event MapStr + Fields MapStr + Output MapStr + Err error } tests := []io{ - // should return nil for empty document + // underRoot = true, merges { - InputLines: []string{}, - Output: nil, + UnderRoot: true, + Event: MapStr{ + "a": "1", + }, + Fields: MapStr{ + "b": 2, + }, + Output: MapStr{ + "a": "1", + "b": 2, + }, }, - // should handle scalar values + + // underRoot = true, overwrites existing { - InputLines: []string{ - "a: b", - "c: true", - "123: 456", + UnderRoot: true, + Event: MapStr{ + "a": "1", + }, + Fields: MapStr{ + "a": 2, }, Output: MapStr{ - "a": "b", - "c": "true", - "123": "456", + "a": 2, }, }, - // should handle array with scalar values + + // underRoot = false, adds new 'fields' when it doesn't exist { - InputLines: []string{ - "a:", - " - b", - " - true", - " - 123", + UnderRoot: false, + Event: MapStr{ + "a": "1", + }, + Fields: MapStr{ + "a": 2, }, Output: MapStr{ - "a": []interface{}{"b", "true", "123"}, + "a": "1", + "fields": MapStr{ + "a": 2, + }, }, }, - // should handle array with nested map + + // underRoot = false, merge with existing 'fields' and overwrites existing keys { - InputLines: []string{ - "a:", - " - b: c", - " d: true", - " 123: 456", + UnderRoot: false, + Event: MapStr{ + "fields": MapStr{ + "a": "1", + "b": 2, + }, + }, + Fields: MapStr{ + "a": 3, + "c": 4, }, Output: MapStr{ - "a": []interface{}{ - MapStr{ - "b": "c", - "d": "true", - "123": "456", - }, + "fields": MapStr{ + "a": 3, + "b": 2, + "c": 4, }, }, }, - // should handle nested map + + // underRoot = false, error when 'fields' is wrong type { - InputLines: []string{ - "a: ", - " b: c", - " d: true", - " 123: 456", + UnderRoot: false, + Event: MapStr{ + "fields": "not a MapStr", + }, + Fields: MapStr{ + "a": 3, }, Output: MapStr{ - "a": MapStr{ - "b": "c", - "d": "true", - "123": "456", - }, + "fields": "not a MapStr", + }, + Err: ErrorFieldsIsNotMapStr, + }, + } + + for _, test := range tests { + err := MergeFields(test.Event, test.Fields, test.UnderRoot) + assert.Equal(t, test.Err, err) + assert.Equal(t, test.Output, test.Event) + } +} + +func TestAddTag(t *testing.T) { + type io struct { + Event MapStr + Tags []string + Output MapStr + Err error + } + tests := []io{ + // No existing tags, creates new tag array + { + Event: MapStr{}, + Tags: []string{"json"}, + Output: MapStr{ + "tags": []string{"json"}, + }, + }, + // Existing tags, appends + { + Event: MapStr{ + "tags": []string{"json"}, + }, + Tags: []string{"docker"}, + Output: MapStr{ + "tags": []string{"json", "docker"}, + }, + }, + // Existing tags is not a []string + { + Event: MapStr{ + "tags": "not a slice", }, + Tags: []string{"docker"}, + Output: MapStr{ + "tags": "not a slice", + }, + Err: ErrorTagsIsNotStringArray, }, } + for _, test := range tests { - var actual MapStr - if err := yaml.Unmarshal([]byte(strings.Join(test.InputLines, "\n")), &actual); err != nil { - assert.Fail(t, "YAML unmarshaling unexpectedly failed: %s", err) - continue - } - assert.Equal(t, test.Output, actual) + err := AddTags(test.Event, test.Tags) + assert.Equal(t, test.Err, err) + assert.Equal(t, test.Output, test.Event) } } diff --git a/libbeat/docs/shipperconfig.asciidoc b/libbeat/docs/shipperconfig.asciidoc index d6881e56bbe..8a1cdffd2f5 100644 --- a/libbeat/docs/shipperconfig.asciidoc +++ b/libbeat/docs/shipperconfig.asciidoc @@ -89,16 +89,51 @@ shipper: A list of tags that the Beat includes in the `tags` field of each published transaction. Tags make it easy to group servers by different logical properties. -For example, if you have a cluster of web servers, you can add the "webservers" tag -to the Beat on each server, and then use filters and queries in the -Kibana web interface to get visualisations for the whole group of servers. +For example, if you have a cluster of web servers, you can add the "webservers" +tag to the Beat on each server, and then use filters and queries in the Kibana +web interface to get visualisations for the whole group of servers. Example: [source,yaml] ------------------------------------------------------------------------------- +-------------------------------------------------------------------------------- shipper: tags: ["my-service", "hardware", "test"] +-------------------------------------------------------------------------------- + +[[configuration-fields]] +===== fields + +Optional fields that you can specify to add additional information to the +output. Fields can be scalar values, arrays, dictionaries, or any nested +combination of these. By default, the fields that you specify here will be +grouped under a `fields` sub-dictionary in the output document. To store the +custom fields as top-level fields, set the `fields_under_root` option to true. + +Example: + +[source,yaml] +------------------------------------------------------------------------------ +shipper: + fields: {project: "myproject", instance-id: "574734885120952459"} +------------------------------------------------------------------------------ + +===== fields_under_root + +If this option is set to true, the custom <> are stored as +top-level fields in the output document instead of being grouped under a +`fields` sub-dictionary. If the custom field names conflict with other field +names, then the custom fields overwrite the other fields. + +Example: + +[source,yaml] +------------------------------------------------------------------------------ +shipper: + fields_under_root: true + fields: + instance_id: i-10a64379 + region: us-east-1 ------------------------------------------------------------------------------ ===== ignore_outgoing diff --git a/libbeat/etc/libbeat.yml b/libbeat/etc/libbeat.yml index 9ff9c4427cb..6fa33e9a4db 100644 --- a/libbeat/etc/libbeat.yml +++ b/libbeat/etc/libbeat.yml @@ -188,6 +188,17 @@ shipper: # logical properties. #tags: ["service-X", "web-tier"] + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # If this option is set to true, the custom fields are stored as top-level + # fields in the output document instead of being grouped under a fields + # sub-dictionary. Default is false. + #fields_under_root: false + # Uncomment the following if you want to ignore transactions created # by the server on which the shipper is installed. This option is useful # to remove duplicates if shippers are installed on multiple servers. diff --git a/libbeat/publisher/client.go b/libbeat/publisher/client.go index bf02e20b0aa..21e08784c8f 100644 --- a/libbeat/publisher/client.go +++ b/libbeat/publisher/client.go @@ -14,6 +14,30 @@ var ( ) // Client is used by beats to publish new events. +// +// The publish methods add fields that are common to all events. Both methods +// add the 'beat' field that contains name and hostname. Also they add 'tags' +// and 'fields'. +// +// Event publishers can override the default index for an event by adding a +// 'beat' field whose value is a common.MapStr that contains an 'index' field +// specifying the destination index. +// +// event := common.MapStr{ +// // Setting a custom index for a single event. +// "beat": common.MapStr{"index": "custom-index"}, +// } +// +// Event publishers can add fields and tags to an event. The fields will take +// precedence over the global fields defined in the shipper configuration. +// +// event := common.MapStr{ +// // Add custom fields to the root of the event. +// common.EventMetadataKey: common.EventMetadata{ +// UnderRoot: true, +// Fields: common.MapStr{"env": "production"} +// } +// } type Client interface { // PublishEvent publishes one event with given options. If Sync option is set, // PublishEvent will block until output plugins report success or failure state @@ -41,10 +65,9 @@ type PublishMessage struct { } type client struct { - publisher *PublisherType - - beatMeta common.MapStr - tags []string + publisher *PublisherType + beatMeta common.MapStr // Beat metadata that is added to all events. + globalEventMetadata common.EventMetadata // Fields and tags that are added to all events. } // ClientOption allows API users to set additional options when publishing events. @@ -82,7 +105,7 @@ func newClient(pub *PublisherType) *client { "name": pub.name, "hostname": pub.hostname, }, - tags: pub.tags, + globalEventMetadata: pub.globalEventMetadata, } } @@ -104,19 +127,34 @@ func (c *client) PublishEvents(events []common.MapStr, opts ...ClientOption) boo return client.PublishEvents(ctx, events) } +// annotateEvent adds fields that are common to all events. This adds the 'beat' +// field that contains name and hostname. It also adds 'tags' and 'fields'. See +// the documentation for Client for more information. func (c *client) annotateEvent(event common.MapStr) { - - // Check if index was set dynamically - if _, ok := event["beat"]; ok { - beatTemp := event["beat"].(common.MapStr) - if _, ok := beatTemp["index"]; ok { - c.beatMeta["index"] = beatTemp["index"] + // Allow an event to override the destination index for an event by setting + // beat.index in an event. + beatMeta := c.beatMeta + if beatIfc, ok := event["beat"]; ok { + ms, ok := beatIfc.(common.MapStr) + if ok { + // Copy beatMeta so the defaults are not changed. + beatMeta = common.MapStrUnion(beatMeta, ms) } } - - event["beat"] = c.beatMeta - if len(c.tags) > 0 { - event["tags"] = c.tags + event["beat"] = beatMeta + + // Add the global tags and fields defined under shipper. + common.AddTags(event, c.globalEventMetadata.Tags) + common.MergeFields(event, c.globalEventMetadata.Fields, c.globalEventMetadata.FieldsUnderRoot) + + // Add the event specific fields last so that they precedence over globals. + if metaIfc, ok := event[common.EventMetadataKey]; ok { + eventMetadata, ok := metaIfc.(common.EventMetadata) + if ok { + common.AddTags(event, eventMetadata.Tags) + common.MergeFields(event, eventMetadata.Fields, eventMetadata.FieldsUnderRoot) + } + delete(event, common.EventMetadataKey) } if logp.IsDebug("publish") { diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index c3c2cbb4401..92d80d17e56 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -53,7 +53,6 @@ type PublisherType struct { hostname string // Host name as returned by the operation system name string // The shipperName if configured, the hostname otherwise IpAddrs []string - tags []string disabled bool Index string Output []*outputWorker @@ -61,6 +60,8 @@ type PublisherType struct { IgnoreOutgoing bool GeoLite *libgeo.GeoIP + globalEventMetadata common.EventMetadata // Fields and tags to add to each event. + RefreshTopologyTimer <-chan time.Time // wsOutput and wsPublisher should be used for proper shutdown of publisher @@ -78,11 +79,11 @@ type PublisherType struct { } type ShipperConfig struct { + common.EventMetadata `config:",inline"` // Fields and tags to add to each event. Name string Refresh_topology_freq int Ignore_outgoing bool Topology_expire int - Tags []string Geoip common.Geoip // internal publisher queue sizes @@ -291,7 +292,7 @@ func (publisher *PublisherType) init( } logp.Info("Publisher name: %s", publisher.name) - publisher.tags = shipper.Tags + publisher.globalEventMetadata = shipper.EventMetadata //Store the publisher's IP addresses publisher.IpAddrs, err = common.LocalIpAddrsAsStrings(false) diff --git a/metricbeat/metricbeat.yml b/metricbeat/metricbeat.yml index 483eca8dc52..218f238486f 100644 --- a/metricbeat/metricbeat.yml +++ b/metricbeat/metricbeat.yml @@ -209,6 +209,17 @@ shipper: # logical properties. #tags: ["service-X", "web-tier"] + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # If this option is set to true, the custom fields are stored as top-level + # fields in the output document instead of being grouped under a fields + # sub-dictionary. Default is false. + #fields_under_root: false + # Uncomment the following if you want to ignore transactions created # by the server on which the shipper is installed. This option is useful # to remove duplicates if shippers are installed on multiple servers. diff --git a/packetbeat/packetbeat.yml b/packetbeat/packetbeat.yml index 405f5285c23..5ada30a98c3 100644 --- a/packetbeat/packetbeat.yml +++ b/packetbeat/packetbeat.yml @@ -29,6 +29,28 @@ protocols: # Enable ICMPv4 and ICMPv6 monitoring. Default: false enabled: true + amqp: + # Configure the ports where to listen for AMQP traffic. You can disable + # the AMQP protocol by commenting out the list of ports. + ports: [5672] + # Truncate messages that are published and avoid huge messages being + # indexed. + # Default: 1000 + #max_body_length: 1000 + + # Hide the header fields in header frames. + # Default: false + #parse_headers: false + + # Hide the additional arguments of method frames. + # Default: false + #parse_arguments: false + + # Hide all methods relative to connection negociation between server and + # client. + # Default: true + #hide_connection_information: true + dns: # Configure the ports where to listen for DNS traffic. You can disable # the DNS protocol by commenting out the list of ports. @@ -52,24 +74,6 @@ protocols: # send_request: true # send_response: true - amqp: - # Configure the ports where to listen for AMQP traffic. You can disable - # the AMQP protocol by commenting out the list of ports. - ports: [5672] - # Truncate messages that are published and avoid huge messages being - # indexed. - # Default: 1000 - #max_body_length: 1000 - # Hide the header fields in header frames. - # Default: false - #parse_headers: false - # Hide the additional arguments of method frames. - # Default: false - #parse_arguments: false - # See all methods relative to connection negociation between server and - # client. - #hide_connection_information: false - http: # Configure the ports where to listen for HTTP traffic. You can disable # the HTTP protocol by commenting out the list of ports. @@ -166,7 +170,6 @@ protocols: # # - process: app # cmdline_grep: gunicorn - ############################################################################### ############################# Libbeat Config ################################## # Base config file used by all other beats for using libbeat features @@ -357,6 +360,17 @@ shipper: # logical properties. #tags: ["service-X", "web-tier"] + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # If this option is set to true, the custom fields are stored as top-level + # fields in the output document instead of being grouped under a fields + # sub-dictionary. Default is false. + #fields_under_root: false + # Uncomment the following if you want to ignore transactions created # by the server on which the shipper is installed. This option is useful # to remove duplicates if shippers are installed on multiple servers. @@ -424,3 +438,5 @@ logging: # Sets log level. The default log level is error. # Available log levels are: critical, error, warning, info, debug #level: error + + diff --git a/topbeat/topbeat.yml b/topbeat/topbeat.yml index 0995cd19666..d8ec37aa45f 100644 --- a/topbeat/topbeat.yml +++ b/topbeat/topbeat.yml @@ -214,6 +214,17 @@ shipper: # logical properties. #tags: ["service-X", "web-tier"] + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # If this option is set to true, the custom fields are stored as top-level + # fields in the output document instead of being grouped under a fields + # sub-dictionary. Default is false. + #fields_under_root: false + # Uncomment the following if you want to ignore transactions created # by the server on which the shipper is installed. This option is useful # to remove duplicates if shippers are installed on multiple servers. diff --git a/winlogbeat/beater/winlogbeat.go b/winlogbeat/beater/winlogbeat.go index 7ed70ff86b9..a5bd509420b 100644 --- a/winlogbeat/beater/winlogbeat.go +++ b/winlogbeat/beater/winlogbeat.go @@ -140,8 +140,9 @@ func (eb *Winlogbeat) Run(b *beat.Beat) error { debugf("Initializing EventLog[%s]", eventLogConfig.Name) eventLog, err := eventlog.New(eventlog.Config{ - Name: eventLogConfig.Name, - API: eventLogConfig.API, + Name: eventLogConfig.Name, + API: eventLogConfig.API, + EventMetadata: eventLogConfig.EventMetadata, }) if err != nil { return fmt.Errorf("Failed to create new event log for %s. %v", diff --git a/winlogbeat/config/config.go b/winlogbeat/config/config.go index f2642506f1c..6259cf48750 100644 --- a/winlogbeat/config/config.go +++ b/winlogbeat/config/config.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/elastic/beats/libbeat/common" "github.com/joeshaw/multierror" ) @@ -132,9 +133,10 @@ func (mc MetricsConfig) Validate() error { // EventLogConfig holds the configuration data that specifies which event logs // to monitor. type EventLogConfig struct { - Name string - IgnoreOlder string `config:"ignore_older"` - API string + common.EventMetadata `config:",inline"` + Name string + IgnoreOlder string `config:"ignore_older"` + API string } // Validate validates the EventLogConfig data and returns an error describing diff --git a/winlogbeat/docs/reference/configuration/winlogbeat-options.asciidoc b/winlogbeat/docs/reference/configuration/winlogbeat-options.asciidoc index 1e7ad7b9fac..605a58dcf6f 100644 --- a/winlogbeat/docs/reference/configuration/winlogbeat-options.asciidoc +++ b/winlogbeat/docs/reference/configuration/winlogbeat-options.asciidoc @@ -124,6 +124,52 @@ winlogbeat: ignore_older: 168h -------------------------------------------------------------------------------- +===== event_logs.tags + +A list of tags that the Beat includes in the `tags` field of each published +event. Tags make it easy to select specific events in Kibana or apply +conditional filtering in Logstash. These tags will be appended to the list of +tags specified in the `shipper` configuration. + +Example: + +[source,yaml] +-------------------------------------------------------------------------------- +winlogbeat: + event_logs: + - name: CustomLog + tags: ["web"] +-------------------------------------------------------------------------------- + +[[configuration-fields]] +===== event_logs.fields + +Optional fields that you can specify to add additional information to the +output. For example, you might add fields that you can use for filtering event +data. Fields can be scalar values, arrays, dictionaries, or any nested +combination of these. By default, the fields that you specify here will be +grouped under a `fields` sub-dictionary in the output document. To store the +custom fields as top-level fields, set the `fields_under_root` option to true. +If a duplicate field is declared in the `shipper` configuration, then its value +will be overwritten by the value declared here. + +[source,yaml] +-------------------------------------------------------------------------------- +winlogbeat: + event_logs: + - name: CustomLog + fields: + customer_id: 51415432 +-------------------------------------------------------------------------------- + +[[fields-under-root]] +===== event_logs.fields_under_root + +If this option is set to true, the custom <> are stored as +top-level fields in the output document instead of being grouped under a +`fields` sub-dictionary. If the custom field names conflict with other field +names added by Winlogbeat, then the custom fields overwrite the other fields. + ===== metrics.bindaddress The hostname and port where the Beat will host an HTTP web service that provides diff --git a/winlogbeat/eventlog/eventlog.go b/winlogbeat/eventlog/eventlog.go index 30d9805e26c..ca9d80fd117 100644 --- a/winlogbeat/eventlog/eventlog.go +++ b/winlogbeat/eventlog/eventlog.go @@ -50,6 +50,8 @@ type Record struct { Message string // The message from the event log. MessageInserts []string // The raw message data logged by an application. MessageErr error // The error that occurred while reading and formatting the message from the event log. + + common.EventMetadata // Fields and tags to add to the event. } // String returns a string representation of Record. @@ -66,10 +68,11 @@ func (r Record) String() string { // ToMapStr returns a new MapStr containing the data from this Record. func (r Record) ToMapStr() common.MapStr { m := common.MapStr{ - "@timestamp": common.Time(r.TimeGenerated), - "log_name": r.EventLogName, - "source_name": r.SourceName, - "computer_name": r.ComputerName, + common.EventMetadataKey: r.EventMetadata, + "@timestamp": common.Time(r.TimeGenerated), + "log_name": r.EventLogName, + "source_name": r.SourceName, + "computer_name": r.ComputerName, // Use a string to represent this uint64 data because its value can // be outside the range represented by a Java long. "record_number": strconv.FormatUint(r.RecordNumber, 10), diff --git a/winlogbeat/eventlog/eventlogging.go b/winlogbeat/eventlog/eventlogging.go index 2bf34103658..8e2c4ce3925 100644 --- a/winlogbeat/eventlog/eventlogging.go +++ b/winlogbeat/eventlog/eventlogging.go @@ -6,6 +6,7 @@ import ( "fmt" "syscall" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" sys "github.com/elastic/beats/winlogbeat/sys/eventlogging" ) @@ -22,13 +23,14 @@ var _ EventLog = &eventLogging{} // eventLogging implements the EventLog interface for reading from the Event // Logging API. type eventLogging struct { - uncServerPath string // UNC name of remote server. - name string // Name of the log that is opened. - handle sys.Handle // Handle to the event log. - readBuf []byte // Buffer for reading in events. - formatBuf []byte // Buffer for formatting messages. - handles *messageFilesCache // Cached mapping of source name to event message file handles. - logPrefix string // Prefix to add to all log entries. + uncServerPath string // UNC name of remote server. + name string // Name of the log that is opened. + handle sys.Handle // Handle to the event log. + readBuf []byte // Buffer for reading in events. + formatBuf []byte // Buffer for formatting messages. + handles *messageFilesCache // Cached mapping of source name to event message file handles. + logPrefix string // Prefix to add to all log entries. + eventMetadata common.EventMetadata // Fields and tags to add to each event. recordNumber uint32 // First record number to read. seek bool // Read should use seek. @@ -132,6 +134,7 @@ func (l *eventLogging) Read() ([]Record, error) { Message: e.Message, MessageInserts: e.MessageInserts, MessageErr: e.MessageErr, + EventMetadata: l.eventMetadata, } if e.TimeGenerated != nil { @@ -211,9 +214,10 @@ func newEventLogging(c Config) (EventLog, error) { name: c.Name, handles: newMessageFilesCache(c.Name, sys.QueryEventMessageFiles, sys.FreeLibrary), - logPrefix: fmt.Sprintf("EventLogging[%s]", c.Name), - readBuf: make([]byte, 0, sys.MaxEventBufferSize), - formatBuf: make([]byte, sys.MaxFormatMessageBufferSize), + logPrefix: fmt.Sprintf("EventLogging[%s]", c.Name), + readBuf: make([]byte, 0, sys.MaxEventBufferSize), + formatBuf: make([]byte, sys.MaxFormatMessageBufferSize), + eventMetadata: c.EventMetadata, }, nil } diff --git a/winlogbeat/eventlog/factory.go b/winlogbeat/eventlog/factory.go index 3338a82438b..e60c9399a7c 100644 --- a/winlogbeat/eventlog/factory.go +++ b/winlogbeat/eventlog/factory.go @@ -4,12 +4,15 @@ import ( "fmt" "sort" "strings" + + "github.com/elastic/beats/libbeat/common" ) // Config is the configuration data used to instantiate a new EventLog. type Config struct { - Name string // Name of the event log or channel. - RemoteAddress string // Remote computer to connect to. Optional. + Name string // Name of the event log or channel. + RemoteAddress string // Remote computer to connect to. Optional. + common.EventMetadata // Fields and tags to add to each event. API string // Name of the API to use. Optional. } diff --git a/winlogbeat/eventlog/wineventlog.go b/winlogbeat/eventlog/wineventlog.go index 3cceab097c3..6b9021353b0 100644 --- a/winlogbeat/eventlog/wineventlog.go +++ b/winlogbeat/eventlog/wineventlog.go @@ -5,6 +5,7 @@ package eventlog import ( "fmt" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/winlogbeat/sys/eventlogging" sys "github.com/elastic/beats/winlogbeat/sys/wineventlog" @@ -38,7 +39,8 @@ type winEventLog struct { systemCtx sys.EvtHandle // System render context. cache *messageFilesCache // Cached mapping of source name to event message file handles. - logPrefix string // String to prefix on log messages. + logPrefix string // String to prefix on log messages. + eventMetadata common.EventMetadata // Field and tags to add to each event. } // Name returns the name of the event log (i.e. Application, Security, etc.). @@ -101,16 +103,17 @@ func (l *winEventLog) Read() ([]Record, error) { } r := Record{ - API: winEventLogAPIName, - EventLogName: e.Channel, - SourceName: e.ProviderName, - ComputerName: e.Computer, - RecordNumber: e.RecordID, - EventID: uint32(e.EventID), - Level: e.Level, - Category: e.Task, - Message: e.Message, - MessageErr: e.MessageErr, + API: winEventLogAPIName, + EventLogName: e.Channel, + SourceName: e.ProviderName, + ComputerName: e.Computer, + RecordNumber: e.RecordID, + EventID: uint32(e.EventID), + Level: e.Level, + Category: e.Task, + Message: e.Message, + MessageErr: e.MessageErr, + EventMetadata: l.eventMetadata, } if e.TimeCreated != nil { @@ -163,13 +166,14 @@ func newWinEventLog(c Config) (EventLog, error) { } return &winEventLog{ - channelName: c.Name, - remoteServer: c.RemoteAddress, - maxRead: defaultMaxNumRead, - renderBuf: make([]byte, renderBufferSize), - systemCtx: ctx, - cache: newMessageFilesCache(c.Name, eventMetadataHandle, freeHandle), - logPrefix: fmt.Sprintf("WinEventLog[%s]", c.Name), + channelName: c.Name, + remoteServer: c.RemoteAddress, + maxRead: defaultMaxNumRead, + renderBuf: make([]byte, renderBufferSize), + systemCtx: ctx, + cache: newMessageFilesCache(c.Name, eventMetadataHandle, freeHandle), + logPrefix: fmt.Sprintf("WinEventLog[%s]", c.Name), + eventMetadata: c.EventMetadata, }, nil } diff --git a/winlogbeat/tests/system/config/winlogbeat.yml.j2 b/winlogbeat/tests/system/config/winlogbeat.yml.j2 index ac5471cce8d..f3ad748c980 100644 --- a/winlogbeat/tests/system/config/winlogbeat.yml.j2 +++ b/winlogbeat/tests/system/config/winlogbeat.yml.j2 @@ -1,21 +1,34 @@ ############################################################################### ############################# Winlogbeat ###################################### winlogbeat: - {% if ignore_older %} + {%- if ignore_older %} ignore_older: {{ignore_older}} {% endif %} - {% if event_logs %} + {%- if event_logs %} event_logs: - {% for log in event_logs %} + {% for log in event_logs -%} - name: {{ log.name }} - {% if log.ignore_older %} + {%- if log.ignore_older %} ignore_older: {{ log.ignore_older }} {% endif %} - {% if log.api %} + {%- if log.api %} api: {{ log.api }} {% endif %} - {% endfor %} + {%- if log.tags %} + tags: + {% for tag in log.tags -%} + - {{ tag }} + {% endfor -%} + {% endif -%} + {%- if log.fields %} + {% if log.fields_under_root %}fields_under_root: true{% endif %} + fields: + {% for k, v in log.fields.items() -%} + {{ k }}: {{ v }} + {% endfor -%} + {% endif %} + {% endfor -%} {% endif %} ############################################################################### @@ -35,18 +48,23 @@ output: ############################# Shipper ######################################### shipper: - {% if shipper_name %} - name: {{ shipper_name }} - {% endif %} - - {% if tags %} - tags: [ - {%- if agent_tags -%} - {%- for tag in agent_tags -%} - "{{ tag }}" - {%- if not loop.last %}, {% endif -%} - {%- endfor -%} - {%- endif -%}] - {% endif %} + {%- if shipper_name %} + name: {{ shipper_name }} + {% endif %} + + {%- if tags %} + tags: + {% for tag in tags -%} + - {{ tag }} + {% endfor -%} + {% endif %} + + {%- if fields %} + {% if fields_under_root %}fields_under_root: true{% endif %} + fields: + {% for k, v in fields.items() -%} + {{ k }}: {{ v }} + {% endfor -%} + {% endif %} # vim: set ft=jinja: diff --git a/winlogbeat/tests/system/test_config.py b/winlogbeat/tests/system/test_config.py index f791e06b5c1..d581fcd59c8 100644 --- a/winlogbeat/tests/system/test_config.py +++ b/winlogbeat/tests/system/test_config.py @@ -9,8 +9,8 @@ class Test(BaseTest): def test_valid_config(self): """ - With -configtest and an error in the configuration, it should - return a non-zero error code. + With -configtest and valid config, it should return a non-zero error + code. """ self.render_config_template( ignore_older="1h", diff --git a/winlogbeat/tests/system/test_eventlog.py b/winlogbeat/tests/system/test_eventlog.py index 99d7e0f31bb..beb395c4d95 100644 --- a/winlogbeat/tests/system/test_eventlog.py +++ b/winlogbeat/tests/system/test_eventlog.py @@ -37,7 +37,7 @@ def clear_event_log(self): win32evtlog.ClearEventLog(hlog, None) win32evtlog.CloseEventLog(hlog) - def write_event_log(self, message, eventID, sid=None): + def write_event_log(self, message, eventID=10, sid=None): if sid == None: sid = self.get_sid() @@ -220,3 +220,94 @@ def read_unknown_sid(self, api): assert evt["message"] == msg return evt + + @unittest.skipUnless(sys.platform.startswith("win"), "requires Windows") + def test_eventlogging_fields_under_root(self): + """ + Event Logging - Fields Under Root + """ + self.fields_under_root("eventlogging") + + @unittest.skipUnless(sys.platform.startswith("win"), "requires Windows") + def test_wineventlog_fields_under_root(self): + """ + Win Event Log - Fields Under Root + """ + self.fields_under_root("wineventlog") + + def fields_under_root(self, api): + msg = "Add fields under root" + self.write_event_log(msg) + + # Run Winlogbeat + self.render_config_template( + tags = ["global"], + fields = {"global": "field", "env": "prod", "type": "overwrite"}, + fields_under_root = True, + event_logs = [ + {"name": self.providerName, + "api": api, + "tags": ["local"], + "fields_under_root": True, + "fields": {"local": "field", "env": "dev"}} + ] + ) + proc = self.start_beat() + self.wait_until(lambda: self.output_has(1)) + proc.check_kill_and_wait() + + # Verify output + events = self.read_output() + self.assertEqual(len(events), 1) + evt = events[0] + self.assertDictContainsSubset({ + "global": "field", + "env": "dev", + "type": "overwrite", + "local": "field", + "tags": ["global", "local"], + }, evt) + + @unittest.skipUnless(sys.platform.startswith("win"), "requires Windows") + def test_eventlogging_fields_not_under_root(self): + """ + Event Logging - Fields Not Under Root + """ + self.fields_not_under_root("eventlogging") + + @unittest.skipUnless(sys.platform.startswith("win"), "requires Windows") + def test_wineventlog_fields_not_under_root(self): + """ + Win Event Log - Fields Not Under Root + """ + self.fields_not_under_root("wineventlog") + + def fields_not_under_root(self, api): + msg = "Add fields" + self.write_event_log(msg) + + # Run Winlogbeat + self.render_config_template( + fields = {"global": "field", "env": "prod", "type": "overwrite"}, + event_logs = [ + {"name": self.providerName, + "api": api, + "fields": {"local": "field", "env": "dev", "num": 1}} + ] + ) + proc = self.start_beat() + self.wait_until(lambda: self.output_has(1)) + proc.check_kill_and_wait() + + # Verify output + events = self.read_output() + self.assertEqual(len(events), 1) + evt = events[0] + assert "tags" not in evt, "tags present in event" + self.assertDictContainsSubset({ + "fields.global": "field", + "fields.env": "dev", + "fields.type": "overwrite", + "fields.local": "field", + "fields.num": 1, + }, evt) diff --git a/winlogbeat/winlogbeat.yml b/winlogbeat/winlogbeat.yml index 91bf3c099fa..951d75741e5 100644 --- a/winlogbeat/winlogbeat.yml +++ b/winlogbeat/winlogbeat.yml @@ -213,6 +213,17 @@ shipper: # logical properties. #tags: ["service-X", "web-tier"] + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # If this option is set to true, the custom fields are stored as top-level + # fields in the output document instead of being grouped under a fields + # sub-dictionary. Default is false. + #fields_under_root: false + # Uncomment the following if you want to ignore transactions created # by the server on which the shipper is installed. This option is useful # to remove duplicates if shippers are installed on multiple servers.