Skip to content

Commit

Permalink
feat: Enable warnings in Loki query responses (#12425)
Browse files Browse the repository at this point in the history
  • Loading branch information
MasslessParticle committed Apr 3, 2024
1 parent 9994f56 commit cf04ec1
Show file tree
Hide file tree
Showing 31 changed files with 1,070 additions and 372 deletions.
4 changes: 4 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"time"

"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"

lokilog "github.com/grafana/loki/v3/pkg/logql/log"

"github.com/go-kit/log"
Expand Down Expand Up @@ -874,6 +876,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
// initialize stats collection for ingester queries.
_, ctx := stats.NewContext(queryServer.Context())
_, ctx = metadata.NewContext(ctx)

if req.Plan == nil {
parsed, err := syntax.ParseLogSelector(req.Selector, true)
Expand Down Expand Up @@ -933,6 +936,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error {
// initialize stats collection for ingester queries.
_, ctx := stats.NewContext(queryServer.Context())
_, ctx = metadata.NewContext(ctx)
sp := opentracing.SpanFromContext(ctx)

// If the plan is empty we want all series to be returned.
Expand Down
10 changes: 10 additions & 0 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"syscall"
"time"

"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"

"github.com/grafana/loki/v3/pkg/util/httpreq"

"github.com/go-kit/log/level"
Expand Down Expand Up @@ -981,6 +983,7 @@ type QuerierQueryServer interface {

func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit int32) error {
stats := stats.FromContext(ctx)
metadata := metadata.FromContext(ctx)

// send until the limit is reached.
for limit != 0 && !isDone(ctx) {
Expand All @@ -999,6 +1002,7 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ

stats.AddIngesterBatch(int64(batchSize))
batch.Stats = stats.Ingester()
batch.Warnings = metadata.Warnings()

if isDone(ctx) {
break
Expand All @@ -1013,6 +1017,7 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ
}

stats.Reset()
metadata.Reset()
}
return nil
}
Expand All @@ -1021,6 +1026,7 @@ func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer
sp := opentracing.SpanFromContext(ctx)

stats := stats.FromContext(ctx)
metadata := metadata.FromContext(ctx)
for !isDone(ctx) {
batch, size, err := iter.ReadSampleBatch(it, queryBatchSampleSize)
if err != nil {
Expand All @@ -1029,6 +1035,8 @@ func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer

stats.AddIngesterBatch(int64(size))
batch.Stats = stats.Ingester()
batch.Warnings = metadata.Warnings()

if isDone(ctx) {
break
}
Expand All @@ -1042,6 +1050,8 @@ func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer
}

stats.Reset()
metadata.Reset()

if sp != nil {
sp.LogKV("event", "sent batch", "size", size)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/iter/entry_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/util"
Expand Down Expand Up @@ -379,6 +381,7 @@ func (i *queryClientIterator) Next() bool {
return false
}
stats.JoinIngesters(ctx, batch.Stats)
_ = metadata.AddWarnings(ctx, batch.Warnings...)
i.curr = NewQueryResponseIterator(batch, i.direction)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/iter/sample_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"io"
"sync"

"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/util"
Expand Down Expand Up @@ -490,6 +492,8 @@ func (i *sampleQueryClientIterator) Next() bool {
return false
}
stats.JoinIngesters(ctx, batch.Stats)
_ = metadata.AddWarnings(ctx, batch.Warnings...)

i.curr = NewSampleQueryResponseIterator(batch)
}
return true
Expand Down
29 changes: 27 additions & 2 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,34 @@ type QueryStatus string
const (
QueryStatusSuccess = "success"
QueryStatusFail = "fail"
// How much stack space to allocate for unescaping JSON strings; if a string longer
// than this needs to be escaped, it will result in a heap allocation
unescapeStackBufSize = 64
)

// QueryResponse represents the http json response to a Loki range and instant query
type QueryResponse struct {
Status string `json:"status"`
Data QueryResponseData `json:"data"`
Status string `json:"status"`
Warnings []string `json:"warnings,omitempty"`
Data QueryResponseData `json:"data"`
}

func (q *QueryResponse) UnmarshalJSON(data []byte) error {
return jsonparser.ObjectEach(data, func(key, value []byte, dataType jsonparser.ValueType, offset int) error {
switch string(key) {
case "status":
q.Status = string(value)
case "warnings":
var warnings []string
if _, err := jsonparser.ArrayEach(value, func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
if dataType == jsonparser.String {
warnings = append(warnings, unescapeJSONString(value))
}
}); err != nil {
return err
}

q.Warnings = warnings
case "data":
var responseData QueryResponseData
if err := responseData.UnmarshalJSON(value); err != nil {
Expand All @@ -62,6 +77,16 @@ func (q *QueryResponse) UnmarshalJSON(data []byte) error {
})
}

func unescapeJSONString(b []byte) string {
var stackbuf [unescapeStackBufSize]byte // stack-allocated array for allocation-free unescaping of small strings
bU, err := jsonparser.Unescape(b, stackbuf[:])
if err != nil {
return ""
}

return string(bU)
}

// PushRequest models a log stream push but is unmarshalled to proto push format.
type PushRequest struct {
Streams []LogProtoStream `json:"streams"`
Expand Down
Loading

0 comments on commit cf04ec1

Please sign in to comment.