Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inflight-logging: Add extra metadata to inflight requests logging #11243

Merged
merged 7 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [11243](https://github.com/grafana/loki/pull/11243) **kavirajk**: Inflight-logging: Add extra metadata to inflight requests logging.
* [11110](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Change the default of the `metrics-namespace` flag to 'loki'.
* [11086](https://github.com/grafana/loki/pull/11086) **kandrew5**: Helm: Allow topologySpreadConstraints
* [11003](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Add the `metrics-namespace` flag to change the namespace of metrics currently using cortex as namespace.
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ BUILD_IMAGE_VERSION ?= 0.31.2
# Docker image info
IMAGE_PREFIX ?= grafana

BUILD_IMAGE_PREFIX ?= grafana

IMAGE_TAG ?= $(shell ./tools/image-tag)

# Version info for binaries
Expand Down Expand Up @@ -102,7 +104,7 @@ RM := --rm
TTY := --tty

DOCKER_BUILDKIT ?= 1
BUILD_IMAGE = BUILD_IMAGE=$(IMAGE_PREFIX)/loki-build-image:$(BUILD_IMAGE_VERSION)
BUILD_IMAGE = BUILD_IMAGE=$(BUILD_IMAGE_PREFIX)/loki-build-image:$(BUILD_IMAGE_VERSION)
PUSH_OCI=docker push
TAG_OCI=docker tag
ifeq ($(CI), true)
Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/regexp"

"github.com/grafana/loki/pkg/util"
logutil "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/validation"
)
Expand Down Expand Up @@ -43,7 +44,7 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool {
for _, b := range blocks {

if b.Hash > 0 {
if b.Hash == HashedQuery(query) {
if b.Hash == util.HashedQuery(query) {
level.Warn(logger).Log("msg", "query blocker matched with hash policy", "hash", b.Hash, "query", query)
return qb.block(b, typ, logger)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/logql/blocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -124,15 +125,15 @@ func TestEngine_ExecWithBlockedQueries(t *testing.T) {
"correct FNV32 hash matches",
defaultQuery, []*validation.BlockedQuery{
{
Hash: HashedQuery(defaultQuery),
Hash: util.HashedQuery(defaultQuery),
},
}, logqlmodel.ErrBlocked,
},
{
"incorrect FNV32 hash does not match",
defaultQuery, []*validation.BlockedQuery{
{
Hash: HashedQuery(defaultQuery) + 1,
Hash: util.HashedQuery(defaultQuery) + 1,
},
}, nil,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
)

if q.logExecQuery {
queryHash := HashedQuery(q.params.Query())
queryHash := util.HashedQuery(q.params.Query())
if GetRangeType(q.params) == InstantType {
level.Info(logutil.WithContext(ctx, q.logger)).Log("msg", "executing query", "type", "instant", "query", q.params.Query(), "query_hash", queryHash)
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2669,7 +2669,7 @@ func TestHashingStability(t *testing.T) {
{`sum (count_over_time({app="myapp",env="myenv"} |= "error" |= "metrics.go" | logfmt [10s])) by(query_hash)`},
} {
params.qs = test.qs
expectedQueryHash := HashedQuery(test.qs)
expectedQueryHash := util.HashedQuery(test.qs)

// check that both places will end up having the same query hash, even though they're emitting different log lines.
require.Regexp(t,
Expand Down
18 changes: 6 additions & 12 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package logql

import (
"context"
"hash/fnv"
"strconv"
"strings"
"time"
Expand All @@ -19,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel"
logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
Expand Down Expand Up @@ -120,7 +120,7 @@ func RecordRangeAndInstantQueryMetrics(
logValues = append(logValues, []interface{}{
"latency", latencyType, // this can be used to filter log lines.
"query", p.Query(),
"query_hash", HashedQuery(p.Query()),
"query_hash", util.HashedQuery(p.Query()),
"query_type", queryType,
"range_type", rt,
"length", p.End().Sub(p.Start()),
Expand Down Expand Up @@ -187,12 +187,6 @@ func RecordRangeAndInstantQueryMetrics(
recordUsageStats(queryType, stats)
}

func HashedQuery(query string) uint32 {
h := fnv.New32()
_, _ = h.Write([]byte(query))
return h.Sum32()
}

func RecordLabelQueryMetrics(
ctx context.Context,
log log.Logger,
Expand Down Expand Up @@ -225,7 +219,7 @@ func RecordLabelQueryMetrics(
"status", status,
"label", label,
"query", query,
"query_hash", HashedQuery(query),
"query_hash", util.HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned,
)

Expand Down Expand Up @@ -276,7 +270,7 @@ func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end ti
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"match", PrintMatches(match),
"query_hash", HashedQuery(PrintMatches(match)),
"query_hash", util.HashedQuery(PrintMatches(match)),
"total_entries", stats.Summary.TotalEntriesReturned)

if shard != nil {
Expand Down Expand Up @@ -316,7 +310,7 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"query", query,
"query_hash", HashedQuery(query),
"query_hash", util.HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned)

level.Info(logger).Log(logValues...)
Expand Down Expand Up @@ -346,7 +340,7 @@ func RecordVolumeQueryMetrics(ctx context.Context, log log.Logger, start, end ti
"latency", latencyType,
"query_type", queryType,
"query", query,
"query_hash", HashedQuery(query),
"query_hash", util.HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
Expand Down
7 changes: 4 additions & 3 deletions pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
)
Expand Down Expand Up @@ -191,11 +192,11 @@ func Test_testToKeyValues(t *testing.T) {
}

func TestQueryHashing(t *testing.T) {
h1 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`)
h2 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= logfmt |= "metrics.go"`)
h1 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`)
h2 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= logfmt |= "metrics.go"`)
// check that it capture differences of order.
require.NotEqual(t, h1, h2)
h3 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`)
h3 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`)
// check that it evaluate same queries as same hashes, even if evaluated at different timestamps.
require.Equal(t, h1, h3)
}
20 changes: 19 additions & 1 deletion pkg/querier/queryrange/queryrangebase/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
)

Expand Down Expand Up @@ -73,6 +74,11 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
MaxRetries: 0,
}
bk := backoff.New(ctx, cfg)

start := req.GetStart()
end := req.GetEnd()
query := req.GetQuery()

for ; tries < r.maxRetries; tries++ {
if ctx.Err() != nil {
return nil, ctx.Err()
Expand All @@ -86,7 +92,19 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok || httpResp.Code/100 == 5 {
lastErr = err
level.Error(util_log.WithContext(ctx, r.log)).Log("msg", "error processing request", "try", tries, "query", req.GetQuery(), "retry_in", bk.NextDelay(), "err", err)
level.Error(util_log.WithContext(ctx, r.log)).Log(
"msg", "error processing request",
"try", tries,
"query", query,
"query_hash", util.HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"length", end.Sub(start),
"retry_in", bk.NextDelay(),
"err", err,
)
bk.Wait()
continue
}
Expand Down
18 changes: 15 additions & 3 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
base "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
logutil "github.com/grafana/loki/pkg/util/log"
)
Expand Down Expand Up @@ -247,8 +248,19 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response,
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

queryHash := logql.HashedQuery(op.Query)
level.Info(logger).Log("msg", "executing query", "type", "range", "query", op.Query, "length", op.EndTs.Sub(op.StartTs), "step", op.Step, "query_hash", queryHash)
queryHash := util.HashedQuery(op.Query)
level.Info(logger).Log(
"msg", "executing query",
"type", "range",
"query", op.Query,
"start", op.StartTs.Format(time.RFC3339Nano),
"end", op.EndTs.Format(time.RFC3339Nano),
"start_delta", time.Since(op.StartTs),
"end_delta", time.Since(op.EndTs),
"length", op.EndTs.Sub(op.StartTs),
"step", op.Step,
"query_hash", queryHash,
)

switch e := expr.(type) {
case syntax.SampleExpr:
Expand Down Expand Up @@ -296,7 +308,7 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response,
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

queryHash := logql.HashedQuery(op.Query)
queryHash := util.HashedQuery(op.Query)
level.Info(logger).Log("msg", "executing query", "type", "instant", "query", op.Query, "query_hash", queryHash)

switch expr.(type) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/template"

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
ruler "github.com/grafana/loki/pkg/ruler/base"
"github.com/grafana/loki/pkg/ruler/rulespb"
"github.com/grafana/loki/pkg/ruler/util"
rulerutil "github.com/grafana/loki/pkg/ruler/util"
"github.com/grafana/loki/pkg/util"
)

// RulesLimits is the one function we need from limits.Overrides, and
Expand All @@ -40,7 +40,7 @@ type RulesLimits interface {
RulerRemoteWriteURL(userID string) string
RulerRemoteWriteTimeout(userID string) time.Duration
RulerRemoteWriteHeaders(userID string) map[string]string
RulerRemoteWriteRelabelConfigs(userID string) []*util.RelabelConfig
RulerRemoteWriteRelabelConfigs(userID string) []*rulerutil.RelabelConfig
RulerRemoteWriteConfig(userID string, id string) *config.RemoteWriteConfig
RulerRemoteWriteQueueCapacity(userID string) int
RulerRemoteWriteQueueMinShards(userID string) int
Expand All @@ -60,7 +60,7 @@ type RulesLimits interface {
// and passing an altered timestamp.
func queryFunc(evaluator Evaluator, checker readyChecker, userID string, logger log.Logger) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
hash := logql.HashedQuery(qs)
hash := util.HashedQuery(qs)
detail := rules.FromOriginContext(ctx)
detailLog := log.With(logger, "rule_name", detail.Name, "rule_type", detail.Kind, "query", qs, "query_hash", hash)

Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/evaluator_jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/util"
)

// EvaluatorWithJitter wraps a given Evaluator. It applies a consistent jitter based on a rule's query string by hashing
Expand Down Expand Up @@ -44,7 +44,7 @@ func NewEvaluatorWithJitter(inner Evaluator, maxJitter time.Duration, hasher has
}

func (e *EvaluatorWithJitter) Eval(ctx context.Context, qs string, now time.Time) (*logqlmodel.Result, error) {
logger := log.With(e.logger, "query", qs, "query_hash", logql.HashedQuery(qs))
logger := log.With(e.logger, "query", qs, "query_hash", util.HashedQuery(qs))
jitter := e.calculateJitter(qs, logger)

if jitter > 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/evaluator_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
"google.golang.org/grpc/keepalive"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/build"
"github.com/grafana/loki/pkg/util/constants"
"github.com/grafana/loki/pkg/util/httpreq"
Expand Down Expand Up @@ -220,7 +220,7 @@ func (r *RemoteEvaluator) query(ctx context.Context, orgID, query string, ts tim
args.Set("time", ts.Format(time.RFC3339Nano))
}
body := []byte(args.Encode())
hash := logql.HashedQuery(query)
hash := util.HashedQuery(query)

req := httpgrpc.HTTPRequest{
Method: http.MethodPost,
Expand Down
13 changes: 12 additions & 1 deletion pkg/util/hash_fp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package util

import "github.com/prometheus/common/model"
import (
"hash/fnv"

"github.com/prometheus/common/model"
)

// HashFP simply moves entropy from the most significant 48 bits of the
// fingerprint into the least significant 16 bits (by XORing) so that a simple
Expand All @@ -12,3 +16,10 @@ import "github.com/prometheus/common/model"
func HashFP(fp model.Fingerprint) uint32 {
return uint32(fp ^ (fp >> 32) ^ (fp >> 16))
}

// HashedQuery returns a unique hash value for the given `query`.
func HashedQuery(query string) uint32 {
h := fnv.New32()
_, _ = h.Write([]byte(query))
return h.Sum32()
}