Skip to content

Commit

Permalink
Make labels in loghttp.Entry of type LabelSet
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Jun 29, 2023
1 parent 617e864 commit f43dfab
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 27 deletions.
4 changes: 0 additions & 4 deletions integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,18 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
{
strconv.FormatInt(now.Add(-48*time.Hour).UnixNano(), 10),
"lineA",
"",
},
{
strconv.FormatInt(now.Add(-48*time.Hour).UnixNano(), 10),
"lineB",
"",
},
{
strconv.FormatInt(now.Add(-time.Minute).UnixNano(), 10),
"lineC",
"",
},
{
strconv.FormatInt(now.Add(-time.Minute).UnixNano(), 10),
"lineD",
"",
},
},
})
Expand Down
65 changes: 53 additions & 12 deletions pkg/loghttp/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/buger/jsonparser"
jsoniter "github.com/json-iterator/go"
"github.com/modern-go/reflect2"

"github.com/grafana/loki/pkg/logproto"
)

func init() {
Expand All @@ -18,7 +20,21 @@ func init() {
type Entry struct {
Timestamp time.Time
Line string
Labels string
Labels LabelSet
}

func (e Entry) ToProto() logproto.Entry {
// If there are no labels, we return empty string instead of '{}'.
var labels string
if len(e.Labels) > 0 {
labels = e.Labels.String()
}

return logproto.Entry{
Timestamp: e.Timestamp,
Line: e.Line,
Labels: labels,
}
}

func (e *Entry) UnmarshalJSON(data []byte) error {
Expand All @@ -28,32 +44,45 @@ func (e *Entry) UnmarshalJSON(data []byte) error {
)
_, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) {
// assert that both items in array are of type string
if t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
}
switch i {
case 0: // timestamp
if t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
}
ts, err := jsonparser.ParseInt(value)
if err != nil {
parseError = err
return
}
e.Timestamp = time.Unix(0, ts)
case 1: // value
if t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
}
v, err := jsonparser.ParseString(value)
if err != nil {
parseError = err
return
}
e.Line = v
case 2: // labels
il, err := jsonparser.ParseString(value)
if err != nil {
if t != jsonparser.Object {
parseError = jsonparser.MalformedObjectError
return
}
e.Labels = make(LabelSet)
if err := jsonparser.ObjectEach(value, func(key []byte, value []byte, dataType jsonparser.ValueType, _ int) error {
if dataType != jsonparser.String {
return jsonparser.MalformedStringError
}
e.Labels[yoloString(key)] = yoloString(value)
return nil
}); err != nil {
parseError = err
return
}
e.Labels = il
}
i++
})
Expand All @@ -75,7 +104,7 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
i := 0
var ts time.Time
var line string
var labels string
var labels LabelSet
ok := iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool {
var ok bool
switch i {
Expand All @@ -91,7 +120,12 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
}
return true
case 2:
labels = iter.ReadString()
labels = make(LabelSet)
iter.ReadMapCB(func(iter *jsoniter.Iterator, labelName string) bool {
labelValue := iter.ReadString()
labels[labelName] = labelValue
return true
})
i++
if iter.Error != nil {
return false
Expand Down Expand Up @@ -143,8 +177,15 @@ func (EntryEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
stream.WriteRaw(`"`)
stream.WriteMore()
stream.WriteStringWithHTMLEscaped(e.Line)
stream.WriteMore()
stream.WriteString(e.Labels)
if len(e.Labels) > 0 {
stream.WriteMore()
stream.WriteObjectStart()
for lName, lValue := range e.Labels {
stream.WriteObjectField(lName)
stream.WriteString(lValue)
}
stream.WriteObjectEnd()
}
stream.WriteArrayEnd()
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ func (s Streams) ToProto() []logproto.Stream {
}
result := make([]logproto.Stream, 0, len(s))
for _, s := range s {
entries := *(*[]logproto.Entry)(unsafe.Pointer(&s.Entries))
entries := make([]logproto.Entry, len(s.Entries), len(s.Entries))
for i, e := range s.Entries {
entries[i] = e.ToProto()
}
result = append(result, logproto.Stream{Labels: s.Labels.String(), Entries: entries})
}
return result
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1187,15 +1187,15 @@ var (
"test": "test"
},
"values":[
[ "123456789012345", "super line", "" ]
[ "123456789012345", "super line"]
]
},
{
"stream": {
"test": "test2"
},
"values":[
[ "123456789012346", "super line2", "" ]
[ "123456789012346", "super line2"]
]
}
]
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/marshal/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var queryTests = []struct {
"test": "test"
},
"values":[
[ "123456789012345", "super line", "" ]
[ "123456789012345", "super line"]
]
}
],
Expand Down Expand Up @@ -506,7 +506,7 @@ var tailTests = []struct {
"test": "test"
},
"values":[
[ "123456789012345", "super line", "" ]
[ "123456789012345", "super line"]
]
}
],
Expand Down Expand Up @@ -828,7 +828,7 @@ func Test_WriteTailResponseJSON(t *testing.T) {
},
},
WebsocketWriterFunc(func(i int, b []byte) error {
require.Equal(t, `{"streams":[{"stream":{"app":"foo"},"values":[["1","foobar",""]]}],"dropped_entries":[{"timestamp":"2","labels":{"app":"dropped"}}]}`, string(b))
require.Equal(t, `{"streams":[{"stream":{"app":"foo"},"values":[["1","foobar"]]}],"dropped_entries":[{"timestamp":"2","labels":{"app":"dropped"}}]}`, string(b))
return nil
}),
),
Expand Down
26 changes: 21 additions & 5 deletions pkg/util/marshal/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/logqlmodel"
)

Expand Down Expand Up @@ -96,18 +97,31 @@ func NewStream(s logproto.Stream) (loghttp.Stream, error) {
}

for i, e := range s.Entries {
ret.Entries[i] = NewEntry(e)
ret.Entries[i], err = NewEntry(e)
if err != nil {
return loghttp.Stream{}, err
}
}

return ret, nil
}

// NewEntry constructs an Entry from a logproto.Entry
func NewEntry(e logproto.Entry) loghttp.Entry {
func NewEntry(e logproto.Entry) (loghttp.Entry, error) {
var labels loghttp.LabelSet
if e.Labels != "" {
lbls, err := syntax.ParseLabels(e.Labels)
if err != nil {
return loghttp.Entry{}, errors.Wrapf(err, "err while creating labelset for entry %s", e.Labels)
}
labels = lbls.Map()
}

return loghttp.Entry{
Timestamp: e.Timestamp,
Line: e.Line,
}
Labels: labels,
}, nil
}

func NewScalar(s promql.Scalar) loghttp.Scalar {
Expand Down Expand Up @@ -315,8 +329,10 @@ func encodeStream(stream logproto.Stream, s *jsoniter.Stream) error {
s.WriteRaw(`"`)
s.WriteMore()
s.WriteStringWithHTMLEscaped(e.Line)
s.WriteMore()
s.WriteString(e.Labels)
if e.Labels != "" {
s.WriteMore()
s.WriteString(e.Labels)
}
s.WriteArrayEnd()

s.Flush()
Expand Down

0 comments on commit f43dfab

Please sign in to comment.