Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup labels parsing. #2929

Merged
merged 3 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

cortex_distributor "github.com/cortexproject/cortex/pkg/distributor"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
cortex_util "github.com/cortexproject/cortex/pkg/util"
Expand All @@ -25,6 +24,7 @@ import (

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
Expand Down Expand Up @@ -206,14 +206,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validatedSamplesCount := 0

for _, stream := range req.Streams {
ls, err := util.ToClientLabels(stream.Labels)
ls, err := logql.ParseLabels(stream.Labels)
if err != nil {
validationErr = httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err)
continue
}
// ensure labels are correctly sorted.
// todo(ctovena) we should lru cache this
stream.Labels = cortex_client.FromLabelAdaptersToLabels(ls).String()
stream.Labels = ls.String()
if err := d.validator.ValidateLabels(userID, ls, stream); err != nil {
validationErr = err
continue
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strings"
"time"

cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -52,7 +52,7 @@ func (v Validator) ValidateEntry(userID string, labels string, entry logproto.En
}

// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(userID string, ls []cortex_client.LabelAdapter, stream logproto.Stream) error {
func (v Validator) ValidateLabels(userID string, ls labels.Labels, stream logproto.Stream) error {
numLabelNames := len(ls)
if numLabelNames > v.MaxLabelNamesPerSeries(userID) {
validation.DiscardedSamples.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Inc()
Expand All @@ -61,7 +61,7 @@ func (v Validator) ValidateLabels(userID string, ls []cortex_client.LabelAdapter
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Add(float64(bytes))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(cortex_client.FromLabelAdaptersToMetric(ls).String(), numLabelNames, v.MaxLabelNamesPerSeries(userID)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(stream.Labels, numLabelNames, v.MaxLabelNamesPerSeries(userID)))
}

maxLabelNameLength := v.MaxLabelNameLength(userID)
Expand Down
12 changes: 6 additions & 6 deletions pkg/distributor/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/assert"
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -101,7 +101,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
return &validation.Limits{MaxLabelNamesPerSeries: 2}
},
"{foo=\"bar\",food=\"bars\",fed=\"bears\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg("{fed=\"bears\", foo=\"bar\", food=\"bars\"}", 3, 2)),
httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg("{foo=\"bar\",food=\"bars\",fed=\"bears\"}", 3, 2)),
},
{
"label name too long",
Expand Down Expand Up @@ -157,10 +157,10 @@ func TestValidator_ValidateLabels(t *testing.T) {
}
}

func mustParseLabels(s string) []client.LabelAdapter {
labels, err := util.ToClientLabels(s)
func mustParseLabels(s string) labels.Labels {
ls, err := logql.ParseLabels(s)
if err != nil {
panic(err)
}
return labels
return ls
}
20 changes: 9 additions & 11 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -108,15 +107,15 @@ func newInstance(cfg *Config, instanceID string, factory func() chunkenc.Chunk,

// consumeChunk manually adds a chunk that was received during ingester chunk
// transfer.
func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapter, chunk *logproto.Chunk) error {
func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *logproto.Chunk) error {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()

fp := i.getHashForLabels(labels)
fp := i.getHashForLabels(ls)

stream, ok := i.streamsByFP[fp]
if !ok {
sortedLabels := i.index.Add(labels, fp)
sortedLabels := i.index.Add(client.FromLabelsToLabelAdapters(ls), fp)
stream = newStream(i.cfg, fp, sortedLabels, i.factory)
i.streamsByFP[fp] = stream
i.streams[stream.labelsString] = stream
Expand Down Expand Up @@ -175,13 +174,13 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, er
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg())
}

labels, err := util.ToClientLabels(pushReqStream.Labels)
labels, err := logql.ParseLabels(pushReqStream.Labels)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(labels, fp)
stream = newStream(i.cfg, fp, sortedLabels, i.factory)
_ = i.index.Add(client.FromLabelsToLabelAdapters(labels), fp)
stream = newStream(i.cfg, fp, labels, i.factory)
i.streams[pushReqStream.Labels] = stream
i.streamsByFP[fp] = stream

Expand All @@ -192,11 +191,10 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, er
return stream, nil
}

