From f43dfab7febf8784e6624ea3459826ac0ec0309b Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 29 Jun 2023 10:43:23 +0200 Subject: [PATCH] Make labels in loghttp.Entry of type LabelSet --- .../loki_micro_services_delete_test.go | 4 -- pkg/loghttp/entry.go | 65 +++++++++++++++---- pkg/loghttp/query.go | 5 +- pkg/querier/queryrange/codec_test.go | 4 +- pkg/util/marshal/marshal_test.go | 6 +- pkg/util/marshal/query.go | 26 ++++++-- 6 files changed, 83 insertions(+), 27 deletions(-) diff --git a/integration/loki_micro_services_delete_test.go b/integration/loki_micro_services_delete_test.go index 78d1b44b8774..1ba0e3e2c20c 100644 --- a/integration/loki_micro_services_delete_test.go +++ b/integration/loki_micro_services_delete_test.go @@ -99,22 +99,18 @@ func TestMicroServicesDeleteRequest(t *testing.T) { { strconv.FormatInt(now.Add(-48*time.Hour).UnixNano(), 10), "lineA", - "", }, { strconv.FormatInt(now.Add(-48*time.Hour).UnixNano(), 10), "lineB", - "", }, { strconv.FormatInt(now.Add(-time.Minute).UnixNano(), 10), "lineC", - "", }, { strconv.FormatInt(now.Add(-time.Minute).UnixNano(), 10), "lineD", - "", }, }, }) diff --git a/pkg/loghttp/entry.go b/pkg/loghttp/entry.go index afcda0262cb9..43b268b06446 100644 --- a/pkg/loghttp/entry.go +++ b/pkg/loghttp/entry.go @@ -8,6 +8,8 @@ import ( "github.com/buger/jsonparser" jsoniter "github.com/json-iterator/go" "github.com/modern-go/reflect2" + + "github.com/grafana/loki/pkg/logproto" ) func init() { @@ -18,7 +20,21 @@ func init() { type Entry struct { Timestamp time.Time Line string - Labels string + Labels LabelSet +} + +func (e Entry) ToProto() logproto.Entry { + // If there are no labels, we return empty string instead of '{}'. + var labels string + if len(e.Labels) > 0 { + labels = e.Labels.String() + } + + return logproto.Entry{ + Timestamp: e.Timestamp, + Line: e.Line, + Labels: labels, + } } func (e *Entry) UnmarshalJSON(data []byte) error { @@ -28,12 +44,12 @@ func (e *Entry) UnmarshalJSON(data []byte) error { ) _, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) { // assert that both items in array are of type string - if t != jsonparser.String { - parseError = jsonparser.MalformedStringError - return - } switch i { case 0: // timestamp + if t != jsonparser.String { + parseError = jsonparser.MalformedStringError + return + } ts, err := jsonparser.ParseInt(value) if err != nil { parseError = err @@ -41,6 +57,10 @@ func (e *Entry) UnmarshalJSON(data []byte) error { } e.Timestamp = time.Unix(0, ts) case 1: // value + if t != jsonparser.String { + parseError = jsonparser.MalformedStringError + return + } v, err := jsonparser.ParseString(value) if err != nil { parseError = err @@ -48,12 +68,21 @@ func (e *Entry) UnmarshalJSON(data []byte) error { } e.Line = v case 2: // labels - il, err := jsonparser.ParseString(value) - if err != nil { + if t != jsonparser.Object { + parseError = jsonparser.MalformedObjectError + return + } + e.Labels = make(LabelSet) + if err := jsonparser.ObjectEach(value, func(key []byte, value []byte, dataType jsonparser.ValueType, _ int) error { + if dataType != jsonparser.String { + return jsonparser.MalformedStringError + } + e.Labels[yoloString(key)] = yoloString(value) + return nil + }); err != nil { parseError = err return } - e.Labels = il } i++ }) @@ -75,7 +104,7 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) { i := 0 var ts time.Time var line string - var labels string + var labels LabelSet ok := iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { var ok bool switch i { @@ -91,7 +120,12 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) { } return true case 2: - labels = iter.ReadString() + labels = make(LabelSet) + iter.ReadMapCB(func(iter *jsoniter.Iterator, labelName string) bool { + labelValue := iter.ReadString() + labels[labelName] = labelValue + return true + }) i++ if iter.Error != nil { return false @@ -143,8 +177,15 @@ func (EntryEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) { stream.WriteRaw(`"`) stream.WriteMore() stream.WriteStringWithHTMLEscaped(e.Line) - stream.WriteMore() - stream.WriteString(e.Labels) + if len(e.Labels) > 0 { + stream.WriteMore() + stream.WriteObjectStart() + for lName, lValue := range e.Labels { + stream.WriteObjectField(lName) + stream.WriteString(lValue) + } + stream.WriteObjectEnd() + } stream.WriteArrayEnd() } diff --git a/pkg/loghttp/query.go b/pkg/loghttp/query.go index ddc66193138f..e1bb65b0cf6d 100644 --- a/pkg/loghttp/query.go +++ b/pkg/loghttp/query.go @@ -233,7 +233,10 @@ func (s Streams) ToProto() []logproto.Stream { } result := make([]logproto.Stream, 0, len(s)) for _, s := range s { - entries := *(*[]logproto.Entry)(unsafe.Pointer(&s.Entries)) + entries := make([]logproto.Entry, len(s.Entries), len(s.Entries)) + for i, e := range s.Entries { + entries[i] = e.ToProto() + } result = append(result, logproto.Stream{Labels: s.Labels.String(), Entries: entries}) } return result diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index a1a284c78db9..532219baa7ed 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -1187,7 +1187,7 @@ var ( "test": "test" }, "values":[ - [ "123456789012345", "super line", "" ] + [ "123456789012345", "super line"] ] }, { @@ -1195,7 +1195,7 @@ var ( "test": "test2" }, "values":[ - [ "123456789012346", "super line2", "" ] + [ "123456789012346", "super line2"] ] } ] diff --git a/pkg/util/marshal/marshal_test.go b/pkg/util/marshal/marshal_test.go index ecd03e920710..6025abd4528d 100644 --- a/pkg/util/marshal/marshal_test.go +++ b/pkg/util/marshal/marshal_test.go @@ -48,7 +48,7 @@ var queryTests = []struct { "test": "test" }, "values":[ - [ "123456789012345", "super line", "" ] + [ "123456789012345", "super line"] ] } ], @@ -506,7 +506,7 @@ var tailTests = []struct { "test": "test" }, "values":[ - [ "123456789012345", "super line", "" ] + [ "123456789012345", "super line"] ] } ], @@ -828,7 +828,7 @@ func Test_WriteTailResponseJSON(t *testing.T) { }, }, WebsocketWriterFunc(func(i int, b []byte) error { - require.Equal(t, `{"streams":[{"stream":{"app":"foo"},"values":[["1","foobar",""]]}],"dropped_entries":[{"timestamp":"2","labels":{"app":"dropped"}}]}`, string(b)) + require.Equal(t, `{"streams":[{"stream":{"app":"foo"},"values":[["1","foobar"]]}],"dropped_entries":[{"timestamp":"2","labels":{"app":"dropped"}}]}`, string(b)) return nil }), ), diff --git a/pkg/util/marshal/query.go b/pkg/util/marshal/query.go index bcc4f500b75c..5a322027bd05 100644 --- a/pkg/util/marshal/query.go +++ b/pkg/util/marshal/query.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/logqlmodel" ) @@ -96,18 +97,31 @@ func NewStream(s logproto.Stream) (loghttp.Stream, error) { } for i, e := range s.Entries { - ret.Entries[i] = NewEntry(e) + ret.Entries[i], err = NewEntry(e) + if err != nil { + return loghttp.Stream{}, err + } } return ret, nil } // NewEntry constructs an Entry from a logproto.Entry -func NewEntry(e logproto.Entry) loghttp.Entry { +func NewEntry(e logproto.Entry) (loghttp.Entry, error) { + var labels loghttp.LabelSet + if e.Labels != "" { + lbls, err := syntax.ParseLabels(e.Labels) + if err != nil { + return loghttp.Entry{}, errors.Wrapf(err, "err while creating labelset for entry %s", e.Labels) + } + labels = lbls.Map() + } + return loghttp.Entry{ Timestamp: e.Timestamp, Line: e.Line, - } + Labels: labels, + }, nil } func NewScalar(s promql.Scalar) loghttp.Scalar { @@ -315,8 +329,10 @@ func encodeStream(stream logproto.Stream, s *jsoniter.Stream) error { s.WriteRaw(`"`) s.WriteMore() s.WriteStringWithHTMLEscaped(e.Line) - s.WriteMore() - s.WriteString(e.Labels) + if e.Labels != "" { + s.WriteMore() + s.WriteString(e.Labels) + } s.WriteArrayEnd() s.Flush()