Skip to content

Commit

Permalink
inflight-logging: Add extra metadata to inflight requests logging (#1…
Browse files Browse the repository at this point in the history
…1243)

**What this PR does / why we need it**:
logging: Add extra metadata to inflight requests

This adds extra metadata (similar to what we have in `metrics.go`) but
for queries in in-flight (both started and retrying)

Changes:
    Adds following data
    1. Query Hash
    2. Start and End time
    3. Start and End delta
    4. Length of the query
5. Moved the helper util to `queryutil` package because of cyclic
dependencies with `logql` package.
   
**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:
Find the screenshots of log entries looks like (both in `retry.go` and
`roundtrip.go`)

![Screenshot 2023-11-16 at 13 01
32](https://github.com/grafana/loki/assets/3735252/177e97ed-6ee8-41dd-b088-2e4f49562ba0)
![Screenshot 2023-11-16 at 13 02
15](https://github.com/grafana/loki/assets/3735252/fb328a37-dbe3-483e-b083-f21327858029)

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a)

---------

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
  • Loading branch information
kavirajk committed Nov 20, 2023
1 parent c716e49 commit 30d0030
Show file tree
Hide file tree
Showing 14 changed files with 75 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [11243](https://github.com/grafana/loki/pull/11243) **kavirajk**: Inflight-logging: Add extra metadata to inflight requests logging.
* [11110](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Change the default of the `metrics-namespace` flag to 'loki'.
* [11086](https://github.com/grafana/loki/pull/11086) **kandrew5**: Helm: Allow topologySpreadConstraints
* [11003](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Add the `metrics-namespace` flag to change the namespace of metrics currently using cortex as namespace.
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ BUILD_IMAGE_VERSION ?= 0.31.2
# Docker image info
IMAGE_PREFIX ?= grafana

BUILD_IMAGE_PREFIX ?= grafana

IMAGE_TAG ?= $(shell ./tools/image-tag)

# Version info for binaries
Expand Down Expand Up @@ -102,7 +104,7 @@ RM := --rm
TTY := --tty

DOCKER_BUILDKIT ?= 1
BUILD_IMAGE = BUILD_IMAGE=$(IMAGE_PREFIX)/loki-build-image:$(BUILD_IMAGE_VERSION)
BUILD_IMAGE = BUILD_IMAGE=$(BUILD_IMAGE_PREFIX)/loki-build-image:$(BUILD_IMAGE_VERSION)
PUSH_OCI=docker push
TAG_OCI=docker tag
ifeq ($(CI), true)
Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/regexp"

"github.com/grafana/loki/pkg/util"
logutil "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/validation"
)
Expand Down Expand Up @@ -43,7 +44,7 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool {
for _, b := range blocks {

if b.Hash > 0 {
if b.Hash == HashedQuery(query) {
if b.Hash == util.HashedQuery(query) {
level.Warn(logger).Log("msg", "query blocker matched with hash policy", "hash", b.Hash, "query", query)
return qb.block(b, typ, logger)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/logql/blocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -124,15 +125,15 @@ func TestEngine_ExecWithBlockedQueries(t *testing.T) {
"correct FNV32 hash matches",
defaultQuery, []*validation.BlockedQuery{
{
Hash: HashedQuery(defaultQuery),
Hash: util.HashedQuery(defaultQuery),
},
}, logqlmodel.ErrBlocked,
},
{
"incorrect FNV32 hash does not match",
defaultQuery, []*validation.BlockedQuery{
{
Hash: HashedQuery(defaultQuery) + 1,
Hash: util.HashedQuery(defaultQuery) + 1,
},
}, nil,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
)

if q.logExecQuery {
queryHash := HashedQuery(q.params.Query())
queryHash := util.HashedQuery(q.params.Query())
if GetRangeType(q.params) == InstantType {
level.Info(logutil.WithContext(ctx, q.logger)).Log("msg", "executing query", "type", "instant", "query", q.params.Query(), "query_hash", queryHash)
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2669,7 +2669,7 @@ func TestHashingStability(t *testing.T) {
{`sum (count_over_time({app="myapp",env="myenv"} |= "error" |= "metrics.go" | logfmt [10s])) by(query_hash)`},
} {
params.qs = test.qs
expectedQueryHash := HashedQuery(test.qs)
expectedQueryHash := util.HashedQuery(test.qs)

// check that both places will end up having the same query hash, even though they're emitting different log lines.
require.Regexp(t,
Expand Down
18 changes: 6 additions & 12 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package logql

import (
"context"
"hash/fnv"
"strconv"
"strings"
"time"
Expand All @@ -19,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel"
logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
Expand Down Expand Up @@ -120,7 +120,7 @@ func RecordRangeAndInstantQueryMetrics(
logValues = append(logValues, []interface{}{
"latency", latencyType, // this can be used to filter log lines.
"query", p.Query(),
"query_hash", HashedQuery(p.Query()),
"query_hash", util.HashedQuery(p.Query()),
"query_type", queryType,
"range_type", rt,
"length", p.End().Sub(p.Start()),
Expand Down Expand Up @@ -187,12 +187,6 @@ func RecordRangeAndInstantQueryMetrics(
recordUsageStats(queryType, stats)
}

func HashedQuery(query string) uint32 {
h := fnv.New32()
_, _ = h.Write([]byte(query))
return h.Sum32()
}

func RecordLabelQueryMetrics(
ctx context.Context,
log log.Logger,
Expand Down Expand Up @@ -225,7 +219,7 @@ func RecordLabelQueryMetrics(
"status", status,
"label", label,
"query", query,
"query_hash", HashedQuery(query),
"query_hash", util.HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned,
)

Expand Down Expand Up @@ -276,7 +270,7 @@ func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end ti
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"match", PrintMatches(match),
"query_hash", HashedQuery(PrintMatches(match)),
"query_hash", util.HashedQuery(PrintMatches(match)),
"total_entries", stats.Summary.TotalEntriesReturned)

if shard != nil {
Expand Down Expand Up @@ -316,7 +310,7 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"query", query,
"query_hash", HashedQuery(query),
"query_hash", util.HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned)

level.Info(logger).Log(logValues...)
Expand Down Expand Up @@ -346,7 +340,7 @@ func RecordVolumeQueryMetrics(ctx context.Context, log log.Logger, start, end ti
"latency", latencyType,
"query_type", queryType,
"query", query,
"query_hash", HashedQuery(query),
"query_hash", util.HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
Expand Down
7 changes: 4 additions & 3 deletions pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
)
Expand Down Expand Up @@ -191,11 +192,11 @@ func Test_testToKeyValues(t *testing.T) {
}

func TestQueryHashing(t *testing.T) {
h1 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`)
h2 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= logfmt |= "metrics.go"`)
h1 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`)
h2 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= logfmt |= "metrics.go"`)
// check that it capture differences of order.
require.NotEqual(t, h1, h2)
h3 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`)
h3 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`)
// check that it evaluate same queries as same hashes, even if evaluated at different timestamps.
require.Equal(t, h1, h3)
}
20 changes: 19 additions & 1 deletion pkg/querier/queryrange/queryrangebase/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
)

Expand Down Expand Up @@ -73,6 +74,11 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
MaxRetries: 0,
}
bk := backoff.New(ctx, cfg)

start := req.GetStart()
end := req.GetEnd()
query := req.GetQuery()

for ; tries < r.maxRetries; tries++ {
if ctx.Err() != nil {
return nil, ctx.Err()
Expand All @@ -86,7 +92,19 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok || httpResp.Code/100 == 5 {
lastErr = err
level.Error(util_log.WithContext(ctx, r.log)).Log("msg", "error processing request", "try", tries, "query", req.GetQuery(), "retry_in", bk.NextDelay(), "err", err)
level.Error(util_log.WithContext(ctx, r.log)).Log(
"msg", "error processing request",
"try", tries,
"query", query,
"query_hash", util.HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"length", end.Sub(start),
"retry_in", bk.NextDelay(),
"err", err,
)
bk.Wait()
continue
}
Expand Down
18 changes: 15 additions & 3 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
base "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
logutil "github.com/grafana/loki/pkg/util/log"
)
Expand Down Expand Up @@ -247,8 +248,19 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response,
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

queryHash := logql.HashedQuery(op.Query)
level.Info(logger).Log("msg", "executing query", "type", "range", "query", op.Query, "length", op.EndTs.Sub(op.StartTs), "step", op.Step, "query_hash", queryHash)
queryHash := util.HashedQuery(op.Query)
level.Info(logger).Log(
"msg", "executing query",
"type", "range",
"query", op.Query,
"start", op.StartTs.Format(time.RFC3339Nano),
"end", op.EndTs.Format(time.RFC3339Nano),
"start_delta", time.Since(op.StartTs),
"end_delta", time.Since(op.EndTs),
"length", op.EndTs.Sub(op.StartTs),
"step", op.Step,
"query_hash", queryHash,
)

switch e := expr.(type) {
case syntax.SampleExpr:
Expand Down Expand Up @@ -296,7 +308,7 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response,
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

queryHash := logql.HashedQuery(op.Query)
queryHash := util.HashedQuery(op.Query)
level.Info(logger).Log("msg", "executing query", "type", "instant", "query", op.Query, "query_hash", queryHash)

switch expr.(type) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/template"

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
ruler "github.com/grafana/loki/pkg/ruler/base"
"github.com/grafana/loki/pkg/ruler/rulespb"
"github.com/grafana/loki/pkg/ruler/util"
rulerutil "github.com/grafana/loki/pkg/ruler/util"
"github.com/grafana/loki/pkg/util"
)

// RulesLimits is the one function we need from limits.Overrides, and
Expand All @@ -40,7 +40,7 @@ type RulesLimits interface {
RulerRemoteWriteURL(userID string) string
RulerRemoteWriteTimeout(userID string) time.Duration
RulerRemoteWriteHeaders(userID string) map[string]string
RulerRemoteWriteRelabelConfigs(userID string) []*util.RelabelConfig
RulerRemoteWriteRelabelConfigs(userID string) []*rulerutil.RelabelConfig
RulerRemoteWriteConfig(userID string, id string) *config.RemoteWriteConfig
RulerRemoteWriteQueueCapacity(userID string) int
RulerRemoteWriteQueueMinShards(userID string) int
Expand All @@ -60,7 +60,7 @@ type RulesLimits interface {
// and passing an altered timestamp.
func queryFunc(evaluator Evaluator, checker readyChecker, userID string, logger log.Logger) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
hash := logql.HashedQuery(qs)
hash := util.HashedQuery(qs)
detail := rules.FromOriginContext(ctx)
detailLog := log.With(logger, "rule_name", detail.Name, "rule_type", detail.Kind, "query", qs, "query_hash", hash)

Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/evaluator_jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/util"
)

// EvaluatorWithJitter wraps a given Evaluator. It applies a consistent jitter based on a rule's query string by hashing
Expand Down Expand Up @@ -44,7 +44,7 @@ func NewEvaluatorWithJitter(inner Evaluator, maxJitter time.Duration, hasher has
}

func (e *EvaluatorWithJitter) Eval(ctx context.Context, qs string, now time.Time) (*logqlmodel.Result, error) {
logger := log.With(e.logger, "query", qs, "query_hash", logql.HashedQuery(qs))
logger := log.With(e.logger, "query", qs, "query_hash", util.HashedQuery(qs))
jitter := e.calculateJitter(qs, logger)

if jitter > 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/evaluator_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
"google.golang.org/grpc/keepalive"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/build"
"github.com/grafana/loki/pkg/util/constants"
"github.com/grafana/loki/pkg/util/httpreq"
Expand Down Expand Up @@ -220,7 +220,7 @@ func (r *RemoteEvaluator) query(ctx context.Context, orgID, query string, ts tim
args.Set("time", ts.Format(time.RFC3339Nano))
}
body := []byte(args.Encode())
hash := logql.HashedQuery(query)
hash := util.HashedQuery(query)

req := httpgrpc.HTTPRequest{
Method: http.MethodPost,
Expand Down
13 changes: 12 additions & 1 deletion pkg/util/hash_fp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package util

import "github.com/prometheus/common/model"
import (
"hash/fnv"

"github.com/prometheus/common/model"
)

// HashFP simply moves entropy from the most significant 48 bits of the
// fingerprint into the least significant 16 bits (by XORing) so that a simple
Expand All @@ -12,3 +16,10 @@ import "github.com/prometheus/common/model"
func HashFP(fp model.Fingerprint) uint32 {
return uint32(fp ^ (fp >> 32) ^ (fp >> 16))
}

// HashedQuery returns a unique hash value for the given `query`.
func HashedQuery(query string) uint32 {
h := fnv.New32()
_, _ = h.Write([]byte(query))
return h.Sum32()
}

0 comments on commit 30d0030

Please sign in to comment.