Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

This logs queries with latency tag when recording stats. #1733

Merged
merged 2 commits into from
Feb 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd
github.com/stretchr/testify v1.4.0
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/ugorji/go v1.1.7 // indirect
github.com/weaveworks/common v0.0.0-20200201141823-27e183090ab1
go.etcd.io/etcd v0.0.0-20190815204525-8f85f0dc2607 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
status = "400"
}
}
RecordMetrics(status, q.String(), rangeType, statResult)
RecordMetrics(ctx, q, status, statResult)

return Result{
Data: data,
Expand Down Expand Up @@ -181,7 +181,7 @@ func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
ctx, cancel := context.WithTimeout(ctx, ng.timeout)
defer cancel()

qs := q.String()
qs := q.Query()

expr, err := ParseExpr(qs)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (

// Params details the parameters associated with a loki request
type Params interface {
String() string
Query() string
Start() time.Time
End() time.Time
Step() time.Duration
Expand All @@ -42,7 +42,7 @@ type LiteralParams struct {
}

// String impls Params
func (p LiteralParams) String() string { return p.qs }
func (p LiteralParams) Query() string { return p.qs }

// Start impls Params
func (p LiteralParams) Start() time.Time { return p.start }
Expand Down
26 changes: 23 additions & 3 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package logql

import (
"context"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -59,12 +62,12 @@ var (
})
)

func RecordMetrics(status, query string, rangeType QueryRangeType, stats stats.Result) {
queryType, err := QueryType(query)
func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Result) {
queryType, err := QueryType(p.Query())
if err != nil {
level.Warn(util.Logger).Log("msg", "error parsing query type", "err", err)
}
rt := string(rangeType)
rt := string(GetRangeType(p))

// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
Expand All @@ -73,6 +76,23 @@ func RecordMetrics(status, query string, rangeType QueryRangeType, stats stats.R
latencyType = latencyTypeSlow
}

// we also log queries, useful for troubleshooting slow queries.
level.Info(
// ensure we have traceID & orgId
util.WithContext(ctx, util.Logger),
).Log(
"latency", latencyType, // this can be used to filter log lines.
"query", p.Query(),
"query_type", queryType,
"range_type", rt,
"length", p.End().Sub(p.Start()),
"step", p.Step(),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"throughput_mb", float64(stats.Summary.BytesProcessedPerSeconds)/10e6,
"total_bytes_mb", float64(stats.Summary.TotalBytesProcessed)/10e6,
)

bytesPerSeconds.WithLabelValues(status, queryType, rt, latencyType).
Observe(float64(stats.Summary.BytesProcessedPerSeconds))
execLatency.WithLabelValues(status, queryType, rt).
Expand Down
46 changes: 46 additions & 0 deletions pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
package logql

import (
"bytes"
"context"
"fmt"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
)

func TestQueryType(t *testing.T) {
Expand Down Expand Up @@ -32,3 +46,35 @@ func TestQueryType(t *testing.T) {
})
}
}

func TestLogSlowQuery(t *testing.T) {
buf := bytes.NewBufferString("")
util.Logger = log.NewLogfmtLogger(buf)
tr, c := jaeger.NewTracer("foo", jaeger.NewConstSampler(true), jaeger.NewInMemoryReporter())
defer c.Close()
opentracing.SetGlobalTracer(tr)
sp := opentracing.StartSpan("")
ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp)
now := time.Now()
RecordMetrics(ctx, LiteralParams{
qs: `{foo="bar"} |= "buzz"`,
direction: logproto.BACKWARD,
end: now,
start: now.Add(-1 * time.Hour),
limit: 1000,
step: time.Minute,
}, "200", stats.Result{
Summary: stats.Summary{
BytesProcessedPerSeconds: 100000,
ExecTime: 25.25,
TotalBytesProcessed: 100000,
},
})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo trace_id=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 throughput_mb=0.01 total_bytes_mb=0.01\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
util.Logger = log.NewNopLogger()
}
2 changes: 1 addition & 1 deletion pkg/logql/stats/stats.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/logql/stats/stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ message Summary {
int64 totalBytesProcessed = 3 [(gogoproto.jsontag) = "totalBytesProcessed"];
// Total lines processed.
int64 totalLinesProcessed = 4 [(gogoproto.jsontag) = "totalLinesProcessed"];
// Execution time in nanoseconds.
// Execution time in seconds.
double execTime = 5 [(gogoproto.jsontag) = "execTime"];
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ func paramsFromRequest(req queryrange.Request) *paramsWrapper {
}
}

