Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lambda/promtail): support dropping labels #10755

1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -41,6 +41,7 @@

* [10416](https://github.com/grafana/loki/pull/10416) **lpugoy**: Lambda-Promtail: Add support for WAF logs in S3
* [10301](https://github.com/grafana/loki/pull/10301) **wildum**: users can now define `additional_fields` in cloudflare configuration.
* [10755](https://github.com/grafana/loki/pull/10755) **hainenber**: Lambda-Promtail: Add support for dropping labels passed via env var

##### Changes

Expand Down
4 changes: 2 additions & 2 deletions tools/lambda-promtail/README.md
Expand Up @@ -54,12 +54,12 @@ Then use Terraform to deploy:

```bash
## use cloudwatch log group
terraform apply -var "<ecr-repo>:<tag>" -var "write_address=https://your-loki-url/loki/api/v1/push" -var "password=<basic-auth-pw>" -var "username=<basic-auth-username>" -var 'bearer_token=<bearer-token>' -var 'log_group_names=["log-group-01", "log-group-02"]' -var 'extra_labels="name1,value1,name2,value2"' -var "tenant_id=<value>" -var 'skip_tls_verify="false"'
terraform apply -var "<ecr-repo>:<tag>" -var "write_address=https://your-loki-url/loki/api/v1/push" -var "password=<basic-auth-pw>" -var "username=<basic-auth-username>" -var 'bearer_token=<bearer-token>' -var 'log_group_names=["log-group-01", "log-group-02"]' -var 'extra_labels="name1,value1,name2,value2"' -var 'drop_labels="name1,name2"' -var "tenant_id=<value>" -var 'skip_tls_verify="false"'
```

```bash
## use kinesis data stream
terraform apply -var "<ecr-repo>:<tag>" -var "write_address=https://your-loki-url/loki/api/v1/push" -var "password=<basic-auth-pw>" -var "username=<basic-auth-username>" -var 'kinesis_stream_name=["kinesis-stream-01", "kinesis-stream-02"]' -var 'extra_labels="name1,value1,name2,value2"' -var "tenant_id=<value>" -var 'skip_tls_verify="false"'
terraform apply -var "<ecr-repo>:<tag>" -var "write_address=https://your-loki-url/loki/api/v1/push" -var "password=<basic-auth-pw>" -var "username=<basic-auth-username>" -var 'kinesis_stream_name=["kinesis-stream-01", "kinesis-stream-02"]' -var 'extra_labels="name1,value1,name2,value2"' -var 'drop_labels="name1,name2"' -var "tenant_id=<value>" -var 'skip_tls_verify="false"'
```

or CloudFormation:
Expand Down
2 changes: 1 addition & 1 deletion tools/lambda-promtail/lambda-promtail/cw.go
Expand Up @@ -26,7 +26,7 @@ func parseCWEvent(ctx context.Context, b *batch, ev *events.CloudwatchLogsEvent)
labels[model.LabelName("__aws_cloudwatch_log_stream")] = model.LabelValue(data.LogStream)
}

labels = applyExtraLabels(labels)
labels = applyLabels(labels)

for _, event := range data.LogEvents {
timestamp := time.UnixMilli(event.Timestamp)
Expand Down
2 changes: 1 addition & 1 deletion tools/lambda-promtail/lambda-promtail/kinesis.go
Expand Up @@ -26,7 +26,7 @@ func parseKinesisEvent(ctx context.Context, b batchIf, ev *events.KinesisEvent)
model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn),
}

labels = applyExtraLabels(labels)
labels = applyLabels(labels)

// Check if the data is gzipped by inspecting the 'data' field
if isGzipped(record.Kinesis.Data) {
Expand Down
50 changes: 39 additions & 11 deletions tools/lambda-promtail/lambda-promtail/main.go
Expand Up @@ -25,18 +25,19 @@ const (

maxErrMsgLen = 1024

invalidExtraLabelsError = "Invalid value for environment variable EXTRA_LABELS. Expected a comma separated list with an even number of entries. "
invalidExtraLabelsError = "invalid value for environment variable EXTRA_LABELS. Expected a comma separated list with an even number of entries. "
)

var (
writeAddress *url.URL
username, password, extraLabelsRaw, tenantID, bearerToken string
keepStream bool
batchSize int
s3Clients map[string]*s3.Client
extraLabels model.LabelSet
skipTlsVerify bool
printLogLine bool
writeAddress *url.URL
username, password, extraLabelsRaw, dropLabelsRaw, tenantID, bearerToken string
keepStream bool
batchSize int
s3Clients map[string]*s3.Client
extraLabels model.LabelSet
dropLabels []model.LabelName
skipTlsVerify bool
printLogLine bool
)

func setupArguments() {
Expand All @@ -60,6 +61,11 @@ func setupArguments() {
panic(err)
}

dropLabels, err = getDropLabels()
if err != nil {
panic(err)
}

username = os.Getenv("USERNAME")
password = os.Getenv("PASSWORD")
// If either username or password is set then both must be.
Expand Down Expand Up @@ -128,8 +134,30 @@ func parseExtraLabels(extraLabelsRaw string, omitPrefix bool) (model.LabelSet, e
return extractedLabels, nil
}

func applyExtraLabels(labels model.LabelSet) model.LabelSet {
return labels.Merge(extraLabels)
func getDropLabels() ([]model.LabelName, error) {
var result []model.LabelName

dropLabelsRaw = os.Getenv("DROP_LABELS")
dropLabelsRawSplit := strings.Split(dropLabelsRaw, ",")
for _, dropLabelRaw := range dropLabelsRawSplit {
dropLabel := model.LabelName(dropLabelRaw)
if !dropLabel.IsValid() {
return []model.LabelName{}, fmt.Errorf("invalid label name %s", dropLabelRaw)
}
result = append(result, dropLabel)
}

return result, nil
}

func applyLabels(labels model.LabelSet) model.LabelSet {
finalLabels := labels.Merge(extraLabels)

for _, dropLabel := range dropLabels {
delete(finalLabels, dropLabel)
}

return finalLabels
}

func checkEventType(ev map[string]interface{}) (interface{}, error) {
Expand Down
25 changes: 25 additions & 0 deletions tools/lambda-promtail/lambda-promtail/main_test.go
@@ -1,6 +1,7 @@
package main

import (
"os"
"testing"

"github.com/prometheus/common/model"
Expand Down Expand Up @@ -34,3 +35,27 @@ func TestLambdaPromtail_TestParseLabelsNoneProvided(t *testing.T) {
require.Len(t, extraLabels, 0)
require.Nil(t, err)
}

func TestLambdaPromtail_TestDropLabels(t *testing.T) {
os.Setenv("DROP_LABELS", "A1,A2")

// Reset the shared global variables
defer func() {
os.Unsetenv("DROP_LABELS")
dropLabels = []model.LabelName{}
}()

var err error
dropLabels, err = getDropLabels()
require.Nil(t, err)
require.Contains(t, dropLabels, model.LabelName("A1"))

defaultLabelSet := model.LabelSet{
model.LabelName("default"): model.LabelValue("default"),
model.LabelName("A1"): model.LabelValue("A1"),
model.LabelName("B2"): model.LabelValue("B2"),
}
modifiedLabels := applyLabels(defaultLabelSet)
require.NotContains(t, modifiedLabels, model.LabelName("A1"))
require.Contains(t, modifiedLabels, model.LabelName("B2"))
}
8 changes: 5 additions & 3 deletions tools/lambda-promtail/lambda-promtail/promtail.go
Expand Up @@ -126,9 +126,11 @@ func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
}

func (b *batch) flushBatch(ctx context.Context) error {
err := b.client.sendToPromtail(ctx, b)
if err != nil {
return err
if b.client != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably uncalled for but this thing got crashed when I was fixing the failed unit tests. Made a nil check here :D

err := b.client.sendToPromtail(ctx, b)
if err != nil {
return err
}
}
b.resetBatch()

Expand Down
17 changes: 9 additions & 8 deletions tools/lambda-promtail/lambda-promtail/s3.go
Expand Up @@ -162,7 +162,7 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
model.LabelName(fmt.Sprintf("__aws_%s_owner", parser.logTypeLabel)): model.LabelValue(labels[parser.ownerLabelKey]),
}

ls = applyExtraLabels(ls)
ls = applyLabels(ls)

// extract the timestamp of the nested event and sends the rest as raw json
if labels["type"] == CLOUDTRAIL_LOG_TYPE {
Expand Down Expand Up @@ -341,13 +341,14 @@ func stringToRawEvent(body string) (map[string]interface{}, error) {
// It also makes use of the fact that the log10 of a number in base 10 is its number of digits - 1.
// It returns early if the fractional seconds is 0 because getting the log10 of 0 results in -Inf.
// For example, given a string 1234567890123:
// iLog10 = 12 // the parsed int is 13 digits long
// multiplier = 0.001 // to get the seconds part it must be divided by 1000
// sec = 1234567890123 * 0.001 = 1234567890 // this is the seconds part of the Unix time
// fractionalSec = 123 // the rest of the parsed int
// fractionalSecLog10 = 2 // it is 3 digits long
// multiplier = 1000000 // nano is 10^-9, so the nanoseconds part is 9 digits long
// nsec = 123000000 // this is the nanoseconds part of the Unix time
//
// iLog10 = 12 // the parsed int is 13 digits long
// multiplier = 0.001 // to get the seconds part it must be divided by 1000
// sec = 1234567890123 * 0.001 = 1234567890 // this is the seconds part of the Unix time
// fractionalSec = 123 // the rest of the parsed int
// fractionalSecLog10 = 2 // it is 3 digits long
// multiplier = 1000000 // nano is 10^-9, so the nanoseconds part is 9 digits long
// nsec = 123000000 // this is the nanoseconds part of the Unix time
func getUnixSecNsec(s string) (sec int64, nsec int64, err error) {
const (
UNIX_SEC_LOG10 = 9
Expand Down
1 change: 1 addition & 0 deletions tools/lambda-promtail/main.tf
Expand Up @@ -174,6 +174,7 @@ resource "aws_lambda_function" "this" {
KEEP_STREAM = var.keep_stream
BATCH_SIZE = var.batch_size
EXTRA_LABELS = var.extra_labels
DROP_LABELS = var.drop_labels
OMIT_EXTRA_LABELS_PREFIX = var.omit_extra_labels_prefix ? "true" : "false"
TENANT_ID = var.tenant_id
SKIP_TLS_VERIFY = var.skip_tls_verify
Expand Down
6 changes: 6 additions & 0 deletions tools/lambda-promtail/variables.tf
Expand Up @@ -72,6 +72,12 @@ variable "extra_labels" {
default = ""
}

variable "drop_labels" {
type = string
description = "Comma separated list of labels to be drop, in the format 'name1,name2,...,nameN' to be omitted to entries forwarded by lambda-promtail."
default = ""
}

variable "omit_extra_labels_prefix" {
type = bool
description = "Whether or not to omit the prefix `__extra_` from extra labels defined in the variable `extra_labels`."
Expand Down