Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added custom decode function to gunzip cloudwatch kinesis messages #12119

Closed
wants to merge 7 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion x-pack/functionbeat/provider/aws/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,33 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"

"bytes"
"compress/gzip"
"io"
)

// Centralize anything related to ECS into a common file.
// TODO: Look at the fields to align them with ECS.
// TODO: how to keep the fields in sync with AWS?
// TODO: api gateway proxy a lot more information is available.

// Custom decode for cloudwatch kinesis messages
thenom marked this conversation as resolved.
Show resolved Hide resolved
func DecodeByteMessage(message []byte) (decoded string) {
//Uncompress gzip message
b := bytes.NewBuffer(message)

var r io.Reader
r, _ = gzip.NewReader(b)

var resB bytes.Buffer
resB.ReadFrom(r)

decoded = string(resB.Bytes())

return
}

// CloudwatchLogs takes an CloudwatchLogsData and transform it into a beat event.
func CloudwatchLogs(request events.CloudwatchLogsData) []beat.Event {
events := make([]beat.Event, len(request.LogEvents))
Expand Down Expand Up @@ -62,6 +82,7 @@ func APIGatewayProxyRequest(request events.APIGatewayProxyRequest) beat.Event {
func KinesisEvent(request events.KinesisEvent) []beat.Event {
events := make([]beat.Event, len(request.Records))
for idx, record := range request.Records {
decodedMessage := DecodeByteMessage(record.Kinesis.Data)
events[idx] = beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
Expand All @@ -71,7 +92,7 @@ func KinesisEvent(request events.KinesisEvent) []beat.Event {
"event_source_arn": record.EventSourceArn,
"event_version": record.EventVersion,
"aws_region": record.AwsRegion,
"message": string(record.Kinesis.Data),
"message": decodedMessage,
"kinesis_partition_key": record.Kinesis.PartitionKey,
"kinesis_schema_version": record.Kinesis.KinesisSchemaVersion,
"kinesis_sequence_number": record.Kinesis.SequenceNumber,
Expand Down