Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add decompress_gzip_field processor #12733

Merged
merged 3 commits into from
Jul 1, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add configurable bulk_flush_frequency in kafka output. {pull}12254[12254]
- Add `decode_base64_field` processor for decoding base64 field. {pull}11914[11914]
- Add aws overview dashboard. {issue}11007[11007] {pull}12175[12175]
- Add `decompress_gzip_field` processor. {pull}12733[12733]

*Auditbeat*

Expand Down
7 changes: 7 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,13 @@ auditbeat.modules:
# target: ""
# overwrite_keys: false
#
#processors:
#- decompress_gzip_field:
# from: "field1"
# to: "field2"
# ignore_missing: false
# fail_on_error: true
#
# The following example copies the value of message to message_copied
#
#processors:
Expand Down
7 changes: 7 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,13 @@ filebeat.inputs:
# target: ""
# overwrite_keys: false
#
#processors:
#- decompress_gzip_field:
# from: "field1"
# to: "field2"
# ignore_missing: false
# fail_on_error: true
#
# The following example copies the value of message to message_copied
#
#processors:
Expand Down
7 changes: 7 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,13 @@ heartbeat.scheduler:
# target: ""
# overwrite_keys: false
#
#processors:
#- decompress_gzip_field:
# from: "field1"
# to: "field2"
# ignore_missing: false
# fail_on_error: true
#
# The following example copies the value of message to message_copied
#
#processors:
Expand Down
7 changes: 7 additions & 0 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ setup.template.settings:
# target: ""
# overwrite_keys: false
#
#processors:
#- decompress_gzip_field:
# from: "field1"
# to: "field2"
# ignore_missing: false
# fail_on_error: true
#
# The following example copies the value of message to message_copied
#
#processors:
Expand Down
7 changes: 7 additions & 0 deletions libbeat/_meta/config.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@
# target: ""
# overwrite_keys: false
#
#processors:
#- decompress_gzip_field:
# from: "field1"
# to: "field2"
# ignore_missing: false
# fail_on_error: true
#
# The following example copies the value of message to message_copied
#
#processors:
Expand Down
35 changes: 35 additions & 0 deletions libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ ifdef::has_decode_csv_fields_processor[]
endif::[]
* <<decode-json-fields,`decode_json_fields`>>
* <<decode-base64-field,`decode_base64_field`>>
* <<decompress-gzip-field,`decompress_gzip_field`>>
* <<dissect, `dissect`>>
* <<extract-array,`extract_array`>>
* <<processor-dns, `dns`>>
Expand Down Expand Up @@ -899,6 +900,40 @@ continues also if an error happened during decoding. Default is `true`.

See <<conditions>> for a list of supported conditions.

[[decompress-gzip-field]]
=== Decompress gzip fields

The `decompress_gzip_field` processor specifies a field to gzip decompress.
The `field` key contains a `from: old-key` and a `to: new-key` pair. `from` is
the origin and `to` the target name of the field.

To overwrite fields either first rename the target field or use the `drop_fields`
processor to drop the field and then rename the field.

[source,yaml]
-------
processors:
- decompress_gzip_field:
from: "field1"
to: "field2"
ignore_missing: false
fail_on_error: true
-------

In the example above:
- field1 is decoded in field2

The `decompress_gzip_field` processor has the following configuration settings:

`ignore_missing`:: (Optional) If set to true, no error is logged in case a key
which should be base64 decoded is missing. Default is `false`.

`fail_on_error`:: (Optional) If set to true, in case of an error the base6 4decode
of fields is stopped and the original event is returned. If set to false, decoding
continues also if an error happened during decoding. Default is `true`.

See <<conditions>> for a list of supported conditions.

[[community-id]]
=== Community ID Network Flow Hash

Expand Down
133 changes: 133 additions & 0 deletions libbeat/processors/actions/decompress_gzip_field.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"fmt"
"io"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"

"bytes"
kvch marked this conversation as resolved.
Show resolved Hide resolved
"compress/gzip"
)

type decompressGzipField struct {
config decompressGzipFieldConfig
log *logp.Logger
}

type decompressGzipFieldConfig struct {
Field fromTo `config:"field"`
IgnoreMissing bool `config:"ignore_missing"`
FailOnError bool `config:"fail_on_error"`
}

func init() {
processors.RegisterPlugin("decompress_gzip_field",
checks.ConfigChecked(NewDecompressGzipFields,
checks.RequireFields("field"),
checks.AllowedFields("field", "ignore_missing", "overwrite_keys", "overwrite_keys", "fail_on_error")))
}

// NewDecompressGzipFields construct a new decompress_gzip_fields processor.
func NewDecompressGzipFields(c *common.Config) (processors.Processor, error) {
config := decompressGzipFieldConfig{
IgnoreMissing: false,
FailOnError: true,
}

err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("failed to unpack the decompress_gzip_fields configuration: %+v", err)
}

return &decompressGzipField{config: config, log: logp.NewLogger("decompress_gzip_field")}, nil
}

// Run applies the decompress_gzip_fields processor to an event.
func (f *decompressGzipField) Run(event *beat.Event) (*beat.Event, error) {
var backup common.MapStr
if f.config.FailOnError {
backup = event.Fields.Clone()
}

err := f.decompressGzipField(event)
if err != nil {
errMsg := fmt.Errorf("Failed to decompress field in decompress_gzip_field processor: %v", err)
f.log.Debug(errMsg.Error())
if f.config.FailOnError {
event.Fields = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
}
return event, nil
}

func (f *decompressGzipField) decompressGzipField(event *beat.Event) error {
data, err := event.GetValue(f.config.Field.From)
if err != nil {
if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound {
return nil
}
return fmt.Errorf("could not fetch value for key: %s, Error: %v", f.config.Field.From, err)
}

var inBuf *bytes.Buffer
switch txt := data.(type) {
case []byte:
inBuf = bytes.NewBuffer(txt)
case string:
inBuf = bytes.NewBufferString(txt)
default:
return fmt.Errorf("cannot decompress type %+v", txt)
}

r, err := gzip.NewReader(inBuf)
if err != nil {
return errors.Wrapf(err, "error decompressing field %s", f.config.Field.From)
}

var outBuf bytes.Buffer
_, err = io.Copy(&outBuf, r)
if err != nil {
return fmt.Errorf("error while decompressing field: %v", err)
}

err = r.Close()
if err != nil {
kvch marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("error closing gzip reader: %v", err)
}

if _, err = event.PutValue(f.config.Field.To, outBuf.String()); err != nil {
return fmt.Errorf("could not put decompressed data: %v", err)
}
return nil
}

// String returns a string representation of this processor.
func (f decompressGzipField) String() string {
return fmt.Sprintf("decompress_gzip_fields=%+v", f.config.Field)
}
Loading