From fea6dad543ad495a2519b650b8b0b06c77adfabc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 1 Jul 2019 12:20:16 +0200 Subject: [PATCH 1/3] add decompress_gzip_field processor --- auditbeat/auditbeat.reference.yml | 7 + filebeat/filebeat.reference.yml | 7 + heartbeat/heartbeat.reference.yml | 7 + journalbeat/journalbeat.reference.yml | 7 + libbeat/_meta/config.reference.yml.tmpl | 7 + libbeat/docs/processors-using.asciidoc | 35 ++++ .../actions/decompress_gzip_field.go | 133 ++++++++++++ .../actions/decompress_gzip_field_test.go | 191 ++++++++++++++++++ metricbeat/metricbeat.reference.yml | 7 + packetbeat/packetbeat.reference.yml | 7 + winlogbeat/winlogbeat.reference.yml | 7 + x-pack/auditbeat/auditbeat.reference.yml | 7 + x-pack/filebeat/filebeat.reference.yml | 7 + x-pack/functionbeat/_meta/beat.reference.yml | 9 + x-pack/functionbeat/_meta/beat.yml | 9 + .../functionbeat/functionbeat.reference.yml | 16 ++ x-pack/functionbeat/functionbeat.yml | 9 + x-pack/metricbeat/metricbeat.reference.yml | 7 + x-pack/winlogbeat/winlogbeat.reference.yml | 7 + 19 files changed, 486 insertions(+) create mode 100644 libbeat/processors/actions/decompress_gzip_field.go create mode 100644 libbeat/processors/actions/decompress_gzip_field_test.go diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 52cd25fc677..95c31e50b7e 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -330,6 +330,13 @@ auditbeat.modules: # target: "" # overwrite_keys: false # +#processors: +#- decompress_gzip_field: +# from: "field1" +# to: "field2" +# ignore_missing: false +# fail_on_error: true +# # The following example copies the value of message to message_copied # #processors: diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index aad973b1572..c7a4cb99f77 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1031,6 +1031,13 @@ filebeat.inputs: # target: "" # overwrite_keys: false # +#processors: +#- decompress_gzip_field: +# from: "field1" +# to: "field2" +# ignore_missing: false +# fail_on_error: true +# # The following example copies the value of message to message_copied # #processors: diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 3ae13d68bb0..008eb35636d 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -474,6 +474,13 @@ heartbeat.scheduler: # target: "" # overwrite_keys: false # +#processors: +#- decompress_gzip_field: +# from: "field1" +# to: "field2" +# ignore_missing: false +# fail_on_error: true +# # The following example copies the value of message to message_copied # #processors: diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index a149db1e68f..42989dfd619 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -270,6 +270,13 @@ setup.template.settings: # target: "" # overwrite_keys: false # +#processors: +#- decompress_gzip_field: +# from: "field1" +# to: "field2" +# ignore_missing: false +# fail_on_error: true +# # The following example copies the value of message to message_copied # #processors: diff --git a/libbeat/_meta/config.reference.yml.tmpl b/libbeat/_meta/config.reference.yml.tmpl index 85035b50a31..e48ff516b80 100644 --- a/libbeat/_meta/config.reference.yml.tmpl +++ b/libbeat/_meta/config.reference.yml.tmpl @@ -218,6 +218,13 @@ # target: "" # overwrite_keys: false # +#processors: +#- decompress_gzip_field: +# from: "field1" +# to: "field2" +# ignore_missing: false +# fail_on_error: true +# # The following example copies the value of message to message_copied # #processors: diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index c0df787eb38..b0d04b1b37e 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -215,6 +215,7 @@ ifdef::has_decode_csv_fields_processor[] endif::[] * <> * <> + * <> * <> * <> * <> @@ -899,6 +900,40 @@ continues also if an error happened during decoding. Default is `true`. See <> for a list of supported conditions. +[[decompress-gzip-field]] +=== Decompress gzip fields + +The `decompress_gzip_field` processor specifies a field to gzip decompress. +The `field` key contains a `from: old-key` and a `to: new-key` pair. `from` is +the origin and `to` the target name of the field. + +To overwrite fields either first rename the target field or use the `drop_fields` +processor to drop the field and then rename the field. + +[source,yaml] +------- +processors: +- decompress_gzip_field: + from: "field1" + to: "field2" + ignore_missing: false + fail_on_error: true +------- + +In the example above: + - field1 is decoded in field2 + +The `decompress_gzip_field` processor has the following configuration settings: + +`ignore_missing`:: (Optional) If set to true, no error is logged in case a key +which should be base64 decoded is missing. Default is `false`. + +`fail_on_error`:: (Optional) If set to true, in case of an error the base6 4decode +of fields is stopped and the original event is returned. If set to false, decoding +continues also if an error happened during decoding. Default is `true`. + +See <> for a list of supported conditions. + [[community-id]] === Community ID Network Flow Hash diff --git a/libbeat/processors/actions/decompress_gzip_field.go b/libbeat/processors/actions/decompress_gzip_field.go new file mode 100644 index 00000000000..03e168c199f --- /dev/null +++ b/libbeat/processors/actions/decompress_gzip_field.go @@ -0,0 +1,133 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package actions + +import ( + "fmt" + "io" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" + + "bytes" + "compress/gzip" +) + +type decompressGzipField struct { + config decompressGzipFieldConfig + log *logp.Logger +} + +type decompressGzipFieldConfig struct { + Field fromTo `config:"field"` + IgnoreMissing bool `config:"ignore_missing"` + FailOnError bool `config:"fail_on_error"` +} + +func init() { + processors.RegisterPlugin("decompress_gzip_field", + checks.ConfigChecked(NewDecompressGzipFields, + checks.RequireFields("field"), + checks.AllowedFields("field", "ignore_missing", "overwrite_keys", "overwrite_keys", "fail_on_error"))) +} + +// NewDecompressGzipFields construct a new decompress_gzip_fields processor. +func NewDecompressGzipFields(c *common.Config) (processors.Processor, error) { + config := decompressGzipFieldConfig{ + IgnoreMissing: false, + FailOnError: true, + } + + err := c.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("failed to unpack the decompress_gzip_fields configuration: %+v", err) + } + + return &decompressGzipField{config: config, log: logp.NewLogger("decompress_gzip_field")}, nil +} + +// Run applies the decompress_gzip_fields processor to an event. +func (f *decompressGzipField) Run(event *beat.Event) (*beat.Event, error) { + var backup common.MapStr + if f.config.FailOnError { + backup = event.Fields.Clone() + } + + err := f.decompressGzipField(event) + if err != nil { + errMsg := fmt.Errorf("Failed to decompress field in decompress_gzip_field processor: %v", err) + f.log.Debug(errMsg.Error()) + if f.config.FailOnError { + event.Fields = backup + event.PutValue("error.message", errMsg.Error()) + return event, err + } + } + return event, nil +} + +func (f *decompressGzipField) decompressGzipField(event *beat.Event) error { + data, err := event.GetValue(f.config.Field.From) + if err != nil { + if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { + return nil + } + return fmt.Errorf("could not fetch value for key: %s, Error: %v", f.config.Field.From, err) + } + + var inBuf *bytes.Buffer + switch txt := data.(type) { + case []byte: + inBuf = bytes.NewBuffer(txt) + case string: + inBuf = bytes.NewBufferString(txt) + default: + return fmt.Errorf("cannot decompress type %+v", txt) + } + + r, err := gzip.NewReader(inBuf) + if err != nil { + return errors.Wrapf(err, "error decompressing field %s", f.config.Field.From) + } + + var outBuf bytes.Buffer + _, err = io.Copy(&outBuf, r) + if err != nil { + return fmt.Errorf("error while decompressing field: %v", err) + } + + err = r.Close() + if err != nil { + return fmt.Errorf("error closing gzip reader: %v", err) + } + + if _, err = event.PutValue(f.config.Field.To, outBuf.String()); err != nil { + return fmt.Errorf("could not put decompressed data: %v", err) + } + return nil +} + +// String returns a string representation of this processor. +func (f decompressGzipField) String() string { + return fmt.Sprintf("decompress_gzip_fields=%+v", f.config.Field) +} diff --git a/libbeat/processors/actions/decompress_gzip_field_test.go b/libbeat/processors/actions/decompress_gzip_field_test.go new file mode 100644 index 00000000000..c7b8097997a --- /dev/null +++ b/libbeat/processors/actions/decompress_gzip_field_test.go @@ -0,0 +1,191 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package actions + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +func TestDecompressGzip(t *testing.T) { + var testCases = []struct { + description string + config decompressGzipFieldConfig + input common.MapStr + output common.MapStr + error bool + }{ + { + description: "bytes field gzip decompress", + config: decompressGzipFieldConfig{ + Field: fromTo{ + From: "field1", To: "field2", + }, + IgnoreMissing: false, + FailOnError: true, + }, + input: common.MapStr{ + "field1": []byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}, + }, + output: common.MapStr{ + "field1": []byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}, + "field2": "decompressed data", + }, + error: false, + }, + { + description: "string field gzip decompress", + config: decompressGzipFieldConfig{ + Field: fromTo{ + From: "field1", To: "field2", + }, + IgnoreMissing: false, + FailOnError: true, + }, + input: common.MapStr{ + "field1": string([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}), + }, + output: common.MapStr{ + "field1": string([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}), + "field2": "decompressed data", + }, + error: false, + }, + { + description: "simple field gzip decompress in place", + config: decompressGzipFieldConfig{ + Field: fromTo{ + From: "field1", To: "field1", + }, + IgnoreMissing: false, + FailOnError: true, + }, + input: common.MapStr{ + "field1": []byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}, + }, + output: common.MapStr{ + "field1": "decompressed data", + }, + error: false, + }, + { + description: "invalid data - fail on error", + config: decompressGzipFieldConfig{ + Field: fromTo{ + From: "field1", To: "field1", + }, + IgnoreMissing: false, + FailOnError: true, + }, + input: common.MapStr{ + "field1": "invalid gzipped data", + }, + output: common.MapStr{ + "field1": "invalid gzipped data", + "error": common.MapStr{ + "message": "Failed to decompress field in decompress_gzip_field processor: error decompressing field field1: gzip: invalid header", + }, + }, + error: true, + }, + { + description: "invalid data - do not fail", + config: decompressGzipFieldConfig{ + Field: fromTo{ + From: "field1", To: "field2", + }, + IgnoreMissing: false, + FailOnError: false, + }, + input: common.MapStr{ + "field1": "invalid gzipped data", + }, + output: common.MapStr{ + "field1": "invalid gzipped data", + }, + error: false, + }, + { + description: "missing field - do not ignore it", + config: decompressGzipFieldConfig{ + Field: fromTo{ + From: "field2", To: "field3", + }, + IgnoreMissing: false, + FailOnError: true, + }, + input: common.MapStr{ + "field1": "my value", + }, + output: common.MapStr{ + "field1": "my value", + "error": common.MapStr{ + "message": "Failed to decompress field in decompress_gzip_field processor: could not fetch value for key: field2, Error: key not found", + }, + }, + error: true, + }, + { + description: "missing field ignore", + config: decompressGzipFieldConfig{ + Field: fromTo{ + From: "field2", To: "field3", + }, + IgnoreMissing: true, + FailOnError: true, + }, + input: common.MapStr{ + "field1": "my value", + }, + output: common.MapStr{ + "field1": "my value", + }, + error: false, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.description, func(t *testing.T) { + t.Parallel() + + f := &decompressGzipField{ + log: logp.NewLogger("decompress_gzip_field"), + config: test.config, + } + + event := &beat.Event{ + Fields: test.input, + } + + newEvent, err := f.Run(event) + if !test.error { + assert.Nil(t, err) + } else { + assert.NotNil(t, err) + } + + assert.Equal(t, test.output, newEvent.Fields) + }) + } +} diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index d83832f63d6..4b1c9e50e47 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -988,6 +988,13 @@ metricbeat.modules: # target: "" # overwrite_keys: false # +#processors: +#- decompress_gzip_field: +# from: "field1" +# to: "field2" +# ignore_missing: false +# fail_on_error: true +# # The following example copies the value of message to message_copied # #processors: diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 6e707cc4891..89d4b85bd84 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -698,6 +698,13 @@ packetbeat.ignore_outgoing: false # target: "" # overwrite_keys: false # +#processors: +#- decompress_gzip_field: +# from: "field1" +# to: "field2" +# ignore_missing: false +# fail_on_error: true +# # The following example copies the value of message to message_copied # #processors: diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index b81ed28b0ea..cba009eed0f 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -251,6 +251,13 @@ winlogbeat.event_logs: # target: "" # overwrite_keys: false # +#processors: +#- decompress_gzip_field: +# from: "field1" +# to: "field2" +# ignore_missing: false +# fail_on_error: true +# # The following example copies the value of message to message_copied # #processors: diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index 277fcf822bf..4c1961fb2f4 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -381,6 +381,13 @@ auditbeat.modules: # target: "" # overwrite_keys: false # +#processors: +#- decompress_gzip_field: +# from: "field1" +# to: "field2" +# ignore_missing: false +# fail_on_error: true +# # The following example copies the value of message to message_copied # #processors: diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index ff470413f24..b54c221c734 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -1179,6 +1179,13 @@ filebeat.inputs: # target: "" # overwrite_keys: false # +#processors: +#- decompress_gzip_field: +# from: "field1" +# to: "field2" +# ignore_missing: false +# fail_on_error: true +# # The following example copies the value of message to message_copied # #processors: diff --git a/x-pack/functionbeat/_meta/beat.reference.yml b/x-pack/functionbeat/_meta/beat.reference.yml index 3e792d15e8b..bc84eed5d83 100644 --- a/x-pack/functionbeat/_meta/beat.reference.yml +++ b/x-pack/functionbeat/_meta/beat.reference.yml @@ -150,6 +150,15 @@ functionbeat.provider.aws.functions: # Define custom processors for this function. #processors: + # This example extracts the raw data from events. + # - decode_base64_field: + # field: + # from: message + # to: message + # - decompress_gzip_field: + # field: + # from: message + # to: message # - decode_json_fields: # fields: ["message"] # process_array: false diff --git a/x-pack/functionbeat/_meta/beat.yml b/x-pack/functionbeat/_meta/beat.yml index 6e5b969549f..889dc4fdae9 100644 --- a/x-pack/functionbeat/_meta/beat.yml +++ b/x-pack/functionbeat/_meta/beat.yml @@ -151,6 +151,15 @@ functionbeat.provider.aws.functions: # Define custom processors for this function. #processors: + # This example extracts the raw data from events. + # - decode_base64_field: + # field: + # from: message + # to: message + # - decompress_gzip_field: + # field: + # from: message + # to: message # - decode_json_fields: # fields: ["message"] # process_array: false diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index dd79ef974ad..a9b0c88445e 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -150,6 +150,15 @@ functionbeat.provider.aws.functions: # Define custom processors for this function. #processors: + # This example extracts the raw data from events. + # - decode_base64_field: + # field: + # from: message + # to: message + # - decompress_gzip_field: + # field: + # from: message + # to: message # - decode_json_fields: # fields: ["message"] # process_array: false @@ -389,6 +398,13 @@ functionbeat.provider.aws.functions: # target: "" # overwrite_keys: false # +#processors: +#- decompress_gzip_field: +# from: "field1" +# to: "field2" +# ignore_missing: false +# fail_on_error: true +# # The following example copies the value of message to message_copied # #processors: diff --git a/x-pack/functionbeat/functionbeat.yml b/x-pack/functionbeat/functionbeat.yml index 4c903d2f3b7..9eea5038c93 100644 --- a/x-pack/functionbeat/functionbeat.yml +++ b/x-pack/functionbeat/functionbeat.yml @@ -151,6 +151,15 @@ functionbeat.provider.aws.functions: # Define custom processors for this function. #processors: + # This example extracts the raw data from events. + # - decode_base64_field: + # field: + # from: message + # to: message + # - decompress_gzip_field: + # field: + # from: message + # to: message # - decode_json_fields: # fields: ["message"] # process_array: false diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 81732105269..86d7ccbb3bb 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -1072,6 +1072,13 @@ metricbeat.modules: # target: "" # overwrite_keys: false # +#processors: +#- decompress_gzip_field: +# from: "field1" +# to: "field2" +# ignore_missing: false +# fail_on_error: true +# # The following example copies the value of message to message_copied # #processors: diff --git a/x-pack/winlogbeat/winlogbeat.reference.yml b/x-pack/winlogbeat/winlogbeat.reference.yml index 6292e7614f5..ae1594907c2 100644 --- a/x-pack/winlogbeat/winlogbeat.reference.yml +++ b/x-pack/winlogbeat/winlogbeat.reference.yml @@ -263,6 +263,13 @@ winlogbeat.event_logs: # target: "" # overwrite_keys: false # +#processors: +#- decompress_gzip_field: +# from: "field1" +# to: "field2" +# ignore_missing: false +# fail_on_error: true +# # The following example copies the value of message to message_copied # #processors: From a2fa6c25d8d62030193323e09016a4460557ffe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 1 Jul 2019 12:42:57 +0200 Subject: [PATCH 2/3] add changelog entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 873103cdfcd..f248ac74904 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -176,6 +176,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add configurable bulk_flush_frequency in kafka output. {pull}12254[12254] - Add `decode_base64_field` processor for decoding base64 field. {pull}11914[11914] - Add aws overview dashboard. {issue}11007[11007] {pull}12175[12175] +- Add `decompress_gzip_field` processor. {pull}12733[12733] *Auditbeat* From 11d66df34d66dbc168690f6b01c1282bf90f4bd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 1 Jul 2019 14:50:43 +0200 Subject: [PATCH 3/3] address review notes --- libbeat/processors/actions/decompress_gzip_field.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libbeat/processors/actions/decompress_gzip_field.go b/libbeat/processors/actions/decompress_gzip_field.go index 03e168c199f..efa3e04ef82 100644 --- a/libbeat/processors/actions/decompress_gzip_field.go +++ b/libbeat/processors/actions/decompress_gzip_field.go @@ -18,6 +18,8 @@ package actions import ( + "bytes" + "compress/gzip" "fmt" "io" @@ -28,9 +30,6 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/processors/checks" - - "bytes" - "compress/gzip" ) type decompressGzipField struct { @@ -113,6 +112,7 @@ func (f *decompressGzipField) decompressGzipField(event *beat.Event) error { var outBuf bytes.Buffer _, err = io.Copy(&outBuf, r) if err != nil { + r.Close() return fmt.Errorf("error while decompressing field: %v", err) }