Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: align semantics of metric and log query label extraction #11587

Merged
merged 9 commits into from
Jan 11, 2024
5 changes: 4 additions & 1 deletion pkg/logql/log/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,13 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde
return "", false
}

if !lbs.ParserLabelHints().ShouldExtract(sanitized) {
_, alwaysExtract := keys[sanitized]
if !alwaysExtract && !lbs.ParserLabelHints().ShouldExtract(sanitized) {
return "", false
}
return sanitized, true
})

if !ok {
continue
}
Expand Down Expand Up @@ -530,6 +532,7 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde
}
}
}

if l.strict && l.dec.Err() != nil {
addErrLabel(errLogfmt, l.dec.Err(), lbs)
return line, true
Expand Down
31 changes: 15 additions & 16 deletions pkg/logql/log/parser_hints.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ type Hints struct {
}

func (p *Hints) ShouldExtract(key string) bool {
if len(p.requiredLabels) == 0 {
return true
}

for _, l := range p.extracted {
if l == key {
return false
Expand All @@ -74,7 +70,7 @@ func (p *Hints) ShouldExtract(key string) bool {
}
}

return false
return len(p.requiredLabels) == 0
}

func (p *Hints) ShouldExtractPrefix(prefix string) bool {
Expand All @@ -95,19 +91,25 @@ func (p *Hints) NoLabels() bool {
}

func (p *Hints) RecordExtracted(key string) {
for _, l := range p.requiredLabels {
if l == key {
p.extracted = append(p.extracted, key)
return
}
}
p.extracted = append(p.extracted, key)
}

func (p *Hints) AllRequiredExtracted() bool {
if len(p.requiredLabels) == 0 {
if len(p.requiredLabels) == 0 || len(p.extracted) < len(p.requiredLabels) {
return false
}
return len(p.extracted) == len(p.requiredLabels)

found := 0
for _, l := range p.requiredLabels {
for _, e := range p.extracted {
if l == e {
found++
break
}
}
}

return len(p.requiredLabels) == found
}

func (p *Hints) Reset() {
Expand Down Expand Up @@ -172,9 +174,6 @@ func NewParserHint(requiredLabelNames, groups []string, without, noLabels bool,
return ph
}

ph.requiredLabels = hints
ph.shouldPreserveError = containsError(hints)

return &Hints{requiredLabels: hints, extracted: extracted, shouldPreserveError: containsError(hints)}
}

Expand Down
20 changes: 15 additions & 5 deletions pkg/logql/log/parser_hints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ var (
"response": {
"status": 204,
"latency_seconds": "30.001"
}
},
"message": {
"message": "foo",
}
}`)

packedLine = []byte(`{
Expand Down Expand Up @@ -58,14 +61,14 @@ func Test_ParserHints(t *testing.T) {
jsonLine,
true,
1.0,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum without (request_host,app,cluster) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
jsonLine,
true,
1.0,
`{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum by (request_host,app) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
Expand Down Expand Up @@ -114,14 +117,14 @@ func Test_ParserHints(t *testing.T) {
jsonLine,
true,
30.001,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum without (request_host,app,cluster)(rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m]))`,
jsonLine,
true,
30.001,
`{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum(rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`,
Expand Down Expand Up @@ -214,6 +217,13 @@ func Test_ParserHints(t *testing.T) {
0,
``,
},
{
`sum by (message_message,app)(count_over_time({app="nginx"} | json | response_status = 204 and remote_user = "foo"[1m]))`,
jsonLine,
true,
1,
`{app="nginx", message_message="foo"}`,
},
} {
tt := tt
t.Run(tt.expr, func(t *testing.T) {
Expand Down