Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: don't throw error if unable to enrich metadata #2608

Merged
merged 8 commits into from May 17, 2023
54 changes: 22 additions & 32 deletions pkg/query-service/app/logs/v3/query_builder.go
Expand Up @@ -53,20 +53,22 @@ var logOperators = map[v3.FilterOperator]string{
// (todo) check contains/not contains/
}

func encrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.AttributeKey) (v3.AttributeKey, error) {
func encrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.AttributeKey) v3.AttributeKey {
nityanandagohain marked this conversation as resolved.
Show resolved Hide resolved
if field.Type == "" || field.DataType == "" {
// check if the field is present in the fields map
if existingField, ok := fields[field.Key]; ok {
if existingField.IsColumn {
return field, nil
return field
}
field.Type = existingField.Type
field.DataType = existingField.DataType
} else {
return field, fmt.Errorf("field not found to enrich metadata")
// enrich with default values if metadata is not found
field.Type = v3.AttributeKeyTypeTag
field.DataType = v3.AttributeKeyDataTypeString
}
}
return field, nil
return field
}

func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string {
Expand All @@ -92,13 +94,8 @@ func getClickhouseColumnName(key v3.AttributeKey, fields map[string]v3.Attribute
clickhouseColumn := key.Key
//if the key is present in the topLevelColumn then it will be only searched in those columns,
//regardless if it is indexed/present again in resource or column attribute
var err error
_, isTopLevelCol := constants.LogsTopLevelColumnsV3[key.Key]
if !isTopLevelCol && !key.IsColumn {
key, err = encrichFieldWithMetadata(key, fields)
if err != nil {
return "", err
}
columnType := getClickhouseLogsColumnType(key.Type)
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key)
Expand All @@ -113,7 +110,8 @@ func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.Attri
selectLabels = ""
} else {
for _, tag := range groupBy {
columnName, err := getClickhouseColumnName(tag, fields)
enrichedTag := encrichFieldWithMetadata(tag, fields)
columnName, err := getClickhouseColumnName(enrichedTag, fields)
if err != nil {
return "", err
}
Expand All @@ -128,33 +126,31 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,

if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
toFormat := item.Value
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
key := encrichFieldWithMetadata(item.Key, fields)
value, err := utils.ValidateAndCastValue(item.Value, key.DataType)
if err != nil {
return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err)
}
if logsOp, ok := logOperators[op]; ok {
switch op {
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
//(todo): refractor this later
key, err := encrichFieldWithMetadata(item.Key, fields)
if err != nil {
return "", err
}
columnType := getClickhouseLogsColumnType(key.Type)
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
conditions = append(conditions, fmt.Sprintf(logsOp, columnType, columnDataType, item.Key.Key))
conditions = append(conditions, fmt.Sprintf(logsOp, columnType, columnDataType, key.Key))
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
// generate the key
columnName, err := getClickhouseColumnName(item.Key, fields)
columnName, err := getClickhouseColumnName(key, fields)
if err != nil {
return "", err
}
conditions = append(conditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, logsOp, item.Value))
default:
// generate the key
columnName, err := getClickhouseColumnName(item.Key, fields)
columnName, err := getClickhouseColumnName(key, fields)
if err != nil {
return "", err
}
fmtVal := utils.ClickHouseFormattedValue(toFormat)

fmtVal := utils.ClickHouseFormattedValue(value)
conditions = append(conditions, fmt.Sprintf("%s %s %s", columnName, logsOp, fmtVal))
}
} else {
Expand All @@ -165,10 +161,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,

// add group by conditions to filter out log lines which doesn't have the key
for _, attr := range groupBy {
enrichedAttr, err := encrichFieldWithMetadata(attr, fields)
if err != nil {
return "", err
}
enrichedAttr := encrichFieldWithMetadata(attr, fields)
if !enrichedAttr.IsColumn {
columnType := getClickhouseLogsColumnType(enrichedAttr.Type)
columnDataType := getClickhouseLogsColumnDataType(enrichedAttr.DataType)
Expand Down Expand Up @@ -231,7 +224,8 @@ func buildLogsQuery(start, end, step int64, mq *v3.BuilderQuery, fields map[stri

aggregationKey := ""
if mq.AggregateAttribute.Key != "" {
aggregationKey, err = getClickhouseColumnName(mq.AggregateAttribute, fields)
enrichedAttribute := encrichFieldWithMetadata(mq.AggregateAttribute, fields)
aggregationKey, err = getClickhouseColumnName(enrichedAttribute, fields)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -269,14 +263,10 @@ func buildLogsQuery(start, end, step int64, mq *v3.BuilderQuery, fields map[stri
return query, nil
case v3.AggregateOpeatorCount:
if mq.AggregateAttribute.Key != "" {
field, err := encrichFieldWithMetadata(mq.AggregateAttribute, fields)
if err != nil {
return "", err
}
field := encrichFieldWithMetadata(mq.AggregateAttribute, fields)
columnType := getClickhouseLogsColumnType(field.Type)
columnDataType := getClickhouseLogsColumnDataType(field.DataType)
filterSubQuery = fmt.Sprintf("%s AND has(%s_%s_key, '%s')", filterSubQuery, columnType, columnDataType, mq.AggregateAttribute.Key)
// check having
}

op := "toFloat64(count(*))"
Expand Down
45 changes: 41 additions & 4 deletions pkg/query-service/app/logs/v3/query_builder_test.go
Expand Up @@ -100,6 +100,8 @@ var timeSeriesFilterQueryData = []struct {
FilterSet *v3.FilterSet
GroupBy []v3.AttributeKey
ExpectedFilter string
Fields map[string]v3.AttributeKey
Error string
}{
{
Name: "Test attribute and resource attribute",
Expand Down Expand Up @@ -173,6 +175,20 @@ var timeSeriesFilterQueryData = []struct {
}},
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'",
},
{
Name: "Test no metadata",
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "host"}, Value: "102.", Operator: "ncontains"},
}},
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'",
},
{
Name: "Test no metadata number",
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "bytes"}, Value: 102, Operator: "="},
}},
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'bytes')] = '102'",
},
{
Name: "Test groupBy",
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
Expand All @@ -189,14 +205,35 @@ var timeSeriesFilterQueryData = []struct {
GroupBy: []v3.AttributeKey{{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}},
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'",
},
{
Name: "Wrong data",
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "bytes"}, Value: true, Operator: "="},
}},
Fields: map[string]v3.AttributeKey{"bytes": {Key: "bytes", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag}},
Error: "failed to validate and cast value for bytes: invalid data type, expected float, got bool",
},
{
Name: "Cast data",
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "bytes"}, Value: 102, Operator: "="},
}},
Fields: map[string]v3.AttributeKey{"bytes": {Key: "bytes", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}},
ExpectedFilter: " AND attributes_int64_value[indexOf(attributes_int64_key, 'bytes')] = 102",
},
}

