Skip to content

Commit

Permalink
[Azure] Sanitize message in case of malformed json (#34874)
Browse files Browse the repository at this point in the history
* Add sanitization function and test for azure input

(cherry picked from commit 4096f9b)
  • Loading branch information
lucian-ioan authored and mergify[bot] committed May 31, 2023
1 parent 192fb07 commit 65ab265
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 0 deletions.
57 changes: 57 additions & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -70,6 +70,63 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Filebeat*

- add documentation for decode_xml_wineventlog processor field mappings. {pull}32456[32456]
- httpjson input: Add request tracing logger. {issue}32402[32402] {pull}32412[32412]
- Add cloudflare R2 to provider list in AWS S3 input. {pull}32620[32620]
- Add support for single string containing multiple relation-types in getRFC5988Link. {pull}32811[32811]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]
- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456]
- Add `parse_aws_vpc_flow_log` processor. {pull}33656[33656]
- Update `aws.vpcflow` dataset in AWS module have a configurable log `format` and to produce ECS 8.x fields. {pull}33699[33699]
- Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658]
- Disable "event normalization" processing for the aws-s3 input to reduce allocations. {pull}33673[33673]
- Add Common Expression Language input. {pull}31233[31233]
- Add support for http+unix and http+npipe schemes in httpjson input. {issue}33571[33571] {pull}33610[33610]
- Add support for http+unix and http+npipe schemes in cel input. {issue}33571[33571] {pull}33712[33712]
- Add `decode_duration`, `move_fields` processors. {pull}31301[31301]
- Add backup to bucket and delete functionality for the `aws-s3` input. {issue}30696[30696] {pull}33559[33559]
- Add metrics for UDP packet processing. {pull}33870[33870]
- Convert UDP input to v2 input. {pull}33930[33930]
- Improve collection of risk information from Okta debug data. {issue}33677[33677] {pull}34030[34030]
- Adding filename details from zip to response for httpjson {issue}33952[33952] {pull}34044[34044]
- Allow user configuration of keep-alive behaviour for HTTPJSON and CEL inputs. {issue}33951[33951] {pull}34014[34014]
- Add support for polling system UDP stats for UDP input metrics. {pull}34070[34070]
- Add support for recognizing the log level in Elasticsearch JVM logs {pull}34159[34159]
- Add new Entity Analytics input with Azure Active Directory support. {pull}34305[34305]
- Added metric `sqs_lag_time` for aws-s3 input. {pull}34306[34306]
- Add metrics for TCP packet processing. {pull}34333[34333]
- Add metrics for unix socket packet processing. {pull}34335[34335]
- Add beta `take over` mode for `filestream` for simple migration from `log` inputs {pull}34292[34292]
- Add pagination support for Salesforce module. {issue}34057[34057] {pull}34065[34065]
- Allow users to redact sensitive data from CEL input debug logs. {pull}34302[34302]
- Added support for HTTP destination override to Google Cloud Storage input. {pull}34413[34413]
- Added metric `sqs_messages_waiting_gauge` for aws-s3 input. {pull}34488[34488]
- Add support for new Rabbitmq timestamp format for logs {pull}34211[34211]
- Allow user configuration of timezone offset in Cisco ASA and FTD modules. {pull}34436[34436]
- Allow user configuration of timezone offset in Checkpoint module. {pull}34472[34472]
- Add support for Okta debug attributes, `risk_reasons`, `risk_behaviors` and `factor`. {issue}33677[33677] {pull}34508[34508]
- Fill okta.request.ip_chain.* as a flattened object in Okta module. {pull}34621[34621]
- Fixed GCS log format issues. {pull}34659[34659]
- Add nginx.ingress_controller.upstream.ip to related.ip {issue}34645[34645] {pull}34672[34672]
- Include NAT and firewall IPs in `related.ip` in Fortinet Firewall module. {issue}34640[34640] {pull}34673[34673]
- Add Basic Authentication support on constructed requests to CEL input {issue}34609[34609] {pull}34689[34689]
- Add string manipulation extensions to CEL input {issue}34610[34610] {pull}34689[34689]
- Add unix socket log parsing for nginx ingress_controller {pull}34732[34732]
- Added metric `sqs_worker_utilization` for aws-s3 input. {pull}34793[34793]
- Improve CEL input documentation {pull}34831[34831]
- Add metrics documentation for CEL and AWS CloudWatch inputs. {issue}34887[34887] {pull}34889[34889]
- Register MIME handlers for CSV types in CEL input. {pull}34934[34934]
- Add MySQL authentication message parsing and `related.ip` and `related.user` fields {pull}34810[34810]
- Mention `mito` CEL tool in CEL input docs. {pull}34959[34959]
- Add nginx ingress_controller parsing if one of upstreams fails to return response {pull}34787[34787]
- Allow neflow v9 and ipfix templates to be shared between source addresses. {pull}35036[35036]
- Add support for collecting IPv6 metrics. {pull}35123[35123]
- Add oracle authentication messages parsing {pull}35127[35127]
- Add sanitization capabilities to azure-eventhub input {pull}34874[34874]
- Add support for CRC validation in Filebeat's HTTP endpoint input. {pull}35204[35204]
- Add execution budget to CEL input. {pull}35409[35409]
- Add XML decoding support to HTTPJSON. {issue}34438[34438] {pull}35235[35235]
- Add delegated account support when using Google ADC in `httpjson` input. {pull}35507[35507]