func (i *instance) getHashForLabels(labels []client.LabelAdapter) model.Fingerprint {
func (i *instance) getHashForLabels(ls labels.Labels) model.Fingerprint {
var fp uint64
lbsModel := client.FromLabelAdaptersToLabels(labels)
fp, i.buf = lbsModel.HashWithoutLabels(i.buf, []string(nil)...)
return i.mapper.mapFP(model.Fingerprint(fp), labels)
fp, i.buf = ls.HashWithoutLabels(i.buf, []string(nil)...)
return i.mapper.mapFP(model.Fingerprint(fp), ls)
}

// Return labels associated with given fingerprint. Used by fingerprint mapper. Must hold streamsMtx.
Expand Down
43 changes: 4 additions & 39 deletions pkg/ingester/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/prometheus/prometheus/pkg/labels"

"github.com/cortexproject/cortex/pkg/ingester/client"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -51,7 +49,7 @@ func newFPMapper(fpToLabels func(fingerprint model.Fingerprint) labels.Labels) *
// mapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and
// returns a truly unique fingerprint. The caller must have locked the raw
// fingerprint.
func (m *fpMapper) mapFP(fp model.Fingerprint, metric []client.LabelAdapter) model.Fingerprint {
func (m *fpMapper) mapFP(fp model.Fingerprint, metric labels.Labels) model.Fingerprint {
// First check if we are in the reserved FP space, in which case this is
// automatically a collision that has to be mapped.
if fp <= maxMappedFP {
Expand All @@ -63,7 +61,7 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric []client.LabelAdapter) mod
s := m.fpToLabels(fp)
if s != nil {
// FP exists in memory, but is it for the same metric?
if equalLabels(metric, s) {
if labels.Equal(metric, s) {
// Yupp. We are done.
return fp
}
Expand All @@ -89,43 +87,10 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric []client.LabelAdapter) mod
return fp
}

func valueForName(s labels.Labels, name string) (string, bool) {
pos := sort.Search(len(s), func(i int) bool { return s[i].Name >= name })
if pos == len(s) || s[pos].Name != name {
return "", false
}
return s[pos].Value, true
}

// Check if a and b contain the same name/value pairs
func equalLabels(a []client.LabelAdapter, b labels.Labels) bool {
if len(a) != len(b) {
return false
}
// Check as many as we can where the two sets are in the same order
i := 0
for ; i < len(a); i++ {
if b[i].Name != a[i].Name {
break
}
if b[i].Value != a[i].Value {
return false
}
}
// Now check remaining values using binary search
for ; i < len(a); i++ {
v, found := valueForName(b, a[i].Name)
if !found || v != a[i].Value {
return false
}
}
return true
}

// maybeAddMapping is only used internally. It takes a detected collision and
// adds it to the collisions map if not yet there. In any case, it returns the
// truly unique fingerprint for the colliding metric.
func (m *fpMapper) maybeAddMapping(fp model.Fingerprint, collidingMetric []client.LabelAdapter) model.Fingerprint {
func (m *fpMapper) maybeAddMapping(fp model.Fingerprint, collidingMetric labels.Labels) model.Fingerprint {
ms := metricToUniqueString(collidingMetric)
m.mtx.RLock()
mappedFPs, ok := m.mappings[fp]
Expand Down Expand Up @@ -177,7 +142,7 @@ func (m *fpMapper) nextMappedFP() model.Fingerprint {
// FastFingerprint function, and its result is not suitable as a key for maps
// and indexes as it might become really large, causing a lot of hashing effort
// in maps and a lot of storage overhead in indexes.
func metricToUniqueString(m []client.LabelAdapter) string {
func metricToUniqueString(m labels.Labels) string {
parts := make([]string, 0, len(m))
for _, pair := range m {
parts = append(parts, pair.Name+separatorString+pair.Value)
Expand Down
27 changes: 13 additions & 14 deletions pkg/ingester/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"sort"
"testing"

"github.com/cortexproject/cortex/pkg/ingester/client"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)
Expand All @@ -20,34 +18,34 @@ var (
fp1 = model.Fingerprint(maxMappedFP + 1)
fp2 = model.Fingerprint(maxMappedFP + 2)
fp3 = model.Fingerprint(1)
cm11 = []client.LabelAdapter{
{Name: "foo", Value: "bar"},
cm11 = []labels.Label{
{Name: "dings", Value: "bumms"},
{Name: "foo", Value: "bar"},
}
cm12 = []client.LabelAdapter{
cm12 = []labels.Label{
{Name: "bar", Value: "foo"},
}
cm13 = []client.LabelAdapter{
cm13 = []labels.Label{
{Name: "foo", Value: "bar"},
}
cm21 = []client.LabelAdapter{
{Name: "foo", Value: "bumms"},
cm21 = []labels.Label{
{Name: "dings", Value: "bar"},
{Name: "foo", Value: "bumms"},
}
cm22 = []client.LabelAdapter{
{Name: "dings", Value: "foo"},
cm22 = []labels.Label{
{Name: "bar", Value: "bumms"},
{Name: "dings", Value: "foo"},
}
cm31 = []client.LabelAdapter{
cm31 = []labels.Label{
{Name: "bumms", Value: "dings"},
}
cm32 = []client.LabelAdapter{
{Name: "bumms", Value: "dings"},
cm32 = []labels.Label{
{Name: "bar", Value: "foo"},
{Name: "bumms", Value: "dings"},
}
)

func copyValuesAndSort(a []client.LabelAdapter) labels.Labels {
func copyValuesAndSort(a []labels.Label) labels.Labels {
c := make(labels.Labels, len(a))
for i, pair := range a {
c[i].Name = pair.Name
Expand Down Expand Up @@ -131,6 +129,7 @@ func TestFPMapper(t *testing.T) {

// assertFingerprintEqual asserts that two fingerprints are equal.
func assertFingerprintEqual(t *testing.T, gotFP, wantFP model.Fingerprint) {
t.Helper()
if gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"os"
"time"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"
"golang.org/x/net/context"

Expand Down Expand Up @@ -101,9 +101,9 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)

userCtx := user.InjectOrgID(stream.Context(), chunkSet.UserId)

lbls := []client.LabelAdapter{}
lbls := make([]labels.Label, 0, len(chunkSet.Labels))
for _, lbl := range chunkSet.Labels {
lbls = append(lbls, client.LabelAdapter{Name: lbl.Name, Value: lbl.Value})
lbls = append(lbls, labels.Label{Name: lbl.Name, Value: lbl.Value})
}

instance := i.getOrCreateInstance(chunkSet.UserId)
Expand Down
13 changes: 0 additions & 13 deletions pkg/util/conv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,13 @@ package util

import (
"math"
"sort"
"time"
"unsafe"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql/parser"
)

// ToClientLabels parses the labels and converts them to the Cortex type.
func ToClientLabels(labels string) ([]client.LabelAdapter, error) {
ls, err := parser.ParseMetric(labels)
if err != nil {
return nil, err
}
sort.Sort(ls)
return client.FromLabelsToLabelAdapters(ls), nil
}

// ModelLabelSetToMap convert a model.LabelSet to a map[string]string
func ModelLabelSetToMap(m model.LabelSet) map[string]string {
if len(m) == 0 {
Expand Down