func TestBuildLogsTimeSeriesFilterQuery(t *testing.T) {
for _, tt := range timeSeriesFilterQueryData {
Convey("TestBuildLogsTimeSeriesFilterQuery", t, func() {
query, err := buildLogsTimeSeriesFilterQuery(tt.FilterSet, tt.GroupBy, map[string]v3.AttributeKey{})
So(err, ShouldBeNil)
So(query, ShouldEqual, tt.ExpectedFilter)
query, err := buildLogsTimeSeriesFilterQuery(tt.FilterSet, tt.GroupBy, tt.Fields)
if tt.Error != "" {
So(err.Error(), ShouldEqual, tt.Error)
} else {
So(err, ShouldBeNil)
So(query, ShouldEqual, tt.ExpectedFilter)
}

})
}
}
Expand Down Expand Up @@ -254,7 +291,7 @@ var testBuildLogsQueryData = []struct {
Expression: "A",
},
TableName: "logs",
ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_float64_value[indexOf(attributes_float64_key, 'bytes')] > 100 AND has(attributes_string_key, 'user_name') group by ts order by ts",
ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_float64_value[indexOf(attributes_float64_key, 'bytes')] > 100.000000 AND has(attributes_string_key, 'user_name') group by ts order by ts",
},
{
Name: "Test aggregate count distinct and order by value",
Expand Down