forked from grafana/loki
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kinesis.go
81 lines (66 loc) · 1.79 KB
/
kinesis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main
import (
"bytes"
"compress/gzip"
"context"
"io"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/logproto"
)
func parseKinesisEvent(ctx context.Context, b batchIf, ev *events.KinesisEvent) error {
if ev == nil {
return nil
}
for _, record := range ev.Records {
timestamp := time.Unix(record.Kinesis.ApproximateArrivalTimestamp.Unix(), 0)
labels := model.LabelSet{
model.LabelName("__aws_log_type"): model.LabelValue("kinesis"),
model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn),
}
labels = applyLabels(labels)
// Check if the data is gzipped by inspecting the 'data' field
if isGzipped(record.Kinesis.Data) {
uncompressedData, err := ungzipData(record.Kinesis.Data)
if err != nil {
return err
}
b.add(ctx, entry{labels, logproto.Entry{
Line: string(uncompressedData),
Timestamp: timestamp,
}})
} else {
b.add(ctx, entry{labels, logproto.Entry{
Line: string(record.Kinesis.Data),
Timestamp: timestamp,
}})
}
}
return nil
}
func processKinesisEvent(ctx context.Context, ev *events.KinesisEvent, pClient Client) error {
batch, _ := newBatch(ctx, pClient)
err := parseKinesisEvent(ctx, batch, ev)
if err != nil {
return err
}
err = pClient.sendToPromtail(ctx, batch)
if err != nil {
return err
}
return nil
}
// isGzipped checks if the input data is gzipped
func isGzipped(data []byte) bool {
return len(data) >= 2 && data[0] == 0x1F && data[1] == 0x8B
}
// unzipData decompress the gzipped data
func ungzipData(data []byte) ([]byte, error) {
reader, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return nil, err
}
defer reader.Close()
return io.ReadAll(reader)
}