func (p paramsWrapper) String() string {
return p.Query
func (p paramsWrapper) Query() string {
return p.LokiRequest.Query
}
func (p paramsWrapper) Start() time.Time {
return p.StartTs
Expand Down
25 changes: 12 additions & 13 deletions pkg/querier/queryrange/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,26 @@ type ctxKeyType string
const ctxKey ctxKeyType = "stats"

var (
defaultMetricRecorder = metricRecorderFn(func(status, query string, rangeType logql.QueryRangeType, stats stats.Result) {
logql.RecordMetrics(status, query, rangeType, stats)
defaultMetricRecorder = metricRecorderFn(func(ctx context.Context, p logql.Params, status string, stats stats.Result) {
logql.RecordMetrics(ctx, p, status, stats)
})
// StatsHTTPMiddleware is an http middleware to record stats for query_range filter.
StatsHTTPMiddleware middleware.Interface = statsHTTPMiddleware(defaultMetricRecorder)
)

type metricRecorder interface {
Record(status, query string, rangeType logql.QueryRangeType, stats stats.Result)
Record(ctx context.Context, p logql.Params, status string, stats stats.Result)
}

type metricRecorderFn func(status, query string, rangeType logql.QueryRangeType, stats stats.Result)
type metricRecorderFn func(ctx context.Context, p logql.Params, status string, stats stats.Result)

func (m metricRecorderFn) Record(status, query string, rangeType logql.QueryRangeType, stats stats.Result) {
m(status, query, rangeType, stats)
func (m metricRecorderFn) Record(ctx context.Context, p logql.Params, status string, stats stats.Result) {
m(ctx, p, status, stats)
}

type queryData struct {
query string
params logql.Params
statistics *stats.Result
rangeType logql.QueryRangeType
recorded bool
}

Expand All @@ -53,9 +52,10 @@ func statsHTTPMiddleware(recorder metricRecorder) middleware.Interface {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := &queryData{}
interceptor := &interceptor{ResponseWriter: w, statusCode: http.StatusOK}
r = r.WithContext(context.WithValue(r.Context(), ctxKey, data))
next.ServeHTTP(
interceptor,
r.WithContext(context.WithValue(r.Context(), ctxKey, data)),
r,
)
// http middlewares runs for every http request.
// but we want only to record query_range filters.
Expand All @@ -64,9 +64,9 @@ func statsHTTPMiddleware(recorder metricRecorder) middleware.Interface {
data.statistics = &stats.Result{}
}
recorder.Record(
r.Context(),
data.params,
strconv.Itoa(interceptor.statusCode),
data.query,
data.rangeType,
*data.statistics,
)
}
Expand Down Expand Up @@ -104,9 +104,8 @@ func StatsCollectorMiddleware() queryrange.Middleware {
ctxValue := ctx.Value(ctxKey)
if data, ok := ctxValue.(*queryData); ok {
data.recorded = true
data.query = req.GetQuery()
data.statistics = statistics
data.rangeType = logql.GetRangeType(paramsFromRequest(req))
data.params = paramsFromRequest(req)
}
return resp, err
})
Expand Down
56 changes: 34 additions & 22 deletions pkg/querier/queryrange/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,27 @@ import (
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
)

func TestStatsCollectorMiddleware(t *testing.T) {
// no stats
data := &queryData{}
var (
data = &queryData{}
now = time.Now()
)
ctx := context.WithValue(context.Background(), ctxKey, data)
_, _ = StatsCollectorMiddleware().Wrap(queryrange.HandlerFunc(func(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
return nil, nil
})).Do(ctx, &LokiRequest{
Query: "foo",
StartTs: time.Now(),
StartTs: now,
})
require.Equal(t, "foo", data.query)
require.Equal(t, "foo", data.params.Query())
require.Equal(t, true, data.recorded)
require.Equal(t, logql.RangeType, data.rangeType)
require.Equal(t, now, data.params.Start())
require.Nil(t, data.statistics)

// no context.
Expand All @@ -37,7 +41,7 @@ func TestStatsCollectorMiddleware(t *testing.T) {
return nil, nil
})).Do(context.Background(), &LokiRequest{
Query: "foo",
StartTs: time.Now(),
StartTs: now,
})
require.Equal(t, false, data.recorded)

Expand All @@ -54,27 +58,27 @@ func TestStatsCollectorMiddleware(t *testing.T) {
}, nil
})).Do(ctx, &LokiRequest{
Query: "foo",
StartTs: time.Now(),
StartTs: now,
})
require.Equal(t, "foo", data.query)
require.Equal(t, "foo", data.params.Query())
require.Equal(t, true, data.recorded)
require.Equal(t, logql.RangeType, data.rangeType)
require.Equal(t, now, data.params.Start())
require.Equal(t, int32(10), data.statistics.Ingester.TotalReached)
}

