Skip to content

Commit

Permalink
Merge pull request #437 from go-faster/feat/logqlbench
Browse files Browse the repository at this point in the history
feat(logqlbench): add LogQL benchmark tool
  • Loading branch information
tdakkota committed Jun 20, 2024
2 parents 62757f3 + bbf64e2 commit 34a7907
Show file tree
Hide file tree
Showing 21 changed files with 1,224 additions and 333 deletions.
183 changes: 183 additions & 0 deletions cmd/otelbench/chtracker/chtracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Package chtracker provides Clickhouse query tracker.
package chtracker

import (
"context"
"net/http"
"sync"
"time"

"github.com/go-faster/errors"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"

"github.com/go-faster/oteldb/internal/tempoapi"
)

// Tracker is a query tracker.
type Tracker[Q any] struct {
senderName string
trace bool

batchSpanProcessor sdktrace.SpanProcessor
tracerProvider trace.TracerProvider
tracer trace.Tracer

tempo *tempoapi.Client

queriesMux sync.Mutex
queries []TrackedQuery[Q]
}

// TrackedQuery stores info about tracked query.
type TrackedQuery[Q any] struct {
TraceID string
Duration time.Duration
Meta Q
}

// Track creates a tracked span and calls given callback.
func (t *Tracker[Q]) Track(ctx context.Context, meta Q, cb func(context.Context, Q) error) (rerr error) {
start := time.Now()

var traceID string
if t.trace {
traceCtx, span := t.tracer.Start(ctx, "chtracker.Track",
trace.WithSpanKind(trace.SpanKindClient),
)
traceID = span.SpanContext().TraceID().String()
ctx = traceCtx

defer func() {
if rerr != nil {
span.RecordError(rerr)
span.SetStatus(codes.Error, rerr.Error())
} else {
span.SetStatus(codes.Ok, "")
}
span.End()
}()
}

if err := cb(ctx, meta); err != nil {
return errors.Wrap(err, "send tracked")
}
duration := time.Since(start)

t.queriesMux.Lock()
t.queries = append(t.queries, TrackedQuery[Q]{
TraceID: traceID,
Duration: duration,
Meta: meta,
})
t.queriesMux.Unlock()
return nil
}

// Report iterates over tracked queries.
func (t *Tracker[Q]) Report(ctx context.Context, cb func(context.Context, TrackedQuery[Q], []QueryReport) error) error {
if err := t.Flush(ctx); err != nil {
return err
}

t.queriesMux.Lock()
defer t.queriesMux.Unlock()

for _, tq := range t.queries {
reports, err := t.retrieveReports(ctx, tq)
if err != nil {
return errors.Wrapf(err, "retrieve reports for %q", tq.TraceID)
}

if err := cb(ctx, tq, reports); err != nil {
return errors.Wrap(err, "report callback")
}
}

return nil
}

// HTTPClient returns instrumented HTTP client to use.
func (t *Tracker[Q]) HTTPClient() *http.Client {
propagator := propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
return &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport,
otelhttp.WithTracerProvider(t.TracerProvider()),
otelhttp.WithPropagators(propagator),
),
}
}

// TracerProvider returns tracer provider to use.
func (t *Tracker[Q]) TracerProvider() trace.TracerProvider {
return t.tracerProvider
}

// Flush writes buffered traces to storage.
func (t *Tracker[Q]) Flush(ctx context.Context) error {
if !t.trace {
return nil
}

ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

if err := t.batchSpanProcessor.ForceFlush(ctx); err != nil {
return errors.Wrap(err, "flush")
}
return nil
}

// SetupOptions defines options for [Setup].
type SetupOptions struct {
// Trace enables tracing.
Trace bool
// TempoAddr sets URL to Tempo API to retrieve traces.
TempoAddr string
}

func (opts *SetupOptions) setDefaults() {
if opts.TempoAddr == "" {
opts.TempoAddr = "http://127.0.0.1:3200"
}
}

