diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 53c9e4f11c6..a06d49bee9f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -333,6 +333,7 @@ field. You can revert this change by configuring tags for the module and omittin - Improve ECS categorization field mappings in system module. {issue}16031[16031] {pull}18065[18065] - Change the `json.*` input settings implementation to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958] - Improve ECS categorization field mappings in osquery module. {issue}16176[16176] {pull}17881[17881] +- Add support for array parsing in azure-eventhub input. {pull}18585[18585] - Added `observer.vendor`, `observer.product`, and `observer.type` to PANW module events. {pull}18223[18223] - The `logstash` module can now automatically detect the log file format (JSON or plaintext) and process it accordingly. {issue}9964[9964] {pull}18095[18095] - Improve ECS categorization field mappings in envoyproxy module. {issue}16161[16161] {pull}18395[18395] diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 2cf6494f8d7..de9c7cf9d79 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -203,14 +203,32 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo // parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { - var obj map[string][]interface{} - err := json.Unmarshal(bMessage, &obj) - if err != nil { - a.log.Errorw(fmt.Sprintf("deserializing multiple messages using the group object `records`"), "error", err) - } + var mapObject map[string][]interface{} var messages []string - if len(obj[expandEventListFromField]) > 0 { - for _, ms := range obj[expandEventListFromField] { + // check if the message is a "records" object containing a list of events + err := json.Unmarshal(bMessage, &mapObject) + if err == nil { + if len(mapObject[expandEventListFromField]) > 0 { + for _, ms := range mapObject[expandEventListFromField] { + js, err := json.Marshal(ms) + if err == nil { + messages = append(messages, string(js)) + } else { + a.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err) + } + } + } + } else { + a.log.Debugf("deserializing multiple messages to a `records` object returning error: %s", err) + // in some cases the message is an array + var arrayObject []interface{} + err = json.Unmarshal(bMessage, &arrayObject) + if err != nil { + // return entire message + a.log.Debugf("deserializing multiple messages to an array returning error: %s", err) + return []string{string(bMessage)} + } + for _, ms := range arrayObject { js, err := json.Marshal(ms) if err == nil { messages = append(messages, string(js)) diff --git a/x-pack/filebeat/input/azureeventhub/input_test.go b/x-pack/filebeat/input/azureeventhub/input_test.go index 6e6cd47484c..8a48593c1ca 100644 --- a/x-pack/filebeat/input/azureeventhub/input_test.go +++ b/x-pack/filebeat/input/azureeventhub/input_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/libbeat/logp" + eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/stretchr/testify/assert" @@ -70,17 +72,38 @@ func TestProcessEvents(t *testing.T) { } func TestParseMultipleMessages(t *testing.T) { + // records object msg := "{\"records\":[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}" - input := azureInput{} - messages := input.parseMultipleMessages([]byte(msg)) - assert.NotNil(t, messages) - assert.Equal(t, len(messages), 3) msgs := []string{ fmt.Sprintf("{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), fmt.Sprintf("{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), fmt.Sprintf("{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}")} + input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))} + messages := input.parseMultipleMessages([]byte(msg)) + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 3) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } + + // array of events + msg1 := "[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]" + messages = input.parseMultipleMessages([]byte(msg1)) + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 3) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } + + // one event only + msg2 := "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}" + messages = input.parseMultipleMessages([]byte(msg2)) + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 1) for _, ms := range messages { assert.Contains(t, msgs, ms) }