func Test_StatsHTTP(t *testing.T) {
for _, test := range []struct {
name string
next http.Handler
expect func(t *testing.T, status, query string, rangeType logql.QueryRangeType, s stats.Result)
expect func(t *testing.T, ctx context.Context, p logql.Params, status string, stats stats.Result)
}{
{
"should not record metric if nothing is recorded",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := r.Context().Value(ctxKey).(*queryData)
data.recorded = false
}),
func(t *testing.T, status, query string, rangeType logql.QueryRangeType, s stats.Result) {
func(t *testing.T, ctx context.Context, p logql.Params, status string, stats stats.Result) {
t.Fail()
},
},
Expand All @@ -83,14 +87,18 @@ func Test_StatsHTTP(t *testing.T) {
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := r.Context().Value(ctxKey).(*queryData)
data.recorded = true
data.rangeType = logql.RangeType
data.query = "foo"
data.params = paramsFromRequest(&LokiRequest{
Query: "foo",
Direction: logproto.BACKWARD,
Limit: 100,
})
data.statistics = nil
}),
func(t *testing.T, status, query string, rangeType logql.QueryRangeType, s stats.Result) {
func(t *testing.T, ctx context.Context, p logql.Params, status string, s stats.Result) {
require.Equal(t, fmt.Sprintf("%d", http.StatusOK), status)
require.Equal(t, logql.RangeType, rangeType)
require.Equal(t, "foo", query)
require.Equal(t, "foo", p.Query())
require.Equal(t, logproto.BACKWARD, p.Direction())
require.Equal(t, uint32(100), p.Limit())
require.Equal(t, stats.Result{}, s)
},
},
Expand All @@ -99,22 +107,26 @@ func Test_StatsHTTP(t *testing.T) {
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := r.Context().Value(ctxKey).(*queryData)
data.recorded = true
data.rangeType = logql.RangeType
data.query = "foo"
data.params = paramsFromRequest(&LokiRequest{
Query: "foo",
Direction: logproto.BACKWARD,
Limit: 100,
})
data.statistics = &statsResult
w.WriteHeader(http.StatusTeapot)
}),
func(t *testing.T, status, query string, rangeType logql.QueryRangeType, s stats.Result) {
func(t *testing.T, ctx context.Context, p logql.Params, status string, s stats.Result) {
require.Equal(t, fmt.Sprintf("%d", http.StatusTeapot), status)
require.Equal(t, logql.RangeType, rangeType)
require.Equal(t, "foo", query)
require.Equal(t, "foo", p.Query())
require.Equal(t, logproto.BACKWARD, p.Direction())
require.Equal(t, uint32(100), p.Limit())
require.Equal(t, statsResult, s)
},
},
} {
t.Run(test.name, func(t *testing.T) {
statsHTTPMiddleware(metricRecorderFn(func(status, query string, rangeType logql.QueryRangeType, stats stats.Result) {
test.expect(t, status, query, rangeType, stats)
statsHTTPMiddleware(metricRecorderFn(func(ctx context.Context, p logql.Params, status string, stats stats.Result) {
test.expect(t, ctx, p, status, stats)
})).Wrap(test.next).ServeHTTP(httptest.NewRecorder(), httptest.NewRequest("GET", "/foo", strings.NewReader("")))
})
}
Expand Down