Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lambda-promtail: Add support for WAF logs in S3 #10416

Merged
merged 11 commits into from
Sep 6, 2023
57 changes: 43 additions & 14 deletions tools/lambda-promtail/lambda-promtail/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func getS3Client(ctx context.Context, region string) (*s3.Client, error) {
return s3Client, nil
}

func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.ReadCloser) error {
func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.ReadCloser, log *log.Logger) error {
parser, ok := parsers[labels["type"]]
if !ok {
if labels["type"] == CLOUDTRAIL_DIGEST_LOG_TYPE {
Expand Down Expand Up @@ -204,18 +204,20 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
match[1] += "Z"
}

if parser.timestampType == "string" {
switch parser.timestampType {
case "string":
timestamp, err = time.Parse(parser.timestampFormat, match[1])
if err != nil {
return err
}
} else if parser.timestampType == "unix" {
// convert to microseconds so that we only use one function
usec, err := toMicroseconds(match[1])
case "unix":
sec, nsec, err := getUnixSecNsec(match[1])
if err != nil {
return err
}
timestamp = time.UnixMicro(usec).UTC()
timestamp = time.Unix(sec, nsec).UTC()
default:
level.Warn(*log).Log("msg", fmt.Sprintf("timestamp type of %s parser unknown, using current time", labels["type"]))
}
}

Expand Down Expand Up @@ -281,7 +283,7 @@ func processS3Event(ctx context.Context, ev *events.S3Event, pc Client, log *log
if err != nil {
return fmt.Errorf("Failed to get object %s from bucket %s on account %s\n, %s", labels["key"], labels["bucket"], labels["bucketOwner"], err)
}
err = parseS3Log(ctx, batch, labels, obj.Body)
err = parseS3Log(ctx, batch, labels, obj.Body, log)
if err != nil {
return err
}
Expand Down Expand Up @@ -333,15 +335,42 @@ func stringToRawEvent(body string) (map[string]interface{}, error) {
return result, nil
}

func toMicroseconds(s string) (usec int64, err error) {
// Unix time in microseconds has 16 digits
// getUnixSecNsec returns the Unix time seconds and nanoseconds in the string s.
// It assumes that the first 10 digits of the parsed int is the Unix time in seconds and the rest is the nanoseconds part.
// This assumption will hold until 2286-11-20 17:46:40 UTC, so it's a safe assumption.
// It also makes use of the fact that the log10 of a number in base 10 is its number of digits - 1.
// It returns early if the fractional seconds is 0 because getting the log10 of 0 results in -Inf.
// For example, given a string 1234567890123:
// iLog10 = 12 // the parsed int is 13 digits long
// multiplier = 0.001 // to get the seconds part it must be divided by 1000
// sec = 1234567890123 * 0.001 = 1234567890 // this is the seconds part of the Unix time
// fractionalSec = 123 // the rest of the parsed int
// fractionalSecLog10 = 2 // it is 3 digits long
// multiplier = 1000000 // nano is 10^-9, so the nanoseconds part is 9 digits long
// nsec = 123000000 // this is the nanoseconds part of the Unix time
func getUnixSecNsec(s string) (sec int64, nsec int64, err error) {
const (
UNIX_SEC_LOG10 = 9
UNIX_NANOSEC_LOG10 = 8
)

i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return usec, err
return sec, nsec, err
}
iPow10 := int(math.Log10(float64(i)))
multiplier := math.Pow10(15 - iPow10)
usec = int64(float64(i) * multiplier)

return usec, err
iLog10 := int(math.Log10(float64(i)))
multiplier := math.Pow10(UNIX_SEC_LOG10 - iLog10)
sec = int64(float64(i) * multiplier)

fractionalSec := float64(i % sec)
if fractionalSec == 0 {
return sec, 0, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to set an actual value for err here? otherwise it has the nil value from strconv.ParseInt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, err would be nil if fractionalSec is 0. There would only be an error during parsing, fractionalSec being 0 just means that the Unix time only has seconds resolution. It is not an error.

}

fractionalSecLog10 := int(math.Log10(fractionalSec))
multiplier = math.Pow10(UNIX_NANOSEC_LOG10 - fractionalSecLog10)
nsec = int64(fractionalSec * multiplier)

return sec, nsec, err
}
71 changes: 54 additions & 17 deletions tools/lambda-promtail/lambda-promtail/s3_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"context"
"io"
"os"
Expand All @@ -9,9 +10,9 @@ import (
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/stretchr/testify/require"

"github.com/go-kit/log"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)

func Test_getLabels(t *testing.T) {
Expand Down Expand Up @@ -348,6 +349,7 @@ func Test_parseS3Log(t *testing.T) {
expectedLen int
expectedStream string
expectedTimestamps []time.Time
expectedLog string
}{
{
name: "vpcflowlogs",
Expand Down Expand Up @@ -519,6 +521,31 @@ func Test_parseS3Log(t *testing.T) {
expectedStream: "",
wantErr: true,
},
{
name: "no_timestamp_type",
args: args{
batchSize: 131072, // Set large enough we don't try and send to promtail
filename: "../testdata/waflog.log.gz",
b: &batch{
streams: map[string]*logproto.Stream{},
},
labels: map[string]string{
"account_id": "11111111111",
"src": "TEST-WEBACL",
"type": "no_type",
},
},
expectedLen: 1,
expectedStream: `{__aws_log_type="s3_waf", __aws_s3_waf="TEST-WEBACL", __aws_s3_waf_owner="11111111111"}`,
expectedLog: `level=warn msg="timestamp type of no_type parser unknown, using current time"` + "\n",
wantErr: false,
},
}
parsers["no_type"] = parserConfig{
logTypeLabel: "s3_waf",
filenameRegex: wafFilenameRegex,
ownerLabelKey: "account_id",
timestampRegex: wafTimestampRegex,
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -528,7 +555,9 @@ func Test_parseS3Log(t *testing.T) {
if err != nil {
t.Errorf("parseS3Log() failed to open test file: %s - %v", tt.args.filename, err)
}
if err := parseS3Log(context.Background(), tt.args.b, tt.args.labels, tt.args.obj); (err != nil) != tt.wantErr {
buf := &bytes.Buffer{}
log := log.NewLogfmtLogger(buf)
if err := parseS3Log(context.Background(), tt.args.b, tt.args.labels, tt.args.obj, &log); (err != nil) != tt.wantErr {
t.Errorf("parseS3Log() error = %v, wantErr %v", err, tt.wantErr)
}
require.Len(t, tt.args.b.streams, tt.expectedLen)
Expand All @@ -542,6 +571,7 @@ func Test_parseS3Log(t *testing.T) {
require.Equal(t, tt.expectedTimestamps[i], entry.Timestamp)
}
}
require.Equal(t, tt.expectedLog, buf.String())
}
})
}
Expand Down Expand Up @@ -652,64 +682,71 @@ func TestProcessSQSEvent(t *testing.T) {
require.True(t, handlerCalled)
}

func TestToMicroseconds(t *testing.T) {
func TestGetUnixSecNsec(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test needs a case for the fractionalSec == 0 case as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests fractionalSec == 0.

type args struct {
s string
}
tests := []struct {
name string
args args
wantErr bool
expectedUsec int64
expectedSec int64
expectedNsec int64
}{
{
name: "timestamp in seconds",
name: "timestamp_in_seconds",
args: args{
s: "1234567890",
},
expectedUsec: 1234567890000000,
expectedSec: 1234567890,
expectedNsec: 0,
wantErr: false,
},
{
name: "timestamp in milliseconds",
name: "timestamp_in_milliseconds",
args: args{
s: "1234567890123",
},
expectedUsec: 1234567890123000,
expectedSec: 1234567890,
expectedNsec: 123000000,
wantErr: false,
},
{
name: "timestamp in microseconds",
name: "timestamp_in_microseconds",
args: args{
s: "1234567890123456",
},
expectedUsec: 1234567890123456,
expectedSec: 1234567890,
expectedNsec: 123456000,
wantErr: false,
},
{
name: "timestamp in nanoseconds",
name: "timestamp_in_nanoseconds",
args: args{
s: "1234567890123456789",
},
expectedUsec: 1234567890123456,
expectedSec: 1234567890,
expectedNsec: 123456789,
wantErr: false,
},
{
name: "strconv error",
name: "strconv_error",
args: args{
s: "string",
},
expectedUsec: 0,
expectedSec: 0,
expectedNsec: 0,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
usec, err := toMicroseconds(tt.args.s)
sec, nsec, err := getUnixSecNsec(tt.args.s)
if (err != nil) != tt.wantErr {
t.Errorf("toMicroseconds() error = %v, wantErr %v", err, tt.wantErr)
}
require.Equal(t, tt.expectedUsec, usec)
require.Equal(t, tt.expectedSec, sec)
require.Equal(t, tt.expectedNsec, nsec)
})
}
}