Skip to content

Commit

Permalink
feat: split detected fields queries (#12491)
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Apr 17, 2024
1 parent a8b172b commit 6c33809
Show file tree
Hide file tree
Showing 16 changed files with 781 additions and 191 deletions.
3 changes: 3 additions & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ schema_config:
ruler:
alertmanager_url: http://localhost:9093

frontend:
encoding: protobuf

# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
#
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ func adjustQueryStartTime(maxLookBackPeriod time.Duration, start, now time.Time)
return start
}

func (i *Ingester) GetDetectedFields(_ context.Context, _ *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error) {
func (i *Ingester) GetDetectedFields(_ context.Context, r *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error) {
return &logproto.DetectedFieldsResponse{
Fields: []*logproto.DetectedField{
{
Expand All @@ -1374,6 +1374,7 @@ func (i *Ingester) GetDetectedFields(_ context.Context, _ *logproto.DetectedFiel
Cardinality: 1,
},
},
FieldLimit: r.GetFieldLimit(),
}, nil
}

Expand Down
431 changes: 266 additions & 165 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,16 @@ message DetectedFieldsRequest {

message DetectedFieldsResponse {
repeated DetectedField fields = 1;
uint32 fieldLimit = 2;
}

// TODO: make the detected field include the serialized sketch
// we only want cardinality in the JSON response
message DetectedField {
string label = 1;
string type = 2 [(gogoproto.casttype) = "DetectedFieldType"];
uint64 cardinality = 3;
bytes sketch = 4 [(gogoproto.jsontag) = "-"]; //serialized hyperloglog sketch
}

message DetectedLabelsRequest {
Expand Down
7 changes: 6 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,6 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
t.Server.HTTP.Path("/loki/api/v1/labels").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/series").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/detected_fields").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/patterns").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/detected_labels").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/index/stats").Methods("GET", "POST").Handler(frontendHandler)
Expand All @@ -1105,6 +1104,12 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler)
}

// We don't marshal the hyperloglog sketch in the detected fields response to JSON, so this endpoint
// only works correctly in V2 with protobuf encoding enabled.
if frontendV2 != nil && frontendV2.IsProtobufEncoded() {
t.Server.HTTP.Path("/loki/api/v1/detected_fields").Methods("GET", "POST").Handler(frontendHandler)
}

if t.frontend == nil {
return services.NewIdleService(nil, func(_ error) error {
if t.stopper != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/lokifrontend/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,14 @@ func (f *Frontend) CheckReady(_ context.Context) error {
return errors.New(msg)
}

func (f *Frontend) IsProtobufEncoded() bool {
return f.cfg.Encoding == EncodingProtobuf
}

func (f *Frontend) IsJSONEncoded() bool {
return f.cfg.Encoding == EncodingJSON
}

const stripeSize = 1 << 6

type requestsInProgress struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ func (q *QuerierAPI) DetectedFieldsHandler(ctx context.Context, req *logproto.De
"msg", "queried store for detected fields that does not support it, no response from querier.DetectedFields",
)
return &logproto.DetectedFieldsResponse{
Fields: []*logproto.DetectedField{},
Fields: []*logproto.DetectedField{},
FieldLimit: req.GetFieldLimit(),
}, nil
}
return resp, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/multi_tenant_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ func (q *MultiTenantQuerier) DetectedFields(ctx context.Context, req *logproto.D
)

return &logproto.DetectedFieldsResponse{
Fields: []*logproto.DetectedField{},
Fields: []*logproto.DetectedField{},
FieldLimit: req.GetFieldLimit(),
}, nil
}

Expand Down
34 changes: 25 additions & 9 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,27 +1018,39 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.

// TODO(twhitney): converting from a step to a duration should be abstracted and reused,
// doing this in a few places now.
streams, err := streamsForFieldDetection(iters, req.LineLimit, time.Duration(req.Step*1e6))
streams, err := streamsForFieldDetection(iters, req.LineLimit, time.Duration(req.Step))
if err != nil {
return nil, err
}

detectedFields := parseDetectedFields(ctx, req.FieldLimit, streams)

//TODO: detected field needs to contain the sketch
// make sure response to frontend is GRPC
//only want cardinality in JSON
fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
for k, v := range detectedFields {
sketch, err := v.sketch.MarshalBinary()
if err != nil {
level.Warn(q.logger).Log("msg", "failed to marshal hyperloglog sketch", "err", err)
continue
}

fields[fieldCount] = &logproto.DetectedField{
Label: k,
Type: v.fieldType,
Cardinality: v.Estimate(),
Sketch: sketch,
}

fieldCount++
}

//TODO: detected fields response needs to include the sketch
return &logproto.DetectedFieldsResponse{
Fields: fields,
Fields: fields,
FieldLimit: req.GetFieldLimit(),
}, nil
}

