Skip to content

Commit

Permalink
Merge pull request #442 from go-faster/perf/optimize-group-entries
Browse files Browse the repository at this point in the history
perf(logqlengine): re-use key buffer
  • Loading branch information
tdakkota committed Jun 28, 2024
2 parents d90e1f3 + 2082cc7 commit 5030322
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 19 deletions.
13 changes: 9 additions & 4 deletions internal/iterators/iterators.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,15 @@ type SliceIterator[T any] struct {

// Slice creates new SliceIterator from given values.
func Slice[T any](vals []T) *SliceIterator[T] {
return &SliceIterator[T]{
data: vals,
n: 0,
}
i := new(SliceIterator[T])
i.Reset(vals)
return i
}

// Reset restores state of iterator using given data.
func (i *SliceIterator[T]) Reset(vals []T) {
i.data = vals
i.n = 0
}

// Next returns true, if there is element and fills t.
Expand Down
7 changes: 4 additions & 3 deletions internal/logql/logqlengine/engine_log_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,20 @@ func (q *LogQuery) eval(ctx context.Context, params EvalParams) (data lokiapi.St
func groupEntries(iter EntryIterator) (s lokiapi.Streams, total int, _ error) {
var (
e Entry
buf = make([]byte, 0, 1024)
streams = map[string]lokiapi.Stream{}
)
for iter.Next(&e) {
// FIXME(tdakkota): allocates a string for every record.
key := e.Set.String()
stream, ok := streams[key]
buf = e.Set.AppendString(buf[:0])
stream, ok := streams[string(buf)]
if !ok {
stream = lokiapi.Stream{
Stream: lokiapi.NewOptLabelSet(e.Set.AsLokiAPI()),
}
}
stream.Values = append(stream.Values, lokiapi.LogEntry{T: uint64(e.Timestamp), V: e.Line})
streams[key] = stream
streams[string(buf)] = stream
total++
}
if err := iter.Err(); err != nil {
Expand Down
193 changes: 193 additions & 0 deletions internal/logql/logqlengine/engine_log_query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package logqlengine

import (
"cmp"
"encoding/binary"
"fmt"
"slices"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/go-faster/oteldb/internal/iterators"
"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
"github.com/go-faster/oteldb/internal/lokiapi"
"github.com/go-faster/oteldb/internal/otelstorage"
)

func labelSet(keyval ...string) logqlabels.LabelSet {
set := logqlabels.NewLabelSet()
for i := 0; i < len(keyval); i += 2 {
set.Set(logql.Label(keyval[i]), pcommon.NewValueStr(keyval[i+1]))
}
return set
}

func apiLabels(keyval ...string) lokiapi.OptLabelSet {
set := lokiapi.LabelSet{}
for i := 0; i < len(keyval); i += 2 {
set[keyval[i]] = keyval[i+1]
}
return lokiapi.NewOptLabelSet(set)
}

func Test_groupEntries(t *testing.T) {
tests := []struct {
entries []Entry
wantS lokiapi.Streams
wantErr bool
}{
{
nil,
lokiapi.Streams{},
false,
},
{
[]Entry{
{1, "line 1", labelSet("key", "a")},
{2, "line 2", labelSet("key", "a")},
{4, "line 4", labelSet("key", "a")},
{3, "line 3", labelSet("key", "b")},
{5, "line 5", labelSet("key", "a", "key2", "a")},
},
lokiapi.Streams{
{
Stream: apiLabels("key", "a"),
Values: []lokiapi.LogEntry{
{T: 1, V: "line 1"},
{T: 2, V: "line 2"},
{T: 4, V: "line 4"},
},
},
{
Stream: apiLabels("key", "b"),
Values: []lokiapi.LogEntry{
{T: 3, V: "line 3"},
},
},
{
Stream: apiLabels("key", "a", "key2", "a"),
Values: []lokiapi.LogEntry{
{T: 5, V: "line 5"},
},
},
},
false,
},
}
for i, tt := range tests {
tt := tt
t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) {
got, gotTotal, err := groupEntries(iterators.Slice(tt.entries))
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.ElementsMatch(t, tt.wantS, got)
require.Equal(t, len(tt.entries), gotTotal)

for _, stream := range got {
require.True(t,
slices.IsSortedFunc(stream.Values, func(a, b lokiapi.LogEntry) int {
return cmp.Compare(a.T, b.T)
}),
"Entries must be sorted",
)
}
})
}
}

func generateLogs(now time.Time) (entries []Entry) {
type httpLogBatch struct {
Method string
Status int
Count int
IP string
Protocol string
}
for j, b := range []httpLogBatch{
{Method: "GET", Status: 200, Count: 11, IP: "200.1.1.1", Protocol: "HTTP/1.0"},
{Method: "GET", Status: 200, Count: 10, IP: "200.1.1.1", Protocol: "HTTP/1.1"},
{Method: "DELETE", Status: 200, Count: 20, IP: "200.1.1.1", Protocol: "HTTP/2.0"},
{Method: "POST", Status: 200, Count: 21, IP: "200.1.1.1", Protocol: "HTTP/1.0"},
{Method: "PATCH", Status: 200, Count: 19, IP: "200.1.1.1", Protocol: "HTTP/1.0"},
{Method: "HEAD", Status: 200, Count: 15, IP: "200.1.1.1", Protocol: "HTTP/2.0"},
{Method: "HEAD", Status: 200, Count: 4, IP: "200.1.1.1", Protocol: "HTTP/1.0"},
{Method: "HEAD", Status: 200, Count: 1, IP: "236.7.233.166", Protocol: "HTTP/2.0"},
{Method: "HEAD", Status: 500, Count: 2, IP: "200.1.1.1", Protocol: "HTTP/2.0"},
{Method: "PUT", Status: 200, Count: 20, IP: "200.1.1.1", Protocol: "HTTP/2.0"},
} {
for i := 0; i < b.Count; i++ {
var (
spanID otelstorage.SpanID
traceID otelstorage.TraceID
)
{
// Predictable IDs for testing.
binary.PutUvarint(spanID[:], uint64(i+1056123959+j*100))
spanID[7] = byte(j)
spanID[6] = byte(i)
binary.PutUvarint(traceID[:], uint64(i+3959+j*1000))
binary.PutUvarint(traceID[8:], uint64(i+13+j*1000))
traceID[15] = byte(j)
traceID[14] = byte(i)
}
now = now.Add(time.Millisecond * 120)
severity := plog.SeverityNumberInfo
switch b.Status / 100 {
case 2:
severity = plog.SeverityNumberInfo
case 3:
severity = plog.SeverityNumberWarn
case 4:
severity = plog.SeverityNumberError
case 5:
severity = plog.SeverityNumberFatal
}
entries = append(entries, Entry{
Timestamp: pcommon.Timestamp(now.UnixNano()),
Line: "line",
Set: labelSet(
"level", strings.ToLower(severity.String()),
"span_id", spanID.Hex(),
"trace_id", traceID.Hex(),
"method", b.Method,
"status", fmt.Sprint(b.Status),
"bytes", fmt.Sprint(250),
"protocol", b.Protocol,
"ip", b.IP,
"url", "/api/v1/series",
"ref", "https://api.go-faster.org",
),
})
}
}
return entries
}

func Benchmark_groupEntries(b *testing.B) {
var (
entries = generateLogs(time.Now())

iter = iterators.Slice(entries)
sink lokiapi.Streams
)
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
iter.Reset(entries)
sink, _, _ = groupEntries(iter)
}

if len(sink) == 0 {
b.Fatal("at least one stream expected")
}
}
37 changes: 25 additions & 12 deletions internal/logql/logqlengine/logqlabels/label_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package logqlabels
import (
"slices"
"strconv"
"strings"

"github.com/go-faster/errors"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -55,27 +54,41 @@ func (l *LabelSet) AsMap() map[string]string {
return set
}

// String returns text representation of labels.
func (l *LabelSet) String() string {
var sb strings.Builder
sb.WriteByte('{')
// AppendString appends text representation of labels.
func (l *LabelSet) AppendString(buf []byte) []byte {
buf = append(buf, '{')

keys := maps.Keys(l.labels)
const stackThreshold = 24
var keys []logql.Label
if len(l.labels) < stackThreshold {
keys = make([]logql.Label, 0, stackThreshold)
} else {
keys = make([]logql.Label, 0, len(l.labels))
}

for key := range l.labels {
keys = append(keys, key)
}
slices.Sort(keys)

i := 0
for _, k := range keys {
v := l.labels[k]
if i != 0 {
sb.WriteByte(',')
buf = append(buf, ',')
}
sb.WriteString(string(k))
sb.WriteByte('=')
sb.WriteString(strconv.Quote(v.AsString()))
buf = append(buf, k...)
buf = append(buf, '=')
buf = strconv.AppendQuote(buf, v.AsString())
i++
}
sb.WriteByte('}')
return sb.String()
buf = append(buf, '}')
return buf
}

// String returns text representation of labels.
func (l *LabelSet) String() string {
return string(l.AppendString(nil))
}

// SetFromRecord sets labels from given log record.
Expand Down

0 comments on commit 5030322

Please sign in to comment.