Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lambda-promtail: Add support for WAF logs in S3 #10416

Merged
merged 11 commits into from
Sep 6, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Loki

##### Enhancements
* [10416](https://github.com/grafana/loki/pull/10416) **lpugoy**: Lambda-Promtail: Add support for WAF logs in S3
* [10324](https://github.com/grafana/loki/pull/10324) **ashwanthgoli**: Deprecate ingester.unordered-writes and a few unused configs(log.use-buffered, log.use-sync, frontend.forward-headers-list)

* [10322](https://github.com/grafana/loki/pull/10322) **chaudum**: Deprecate misleading setting `-ruler.evaluation-delay-duration`.
Expand Down
83 changes: 78 additions & 5 deletions tools/lambda-promtail/lambda-promtail/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"regexp"
"strconv"
"time"

"github.com/aws/aws-lambda-go/events"
Expand All @@ -31,6 +33,8 @@ type parserConfig struct {
timestampRegex *regexp.Regexp
// time format to use to convert the timestamp to time.Time
timestampFormat string
// if the timestamp is a string that can be parsed or a Unix timestamp
timestampType string
// how many lines or jsonToken to skip at the beginning of the file
skipHeaderCount int
// key of the metadata label to use as a value for the__aws_<logType>_owner label
Expand All @@ -45,6 +49,7 @@ const (
CLOUDFRONT_LOG_TYPE string = "cloudfront"
LB_NLB_TYPE string = "net"
LB_ALB_TYPE string = "app"
WAF_LOG_TYPE string = "WAFLogs"
)

var (
Expand All @@ -66,18 +71,25 @@ var (
// CloudFront
// source https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#AccessLogsFileNaming
// example: example-prefix/EMLARXS9EXAMPLE.2019-11-14-20.RT4KCN4SGK9.gz
// AWS WAF logs
// source: https://docs.aws.amazon.com/waf/latest/developerguide/logging-s3.html
// format: aws-waf-logs-suffix[/prefix]/AWSLogs/aws-account-id/WAFLogs/region/webacl-name/year/month/day/hour/minute/aws-account-id_waflogs_region_webacl-name_timestamp_hash.log.gz
// example: aws-waf-logs-test/AWSLogs/11111111111/WAFLogs/us-east-1/TEST-WEBACL/2021/10/28/19/50/11111111111_waflogs_us-east-1_TEST-WEBACL_20211028T1950Z_e0ca43b5.log.gz
defaultFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>[a-zA-Z0-9_\-]+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?P<lb_type>app|net)\.*?)?(?P<src>[a-zA-Z0-9\-]+)`)
defaultTimestampRegex = regexp.MustCompile(`(?P<timestamp>\d+-\d+-\d+T\d+:\d+:\d+(?:\.\d+Z)?)`)
cloudtrailFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<organization_id>o-[a-z0-9]{10,32})?\/?(?P<account_id>\d+)\/(?P<type>[a-zA-Z0-9_\-]+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:CloudTrail|CloudTrail-Digest)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?.+_(?P<src>[a-zA-Z0-9\-]+)`)
cloudfrontFilenameRegex = regexp.MustCompile(`(?P<prefix>.*)\/(?P<src>[A-Z0-9]+)\.(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)-(.+)`)
cloudfrontTimestampRegex = regexp.MustCompile(`(?P<timestamp>\d+-\d+-\d+\s\d+:\d+:\d+)`)
wafFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>WAFLogs)\/(?P<region>[\w-]+)\/(?P<src>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/(?P<hour>\d+)\/(?P<minute>\d+)\/\d+\_waflogs\_[\w-]+_[\w-]+_\d+T\d+Z_\w+`)
wafTimestampRegex = regexp.MustCompile(`"timestamp":\s*(?P<timestamp>\d+),`)
parsers = map[string]parserConfig{
FLOW_LOG_TYPE: {
logTypeLabel: "s3_vpc_flow",
filenameRegex: defaultFilenameRegex,
ownerLabelKey: "account_id",
timestampRegex: defaultTimestampRegex,
timestampFormat: time.RFC3339,
timestampType: "string",
skipHeaderCount: 1,
},
LB_LOG_TYPE: {
Expand All @@ -86,6 +98,7 @@ var (
ownerLabelKey: "account_id",
timestampFormat: time.RFC3339,
timestampRegex: defaultTimestampRegex,
timestampType: "string",
},
CLOUDTRAIL_LOG_TYPE: {
logTypeLabel: "s3_cloudtrail",
Expand All @@ -99,8 +112,16 @@ var (
ownerLabelKey: "prefix",
timestampRegex: cloudfrontTimestampRegex,
timestampFormat: "2006-01-02\x0915:04:05",
timestampType: "string",
skipHeaderCount: 2,
},
WAF_LOG_TYPE: {
logTypeLabel: "s3_waf",
filenameRegex: wafFilenameRegex,
ownerLabelKey: "account_id",
timestampRegex: wafTimestampRegex,
timestampType: "unix",
},
}
)

Expand All @@ -120,7 +141,7 @@ func getS3Client(ctx context.Context, region string) (*s3.Client, error) {
return s3Client, nil
}

func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.ReadCloser) error {
func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.ReadCloser, log *log.Logger) error {
parser, ok := parsers[labels["type"]]
if !ok {
if labels["type"] == CLOUDTRAIL_DIGEST_LOG_TYPE {
Expand Down Expand Up @@ -182,9 +203,21 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
// NLB logs don't have .SSSSSSZ suffix. RFC3339 requires a TZ specifier, use UTC
match[1] += "Z"
}
timestamp, err = time.Parse(parser.timestampFormat, match[1])
if err != nil {
return err

switch parser.timestampType {
case "string":
timestamp, err = time.Parse(parser.timestampFormat, match[1])
if err != nil {
return err
}
case "unix":
sec, nsec, err := getUnixSecNsec(match[1])
if err != nil {
return err
}
timestamp = time.Unix(sec, nsec).UTC()
default:
level.Warn(*log).Log("msg", fmt.Sprintf("timestamp type of %s parser unknown, using current time", labels["type"]))
}
}

Expand Down Expand Up @@ -250,7 +283,7 @@ func processS3Event(ctx context.Context, ev *events.S3Event, pc Client, log *log
if err != nil {
return fmt.Errorf("Failed to get object %s from bucket %s on account %s\n, %s", labels["key"], labels["bucket"], labels["bucketOwner"], err)
}
err = parseS3Log(ctx, batch, labels, obj.Body)
err = parseS3Log(ctx, batch, labels, obj.Body, log)
if err != nil {
return err
}
Expand Down Expand Up @@ -301,3 +334,43 @@ func stringToRawEvent(body string) (map[string]interface{}, error) {
}
return result, nil
}

// getUnixSecNsec returns the Unix time seconds and nanoseconds in the string s.
// It assumes that the first 10 digits of the parsed int is the Unix time in seconds and the rest is the nanoseconds part.
// This assumption will hold until 2286-11-20 17:46:40 UTC, so it's a safe assumption.
// It also makes use of the fact that the log10 of a number in base 10 is its number of digits - 1.
// It returns early if the fractional seconds is 0 because getting the log10 of 0 results in -Inf.
// For example, given a string 1234567890123:
// iLog10 = 12 // the parsed int is 13 digits long
// multiplier = 0.001 // to get the seconds part it must be divided by 1000
// sec = 1234567890123 * 0.001 = 1234567890 // this is the seconds part of the Unix time
// fractionalSec = 123 // the rest of the parsed int
// fractionalSecLog10 = 2 // it is 3 digits long
// multiplier = 1000000 // nano is 10^-9, so the nanoseconds part is 9 digits long
// nsec = 123000000 // this is the nanoseconds part of the Unix time
func getUnixSecNsec(s string) (sec int64, nsec int64, err error) {
const (
UNIX_SEC_LOG10 = 9
UNIX_NANOSEC_LOG10 = 8
)

i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return sec, nsec, err
}

iLog10 := int(math.Log10(float64(i)))
multiplier := math.Pow10(UNIX_SEC_LOG10 - iLog10)
sec = int64(float64(i) * multiplier)

fractionalSec := float64(i % sec)
if fractionalSec == 0 {
return sec, 0, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to set an actual value for err here? otherwise it has the nil value from strconv.ParseInt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, err would be nil if fractionalSec is 0. There would only be an error during parsing, fractionalSec being 0 just means that the Unix time only has seconds resolution. It is not an error.

}

fractionalSecLog10 := int(math.Log10(fractionalSec))
multiplier = math.Pow10(UNIX_NANOSEC_LOG10 - fractionalSecLog10)
nsec = int64(fractionalSec * multiplier)

return sec, nsec, err
}
161 changes: 158 additions & 3 deletions tools/lambda-promtail/lambda-promtail/s3_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"context"
"io"
"os"
Expand All @@ -9,9 +10,9 @@ import (
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/stretchr/testify/require"

"github.com/go-kit/log"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)

func Test_getLabels(t *testing.T) {
Expand Down Expand Up @@ -257,6 +258,41 @@ func Test_getLabels(t *testing.T) {
},
wantErr: false,
},
{
name: "s3_waf",
args: args{
record: events.S3EventRecord{
AWSRegion: "us-east-1",
S3: events.S3Entity{
Bucket: events.S3Bucket{
Name: "waf_logs_test",
OwnerIdentity: events.S3UserIdentity{
PrincipalID: "test",
},
},
Object: events.S3Object{
Key: "prefix/AWSLogs/11111111111/WAFLogs/us-east-1/TEST-WEBACL/2021/10/28/19/50/11111111111_waflogs_us-east-1_TEST-WEBACL_20211028T1950Z_e0ca43b5.log.gz",
},
},
},
},
want: map[string]string{
"account_id": "11111111111",
"bucket_owner": "test",
"bucket_region": "us-east-1",
"bucket": "waf_logs_test",
"day": "28",
"hour": "19",
"key": "prefix/AWSLogs/11111111111/WAFLogs/us-east-1/TEST-WEBACL/2021/10/28/19/50/11111111111_waflogs_us-east-1_TEST-WEBACL_20211028T1950Z_e0ca43b5.log.gz",
"minute": "50",
"month": "10",
"region": "us-east-1",
"src": "TEST-WEBACL",
"type": WAF_LOG_TYPE,
"year": "2021",
},
wantErr: false,
},
{
name: "missing_type",
args: args{
Expand Down Expand Up @@ -313,6 +349,7 @@ func Test_parseS3Log(t *testing.T) {
expectedLen int
expectedStream string
expectedTimestamps []time.Time
expectedLog string
}{
{
name: "vpcflowlogs",
Expand Down Expand Up @@ -443,6 +480,27 @@ func Test_parseS3Log(t *testing.T) {
},
wantErr: false,
},
{
name: "waflogs",
args: args{
batchSize: 131072, // Set large enough we don't try and send to promtail
filename: "../testdata/waflog.log.gz",
b: &batch{
streams: map[string]*logproto.Stream{},
},
labels: map[string]string{
"account_id": "11111111111",
"src": "TEST-WEBACL",
"type": WAF_LOG_TYPE,
},
},
expectedLen: 1,
expectedStream: `{__aws_log_type="s3_waf", __aws_s3_waf="TEST-WEBACL", __aws_s3_waf_owner="11111111111"}`,
expectedTimestamps: []time.Time{
time.Date(2023, time.August, 31, 4, 57, 42, 729000000, time.UTC),
},
wantErr: false,
},
{
name: "missing_parser",
args: args{
Expand All @@ -463,6 +521,31 @@ func Test_parseS3Log(t *testing.T) {
expectedStream: "",
wantErr: true,
},
{
name: "no_timestamp_type",
args: args{
batchSize: 131072, // Set large enough we don't try and send to promtail
filename: "../testdata/waflog.log.gz",
b: &batch{
streams: map[string]*logproto.Stream{},
},
labels: map[string]string{
"account_id": "11111111111",
"src": "TEST-WEBACL",
"type": "no_type",
},
},
expectedLen: 1,
expectedStream: `{__aws_log_type="s3_waf", __aws_s3_waf="TEST-WEBACL", __aws_s3_waf_owner="11111111111"}`,
expectedLog: `level=warn msg="timestamp type of no_type parser unknown, using current time"` + "\n",
wantErr: false,
},
}
parsers["no_type"] = parserConfig{
logTypeLabel: "s3_waf",
filenameRegex: wafFilenameRegex,
ownerLabelKey: "account_id",
timestampRegex: wafTimestampRegex,
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -472,7 +555,9 @@ func Test_parseS3Log(t *testing.T) {
if err != nil {
t.Errorf("parseS3Log() failed to open test file: %s - %v", tt.args.filename, err)
}
if err := parseS3Log(context.Background(), tt.args.b, tt.args.labels, tt.args.obj); (err != nil) != tt.wantErr {
buf := &bytes.Buffer{}
log := log.NewLogfmtLogger(buf)
if err := parseS3Log(context.Background(), tt.args.b, tt.args.labels, tt.args.obj, &log); (err != nil) != tt.wantErr {
t.Errorf("parseS3Log() error = %v, wantErr %v", err, tt.wantErr)
}
require.Len(t, tt.args.b.streams, tt.expectedLen)
Expand All @@ -486,6 +571,7 @@ func Test_parseS3Log(t *testing.T) {
require.Equal(t, tt.expectedTimestamps[i], entry.Timestamp)
}
}
require.Equal(t, tt.expectedLog, buf.String())
}
})
}
Expand Down Expand Up @@ -595,3 +681,72 @@ func TestProcessSQSEvent(t *testing.T) {
require.Nil(t, err)
require.True(t, handlerCalled)
}

func TestGetUnixSecNsec(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test needs a case for the fractionalSec == 0 case as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests fractionalSec == 0.

type args struct {
s string
}
tests := []struct {
name string
args args
wantErr bool
expectedSec int64
expectedNsec int64
}{
{
name: "timestamp_in_seconds",
args: args{
s: "1234567890",
},
expectedSec: 1234567890,
expectedNsec: 0,
wantErr: false,
},
{
name: "timestamp_in_milliseconds",
args: args{
s: "1234567890123",
},
expectedSec: 1234567890,
expectedNsec: 123000000,
wantErr: false,
},
{
name: "timestamp_in_microseconds",
args: args{
s: "1234567890123456",
},
expectedSec: 1234567890,
expectedNsec: 123456000,
wantErr: false,
},
{
name: "timestamp_in_nanoseconds",
args: args{
s: "1234567890123456789",
},
expectedSec: 1234567890,
expectedNsec: 123456789,
wantErr: false,
},
{
name: "strconv_error",
args: args{
s: "string",
},
expectedSec: 0,
expectedNsec: 0,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sec, nsec, err := getUnixSecNsec(tt.args.s)
if (err != nil) != tt.wantErr {
t.Errorf("getUnixSecNsec() error = %v, wantErr %v", err, tt.wantErr)
}
require.Equal(t, tt.expectedSec, sec)
require.Equal(t, tt.expectedNsec, nsec)
})
}
}
Binary file added tools/lambda-promtail/testdata/waflog.log.gz
Binary file not shown.