Expand All @@ -1064,6 +1076,10 @@ func (p *parsedFields) Estimate() uint64 {
return p.sketch.Estimate()
}

func (p *parsedFields) Marshal() ([]byte, error) {
return p.sketch.MarshalBinary()
}

func (p *parsedFields) DetermineType(value string) {
p.fieldType = determineType(value)
p.isTypeDetected = true
Expand Down Expand Up @@ -1098,20 +1114,22 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
fieldCount := uint32(0)

for _, stream := range streams {

level.Debug(spanlogger.FromContext(ctx)).Log(
"detected_fields", "true",
"msg", fmt.Sprintf("looking for detected fields in stream %d with %d lines", stream.Hash, len(stream.Entries)))

for _, entry := range stream.Entries {
detected := parseLine(entry.Line)
for k, vals := range detected {
if fieldCount >= limit {
return detectedFields
df, ok := detectedFields[k]
if !ok && fieldCount < limit {
df = newParsedFields()
detectedFields[k] = df
fieldCount++
}

if _, ok := detectedFields[k]; !ok {
detectedFields[k] = newParsedFields()
if df == nil {
continue
}

for _, v := range vals {
Expand All @@ -1126,8 +1144,6 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
level.Debug(spanlogger.FromContext(ctx)).Log(
"detected_fields", "true",
"msg", fmt.Sprintf("detected field %s with %d values", k, len(vals)))

fieldCount++
}
}
}
Expand Down
106 changes: 97 additions & 9 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"golang.org/x/exp/maps"

"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/storage/detected"
"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"

"github.com/grafana/dskit/httpgrpc"
Expand Down Expand Up @@ -972,9 +973,12 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht
return req.WithContext(ctx), nil
case *DetectedFieldsRequest:
params := url.Values{
"start": []string{fmt.Sprintf("%d", request.Start.UnixNano())},
"end": []string{fmt.Sprintf("%d", request.End.UnixNano())},
"query": []string{request.GetQuery()},
"query": []string{request.GetQuery()},
"start": []string{fmt.Sprintf("%d", request.Start.UnixNano())},
"end": []string{fmt.Sprintf("%d", request.End.UnixNano())},
"line_limit": []string{fmt.Sprintf("%d", request.GetLineLimit())},
"field_limit": []string{fmt.Sprintf("%d", request.GetFieldLimit())},
"step": []string{fmt.Sprintf("%d", request.GetStep())},
}

u := &url.URL{
Expand Down Expand Up @@ -1587,6 +1591,29 @@ func (Codec) MergeResponse(responses ...queryrangebase.Response) (queryrangebase
Response: seriesvolume.Merge(resps, resp0.Response.Limit),
Headers: headers,
}, nil
case *DetectedFieldsResponse:
resp0 := responses[0].(*DetectedFieldsResponse)
headers := resp0.Headers
fieldLimit := resp0.Response.GetFieldLimit()

fields := []*logproto.DetectedField{}
for _, r := range responses {
fields = append(fields, r.(*DetectedFieldsResponse).Response.Fields...)
}

mergedFields, err := detected.MergeFields(fields, fieldLimit)

if err != nil {
return nil, err
}

return &DetectedFieldsResponse{
Response: &logproto.DetectedFieldsResponse{
Fields: mergedFields,
FieldLimit: fieldLimit,
},
Headers: headers,
}, nil
default:
return nil, fmt.Errorf("unknown response type (%T) in merging responses", responses[0])
}
Expand Down Expand Up @@ -1781,8 +1808,12 @@ func ParamsFromRequest(req queryrangebase.Request) (logql.Params, error) {
return &paramsStatsWrapper{
IndexStatsRequest: r,
}, nil
case *DetectedFieldsRequest:
return &paramsDetectedFieldsWrapper{
DetectedFieldsRequest: r,
}, nil
default:
return nil, fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, got (%T)", r)
return nil, fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, *DetectedFieldsRequest, got (%T)", r)
}
}

Expand Down Expand Up @@ -1950,6 +1981,47 @@ func (p paramsStatsWrapper) Shards() []string {
return make([]string, 0)
}

type paramsDetectedFieldsWrapper struct {
*DetectedFieldsRequest
}

func (p paramsDetectedFieldsWrapper) QueryString() string {
return p.GetQuery()
}

func (p paramsDetectedFieldsWrapper) GetExpression() syntax.Expr {
expr, err := syntax.ParseExpr(p.GetQuery())
if err != nil {
return nil
}

return expr
}

func (p paramsDetectedFieldsWrapper) Start() time.Time {
return p.GetStartTs()
}

func (p paramsDetectedFieldsWrapper) End() time.Time {
return p.GetEndTs()
}

func (p paramsDetectedFieldsWrapper) Step() time.Duration {
return time.Duration(p.GetStep() * 1e6)
}

func (p paramsDetectedFieldsWrapper) Interval() time.Duration {
return 0
}

func (p paramsDetectedFieldsWrapper) Direction() logproto.Direction {
return logproto.BACKWARD
}
func (p paramsDetectedFieldsWrapper) Limit() uint32 { return p.DetectedFieldsRequest.LineLimit }
func (p paramsDetectedFieldsWrapper) Shards() []string {
return make([]string, 0)
}

func httpResponseHeadersToPromResponseHeaders(httpHeaders http.Header) []queryrangebase.PrometheusResponseHeader {
var promHeaders []queryrangebase.PrometheusResponseHeader
for h, hv := range httpHeaders {
Expand Down Expand Up @@ -2071,12 +2143,15 @@ type DetectedFieldsRequest struct {
path string
}

func NewDetectedFieldsRequest(start, end time.Time, query, path string) *DetectedFieldsRequest {
func NewDetectedFieldsRequest(start, end time.Time, lineLimit, fieldLimit uint32, step int64, query, path string) *DetectedFieldsRequest {
return &DetectedFieldsRequest{
DetectedFieldsRequest: logproto.DetectedFieldsRequest{
Start: start,
End: end,
Query: query,
Start: start,
End: end,
Query: query,
LineLimit: lineLimit,
FieldLimit: fieldLimit,
Step: step,
},
path: path,
}
Expand All @@ -2103,7 +2178,15 @@ func (r *DetectedFieldsRequest) GetStartTs() time.Time {
}

func (r *DetectedFieldsRequest) GetStep() int64 {
return 0
return r.Step
}

func (r *DetectedFieldsRequest) GetLineLimit() uint32 {
return r.LineLimit
}

func (r *DetectedFieldsRequest) GetFieldLimit() uint32 {
return r.FieldLimit
}

func (r *DetectedFieldsRequest) Path() string {
Expand Down Expand Up @@ -2132,6 +2215,11 @@ func (r *DetectedFieldsRequest) LogToSpan(sp opentracing.Span) {
sp.LogFields(
otlog.String("start", timestamp.Time(r.GetStart().UnixNano()).String()),
otlog.String("end", timestamp.Time(r.GetEnd().UnixNano()).String()),
otlog.String("query", r.GetQuery()),
otlog.Int64("step (ms)", r.GetStep()),
otlog.Int64("line_limit", int64(r.GetLineLimit())),
otlog.Int64("field_limit", int64(r.GetFieldLimit())),
otlog.String("step", fmt.Sprintf("%d", r.GetStep())),
)
}

Expand Down
Loading

0 comments on commit 6c33809

Please sign in to comment.