-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Enable warnings in Loki query responses #12425
Conversation
@@ -874,6 +875,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / | |||
func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error { | |||
// initialize stats collection for ingester queries. | |||
_, ctx := stats.NewContext(queryServer.Context()) | |||
_, ctx = metadata.NewContext(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This creates a new metadata context for the ingester. Any warnings will be attached in the returned batch stats.
@@ -971,6 +973,7 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ | |||
|
|||
stats.AddIngesterBatch(int64(batchSize)) | |||
batch.Stats = stats.Ingester() | |||
batch.Warnings = metadata.Warnings() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's where we attach warnings to the ingester batch response. From here on out, it's a matter of encoding and decoding (and encoding and decoding) in the scheduler and frontend to propagate them to the user
@@ -266,6 +266,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { | |||
Data: data, | |||
Statistics: statResult, | |||
Headers: metadataCtx.Headers(), | |||
Warnings: metadataCtx.Warnings(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the crux of the PR on the frontend. Everything comes through here so this is where we attach the warnings to be propagated to the scheduler and querier.
@@ -1145,7 +1144,8 @@ func decodeResponseJSONFrom(buf []byte, req queryrangebase.Request, headers http | |||
ResultType: loghttp.ResultTypeMatrix, | |||
Result: toProtoMatrix(resp.Data.Result.(loghttp.Matrix)), | |||
}, | |||
Headers: convertPrometheusResponseHeadersToPointers(httpResponseHeadersToPromResponseHeaders(headers)), | |||
Headers: convertPrometheusResponseHeadersToPointers(httpResponseHeadersToPromResponseHeaders(headers)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this file, we have to attach warnings to each of the response types that might possibly have them
for _, res := range responses { | ||
lokiResult := res.(*LokiResponse) | ||
mergedStats.MergeSplit(lokiResult.Statistics) | ||
lokiResponses = append(lokiResponses, lokiResult) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Loki splits a query and then merges the responses into one. Here we make sure all the warnings from subqueries make it into the merged query. We'll do the same below for metrics queries
@@ -164,8 +164,28 @@ func (p prometheusCodec) MergeResponse(responses ...Response) (Response, error) | |||
// Merge the responses. | |||
sort.Sort(byFirstTime(promResponses)) | |||
|
|||
uniqueWarnings := map[string]struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merging metrics queries
s.WriteObjectStart() | ||
s.WriteObjectField("status") | ||
s.WriteString("success") | ||
|
||
if len(warnings) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Custom json marshalling
} | ||
|
||
func (q *QueryResponse) UnmarshalJSON(data []byte) error { | ||
return jsonparser.ObjectEach(data, func(key, value []byte, dataType jsonparser.ValueType, offset int) error { | ||
switch string(key) { | ||
case "status": | ||
q.Status = string(value) | ||
case "warnings": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Custom json unmarshalling
@@ -94,3 +119,19 @@ func ExtendHeaders(dst map[string][]string, src []*definitions.PrometheusRespons | |||
dst[header.Name] = header.Values | |||
} | |||
} | |||
|
|||
func AddWarnings(ctx context.Context, warnings ...string) error { | |||
context, ok := ctx.Value(metadataKey).(*Context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Presumably, most of the time there will be no warnings, and looking up keys in contexts can be surprisingly expensive, not to mention the locking, so a short circuit here is probably a good idea 🤔
context, ok := ctx.Value(metadataKey).(*Context) | |
if len(warnings) == 0 { | |
return nil | |
} | |
context, ok := ctx.Value(metadataKey).(*Context) |
pkg/querier/queryrange/codec.go
Outdated
} | ||
} | ||
|
||
warnings := make([]string, 0, len(uniqueWarnings)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but since we already import golang.org/x/exp
, we might as well use maps.Keys
from it
pkg/logqlmodel/metadata/context.go
Outdated
c.mtx.Lock() | ||
defer c.mtx.Unlock() | ||
|
||
warnings := make([]string, 0, len(c.warnings)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as https://github.com/grafana/loki/pull/12425/files#r1548053553, we can use maps.Keys()
here
pkg/logql/accumulator.go
Outdated
@@ -353,6 +355,14 @@ func (acc *AccumulatedStreams) Result() []logqlmodel.Result { | |||
) | |||
} | |||
|
|||
warnings := make([]string, 0, len(acc.warnings)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, maps.Keys()
can simplify this
} | ||
} | ||
|
||
warnings := make([]string, 0, len(uniqueWarnings)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, maps.Keys()
😅
@@ -252,6 +252,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que | |||
return nil, err | |||
} | |||
|
|||
// TODO(tpatterson): add warnings to response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TODO
seems obsolete, you seem to have added the warnings to all of the response types below ?
pkg/loghttp/query.go
Outdated
case "warnings": | ||
var warnings []string | ||
if _, err := jsonparser.ArrayEach(value, func(value []byte, dataType jsonparser.ValueType, offset int, err error) { | ||
warnings = append(warnings, string(value)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm considering JSON can contain all sorts of junk in an array, this probably needs a bit more validation that we are actually getting strings here, presumably through the dataType
🤔
I assume that value
doesn't contain the string quote characters, but I wonder if it contains escape characters, e.g. if the warning itself contains a quote symbol, I assume it will be escaped in value
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand how every part of this is threaded and fits together, but overall the code LGTM, with the minor nitpicks I added as inline comment.
I'll 👍 to unblock, though the only inline comment that is somewhat blocking is this one, though this one is probably also quite nice to have, considering how AddWarnings()
seems to be very much on the hot path.
Under certain conditions, it might be useful for Loki to be able to return warnings to the frontend. The prometheus response types support this so it's just a matter of wiring it in.