Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-2.8.x] Expose optional label matcher for label values handler (#8824) #8960

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Expand Up @@ -6,6 +6,12 @@

##### Enhancements

* [8824](https://github.com/grafana/loki/pull/8824) **periklis**: Expose optional label matcher for label values handler
* [8852](https://github.com/grafana/loki/pull/8852) **wtchangdm**: Loki: Add `route_randomly` to Redis options.
* [8848](https://github.com/grafana/loki/pull/8848) **dannykopping**: Ruler: add configurable rule evaluation jitter.
* [8752](https://github.com/grafana/loki/pull/8752) **chaudum**: Add query fairness control across actors within a tenant to scheduler, which can be enabled by passing the `X-Loki-Actor-Path` header to the HTTP request of the query.
* [8786](https://github.com/grafana/loki/pull/8786) **DylanGuedes**: Ingester: add new /ingester/prepare_shutdown endpoint.
* [8744](https://github.com/grafana/loki/pull/8744) **dannykopping**: Ruler: remote rule evaluation.
periklis marked this conversation as resolved.
Show resolved Hide resolved
* [8727](https://github.com/grafana/loki/pull/8727) **cstyan** **jeschkies**: Propagate per-request limit header to querier.
* [8682](https://github.com/grafana/loki/pull/8682) **dannykopping**: Add fetched chunk size distribution metric `loki_chunk_fetcher_fetched_size_bytes`.
* [8532](https://github.com/grafana/loki/pull/8532) **justcompile**: Adds Storage Class option to S3 objects
Expand Down
1 change: 1 addition & 0 deletions docs/sources/api/_index.md
Expand Up @@ -483,6 +483,7 @@ It accepts the following query parameters in the URL:
- `start`: The start time for the query as a nanosecond Unix epoch. Defaults to 6 hours ago.
- `end`: The end time for the query as a nanosecond Unix epoch. Defaults to now.
- `since`: A `duration` used to calculate `start` relative to `end`. If `end` is in the future, `start` is calculated as this duration before now. Any value specified for `start` supersedes this parameter.
- `query`: A set of log stream selector that selects the streams to match and return label values for `<name>`. Example: `{"app": "myapp", "environment": "dev"}`

In microservices mode, `/loki/api/v1/label/<name>/values` is exposed by the querier.

Expand Down
11 changes: 10 additions & 1 deletion pkg/ingester/ingester.go
Expand Up @@ -828,7 +828,16 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
if err != nil {
return nil, err
}
resp, err := instance.Label(ctx, req)

var matchers []*labels.Matcher
if req.Query != "" {
matchers, err = syntax.ParseMatchers(req.Query)
if err != nil {
return nil, err
}
}

resp, err := instance.Label(ctx, req, matchers...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/loghttp/labels.go
Expand Up @@ -82,5 +82,7 @@ func ParseLabelQuery(r *http.Request) (*logproto.LabelRequest, error) {
}
req.Start = &start
req.End = &end

req.Query = query(r)
return req, nil
}
319 changes: 188 additions & 131 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/logproto/logproto.proto
Expand Up @@ -119,6 +119,7 @@ message LabelRequest {
(gogoproto.stdtime) = true,
(gogoproto.nullable) = true
];
string query = 5; // Naming this query instead of match because this should be with queryrangebase.Request interface
}

message LabelResponse {
Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/metrics.go
Expand Up @@ -177,7 +177,7 @@ func RecordLabelQueryMetrics(
ctx context.Context,
log log.Logger,
start, end time.Time,
label, status string,
label, query, status string,
stats logql_stats.Result,
) {
var (
Expand All @@ -199,6 +199,7 @@ func RecordLabelQueryMetrics(
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"label", label,
"query", query,
"splits", stats.Summary.Splits,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/metrics_test.go
Expand Up @@ -101,7 +101,7 @@ func TestLogLabelsQuery(t *testing.T) {
sp := opentracing.StartSpan("")
ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp)
now := time.Now()
RecordLabelQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, "foo", "200", stats.Result{
RecordLabelQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, "foo", "", "200", stats.Result{
Summary: stats.Summary{
BytesProcessedPerSecond: 100000,
ExecTime: 25.25,
Expand All @@ -111,7 +111,7 @@ func TestLogLabelsQuery(t *testing.T) {
})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query_type=labels length=1h0m0s duration=25.25s status=200 label=foo splits=0 throughput=100kB total_bytes=100kB total_entries=12\n",
"level=info org_id=foo traceID=%s latency=slow query_type=labels length=1h0m0s duration=25.25s status=200 label=foo query= splits=0 throughput=100kB total_bytes=100kB total_entries=12\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/http.go
Expand Up @@ -222,7 +222,7 @@ func (q *QuerierAPI) LabelHandler(w http.ResponseWriter, r *http.Request) {
status, _ = server.ClientHTTPStatusAndError(err)
}

logql.RecordLabelQueryMetrics(ctx, log, *req.Start, *req.End, req.Name, strconv.Itoa(status), statResult)
logql.RecordLabelQueryMetrics(ctx, log, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult)

if err != nil {
serverutil.WriteError(err, w)
Expand Down
11 changes: 10 additions & 1 deletion pkg/querier/querier.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/httpgrpc"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/health/grpc_health_v1"
Expand Down Expand Up @@ -364,6 +365,14 @@ func (q *SingleTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequ
return nil, err
}

var matchers []*labels.Matcher
if req.Query != "" {
matchers, err = syntax.ParseMatchers(req.Query)
if err != nil {
return nil, err
}
}

// Enforce the query timeout while querying backends
queryTimeout := q.limits.QueryTimeout(ctx, userID)
// TODO: remove this clause once we remove the deprecated query-timeout flag.
Expand Down Expand Up @@ -401,7 +410,7 @@ func (q *SingleTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequ
)

if req.Values {
storeValues, err = q.store.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name)
storeValues, err = q.store.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name, matchers...)
} else {
storeValues, err = q.store.LabelNamesForMetricName(ctx, userID, from, through, "logs")
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/querier/queryrange/codec.go
Expand Up @@ -186,10 +186,6 @@ func (r *LokiLabelNamesRequest) WithQuery(query string) queryrangebase.Request {
return &new
}

func (r *LokiLabelNamesRequest) GetQuery() string {
return ""
}

func (r *LokiLabelNamesRequest) GetStep() int64 {
return 0
}
Expand Down Expand Up @@ -259,6 +255,7 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []
StartTs: *req.Start,
EndTs: *req.End,
Path: r.URL.Path,
Query: req.Query,
}, nil
case IndexStatsOp:
req, err := loghttp.ParseIndexStatsQuery(r)
Expand Down Expand Up @@ -340,6 +337,7 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http
params := url.Values{
"start": []string{fmt.Sprintf("%d", request.StartTs.UnixNano())},
"end": []string{fmt.Sprintf("%d", request.EndTs.UnixNano())},
"query": []string{request.GetQuery()},
}

u := &url.URL{
Expand Down
35 changes: 35 additions & 0 deletions pkg/querier/queryrange/codec_test.go
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
strings "strings"
"testing"
"time"
Expand Down Expand Up @@ -80,6 +81,15 @@ func Test_codec_DecodeRequest(t *testing.T) {
StartTs: start,
EndTs: end,
}, false},
{"label_values", func() (*http.Request, error) {
return http.NewRequest(http.MethodGet,
fmt.Sprintf(`/label/test/values?start=%d&end=%d&query={foo="bar"}`, start.UnixNano(), end.UnixNano()), nil)
}, &LokiLabelNamesRequest{
Path: "/label/test/values",
StartTs: start,
EndTs: end,
Query: `{foo="bar"}`,
}, false},
{"index_stats", func() (*http.Request, error) {
return LokiCodec.EncodeRequest(context.Background(), &logproto.IndexStatsRequest{
From: model.TimeFromUnixNano(start.UnixNano()),
Expand Down Expand Up @@ -334,20 +344,45 @@ func Test_codec_labels_EncodeRequest(t *testing.T) {
Path: "/loki/api/v1/labels/__name__/values",
StartTs: start,
EndTs: end,
Query: `{foo="bar"}`,
}
got, err = LokiCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/labels/__name__/values", got.URL.Path)
require.Equal(t, fmt.Sprintf("%d", start.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end"))
require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("query"))

// testing a full roundtrip
req, err = LokiCodec.DecodeRequest(context.TODO(), got, nil)
require.NoError(t, err)
require.Equal(t, toEncode.StartTs, req.(*LokiLabelNamesRequest).StartTs)
require.Equal(t, toEncode.EndTs, req.(*LokiLabelNamesRequest).EndTs)
require.Equal(t, toEncode.Query, req.(*LokiLabelNamesRequest).Query)
require.Equal(t, "/loki/api/v1/labels/__name__/values", req.(*LokiLabelNamesRequest).Path)
}

func Test_codec_labels_DecodeRequest(t *testing.T) {
ctx := context.Background()
u, err := url.Parse(`/loki/api/v1/labels/__name__/values?start=1575285010000000010&end=1575288610000000010&query={foo="bar"}`)
require.NoError(t, err)

r := &http.Request{URL: u}
req, err := LokiCodec.DecodeRequest(context.TODO(), r, nil)
require.NoError(t, err)
require.Equal(t, start, req.(*LokiLabelNamesRequest).StartTs)
require.Equal(t, end, req.(*LokiLabelNamesRequest).EndTs)
require.Equal(t, `{foo="bar"}`, req.(*LokiLabelNamesRequest).Query)
require.Equal(t, "/loki/api/v1/labels/__name__/values", req.(*LokiLabelNamesRequest).Path)

got, err := LokiCodec.EncodeRequest(ctx, req)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/labels/__name__/values", got.URL.Path)
require.Equal(t, fmt.Sprintf("%d", start.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end"))
require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("query"))
}

func Test_codec_index_stats_EncodeRequest(t *testing.T) {
Expand Down