diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ea91d54add8..d499e93e13e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -68,12 +68,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Filebeat* - -*Auditbeat* - - -*Filebeat* - +- Add sanitization capabilities to azure-eventhub input {pull}34874[34874] *Heartbeat* diff --git a/x-pack/filebeat/input/azureeventhub/config.go b/x-pack/filebeat/input/azureeventhub/config.go index f1d1fa63a5b..b2c7841e181 100644 --- a/x-pack/filebeat/input/azureeventhub/config.go +++ b/x-pack/filebeat/input/azureeventhub/config.go @@ -23,6 +23,8 @@ type azureInputConfig struct { SAContainer string `config:"storage_account_container"` // by default the azure public environment is used, to override, users can provide a specific resource manager endpoint OverrideEnvironment string `config:"resource_manager_endpoint"` + // cleanup the log JSON input for known issues, options: SINGLE_QUOTES, NEW_LINES + SanitizeOptions []string `config:"sanitize_options"` } const ephContainerName = "filebeat" @@ -62,6 +64,14 @@ func (conf *azureInputConfig) Validate() error { return err } + // log a warning for each sanitization option not supported + for _, opt := range conf.SanitizeOptions { + err := sanitizeOptionsValidate(opt) + if err != nil { + logger.Warnf("%s: %v", opt, err) + } + } + return nil } diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 5f0d1b3df5e..2be67af4bd0 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -12,8 +12,6 @@ import ( "sync" "time" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" @@ -41,7 +39,7 @@ type azureInput struct { workerWg sync.WaitGroup // waits on worker goroutine. processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option hub *eventhub.Hub // hub will be assigned - ackChannel chan int + // ackChannel chan int } const ( @@ -51,7 +49,7 @@ const ( func init() { err := input.Register(inputName, NewInput) if err != nil { - panic(errors.Wrapf(err, "failed to register %v input", inputName)) + panic(fmt.Errorf("failed to register %v input: %w", inputName, err)) } } @@ -63,7 +61,7 @@ func NewInput( ) (input.Input, error) { var config azureInputConfig if err := cfg.Unpack(&config); err != nil { - return nil, errors.Wrapf(err, "reading %s input config", inputName) + return nil, fmt.Errorf("reading %s input config: %w", inputName, err) } inputCtx, cancelInputCtx := context.WithCancel(context.Background()) @@ -152,13 +150,13 @@ func (a *azureInput) Stop() { if a.hub != nil { err := a.hub.Close(a.workerCtx) if err != nil { - a.log.Errorw(fmt.Sprintf("error while closing eventhub"), "error", err) + a.log.Errorw("error while closing eventhub", "error", err) } } if a.processor != nil { err := a.processor.Close(a.workerCtx) if err != nil { - a.log.Errorw(fmt.Sprintf("error while closing eventhostprocessor"), "error", err) + a.log.Errorw("error while closing eventhostprocessor", "error", err) } } a.workerCancel() @@ -180,9 +178,9 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo } messages := a.parseMultipleMessages(event.Data) for _, msg := range messages { - azure.Put("offset", event.SystemProperties.Offset) - azure.Put("sequence_number", event.SystemProperties.SequenceNumber) - azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime) + _, _ = azure.Put("offset", event.SystemProperties.Offset) + _, _ = azure.Put("sequence_number", event.SystemProperties.SequenceNumber) + _, _ = azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime) ok := a.outlet.OnEvent(beat.Event{ Timestamp: timestamp, Fields: common.MapStr{ @@ -202,6 +200,15 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { var mapObject map[string][]interface{} var messages []string + + // Clean up the message for known issues [1] where Azure services produce malformed JSON documents. + // Sanitization occurs if options are available and the message contains an invalid JSON. + // + // [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps + if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) { + bMessage = sanitize(bMessage, a.config.SanitizeOptions...) + } + // check if the message is a "records" object containing a list of events err := json.Unmarshal(bMessage, &mapObject) if err == nil { diff --git a/x-pack/filebeat/input/azureeventhub/sanitization.go b/x-pack/filebeat/input/azureeventhub/sanitization.go new file mode 100644 index 00000000000..537d29951c9 --- /dev/null +++ b/x-pack/filebeat/input/azureeventhub/sanitization.go @@ -0,0 +1,78 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !aix +// +build !aix + +package azureeventhub + +import ( + "bytes" + "errors" +) + +type sanitizationOption string + +const ( + newLines sanitizationOption = "NEW_LINES" + singleQuotes sanitizationOption = "SINGLE_QUOTES" +) + +// sanitizeOptionsValidate validates for supported sanitization options +func sanitizeOptionsValidate(s string) error { + switch s { + case "NEW_LINES": + return nil + case "SINGLE_QUOTES": + return nil + default: + return errors.New("invalid sanitization option") + } +} + +// sanitize applies the sanitization options specified in the config +// if no sanitization options are provided, the message remains unchanged +func sanitize(jsonByte []byte, opts ...string) []byte { + res := jsonByte + + for _, opt := range opts { + switch sanitizationOption(opt) { + case newLines: + res = sanitizeNewLines(res) + case singleQuotes: + res = sanitizeSingleQuotes(res) + } + } + + return res +} + +// sanitizeNewLines removes newlines found in the message +func sanitizeNewLines(jsonByte []byte) []byte { + return bytes.ReplaceAll(jsonByte, []byte("\n"), []byte{}) +} + +// sanitizeSingleQuotes replaces single quotes with double quotes in the message +// single quotes that are in between double quotes remain unchanged +func sanitizeSingleQuotes(jsonByte []byte) []byte { + var result bytes.Buffer + var prevChar byte + + inDoubleQuotes := false + + for _, r := range jsonByte { + if r == '"' && prevChar != '\\' { + inDoubleQuotes = !inDoubleQuotes + } + + if r == '\'' && !inDoubleQuotes { + result.WriteRune('"') + } else { + result.WriteByte(r) + } + prevChar = r + } + + return result.Bytes() +} diff --git a/x-pack/filebeat/input/azureeventhub/sanitization_test.go b/x-pack/filebeat/input/azureeventhub/sanitization_test.go new file mode 100644 index 00000000000..d9b31219474 --- /dev/null +++ b/x-pack/filebeat/input/azureeventhub/sanitization_test.go @@ -0,0 +1,82 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !aix +// +build !aix + +package azureeventhub + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +func TestParseMultipleMessagesSanitization(t *testing.T) { + msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}" + msgs := []string{ + "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", + "{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}", + } + + input := azureInput{ + log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)), + config: azureInputConfig{ + SanitizeOptions: []string{"SINGLE_QUOTES", "NEW_LINES"}, + }, + } + + messages := input.parseMultipleMessages([]byte(msg)) + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 3) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } +} + +func TestSanitize(t *testing.T) { + jsonByte := []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}") + + testCases := []struct { + name string + opts []string + expected []byte + }{ + { + name: "no options", + opts: []string{}, + expected: jsonByte, + }, + { + name: "NEW_LINES option", + opts: []string{"NEW_LINES"}, + expected: []byte("{'test':\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + }, + { + name: "SINGLE_QUOTES option", + opts: []string{"SINGLE_QUOTES"}, + expected: []byte("{\"test\":\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + }, + { + name: "both options", + opts: []string{"NEW_LINES", "SINGLE_QUOTES"}, + expected: []byte("{\"test\":\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + }, + } + + // Run test cases + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + res := sanitize(jsonByte, tc.opts...) + assert.Equal(t, tc.expected, res) + }) + } +}