Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Azure] Sanitize message in case of malformed json #34874

Merged
merged 21 commits into from May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -286,6 +286,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Allow neflow v9 and ipfix templates to be shared between source addresses. {pull}35036[35036]
- Add support for collecting IPv6 metrics. {pull}35123[35123]
- Add oracle authentication messages parsing {pull}35127[35127]
- Add sanitization capabilities to azure-eventhub input {pull}34874[34874]
- Add support for CRC validation in Filebeat's HTTP endpoint input. {pull}35204[35204]
- Add execution budget to CEL input. {pull}35409[35409]
- Add XML decoding support to HTTPJSON. {issue}34438[34438] {pull}35235[35235]
Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/input/azureeventhub/config.go
Expand Up @@ -25,6 +25,8 @@ type azureInputConfig struct {
SAContainer string `config:"storage_account_container"`
// by default the azure public environment is used, to override, users can provide a specific resource manager endpoint
OverrideEnvironment string `config:"resource_manager_endpoint"`
// cleanup the log JSON input for known issues, options: SINGLE_QUOTES, NEW_LINES
SanitizeOptions []string `config:"sanitize_options"`
}

const ephContainerName = "filebeat"
Expand Down Expand Up @@ -63,6 +65,14 @@ func (conf *azureInputConfig) Validate() error {
return err
}

// log a warning for each sanitization option not supported
for _, opt := range conf.SanitizeOptions {
err := sanitizeOptionsValidate(opt)
if err != nil {
logger.Warnf("%s: %v", opt, err)
}
}

return nil
}

Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/input/azureeventhub/input.go
Expand Up @@ -166,6 +166,15 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo
func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
var mapObject map[string][]interface{}
var messages []string

// Clean up the message for known issues [1] where Azure services produce malformed JSON documents.
// Sanitization occurs if options are available and the message contains an invalid JSON.
//
// [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps
if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) {
bMessage = sanitize(bMessage, a.config.SanitizeOptions...)
}

// check if the message is a "records" object containing a list of events
err := json.Unmarshal(bMessage, &mapObject)
if err == nil {
Expand Down
78 changes: 78 additions & 0 deletions x-pack/filebeat/input/azureeventhub/sanitization.go
@@ -0,0 +1,78 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !aix
// +build !aix

package azureeventhub

import (
"bytes"
"errors"
)

type sanitizationOption string

const (
newLines sanitizationOption = "NEW_LINES"
singleQuotes sanitizationOption = "SINGLE_QUOTES"
)

// sanitizeOptionsValidate validates for supported sanitization options
func sanitizeOptionsValidate(s string) error {
switch s {
case "NEW_LINES":
return nil
case "SINGLE_QUOTES":
return nil
default:
return errors.New("invalid sanitization option")
}
}

// sanitize applies the sanitization options specified in the config
// if no sanitization options are provided, the message remains unchanged
func sanitize(jsonByte []byte, opts ...string) []byte {
res := jsonByte

for _, opt := range opts {
switch sanitizationOption(opt) {
case newLines:
res = sanitizeNewLines(res)
case singleQuotes:
res = sanitizeSingleQuotes(res)
}
}

return res
}

// sanitizeNewLines removes newlines found in the message
func sanitizeNewLines(jsonByte []byte) []byte {
return bytes.ReplaceAll(jsonByte, []byte("\n"), []byte{})
}

// sanitizeSingleQuotes replaces single quotes with double quotes in the message
// single quotes that are in between double quotes remain unchanged
func sanitizeSingleQuotes(jsonByte []byte) []byte {
var result bytes.Buffer
var prevChar byte

inDoubleQuotes := false

for _, r := range jsonByte {
if r == '"' && prevChar != '\\' {
inDoubleQuotes = !inDoubleQuotes
}

if r == '\'' && !inDoubleQuotes {
result.WriteRune('"')
} else {
result.WriteByte(r)
}
prevChar = r
}

return result.Bytes()
}
82 changes: 82 additions & 0 deletions x-pack/filebeat/input/azureeventhub/sanitization_test.go
@@ -0,0 +1,82 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !aix
// +build !aix

package azureeventhub

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"
)

func TestParseMultipleMessagesSanitization(t *testing.T) {
msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}"
msgs := []string{
"{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}",
}

input := azureInput{
log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)),
config: azureInputConfig{
SanitizeOptions: []string{"SINGLE_QUOTES", "NEW_LINES"},
},
}

messages := input.parseMultipleMessages([]byte(msg))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
assert.Contains(t, msgs, ms)
}
}

func TestSanitize(t *testing.T) {
jsonByte := []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}")

testCases := []struct {
name string
opts []string
expected []byte
}{
{
name: "no options",
opts: []string{},
expected: jsonByte,
},
{
name: "NEW_LINES option",
opts: []string{"NEW_LINES"},
expected: []byte("{'test':\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
{
name: "SINGLE_QUOTES option",
opts: []string{"SINGLE_QUOTES"},
expected: []byte("{\"test\":\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
{
name: "both options",
opts: []string{"NEW_LINES", "SINGLE_QUOTES"},
expected: []byte("{\"test\":\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
}

// Run test cases
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
res := sanitize(jsonByte, tc.opts...)
assert.Equal(t, tc.expected, res)
})
}
}