Skip to content

Commit

Permalink
feat(detectedFields): add parser to response (#12872)
Browse files Browse the repository at this point in the history
  • Loading branch information
svennergr committed May 3, 2024
1 parent 11e02cc commit 2b3ae48
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 178 deletions.
1 change: 1 addition & 0 deletions pkg/loghttp/detected.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ type DetectedField struct {
Label string `json:"label,omitempty"`
Type logproto.DetectedFieldType `json:"type,omitempty"`
Cardinality uint64 `json:"cardinality,omitempty"`
Parser string `json:"parser,omitempty"`
}
396 changes: 226 additions & 170 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ message DetectedField {
string label = 1;
string type = 2 [(gogoproto.casttype) = "DetectedFieldType"];
uint64 cardinality = 3;
bytes sketch = 4 [(gogoproto.jsontag) = "sketch,omitempty"];
string parser = 4;
bytes sketch = 5 [(gogoproto.jsontag) = "sketch,omitempty"];
}

message DetectedLabelsRequest {
Expand Down
24 changes: 17 additions & 7 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,7 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
Type: v.fieldType,
Cardinality: v.Estimate(),
Sketch: sketch,
Parser: v.parser,
}

fieldCount++
Expand All @@ -1124,13 +1125,19 @@ type parsedFields struct {
sketch *hyperloglog.Sketch
isTypeDetected bool
fieldType logproto.DetectedFieldType
parser string
}

func newParsedFields() *parsedFields {
func newParsedFields(parser *string) *parsedFields {
p := ""
if parser != nil {
p = *parser
}
return &parsedFields{
sketch: hyperloglog.New(),
isTypeDetected: false,
fieldType: logproto.DetectedFieldString,
parser: p,
}
}

Expand Down Expand Up @@ -1185,11 +1192,12 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
"msg", fmt.Sprintf("looking for detected fields in stream %d with %d lines", stream.Hash, len(stream.Entries)))

for _, entry := range stream.Entries {
detected := parseLine(entry.Line)
detected, parser := parseLine(entry.Line)
for k, vals := range detected {
df, ok := detectedFields[k]
if !ok && fieldCount < limit {
df = newParsedFields()

df = newParsedFields(parser)
detectedFields[k] = df
fieldCount++
}
Expand Down Expand Up @@ -1217,17 +1225,19 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
return detectedFields
}

func parseLine(line string) map[string][]string {
func parseLine(line string) (map[string][]string, *string) {
parser := "logfmt"
logFmtParser := logql_log.NewLogfmtParser(true, false)
jsonParser := logql_log.NewJSONParser()

lbls := logql_log.NewBaseLabelsBuilder().ForLabels(labels.EmptyLabels(), 0)
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls)
if !logfmtSuccess || lbls.HasErr() {
parser = "json"
jsonParser := logql_log.NewJSONParser()
lbls.Reset()
_, jsonSuccess := jsonParser.Process(0, []byte(line), lbls)
if !jsonSuccess || lbls.HasErr() {
return map[string][]string{}
return map[string][]string{}, nil
}
}

Expand All @@ -1249,7 +1259,7 @@ func parseLine(line string) map[string][]string {
result[lbl] = vals
}

return result
return result, &parser
}

// streamsForFieldDetection reads the streams from the iterator and returns them sorted.
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/detected/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
type UnmarshaledDetectedField struct {
Label string
Type logproto.DetectedFieldType
Parser string
Sketch *hyperloglog.Sketch
}

Expand All @@ -22,6 +23,7 @@ func UnmarshalDetectedField(f *logproto.DetectedField) (*UnmarshaledDetectedFiel
return &UnmarshaledDetectedField{
Label: f.Label,
Type: f.Type,
Parser: f.Parser,
Sketch: sketch,
}, nil
}
Expand Down Expand Up @@ -77,6 +79,7 @@ func MergeFields(
Label: field.Label,
Type: field.Type,
Cardinality: field.Sketch.Estimate(),
Parser: field.Parser,
Sketch: nil,
}
result = append(result, detectedField)
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/detected/fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func Test_MergeFields(t *testing.T) {
Type: logproto.DetectedFieldString,
Cardinality: 1,
Sketch: marshalledFooSketch,
Parser: "logfmt",
},
{
Label: "bar",
Expand Down Expand Up @@ -65,6 +66,7 @@ func Test_MergeFields(t *testing.T) {

assert.Equal(t, logproto.DetectedFieldString, foo.Type)
assert.Equal(t, uint64(3), foo.Cardinality)
assert.Equal(t, "logfmt", foo.Parser)
})

t.Run("returns up to limit number of fields", func(t *testing.T) {
Expand Down

0 comments on commit 2b3ae48

Please sign in to comment.