Skip to content

Commit

Permalink
Use slice of labels for json and proto structures
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Jul 11, 2023
1 parent 4c60a9e commit 4028758
Show file tree
Hide file tree
Showing 15 changed files with 1,303 additions and 231 deletions.
47 changes: 18 additions & 29 deletions pkg/loghttp/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import (
"github.com/buger/jsonparser"
jsoniter "github.com/json-iterator/go"
"github.com/modern-go/reflect2"

"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/prometheus/model/labels"
)

func init() {
Expand All @@ -20,21 +19,7 @@ func init() {
type Entry struct {
Timestamp time.Time
Line string
NonIndexedLabels LabelSet
}

func (e Entry) ToProto() logproto.Entry {
// If there are no labels, we return empty string instead of '{}'.
var nonIndexedLabels string
if len(e.NonIndexedLabels) > 0 {
nonIndexedLabels = e.NonIndexedLabels.String()
}

return logproto.Entry{
Timestamp: e.Timestamp,
Line: e.Line,
NonIndexedLabels: nonIndexedLabels,
}
NonIndexedLabels labels.Labels
}

func (e *Entry) UnmarshalJSON(data []byte) error {
Expand Down Expand Up @@ -72,17 +57,21 @@ func (e *Entry) UnmarshalJSON(data []byte) error {
parseError = jsonparser.MalformedObjectError
return
}
e.NonIndexedLabels = make(LabelSet)
var nonIndexedLabels labels.Labels
if err := jsonparser.ObjectEach(value, func(key []byte, value []byte, dataType jsonparser.ValueType, _ int) error {
if dataType != jsonparser.String {
return jsonparser.MalformedStringError
}
e.NonIndexedLabels[yoloString(key)] = yoloString(value)
nonIndexedLabels = append(nonIndexedLabels, labels.Label{
Name: yoloString(key),
Value: yoloString(value),
})
return nil
}); err != nil {
parseError = err
return
}
e.NonIndexedLabels = nonIndexedLabels
}
i++
})
Expand All @@ -104,7 +93,7 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
i := 0
var ts time.Time
var line string
var labels LabelSet
var nonIndexedLabels labels.Labels
ok := iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool {
var ok bool
switch i {
Expand All @@ -120,10 +109,12 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
}
return true
case 2:
labels = make(LabelSet)
iter.ReadMapCB(func(iter *jsoniter.Iterator, labelName string) bool {
labelValue := iter.ReadString()
labels[labelName] = labelValue
nonIndexedLabels = append(nonIndexedLabels, labels.Label{
Name: labelName,
Value: labelValue,
})
return true
})
i++
Expand All @@ -140,7 +131,7 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
*((*[]Entry)(ptr)) = append(*((*[]Entry)(ptr)), Entry{
Timestamp: ts,
Line: line,
NonIndexedLabels: labels,
NonIndexedLabels: nonIndexedLabels,
})
return true
}
Expand Down Expand Up @@ -180,14 +171,12 @@ func (EntryEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
if len(e.NonIndexedLabels) > 0 {
stream.WriteMore()
stream.WriteObjectStart()
var idx int
for lName, lValue := range e.NonIndexedLabels {
if idx > 0 {
for i, lbl := range e.NonIndexedLabels {
if i > 0 {
stream.WriteMore()
}
stream.WriteObjectField(lName)
stream.WriteString(lValue)
idx++
stream.WriteObjectField(lbl.Name)
stream.WriteString(lbl.Value)
}
stream.WriteObjectEnd()
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"unsafe"

"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
"github.com/prometheus/prometheus/model/labels"

"github.com/buger/jsonparser"
json "github.com/json-iterator/go"
Expand Down Expand Up @@ -156,19 +157,22 @@ func unmarshalHTTPToLogProtoEntry(data []byte) (logproto.Entry, error) {
}
e.Line = v
case 2: // nonIndexedLabels
nonIndexedLabels := make(LabelSet)
var nonIndexedLabels labels.Labels
err := jsonparser.ObjectEach(value, func(key, val []byte, dataType jsonparser.ValueType, _ int) error {
if dataType != jsonparser.String {
return jsonparser.MalformedStringError
}
nonIndexedLabels[yoloString(key)] = yoloString(val)
nonIndexedLabels = append(nonIndexedLabels, labels.Label{
Name: yoloString(key),
Value: yoloString(val),
})
return nil
})
if err != nil {
parseError = err
return
}
e.NonIndexedLabels = nonIndexedLabels.String()
e.NonIndexedLabels = nonIndexedLabels
}
i++
})
Expand Down Expand Up @@ -238,10 +242,7 @@ func (s Streams) ToProto() []logproto.Stream {
}
result := make([]logproto.Stream, 0, len(s))
for _, s := range s {
entries := make([]logproto.Entry, len(s.Entries), len(s.Entries))
for i, e := range s.Entries {
entries[i] = e.ToProto()
}
entries := *(*[]logproto.Entry)(unsafe.Pointer(&s.Entries))
result = append(result, logproto.Stream{Labels: s.Labels.String(), Entries: entries})
}
return result
Expand Down
36 changes: 29 additions & 7 deletions pkg/loghttp/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

jsoniter "github.com/json-iterator/go"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -153,14 +154,20 @@ func TestStreams_ToProto(t *testing.T) {
Labels: map[string]string{"foo": "bar"},
Entries: []Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2", NonIndexedLabels: LabelSet{"foo": "a", "bar": "b"}},
{Timestamp: time.Unix(0, 2), Line: "2", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
{
Labels: map[string]string{"foo": "bar", "lvl": "error"},
Entries: []Entry{
{Timestamp: time.Unix(0, 3), Line: "3"},
{Timestamp: time.Unix(0, 4), Line: "4", NonIndexedLabels: LabelSet{"foo": "a", "bar": "b"}},
{Timestamp: time.Unix(0, 4), Line: "4", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
},
Expand All @@ -169,14 +176,20 @@ func TestStreams_ToProto(t *testing.T) {
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2", NonIndexedLabels: LabelSet{"foo": "a", "bar": "b"}.String()},
{Timestamp: time.Unix(0, 2), Line: "2", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
{
Labels: `{foo="bar", lvl="error"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 3), Line: "3"},
{Timestamp: time.Unix(0, 4), Line: "4", NonIndexedLabels: LabelSet{"foo": "a", "bar": "b"}.String()},
{Timestamp: time.Unix(0, 4), Line: "4", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
},
Expand Down Expand Up @@ -210,7 +223,10 @@ func Test_QueryResponseUnmarshal(t *testing.T) {
Labels: LabelSet{"foo": "bar"},
Entries: []Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2", NonIndexedLabels: LabelSet{"foo": "a", "bar": "b"}},
{Timestamp: time.Unix(0, 2), Line: "2", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
},
Expand All @@ -230,7 +246,10 @@ func Test_QueryResponseUnmarshal(t *testing.T) {
Labels: LabelSet{"foo": "bar"},
Entries: []Entry{
{Timestamp: time.Unix(0, 1), Line: "log line 1"},
{Timestamp: time.Unix(0, 2), Line: "some log line 2", NonIndexedLabels: LabelSet{"foo": "a", "bar": "b"}},
{Timestamp: time.Unix(0, 2), Line: "some log line 2", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
Stream{
Expand All @@ -240,7 +259,10 @@ func Test_QueryResponseUnmarshal(t *testing.T) {
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 2), Line: "2", NonIndexedLabels: LabelSet{"foo": "a", "bar": "b"}},
{Timestamp: time.Unix(0, 2), Line: "2", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
},
Expand Down
1 change: 1 addition & 0 deletions pkg/push/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/gogo/protobuf v1.3.2
github.com/prometheus/prometheus v0.45.0
github.com/stretchr/testify v1.8.1
google.golang.org/grpc v1.52.0
)
Expand Down

0 comments on commit 4028758

Please sign in to comment.