Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: align semantics of metric and log query label extraction #11587

Merged
merged 9 commits into from
Jan 11, 2024
5 changes: 4 additions & 1 deletion pkg/logql/log/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,13 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde
return "", false
}

if !lbs.ParserLabelHints().ShouldExtract(sanitized) {
_, alwaysExtract := keys[sanitized]
if !alwaysExtract && !lbs.ParserLabelHints().ShouldExtract(sanitized) {
return "", false
}
return sanitized, true
})

if !ok {
continue
}
Expand Down Expand Up @@ -530,6 +532,7 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde
}
}
}

if l.strict && l.dec.Err() != nil {
addErrLabel(errLogfmt, l.dec.Err(), lbs)
return line, true
Expand Down
31 changes: 15 additions & 16 deletions pkg/logql/log/parser_hints.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ type Hints struct {
}

func (p *Hints) ShouldExtract(key string) bool {
if len(p.requiredLabels) == 0 {
return true
}

for _, l := range p.extracted {
if l == key {
return false
Expand All @@ -74,7 +70,7 @@ func (p *Hints) ShouldExtract(key string) bool {
}
}

return false
return len(p.requiredLabels) == 0
}

func (p *Hints) ShouldExtractPrefix(prefix string) bool {
Expand All @@ -95,19 +91,25 @@ func (p *Hints) NoLabels() bool {
}

func (p *Hints) RecordExtracted(key string) {
for _, l := range p.requiredLabels {
if l == key {
p.extracted = append(p.extracted, key)
return
}
}
p.extracted = append(p.extracted, key)
}

func (p *Hints) AllRequiredExtracted() bool {
if len(p.requiredLabels) == 0 {
if len(p.requiredLabels) == 0 || len(p.extracted) < len(p.requiredLabels) {
return false
}
return len(p.extracted) == len(p.requiredLabels)

found := map[string]interface{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to say found := make(map[string]struct{}, len(p.requiredLabels)) here. struct{} is smaller than interface{} and initializing with the max len we'll need ensures only one alloc

for _, e := range p.extracted {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might also be worth seeing if it's faster to just compare the list in two loops. It's an n^2 algorithm but it might be faster because it requires no allocs. (This function is called a lot)

It's the same reason extractedLabels and requiredLabels are []string here rather than map[string]. It's actually faster to iterate a small slice than index all these things from a map!

for _, l := range p.requiredLabels {
if e == l {
found[l] = nil
break
}
}
}

return len(p.requiredLabels) == len(found)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed because previously, RecordExtracted was only recording required fields that were extracted. As a result, it was previously acceptable to just test the length of the 2 slices against each other. However, now that we're recording all extracted labels, we have to actually compare extracted to required. I'm using a map to prevent duplicate extractions from causing an incorrect result here.

}

func (p *Hints) Reset() {
Expand Down Expand Up @@ -172,9 +174,6 @@ func NewParserHint(requiredLabelNames, groups []string, without, noLabels bool,
return ph
}

ph.requiredLabels = hints
ph.shouldPreserveError = containsError(hints)

return &Hints{requiredLabels: hints, extracted: extracted, shouldPreserveError: containsError(hints)}
}

Expand Down
20 changes: 15 additions & 5 deletions pkg/logql/log/parser_hints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ var (
"response": {
"status": 204,
"latency_seconds": "30.001"
}
},
"message": {
"message": "foo",
}
}`)

packedLine = []byte(`{
Expand Down Expand Up @@ -58,14 +61,14 @@ func Test_ParserHints(t *testing.T) {
jsonLine,
true,
1.0,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum without (request_host,app,cluster) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
jsonLine,
true,
1.0,
`{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum by (request_host,app) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
Expand Down Expand Up @@ -114,14 +117,14 @@ func Test_ParserHints(t *testing.T) {
jsonLine,
true,
30.001,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum without (request_host,app,cluster)(rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m]))`,
jsonLine,
true,
30.001,
`{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum(rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`,
Expand Down Expand Up @@ -214,6 +217,13 @@ func Test_ParserHints(t *testing.T) {
0,
``,
},
{
`sum by (message_message,app)(count_over_time({app="nginx"} | json | response_status = 204 and remote_user = "foo"[1m]))`,
jsonLine,
true,
1,
`{app="nginx", message_message="foo"}`,
},
} {
tt := tt
t.Run(tt.expr, func(t *testing.T) {
Expand Down