Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add allowedLabels to remove unwanted labels from loki #1639

Merged
merged 6 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 98 additions & 54 deletions log/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,83 +43,108 @@ type lokiHook struct {
limit int
msgMaxSize int
levels []logrus.Level
allowedLabels []string
pushPeriod time.Duration
client *http.Client
ctx context.Context
fallbackLogger logrus.FieldLogger
profile bool
droppedLabels map[string]string
droppedMsg string
}

func getDefaultLoki() *lokiHook {
return &lokiHook{
addr: "http://127.0.0.1:3100/loki/api/v1/push",
limit: 100,
levels: logrus.AllLevels,
pushPeriod: time.Second * 1,
msgMaxSize: 1024 * 1024, // 1mb
ch: make(chan *logrus.Entry, 1000),
allowedLabels: nil,
droppedMsg: "k6 dropped %d log messages because they were above the limit of %d messages / %s",
}
}

// LokiFromConfigLine returns a new logrus.Hook that pushes logrus.Entrys to loki and is configured
// through the provided line
//nolint:funlen
func LokiFromConfigLine(ctx context.Context, fallbackLogger logrus.FieldLogger, line string) (logrus.Hook, error) {
h := &lokiHook{
addr: "http://127.0.0.1:3100/loki/api/v1/push",
limit: 100,
levels: logrus.AllLevels,
pushPeriod: time.Second * 1,
ctx: ctx,
msgMaxSize: 1024 * 1024, // 1mb
ch: make(chan *logrus.Entry, 1000),
fallbackLogger: fallbackLogger,
}
if line == "loki" {
return h, nil
}
h := getDefaultLoki()

parts := strings.SplitN(line, "=", 2)
if parts[0] != "loki" {
return nil, fmt.Errorf("loki configuration should be in the form `loki=url-to-push` but is `%s`", line)
h.ctx = ctx
h.fallbackLogger = fallbackLogger

if line != "loki" {
parts := strings.SplitN(line, "=", 2)
if parts[0] != "loki" {
return nil, fmt.Errorf("loki configuration should be in the form `loki=url-to-push` but is `%s`", line)
}

err := h.parseArgs(line)
if err != nil {
return nil, err
}
}
args := strings.Split(parts[1], ",")
h.addr = args[0]
// TODO use something better ... maybe
// https://godoc.org/github.com/kubernetes/helm/pkg/strvals
// atleast until https://github.com/loadimpact/k6/issues/926?
if len(args) == 1 {
return h, nil
h.droppedLabels = make(map[string]string, 2+len(h.labels))
h.droppedLabels["level"] = logrus.WarnLevel.String()
for _, params := range h.labels {
h.droppedLabels[params[0]] = params[1]
}

for _, arg := range args[1:] {
paramParts := strings.SplitN(arg, "=", 2)
h.droppedMsg = h.filterLabels(h.droppedLabels, h.droppedMsg)

if len(paramParts) != 2 {
return nil, fmt.Errorf("loki arguments should be in the form `address,key1=value1,key2=value2`, got %s", arg)
}
h.client = &http.Client{Timeout: h.pushPeriod}

key, value := paramParts[0], paramParts[1]
go h.loop()

return h, nil
}

func (h *lokiHook) parseArgs(line string) error {
tokens, err := tokenize(line)
if err != nil {
return err
}

for _, token := range tokens {
key := token.key
value := token.value

var err error
switch key {
case "loki":
h.addr = value
case "pushPeriod":
h.pushPeriod, err = time.ParseDuration(value)
if err != nil {
return nil, fmt.Errorf("couldn't parse the loki pushPeriod %w", err)
return fmt.Errorf("couldn't parse the loki pushPeriod %w", err)
}
case "profile":
h.profile = true
case "limit":
h.limit, err = strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("couldn't parse the loki limit as a number %w", err)
return fmt.Errorf("couldn't parse the loki limit as a number %w", err)
}
if !(h.limit > 0) {
return nil, fmt.Errorf("loki limit needs to be a positive number, is %d", h.limit)
return fmt.Errorf("loki limit needs to be a positive number, is %d", h.limit)
}
case "msgMaxSize":
h.msgMaxSize, err = strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("couldn't parse the loki msgMaxSize as a number %w", err)
return fmt.Errorf("couldn't parse the loki msgMaxSize as a number %w", err)
}
if !(h.msgMaxSize > 0) {
return nil, fmt.Errorf("loki msgMaxSize needs to be a positive number, is %d", h.msgMaxSize)
return fmt.Errorf("loki msgMaxSize needs to be a positive number, is %d", h.msgMaxSize)
}
case "level":
h.levels, err = getLevels(value)
if err != nil {
return nil, err
return err
}
case "allowedLabels":
h.allowedLabels = strings.Split(value, ",")
default:
if strings.HasPrefix(key, "label.") {
labelKey := strings.TrimPrefix(key, "label.")
Expand All @@ -128,15 +153,11 @@ func LokiFromConfigLine(ctx context.Context, fallbackLogger logrus.FieldLogger,
continue
}

return nil, fmt.Errorf("unknown loki config key %s", key)
return fmt.Errorf("unknown loki config key %s", key)
}
}

h.client = &http.Client{Timeout: h.pushPeriod}

go h.loop()

return h, nil
return nil
}

func getLevels(level string) ([]logrus.Level, error) {
Expand Down Expand Up @@ -176,8 +197,7 @@ func (h *lokiHook) loop() {
cutOff := <-ch
close(ch) // signal that more buffering can continue

copy(oldLogs[len(oldLogs):len(oldLogs)+oldCount], msgsToPush[:oldCount])
oldLogs = oldLogs[:len(oldLogs)+oldCount]
oldLogs = append(oldLogs, msgsToPush[:oldCount]...)

t := time.Now()
cutOffIndex := sortAndSplitMsgs(oldLogs, cutOff)
Expand Down Expand Up @@ -244,12 +264,13 @@ func (h *lokiHook) loop() {
labels[params[0]] = params[1]
}
labels["level"] = entry.Level.String()
msg := h.filterLabels(labels, entry.Message) // TODO we can do this while constructing
// have the cutoff here ?
// if we cutoff here we can cut somewhat on the backbuffers and optimize the inserting
// in/creating of the final Streams that we push
msgs[count] = tmpMsg{
labels: labels,
msg: entry.Message,
msg: msg,
t: entry.Time.UnixNano(),
}
count++
Expand All @@ -269,6 +290,36 @@ func (h *lokiHook) loop() {
}
}

func (h *lokiHook) filterLabels(labels map[string]string, msg string) string {
imiric marked this conversation as resolved.
Show resolved Hide resolved
if h.allowedLabels == nil {
return msg
}
// TODO both can be reused as under load this will just generate a lot of *probably* fairly
// similar objects.
var b strings.Builder
keys := make([]string, 0, len(labels))
for key := range labels {
keys = append(keys, key)
}
sort.Strings(keys)
b.WriteString(msg)
outer:
for _, key := range keys {
for _, label := range h.allowedLabels {
if label == key {
continue outer
}
}
b.WriteRune(' ')
b.WriteString(key)
b.WriteRune('=')
b.WriteString(labels[key])
delete(labels, key)
}

return b.String()
}

func sortAndSplitMsgs(msgs []tmpMsg, cutOff int64) int {
if len(msgs) == 0 {
return 0
Expand All @@ -294,17 +345,10 @@ func (h *lokiHook) createPushMessage(msgs []tmpMsg, cutOffIndex, dropped int) *l
pushMsg.add(msg)
}
if dropped != 0 {
labels := make(map[string]string, 2+len(h.labels))
labels["level"] = logrus.WarnLevel.String()
for _, params := range h.labels {
labels[params[0]] = params[1]
}

msg := tmpMsg{
labels: labels,
msg: fmt.Sprintf("k6 dropped %d log messages because they were above the limit of %d messages / %s",
dropped, h.limit, h.pushPeriod),
t: msgs[cutOffIndex-1].t,
labels: h.droppedLabels,
msg: fmt.Sprintf(h.droppedMsg, dropped, h.limit, h.pushPeriod),
t: msgs[cutOffIndex-1].t,
}
pushMsg.add(msg)
}
Expand Down
78 changes: 64 additions & 14 deletions log/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ package log
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -40,24 +42,29 @@ func TestSyslogFromConfigLine(t *testing.T) {
{
line: "loki", // default settings
res: lokiHook{
ctx: context.Background(),
addr: "http://127.0.0.1:3100/loki/api/v1/push",
limit: 100,
pushPeriod: time.Second * 1,
levels: logrus.AllLevels,
msgMaxSize: 1024 * 1024,
ctx: context.Background(),
addr: "http://127.0.0.1:3100/loki/api/v1/push",
limit: 100,
pushPeriod: time.Second * 1,
levels: logrus.AllLevels,
msgMaxSize: 1024 * 1024,
droppedLabels: map[string]string{"level": "warning"},
droppedMsg: "k6 dropped %d log messages because they were above the limit of %d messages / %s",
},
},
{
line: "loki=somewhere:1233,label.something=else,label.foo=bar,limit=32,level=info,pushPeriod=5m32s,msgMaxSize=1231",
line: "loki=somewhere:1233,label.something=else,label.foo=bar,limit=32,level=info,allowedLabels=[something],pushPeriod=5m32s,msgMaxSize=1231",
res: lokiHook{
ctx: context.Background(),
addr: "somewhere:1233",
limit: 32,
pushPeriod: time.Minute*5 + time.Second*32,
levels: logrus.AllLevels[:5],
labels: [][2]string{{"something", "else"}, {"foo", "bar"}},
msgMaxSize: 1231,
ctx: context.Background(),
addr: "somewhere:1233",
limit: 32,
pushPeriod: time.Minute*5 + time.Second*32,
levels: logrus.AllLevels[:5],
labels: [][2]string{{"something", "else"}, {"foo", "bar"}},
msgMaxSize: 1231,
allowedLabels: []string{"something"},
droppedLabels: map[string]string{"something": "else"},
droppedMsg: "k6 dropped %d log messages because they were above the limit of %d messages / %s foo=bar level=warning",
},
},
{
Expand Down Expand Up @@ -112,3 +119,46 @@ func TestLogEntryMarshal(t *testing.T) {

require.Equal(t, expected, s)
}

func TestFilterLabels(t *testing.T) {
cases := []struct {
allowedLabels []string
labels map[string]string
expectedLabels map[string]string
msg string
result string
}{
{
allowedLabels: []string{"a", "b"},
labels: map[string]string{"a": "1", "b": "2", "d": "3", "c": "4", "e": "5"},
expectedLabels: map[string]string{"a": "1", "b": "2"},
msg: "some msg",
result: "some msg c=4 d=3 e=5",
},
{
allowedLabels: []string{"a", "b"},
labels: map[string]string{"d": "3", "c": "4", "e": "5"},
expectedLabels: map[string]string{},
msg: "some msg",
result: "some msg c=4 d=3 e=5",
},
{
allowedLabels: []string{"a", "b"},
labels: map[string]string{"a": "1", "d": "3", "c": "4", "e": "5"},
expectedLabels: map[string]string{"a": "1"},
msg: "some msg",
result: "some msg c=4 d=3 e=5",
},
}

for i, c := range cases {
c := c
t.Run(fmt.Sprint(i), func(t *testing.T) {
h := &lokiHook{}
h.allowedLabels = c.allowedLabels
result := h.filterLabels(c.labels, c.msg)
assert.Equal(t, c.result, result)
assert.Equal(t, c.expectedLabels, c.labels)
})
}
}
Loading