Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added custom decode function to gunzip cloudwatch kinesis messages #12119

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion x-pack/functionbeat/provider/aws/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,38 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"

"bytes"
"compress/gzip"
"io"
)

// Centralize anything related to ECS into a common file.
// TODO: Look at the fields to align them with ECS.
// TODO: how to keep the fields in sync with AWS?
// TODO: api gateway proxy a lot more information is available.

// DecodeByteMessage for cloudwatch kinesis messages
func DecodeByteMessage(message []byte) (decoded string) {
//Uncompress gzip message
var err error
b := bytes.NewBuffer(message)

var r io.Reader
r, err = gzip.NewReader(b)

if err == nil {
var resB bytes.Buffer
resB.ReadFrom(r)

decoded = string(resB.Bytes())
} else {
decoded = string(message)
}

return
}

// CloudwatchLogs takes an CloudwatchLogsData and transform it into a beat event.
func CloudwatchLogs(request events.CloudwatchLogsData) []beat.Event {
events := make([]beat.Event, len(request.LogEvents))
Expand Down Expand Up @@ -62,6 +87,7 @@ func APIGatewayProxyRequest(request events.APIGatewayProxyRequest) beat.Event {
func KinesisEvent(request events.KinesisEvent) []beat.Event {
events := make([]beat.Event, len(request.Records))
for idx, record := range request.Records {
decodedMessage := DecodeByteMessage(record.Kinesis.Data)
events[idx] = beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
Expand All @@ -71,7 +97,7 @@ func KinesisEvent(request events.KinesisEvent) []beat.Event {
"event_source_arn": record.EventSourceArn,
"event_version": record.EventVersion,
"aws_region": record.AwsRegion,
"message": string(record.Kinesis.Data),
"message": decodedMessage,
"kinesis_partition_key": record.Kinesis.PartitionKey,
"kinesis_schema_version": record.Kinesis.KinesisSchemaVersion,
"kinesis_sequence_number": record.Kinesis.SequenceNumber,
Expand Down
27 changes: 27 additions & 0 deletions x-pack/functionbeat/provider/aws/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

func TestKinesis(t *testing.T) {
// Test uncompressed Kinesis data message
request := events.KinesisEvent{
Records: []events.KinesisEventRecord{
events.KinesisEventRecord{
Expand All @@ -34,6 +35,27 @@ func TestKinesis(t *testing.T) {
},
}

// Test compressed Kinesis data message
requestCompressed := events.KinesisEvent{
Records: []events.KinesisEventRecord{
events.KinesisEventRecord{
AwsRegion: "us-east-1",
EventID: "1234",
EventName: "connect",
EventSource: "web",
EventVersion: "1.0",
EventSourceArn: "arn:aws:iam::00000000:role/functionbeat",
Kinesis: events.KinesisRecord{
Data: []byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 202, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 0, 0, 0, 255, 255, 1, 0, 0, 255, 255, 133, 17, 74, 13, 11, 0, 0, 0},
PartitionKey: "abc123",
SequenceNumber: "12345",
KinesisSchemaVersion: "1.0",
EncryptionType: "test",
},
},
},
}

events := KinesisEvent(request)
assert.Equal(t, 1, len(events))

Expand All @@ -52,4 +74,9 @@ func TestKinesis(t *testing.T) {
}

assert.Equal(t, fields, events[0].Fields)

eventsCompressed := KinesisEvent(requestCompressed)
assert.Equal(t, 1, len(eventsCompressed))

assert.Equal(t, fields, eventsCompressed[0].Fields)
}