*Auditbeat*

Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/input/azureeventhub/config.go
Expand Up @@ -23,6 +23,8 @@ type azureInputConfig struct {
SAContainer string `config:"storage_account_container"`
// by default the azure public environment is used, to override, users can provide a specific resource manager endpoint
OverrideEnvironment string `config:"resource_manager_endpoint"`
// cleanup the log JSON input for known issues, options: SINGLE_QUOTES, NEW_LINES
SanitizeOptions []string `config:"sanitize_options"`
}

const ephContainerName = "filebeat"
Expand Down Expand Up @@ -62,6 +64,14 @@ func (conf *azureInputConfig) Validate() error {
return err
}

// log a warning for each sanitization option not supported
for _, opt := range conf.SanitizeOptions {
err := sanitizeOptionsValidate(opt)
if err != nil {
logger.Warnf("%s: %v", opt, err)
}
}

return nil
}

Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/input/azureeventhub/input.go
Expand Up @@ -202,6 +202,15 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo
func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
var mapObject map[string][]interface{}
var messages []string

// Clean up the message for known issues [1] where Azure services produce malformed JSON documents.
// Sanitization occurs if options are available and the message contains an invalid JSON.
//
// [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps
if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) {
bMessage = sanitize(bMessage, a.config.SanitizeOptions...)
}

// check if the message is a "records" object containing a list of events
err := json.Unmarshal(bMessage, &mapObject)
if err == nil {
Expand Down
78 changes: 78 additions & 0 deletions x-pack/filebeat/input/azureeventhub/sanitization.go
@@ -0,0 +1,78 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !aix
// +build !aix

package azureeventhub

import (
"bytes"
"errors"
)

type sanitizationOption string

const (
newLines sanitizationOption = "NEW_LINES"
singleQuotes sanitizationOption = "SINGLE_QUOTES"
)

// sanitizeOptionsValidate validates for supported sanitization options
func sanitizeOptionsValidate(s string) error {
switch s {
case "NEW_LINES":
return nil
case "SINGLE_QUOTES":
return nil
default:
return errors.New("invalid sanitization option")
}
}

// sanitize applies the sanitization options specified in the config
// if no sanitization options are provided, the message remains unchanged
func sanitize(jsonByte []byte, opts ...string) []byte {
res := jsonByte

for _, opt := range opts {
switch sanitizationOption(opt) {
case newLines:
res = sanitizeNewLines(res)
case singleQuotes:
res = sanitizeSingleQuotes(res)
}
}

return res
}

// sanitizeNewLines removes newlines found in the message
func sanitizeNewLines(jsonByte []byte) []byte {
return bytes.ReplaceAll(jsonByte, []byte("\n"), []byte{})
}

// sanitizeSingleQuotes replaces single quotes with double quotes in the message
// single quotes that are in between double quotes remain unchanged
func sanitizeSingleQuotes(jsonByte []byte) []byte {
var result bytes.Buffer
var prevChar byte

inDoubleQuotes := false

for _, r := range jsonByte {
if r == '"' && prevChar != '\\' {
inDoubleQuotes = !inDoubleQuotes
}

if r == '\'' && !inDoubleQuotes {
result.WriteRune('"')
} else {
result.WriteByte(r)
}
prevChar = r
}

return result.Bytes()
}
82 changes: 82 additions & 0 deletions x-pack/filebeat/input/azureeventhub/sanitization_test.go
@@ -0,0 +1,82 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !aix
// +build !aix

package azureeventhub

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"
)

func TestParseMultipleMessagesSanitization(t *testing.T) {
msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}"
msgs := []string{
"{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
"{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}",
}

input := azureInput{
log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)),

Check failure on line 30 in x-pack/filebeat/input/azureeventhub/sanitization_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

cannot use logp.NewLogger(fmt.Sprintf("%s test for input", inputName)) (value of type *"github.com/elastic/elastic-agent-libs/logp".Logger) as *"github.com/elastic/beats/v7/libbeat/logp".Logger value in struct literal (typecheck)

Check failure on line 30 in x-pack/filebeat/input/azureeventhub/sanitization_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

cannot use logp.NewLogger(fmt.Sprintf("%s test for input", inputName)) (value of type *"github.com/elastic/elastic-agent-libs/logp".Logger) as *"github.com/elastic/beats/v7/libbeat/logp".Logger value in struct literal (typecheck)
config: azureInputConfig{
SanitizeOptions: []string{"SINGLE_QUOTES", "NEW_LINES"},
},
}

messages := input.parseMultipleMessages([]byte(msg))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
assert.Contains(t, msgs, ms)
}
}

func TestSanitize(t *testing.T) {
jsonByte := []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}")

testCases := []struct {
name string
opts []string
expected []byte
}{
{
name: "no options",
opts: []string{},
expected: jsonByte,
},
{
name: "NEW_LINES option",
opts: []string{"NEW_LINES"},
expected: []byte("{'test':\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
{
name: "SINGLE_QUOTES option",
opts: []string{"SINGLE_QUOTES"},
expected: []byte("{\"test\":\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
{
name: "both options",
opts: []string{"NEW_LINES", "SINGLE_QUOTES"},
expected: []byte("{\"test\":\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
},
}

// Run test cases
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
res := sanitize(jsonByte, tc.opts...)
assert.Equal(t, tc.expected, res)
})
}
}

0 comments on commit 65ab265

Please sign in to comment.