Skip to content

Commit

Permalink
Add support for EventBridge s3 events in lambda-promtail (#10449)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Lambda promtail supports s3 events, which are used for scraping several
log sources such as ALB access logs. This works by configuring at the S3
bucket level "s3 event notification", that are configured to target the
lambda deployment of lambda-promtail.

However, if one is configuring this through CloudFormation, there's a
known issue with AWS that doesn't allow to configure both the lambda,
the bucket, and the notifications in the same stack. See [this
issue](aws-cloudformation/cloudformation-coverage-roadmap#79)
for details.

For that, AWS introduced EventBridge notifications, which can be used to
ship s3 events to a lambda deployment as well. This flow looks like:
s3 -> eventbridge bus -> eventbridge rule -> lambda

EventBridge has it's own message structure for s3 notifications. This PR
adds a translation layer, just for `Object created` events (since they
are the only ones we should take into account), so that EventBridge
events can be received, and trigger the lambda as if they were from s3.

**Which issue(s) this PR fixes**:
Fixes #10209

**Special notes for your reviewer**:

- [x] Pending testing this with an actual deployment of the s3 -> event
bridge -> lambda flow
- [x] ~~Add CF template for the `s3 -> event bridge -> lambda`
deployment~~ Follow up PR

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
  • Loading branch information
thepalbi committed Sep 9, 2023
1 parent bfe558c commit 5034b1b
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 1 deletion.
61 changes: 61 additions & 0 deletions tools/lambda-promtail/lambda-promtail/eventbridge.go
@@ -0,0 +1,61 @@
package main

import (
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/go-kit/log"
)

// S3Detail encodes the message structure in EventBridge s3 notifications.
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html
type S3Detail struct {
Version string `json:"version"`
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object S3ObjectDetail `json:"object"`
}

type S3ObjectDetail struct {
Key string `json:"key"`
Size int `json:"size"`
ETag string `json:"etag"`
VersionID string `json:"version-id"`
Sequencer string `json:"sequencer"`
}

type s3EventProcessor func(ctx context.Context, ev *events.S3Event, pc Client, log *log.Logger) error

func processEventBridgeEvent(ctx context.Context, ev *events.CloudWatchEvent, pc Client, log *log.Logger, process s3EventProcessor) error {
// lambda-promtail should only be used with S3 object creation events, since those indicate that a new file has been
// added to bucket, and need to be fetched and parsed accordingly.
if !(ev.Source == "aws.s3" && ev.DetailType == "Object Created") {
return fmt.Errorf("event bridge event type not supported")
}

var eventDetail S3Detail
if err := json.Unmarshal(ev.Detail, &eventDetail); err != nil {
return err
}

// TODO(thepalbi): how to fill bucket owner?
var s3Event = events.S3Event{
Records: []events.S3EventRecord{
{
AWSRegion: ev.Region,
S3: events.S3Entity{
Bucket: events.S3Bucket{
Name: eventDetail.Bucket.Name,
},
Object: events.S3Object{
Key: eventDetail.Object.Key,
},
},
},
},
}

return process(ctx, &s3Event, pc, log)
}
63 changes: 63 additions & 0 deletions tools/lambda-promtail/lambda-promtail/eventbridge_test.go
@@ -0,0 +1,63 @@
package main

import (
"context"
"encoding/json"
"github.com/aws/aws-lambda-go/events"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"os"
"testing"
)

type testPromtailClient struct{}

func (t testPromtailClient) sendToPromtail(ctx context.Context, b *batch) error {
return nil
}

func Test_processEventBridgeEvent(t *testing.T) {
logger := log.NewNopLogger()
t.Run("s3 object created event", func(t *testing.T) {
bs, err := os.ReadFile("../testdata/eventbridge-s3-event.json")
require.NoError(t, err)

var ebEvent events.CloudWatchEvent
require.NoError(t, json.Unmarshal(bs, &ebEvent))

processor := s3EventProcessor(func(ctx context.Context, ev *events.S3Event, pc Client, log *log.Logger) error {
require.Len(t, ev.Records, 1)
require.Equal(t, events.S3EventRecord{
AWSRegion: "us-east-2",
S3: events.S3Entity{
Bucket: events.S3Bucket{
Name: "bucket",
},
Object: events.S3Object{
Key: "pizza.txt",
},
},
}, ev.Records[0])
return nil
})

err = processEventBridgeEvent(context.Background(), &ebEvent, testPromtailClient{}, &logger, processor)
require.NoError(t, err)

t.Run("s3 object created event", func(t *testing.T) {
var ebEvent = events.CloudWatchEvent{
Source: "aws.s3",
// picking a different s3 event type
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-mapping-troubleshooting.html
DetailType: "Object Restore Initiated",
}

processor := s3EventProcessor(func(ctx context.Context, ev *events.S3Event, pc Client, log *log.Logger) error {
return nil
})

err = processEventBridgeEvent(context.Background(), &ebEvent, testPromtailClient{}, &logger, processor)
require.Error(t, err, "expected process to fail due to unsupported event type")
})
})
}
5 changes: 4 additions & 1 deletion tools/lambda-promtail/lambda-promtail/main.go
Expand Up @@ -139,8 +139,9 @@ func checkEventType(ev map[string]interface{}) (interface{}, error) {
var kinesisEvent events.KinesisEvent
var sqsEvent events.SQSEvent
var snsEvent events.SNSEvent
var eventBridgeEvent events.CloudWatchEvent

types := [...]interface{}{&s3Event, &s3TestEvent, &cwEvent, &kinesisEvent, &sqsEvent, &snsEvent}
types := [...]interface{}{&s3Event, &s3TestEvent, &cwEvent, &kinesisEvent, &sqsEvent, &snsEvent, &eventBridgeEvent}

j, _ := json.Marshal(ev)
reader := strings.NewReader(string(j))
Expand Down Expand Up @@ -185,6 +186,8 @@ func handler(ctx context.Context, ev map[string]interface{}) error {
}

switch evt := event.(type) {
case *events.CloudWatchEvent:
err = processEventBridgeEvent(ctx, evt, pClient, pClient.log, processS3Event)
case *events.S3Event:
err = processS3Event(ctx, evt, pClient, pClient.log)
case *events.CloudwatchLogsEvent:
Expand Down
28 changes: 28 additions & 0 deletions tools/lambda-promtail/testdata/eventbridge-s3-event.json
@@ -0,0 +1,28 @@
{
"version": "0",
"id": "52f34d1b-bd8e-5a96-4e0a-0343037b0e63",
"detail-type": "Object Created",
"source": "aws.s3",
"account": "123",
"time": "2023-08-25T17:41:58Z",
"region": "us-east-2",
"resources": [
"arn:aws:s3:::thepalbi-test"
],
"detail": {
"version": "0",
"bucket": {
"name": "bucket"
},
"object": {
"key": "pizza.txt",
"size": 100,
"etag": "8adc5937e635f6c9af646f0b23560fae",
"sequencer": "0064E8E7E6404BABBD"
},
"request-id": "VHS8EE09Q94HJZDZ",
"requester": "366620023056",
"source-ip-address": "1.167.76.151",
"reason": "PutObject"
}
}

0 comments on commit 5034b1b

Please sign in to comment.