Skip to content

Commit

Permalink
lambda-promtail: Add support for WAF logs in S3
Browse files Browse the repository at this point in the history
  • Loading branch information
lpugoy committed Sep 1, 2023
1 parent a05744a commit 77011d1
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Loki

##### Enhancements
* [10416](https://github.com/grafana/loki/pull/10416) **lpugoy**: Add support for WAF logs in S3
* [10324](https://github.com/grafana/loki/pull/10324) **ashwanthgoli**: Deprecate ingester.unordered-writes and a few unused configs(log.use-buffered, log.use-sync, frontend.forward-headers-list)

* [10322](https://github.com/grafana/loki/pull/10322) **chaudum**: Deprecate misleading setting `-ruler.evaluation-delay-duration`.
Expand Down
50 changes: 47 additions & 3 deletions tools/lambda-promtail/lambda-promtail/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"regexp"
"strconv"
"time"

"github.com/aws/aws-lambda-go/events"
Expand All @@ -31,6 +33,8 @@ type parserConfig struct {
timestampRegex *regexp.Regexp
// time format to use to convert the timestamp to time.Time
timestampFormat string
// if the timestamp is a string that can be parsed or a Unix timestamp
timestampType string
// how many lines or jsonToken to skip at the beginning of the file
skipHeaderCount int
// key of the metadata label to use as a value for the__aws_<logType>_owner label
Expand All @@ -45,6 +49,7 @@ const (
CLOUDFRONT_LOG_TYPE string = "cloudfront"
LB_NLB_TYPE string = "net"
LB_ALB_TYPE string = "app"
WAF_LOG_TYPE string = "WAFLogs"
)

var (
Expand All @@ -66,18 +71,25 @@ var (
// CloudFront
// source https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#AccessLogsFileNaming
// example: example-prefix/EMLARXS9EXAMPLE.2019-11-14-20.RT4KCN4SGK9.gz
// AWS WAF logs
// source: https://docs.aws.amazon.com/waf/latest/developerguide/logging-s3.html
// format: aws-waf-logs-suffix[/prefix]/AWSLogs/aws-account-id/WAFLogs/region/webacl-name/year/month/day/hour/minute/aws-account-id_waflogs_region_webacl-name_timestamp_hash.log.gz
// example: aws-waf-logs-test/AWSLogs/11111111111/WAFLogs/us-east-1/TEST-WEBACL/2021/10/28/19/50/11111111111_waflogs_us-east-1_TEST-WEBACL_20211028T1950Z_e0ca43b5.log.gz
defaultFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>[a-zA-Z0-9_\-]+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?P<lb_type>app|net)\.*?)?(?P<src>[a-zA-Z0-9\-]+)`)
defaultTimestampRegex = regexp.MustCompile(`(?P<timestamp>\d+-\d+-\d+T\d+:\d+:\d+(?:\.\d+Z)?)`)
cloudtrailFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<organization_id>o-[a-z0-9]{10,32})?\/?(?P<account_id>\d+)\/(?P<type>[a-zA-Z0-9_\-]+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:CloudTrail|CloudTrail-Digest)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?.+_(?P<src>[a-zA-Z0-9\-]+)`)
cloudfrontFilenameRegex = regexp.MustCompile(`(?P<prefix>.*)\/(?P<src>[A-Z0-9]+)\.(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)-(.+)`)
cloudfrontTimestampRegex = regexp.MustCompile(`(?P<timestamp>\d+-\d+-\d+\s\d+:\d+:\d+)`)
wafFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>WAFLogs)\/(?P<region>[\w-]+)\/(?P<src>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/(?P<hour>\d+)\/(?P<minute>\d+)\/\d+\_waflogs\_[\w-]+_[\w-]+_\d+T\d+Z_\w+`)
wafTimestampRegex = regexp.MustCompile(`"timestamp":\s*(?P<timestamp>\d+),`)
parsers = map[string]parserConfig{
FLOW_LOG_TYPE: {
logTypeLabel: "s3_vpc_flow",
filenameRegex: defaultFilenameRegex,
ownerLabelKey: "account_id",
timestampRegex: defaultTimestampRegex,
timestampFormat: time.RFC3339,
timestampType: "string",
skipHeaderCount: 1,
},
LB_LOG_TYPE: {
Expand All @@ -86,6 +98,7 @@ var (
ownerLabelKey: "account_id",
timestampFormat: time.RFC3339,
timestampRegex: defaultTimestampRegex,
timestampType: "string",
},
CLOUDTRAIL_LOG_TYPE: {
logTypeLabel: "s3_cloudtrail",
Expand All @@ -99,8 +112,16 @@ var (
ownerLabelKey: "prefix",
timestampRegex: cloudfrontTimestampRegex,
timestampFormat: "2006-01-02\x0915:04:05",
timestampType: "string",
skipHeaderCount: 2,
},
WAF_LOG_TYPE: {
logTypeLabel: "s3_waf",
filenameRegex: wafFilenameRegex,
ownerLabelKey: "account_id",
timestampRegex: wafTimestampRegex,
timestampType: "unix",
},
}
)

Expand Down Expand Up @@ -182,9 +203,19 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
// NLB logs don't have .SSSSSSZ suffix. RFC3339 requires a TZ specifier, use UTC
match[1] += "Z"
}
timestamp, err = time.Parse(parser.timestampFormat, match[1])
if err != nil {
return err

if parser.timestampType == "string" {
timestamp, err = time.Parse(parser.timestampFormat, match[1])
if err != nil {
return err
}
} else if parser.timestampType == "unix" {
// convert to microseconds so that we only use one function
usec, err := toMicroseconds(match[1])
if err != nil {
return err
}
timestamp = time.UnixMicro(usec).UTC()
}
}

Expand Down Expand Up @@ -301,3 +332,16 @@ func stringToRawEvent(body string) (map[string]interface{}, error) {
}
return result, nil
}

func toMicroseconds(s string) (usec int64, err error) {
// Unix time in microseconds has 16 digits
i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return usec, err
}
iPow10 := int(math.Log10(float64(i)))
multiplier := math.Pow10(15 - iPow10)
usec = int64(float64(i) * multiplier)

return usec, err
}
118 changes: 118 additions & 0 deletions tools/lambda-promtail/lambda-promtail/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,41 @@ func Test_getLabels(t *testing.T) {
},
wantErr: false,
},
{
name: "s3_waf",
args: args{
record: events.S3EventRecord{
AWSRegion: "us-east-1",
S3: events.S3Entity{
Bucket: events.S3Bucket{
Name: "waf_logs_test",
OwnerIdentity: events.S3UserIdentity{
PrincipalID: "test",
},
},
Object: events.S3Object{
Key: "prefix/AWSLogs/11111111111/WAFLogs/us-east-1/TEST-WEBACL/2021/10/28/19/50/11111111111_waflogs_us-east-1_TEST-WEBACL_20211028T1950Z_e0ca43b5.log.gz",
},
},
},
},
want: map[string]string{
"account_id": "11111111111",
"bucket_owner": "test",
"bucket_region": "us-east-1",
"bucket": "waf_logs_test",
"day": "28",
"hour": "19",
"key": "prefix/AWSLogs/11111111111/WAFLogs/us-east-1/TEST-WEBACL/2021/10/28/19/50/11111111111_waflogs_us-east-1_TEST-WEBACL_20211028T1950Z_e0ca43b5.log.gz",
"minute": "50",
"month": "10",
"region": "us-east-1",
"src": "TEST-WEBACL",
"type": WAF_LOG_TYPE,
"year": "2021",
},
wantErr: false,
},
{
name: "missing_type",
args: args{
Expand Down Expand Up @@ -443,6 +478,27 @@ func Test_parseS3Log(t *testing.T) {
},
wantErr: false,
},
{
name: "waflogs",
args: args{
batchSize: 131072, // Set large enough we don't try and send to promtail
filename: "../testdata/waflog.log.gz",
b: &batch{
streams: map[string]*logproto.Stream{},
},
labels: map[string]string{
"account_id": "11111111111",
"src": "TEST-WEBACL",
"type": WAF_LOG_TYPE,
},
},
expectedLen: 1,
expectedStream: `{__aws_log_type="s3_waf", __aws_s3_waf="TEST-WEBACL", __aws_s3_waf_owner="11111111111"}`,
expectedTimestamps: []time.Time{
time.Date(2023, time.August, 31, 4, 57, 42, 729000000, time.UTC),
},
wantErr: false,
},
{
name: "missing_parser",
args: args{
Expand Down Expand Up @@ -595,3 +651,65 @@ func TestProcessSQSEvent(t *testing.T) {
require.Nil(t, err)
require.True(t, handlerCalled)
}

func TestToMicroseconds(t *testing.T) {
type args struct {
s string
}
tests := []struct {
name string
args args
wantErr bool
expectedUsec int64
}{
{
name: "timestamp in seconds",
args: args{
s: "1234567890",
},
expectedUsec: 1234567890000000,
wantErr: false,
},
{
name: "timestamp in milliseconds",
args: args{
s: "1234567890123",
},
expectedUsec: 1234567890123000,
wantErr: false,
},
{
name: "timestamp in microseconds",
args: args{
s: "1234567890123456",
},
expectedUsec: 1234567890123456,
wantErr: false,
},
{
name: "timestamp in nanoseconds",
args: args{
s: "1234567890123456789",
},
expectedUsec: 1234567890123456,
wantErr: false,
},
{
name: "strconv error",
args: args{
s: "string",
},
expectedUsec: 0,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
usec, err := toMicroseconds(tt.args.s)
if (err != nil) != tt.wantErr {
t.Errorf("toMicroseconds() error = %v, wantErr %v", err, tt.wantErr)
}
require.Equal(t, tt.expectedUsec, usec)
})
}
}
Binary file added tools/lambda-promtail/testdata/waflog.log.gz
Binary file not shown.

0 comments on commit 77011d1

Please sign in to comment.