-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LogQL: Labels and Metrics Extraction #2769
Conversation
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
… parsing and moar tests
Also add duration convertion for unwrap. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
The auth middleware was happening after the stats one and so org_id was not set 🤦. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
This patch extends the duration label filter with support for byte sizes such as `1kB` and `42MiB`.
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Support byte sizes in label filters.
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
I'm overall very positive! This will bring better querying and filtering of logs, e.g. to filter on request ip-address, unique user ids, etc. Awesome! 🎖️ A few thoughts:
|
We're thinking about a CIDR filter :)
Sure thing a log would look like this
It would transform it into:
No whitespace outside of string literal ("abc. ") are not important anywhere actually. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Publishing initial feedback so I don't lose it - will pick up where I left off.
} | ||
} | ||
var stream *logproto.Stream | ||
lhash := parsedLbs.Hash() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a method to the logql.Pipeline
interface like HasLabelMappings() bool
so we can avoid all the label parsing/hashing for queries which don't use them?
var found bool | ||
var s *logproto.Series | ||
lhash := parsedLabels.Hash() | ||
if s, found = series[lhash]; !found { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can be if s, found := series[lhash]; !found
instead of pre-declaring found
as it's unused elsewhere.
} | ||
var found bool | ||
var s *logproto.Series | ||
lhash := parsedLabels.Hash() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
future idea: keep a least-recently-used cache for bounded length that memoizes hashing operations.
edit: that was the most jargon-y line I think I've ever posted 😭 . tl;dr: can we cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should try something for sure, same for Labels.String()
s.Samples = append(s.Samples, logproto.Sample{ | ||
Timestamp: e.t, | ||
Value: value, | ||
Hash: xxhash.Sum64([]byte(e.s)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
future idea: depending on how we use these, I wonder if it's better to add them to our chunk format & precompute/embed them there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah we could, I think however hash is SUPER SUPER fast. this is why I picked this one.
But yes let's benchmark without.
} | ||
// we decode always the line length and ts as varint | ||
si.stats.DecompressedBytes += int64(len(line)) + 2*binary.MaxVarintLen64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: I'd like to have this encapsulated somewhere else (memchunk?), but it seems fine for now.
return aux / count | ||
} | ||
|
||
func stddevOverTime(samples []promql.Point) float64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this expressable as math.Sqrt(stdvarOverTime)
?
return min | ||
} | ||
|
||
func stdvarOverTime(samples []promql.Point) float64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func stdvarOverTime(samples []promql.Point) float64 { | |
// See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm | |
func stdvarOverTime(samples []promql.Point) float64 { |
return func(samples []promql.Point) float64 { | ||
values := make(vectorByValueHeap, 0, len(samples)) | ||
for _, v := range samples { | ||
values = append(values, promql.Sample{Point: promql.Point{V: v.V}}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you need to Push
(via the Heap API) instead of append
?
edit: This is dangerous - I think you just want a sorted []promql.Point
but the type you're using suggests it's a heap, which can be expected to have a very different internal structure when sorted. I'd suggest just implementing a new type that can implement the Sort interface and use that instead.
// If 'values' has zero elements, NaN is returned. | ||
// If q<0, -Inf is returned. | ||
// If q>1, +Inf is returned. | ||
func quantile(q float64, values vectorByValueHeap) float64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we can't just validate that the quantile has reasonable bounds (i.e. not < 0 or > 1)?
) | ||
|
||
var ( | ||
_ Stage = &LineFormatter{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_ Stage = &LineFormatter{} | |
// Compile time checks for interface inhabitants. | |
_ Stage = &LineFormatter{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, finally got to the end! All in all it’s excellent. There were a few bits here and there I’d like your feedback on, but let’s get this merged soon.
) | ||
|
||
var ( | ||
_ LabelFilterer = &BinaryLabelFilter{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_ LabelFilterer = &BinaryLabelFilter{} | |
// Compile time assertions | |
_ LabelFilterer = &BinaryLabelFilter{} |
} | ||
line, rok := b.Right.Process(line, lbs) | ||
if !b.and { | ||
return line, lok || rok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be simplified to return line, true
.
|
||
// ToSampleExtractor transform a LineExtractor into a SampleExtractor. | ||
// Useful for metric conversion without log Pipeline. | ||
func (l LineExtractor) ToSampleExtractor() SampleExtractor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this more easily be a method?
func (l LineExtractor) Process() (line []byte, lbs labels.Labels) (float64, labels.Labels, bool) {
return l(line), lbs, true
}
return lineSampleExtractor{Stage: m.Reduce(), LineExtractor: ex}, nil | ||
} | ||
|
||
type convertionFn func(value string) (float64, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type convertionFn func(value string) (float64, error) | |
type conversionFn func(value string) (float64, error) |
) | ||
|
||
var ( | ||
_ Stage = &JSONParser{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_ Stage = &JSONParser{} | |
// Compile time assurance | |
_ Stage = &JSONParser{} |
@@ -113,7 +113,7 @@ func QueryType(query string) (string, error) { | |||
return QueryTypeMetric, nil | |||
case *matchersExpr: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It would be nice if we could use all interface types in here to make refactoring & extending safer/easier.
mustNewMatcher(labels.MatchRegexp, "foo", "bar\\w+"), | ||
}, | ||
}, | ||
// { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these all commented out?
@@ -278,6 +278,13 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr, r *sh | |||
} | |||
|
|||
func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr, r *shardRecorder) SampleExpr { | |||
if hasLabelModifier(expr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome!
@@ -289,6 +296,22 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr, r *shar | |||
} | |||
} | |||
|
|||
// hasLabelModifier tells if an expression contains pipelines that can modify stream labels | |||
// parsers introduce new labels but does not alter original one for instance. | |||
func hasLabelModifier(expr *rangeAggregationExpr) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would work better as an interface method on SampleExpr
, like how we've done Operations
:
type SampleExpr interface {
// Selector is the LogQL selector to apply when retrieving logs.
Selector() LogSelectorExpr
Extractor() (SampleExtractor, error)
// Operations returns the list of operations used in this SampleExpr
Operations() []string
Expr
}
This would allow us to add new types in the future with confidence. I think this only applies for sample expressions because sharded LogSelectorExpr
are recombined via HeapIterators
and we don't drop their lines or combine them in non-associative ways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, will address some of these in followup PRs.
If for example I'm checking this out as i have problems comparing a string duration with duration in ms/s/us (this case 250ms). |
What this PR does / why we need it:
This PR introduces all enhancement suggested in the following design docs: https://docs.google.com/document/d/1xRGO5YK8Mw2e7Pdtvo_HVL95EWYxv3KD4mdIjCGqNVk/edit#heading=h.poq71alhk873
Log selector can now contain:
| regexp "(?P<label_name>\w+)"
| foo > 250ms
), bytes (| foo >= 20KB
), float (| foo = 5.0
) and string (same as prometheus|foo=~"abc"
)and
andor
which respectively express and and or binary operation.and
can also simply replace by a comma, a space or another pipe.| line_format "{{.namespace}} => {{.query}}"
| label_fmt foo=bar
rename bar into foo.| label_fmt foo="buzz{{.bar}}"
set foo label value by executing a template, same template engine and principle as theline_format
| label_fmt foo=bar,blip="{{.boop}}"
{job="cortex-ops/query-frontend"} |= "logging.go" | logfmt
)Full example:
Metrics queries can take advantage of everything above plus we've introduced new over_time aggregation which requires you to select which labels you want to use as value using unwrap
| unwrap latency
as the last pipe. Without unwrap those new aggregation will fail.New Additions:
Non associative operation such avg_over_time, stddev_over_time and stdvar_over_time and quantile_over_time support grouping directly without vector aggregation. e.g
quantile_over_time(0.99,{job="tempo-dev/query-frontend"} | json | unwrap duration(duration) [1m]) by (caller)
. For count/sum/min and max you can use normal vector grouping to reduce label cardinality such assum by (caller) (count_over_time{job="tempo-dev/query-frontend"} | json | unwrap duration(duration) [1m]))
Unwrap also support the duration or duration_seconds function
| unwrap duration(label_foo)
as shown above, to parse duration such1s
or5m25s200ms
from label_foo. More function will be added.Error:
If anything goes wrong within unwrap, label filter , parser and such, a new label
__error__
is added to each sample/log line. You can use label filter to remove/filter out those for example:{job="cortex-ops/query-frontend"} | json | __error__=""
Result , from metric queries, cannot contains an
__error__
label otherwise Loki will returns an API error. The error should be filtered out after it's appearance this means if the unwrap conversion generate and error the label filter should be place at the end. Only label filters can be placed after unwrap.example:
Performance:
So far it's not that bad, filters have taken a hit, but I've got some plan to get them back to their speed. Regexp and metrics queries can be slows. Make sure to reduce labels when using logfmt and json.
I've run some benchmark already and it seems that mostly the labels handling is what is expensive. Again I've got plenty of placeholder for this.
Frontend can split and shard queries, some are excluded for now as it was requiring some more thinking.
Special notes for your reviewer:
I've left ton of todo for later because I could go on like this forever.
Next:
✌️