Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.17](backport #34874) [Azure] Sanitize message in case of malformed json #35623

Merged
merged 5 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Filebeat*


*Auditbeat*


*Filebeat*

- Add sanitization capabilities to azure-eventhub input {pull}34874[34874]

*Heartbeat*

Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/input/azureeventhub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type azureInputConfig struct {
SAContainer string `config:"storage_account_container"`
// by default the azure public environment is used, to override, users can provide a specific resource manager endpoint
OverrideEnvironment string `config:"resource_manager_endpoint"`
// cleanup the log JSON input for known issues, options: SINGLE_QUOTES, NEW_LINES
SanitizeOptions []string `config:"sanitize_options"`
}

const ephContainerName = "filebeat"
Expand Down Expand Up @@ -62,6 +64,14 @@ func (conf *azureInputConfig) Validate() error {
return err
}

// log a warning for each sanitization option not supported
for _, opt := range conf.SanitizeOptions {
err := sanitizeOptionsValidate(opt)
if err != nil {
logger.Warnf("%s: %v", opt, err)
}
}

return nil
}

Expand Down
27 changes: 17 additions & 10 deletions x-pack/filebeat/input/azureeventhub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"sync"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -41,7 +39,7 @@ type azureInput struct {
workerWg sync.WaitGroup // waits on worker goroutine.
processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option
hub *eventhub.Hub // hub will be assigned
ackChannel chan int
// ackChannel chan int
}

const (
Expand All @@ -51,7 +49,7 @@ const (
func init() {
err := input.Register(inputName, NewInput)
if err != nil {
panic(errors.Wrapf(err, "failed to register %v input", inputName))
panic(fmt.Errorf("failed to register %v input: %w", inputName, err))
}
}

Expand All @@ -63,7 +61,7 @@ func NewInput(
) (input.Input, error) {
var config azureInputConfig
if err := cfg.Unpack(&config); err != nil {
return nil, errors.Wrapf(err, "reading %s input config", inputName)
return nil, fmt.Errorf("reading %s input config: %w", inputName, err)
}

inputCtx, cancelInputCtx := context.WithCancel(context.Background())
Expand Down Expand Up @@ -152,13 +150,13 @@ func (a *azureInput) Stop() {
if a.hub != nil {
err := a.hub.Close(a.workerCtx)
if err != nil {
a.log.Errorw(fmt.Sprintf("error while closing eventhub"), "error", err)
a.log.Errorw("error while closing eventhub", "error", err)
}
}
if a.processor != nil {
err := a.processor.Close(a.workerCtx)
if err != nil {
a.log.Errorw(fmt.Sprintf("error while closing eventhostprocessor"), "error", err)
a.log.Errorw("error while closing eventhostprocessor", "error", err)
}
}
a.workerCancel()
Expand All @@ -180,9 +178,9 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo
}
messages := a.parseMultipleMessages(event.Data)
for _, msg := range messages {
azure.Put("offset", event.SystemProperties.Offset)
azure.Put("sequence_number", event.SystemProperties.SequenceNumber)
azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime)
_, _ = azure.Put("offset", event.SystemProperties.Offset)
_, _ = azure.Put("sequence_number", event.SystemProperties.SequenceNumber)
_, _ = azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime)
ok := a.outlet.OnEvent(beat.Event{
Timestamp: timestamp,
Fields: common.MapStr{
Expand All @@ -202,6 +200,15 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo
func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
var mapObject map[string][]interface{}
var messages []string

// Clean up the message for known issues [1] where Azure services produce malformed JSON documents.
// Sanitization occurs if options are available and the message contains an invalid JSON.
//
// [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps
if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) {
bMessage = sanitize(bMessage, a.config.SanitizeOptions...)
}

// check if the message is a "records" object containing a list of events
err := json.Unmarshal(bMessage, &mapObject)
if err == nil {
Expand Down
78 changes: 78 additions & 0 deletions x-pack/filebeat/input/azureeventhub/sanitization.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !aix
// +build !aix

package azureeventhub

import (
"bytes"
"errors"
)

type sanitizationOption string

const (
newLines sanitizationOption = "NEW_LINES"
singleQuotes sanitizationOption = "SINGLE_QUOTES"
)

// sanitizeOptionsValidate validates for supported sanitization options
func sanitizeOptionsValidate(s string) error {
switch s {
case "NEW_LINES":
return nil
case "SINGLE_QUOTES":
return nil
default:
return errors.New("invalid sanitization option")
}
}

// sanitize applies the sanitization options specified in the config
// if no sanitization options are provided, the message remains unchanged
func sanitize(jsonByte []byte, opts ...string) []byte {
res := jsonByte

for _, opt := range opts {
switch sanitizationOption(opt) {
case newLines:
res = sanitizeNewLines(res)
case singleQuotes:
res = sanitizeSingleQuotes(res)
}
}

return res
}

// sanitizeNewLines removes newlines found in the message
func sanitizeNewLines(jsonByte []byte) []byte {
return bytes.ReplaceAll(jsonByte, []byte("\n"), []byte{})
}

// sanitizeSingleQuotes replaces single quotes with double quotes in the message
// single quotes that are in between double quotes remain unchanged
func sanitizeSingleQuotes(jsonByte []byte) []byte {
var result bytes.Buffer
var prevChar byte

inDoubleQuotes := false

for _, r := range jsonByte {
if r == '"' && prevChar != '\\' {
inDoubleQuotes = !inDoubleQuotes
}

if r == '\'' && !inDoubleQuotes {
result.WriteRune('"')
} else {
result.WriteByte(r)
}
prevChar = r
}

return result.Bytes()
}
82 changes: 82 additions & 0 deletions x-pack/filebeat/input/azureeventhub/sanitization_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !aix
// +build !aix

package azureeventhub

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/logp"
)

func TestParseMultipleMessagesSanitization(t *testing.T) {
msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}"
msgs := []string{
"{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}",
}

input := azureInput{
log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)),
config: azureInputConfig{
SanitizeOptions: []string{"SINGLE_QUOTES", "NEW_LINES"},
},
}

messages := input.parseMultipleMessages([]byte(msg))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
assert.Contains(t, msgs, ms)
}
}

func TestSanitize(t *testing.T) {
jsonByte := []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}")

testCases := []struct {
name string
opts []string
expected []byte
}{
{
name: "no options",
opts: []string{},
expected: jsonByte,
},
{
name: "NEW_LINES option",
opts: []string{"NEW_LINES"},
expected: []byte("{'test':\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
{
name: "SINGLE_QUOTES option",
opts: []string{"SINGLE_QUOTES"},
expected: []byte("{\"test\":\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
{
name: "both options",
opts: []string{"NEW_LINES", "SINGLE_QUOTES"},
expected: []byte("{\"test\":\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
}

// Run test cases
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
res := sanitize(jsonByte, tc.opts...)
assert.Equal(t, tc.expected, res)
})
}
}
Loading