// Setup creates new [Tracker].
func Setup[Q any](ctx context.Context, senderName string, opts SetupOptions) (*Tracker[Q], error) {
opts.setDefaults()

exporter, err := otlptracegrpc.New(ctx)
if err != nil {
return nil, errors.Wrap(err, "create exporter")
}
t := &Tracker[Q]{
senderName: senderName,
trace: opts.Trace,
}

t.batchSpanProcessor = sdktrace.NewBatchSpanProcessor(exporter)
t.tracerProvider = sdktrace.NewTracerProvider(
sdktrace.WithResource(resource.NewSchemaless(
attribute.String("service.name", "otelbench."+senderName),
)),
sdktrace.WithSpanProcessor(t.batchSpanProcessor),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
t.tracer = t.tracerProvider.Tracer(senderName)

tempoClient, err := tempoapi.NewClient(opts.TempoAddr)
if err != nil {
return nil, errors.Wrap(err, "create tempo client")
}
t.tempo = tempoClient
return t, nil
}
174 changes: 174 additions & 0 deletions cmd/otelbench/chtracker/clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package chtracker

import (
"context"
"io"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/go-faster/errors"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/go-faster/oteldb/internal/tempoapi"
)

// QueryReport is a Clickhouse query stats retrieved from trace.
type QueryReport struct {
DurationNanos int64 `json:"duration_nanos,omitempty" yaml:"duration_nanos,omitempty"`
Query string `json:"query,omitempty" yaml:"query,omitempty"`
ReadBytes int64 `json:"read_bytes,omitempty" yaml:"read_bytes,omitempty"`
ReadRows int64 `json:"read_rows,omitempty" yaml:"read_rows,omitempty"`
MemoryUsage int64 `json:"memory_usage,omitempty" yaml:"memory_usage,omitempty"`

ReceivedRows int64 `json:"recevied_rows,omitempty" yaml:"recevied_rows,omitempty"`
}

func (t *Tracker[Q]) retrieveReports(ctx context.Context, tq TrackedQuery[Q]) (reports []QueryReport, _ error) {
if !t.trace {
return reports, nil
}

ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

bo := backoff.NewConstantBackOff(time.Millisecond * 100)
res, err := backoff.RetryWithData(func() (v ptrace.Traces, err error) {
res, err := t.tempo.TraceByID(ctx, tempoapi.TraceByIDParams{
TraceID: tq.TraceID,
Accept: "application/protobuf",
})
if err != nil {
return v, backoff.Permanent(err)
}
switch r := res.(type) {
case *tempoapi.TraceByIDNotFound:
return v, errors.Errorf("trace %q not found", tq.TraceID)
case *tempoapi.TraceByID:
var um ptrace.ProtoUnmarshaler
buf, err := io.ReadAll(r.Data)
if err != nil {
return v, backoff.Permanent(errors.Wrap(err, "read data"))
}
traces, err := um.UnmarshalTraces(buf)
if err != nil {
return v, backoff.Permanent(errors.Wrap(err, "unmarshal traces"))
}

services := make(map[string]int)
list := traces.ResourceSpans()
for i := 0; i < list.Len(); i++ {
rs := list.At(i)
attrValue, ok := rs.Resource().Attributes().Get("service.name")
if !ok {
return v, backoff.Permanent(errors.New("service name not found"))
}
services[attrValue.AsString()]++
}
for _, svc := range []string{
"otelbench." + t.senderName,
"go-faster.oteldb",
"clickhouse",
} {
if _, ok := services[svc]; !ok {
return v, errors.Errorf("service %q not found", svc)
}
}

return traces, nil
default:
return v, backoff.Permanent(errors.Errorf("unknown response type %T", res))
}
}, backoff.WithContext(bo, ctx))
if err != nil {
return nil, errors.Wrap(err, "get trace")
}
if res.SpanCount() < 1 {
return nil, errors.New("response is empty, no spans returned")
}

// For each clickhouse query ID, save query.
type queryReport struct {
// "query" span coming from Clickhouse
clickhouseSpan ptrace.Span
// "Do" span coming from ch-go
chgoSpan ptrace.Span
}
var (
rsl = res.ResourceSpans()
queries = map[string]queryReport{}
)
for i := 0; i < rsl.Len(); i++ {
rs := rsl.At(i)
spansSlices := rs.ScopeSpans()
for j := 0; j < spansSlices.Len(); j++ {
spans := spansSlices.At(j).Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)

attrs := span.Attributes()
if _, ok := attrs.Get("db.statement"); !ok {
continue
}

switch span.Name() {
case "query":
queryIDVal, ok := attrs.Get("clickhouse.query_id")
if !ok {
continue
}
queryID := queryIDVal.AsString()

report := queries[queryID]
report.clickhouseSpan = span
queries[queryID] = report
case "Do":
queryIDVal, ok := attrs.Get("ch.query.id")
if !ok {
continue
}
queryID := queryIDVal.AsString()

report := queries[queryID]
report.chgoSpan = span
queries[queryID] = report
default:
continue
}
}
}
}

for _, r := range queries {
var report QueryReport

if span := r.clickhouseSpan; span != (ptrace.Span{}) {
attrs := span.Attributes()

if statement, ok := attrs.Get("db.statement"); ok {
report.Query = statement.AsString()
}
if readBytes, ok := attrs.Get("clickhouse.read_bytes"); ok {
report.ReadBytes = readBytes.Int()
}
if readRows, ok := attrs.Get("clickhouse.read_rows"); ok {
report.ReadRows = readRows.Int()
}
if memoryUsage, ok := attrs.Get("clickhouse.memory_usage"); ok {
report.MemoryUsage = memoryUsage.Int()
}
report.DurationNanos = span.EndTimestamp().AsTime().Sub(span.StartTimestamp().AsTime()).Nanoseconds()
} else {
continue
}

if span := r.chgoSpan; span != (ptrace.Span{}) {
attrs := span.Attributes()
if receivedRows, ok := attrs.Get("ch.rows_received"); ok {
report.ReceivedRows = receivedRows.Int()
}
}

reports = append(reports, report)
}
return reports, nil
}
14 changes: 14 additions & 0 deletions cmd/otelbench/logql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package main

import "github.com/spf13/cobra"

func newLogQLCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "logql",
Short: "Suite for LogQL benchmarks",
}
cmd.AddCommand(
newLogQLBenchmarkCommand(),
)
return cmd
}
Loading

0 comments on commit 34a7907

Please sign in to comment.