Skip to content

Commit

Permalink
feat: don't throw error if unable to enrich metadata (#2608)
Browse files Browse the repository at this point in the history
* feat: don't throw error is unable to enrich metadata

* feat: remove isDefaultEnriched key

* feat: validate and cast added

* feat: function name corrected

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
  • Loading branch information
nityanandagohain and srikanthccv committed May 17, 2023
1 parent b6a455d commit d1a256a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 36 deletions.
54 changes: 22 additions & 32 deletions pkg/query-service/app/logs/v3/query_builder.go
Expand Up @@ -53,20 +53,22 @@ var logOperators = map[v3.FilterOperator]string{
// (todo) check contains/not contains/
}

func encrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.AttributeKey) (v3.AttributeKey, error) {
func enrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.AttributeKey) v3.AttributeKey {
if field.Type == "" || field.DataType == "" {
// check if the field is present in the fields map
if existingField, ok := fields[field.Key]; ok {
if existingField.IsColumn {
return field, nil
return field
}
field.Type = existingField.Type
field.DataType = existingField.DataType
} else {
return field, fmt.Errorf("field not found to enrich metadata")
// enrich with default values if metadata is not found
field.Type = v3.AttributeKeyTypeTag
field.DataType = v3.AttributeKeyDataTypeString
}
}
return field, nil
return field
}

func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string {
Expand All @@ -92,13 +94,8 @@ func getClickhouseColumnName(key v3.AttributeKey, fields map[string]v3.Attribute
clickhouseColumn := key.Key
//if the key is present in the topLevelColumn then it will be only searched in those columns,
//regardless if it is indexed/present again in resource or column attribute
var err error
_, isTopLevelCol := constants.LogsTopLevelColumnsV3[key.Key]
if !isTopLevelCol && !key.IsColumn {
key, err = encrichFieldWithMetadata(key, fields)
if err != nil {
return "", err
}
columnType := getClickhouseLogsColumnType(key.Type)
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key)
Expand All @@ -113,7 +110,8 @@ func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.Attri
selectLabels = ""
} else {
for _, tag := range groupBy {
columnName, err := getClickhouseColumnName(tag, fields)
enrichedTag := enrichFieldWithMetadata(tag, fields)
columnName, err := getClickhouseColumnName(enrichedTag, fields)
if err != nil {
return "", err
}
Expand All @@ -128,33 +126,31 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,

if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
toFormat := item.Value
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
key := enrichFieldWithMetadata(item.Key, fields)
value, err := utils.ValidateAndCastValue(item.Value, key.DataType)
if err != nil {
return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err)
}
if logsOp, ok := logOperators[op]; ok {
switch op {
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
//(todo): refractor this later
key, err := encrichFieldWithMetadata(item.Key, fields)
if err != nil {
return "", err
}
columnType := getClickhouseLogsColumnType(key.Type)
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
conditions = append(conditions, fmt.Sprintf(logsOp, columnType, columnDataType, item.Key.Key))
conditions = append(conditions, fmt.Sprintf(logsOp, columnType, columnDataType, key.Key))
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
// generate the key
columnName, err := getClickhouseColumnName(item.Key, fields)
columnName, err := getClickhouseColumnName(key, fields)
if err != nil {
return "", err
}
conditions = append(conditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, logsOp, item.Value))
default:
// generate the key
columnName, err := getClickhouseColumnName(item.Key, fields)
columnName, err := getClickhouseColumnName(key, fields)
if err != nil {
return "", err
}
fmtVal := utils.ClickHouseFormattedValue(toFormat)

fmtVal := utils.ClickHouseFormattedValue(value)
conditions = append(conditions, fmt.Sprintf("%s %s %s", columnName, logsOp, fmtVal))
}
} else {
Expand All @@ -165,10 +161,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,

// add group by conditions to filter out log lines which doesn't have the key
for _, attr := range groupBy {
enrichedAttr, err := encrichFieldWithMetadata(attr, fields)
if err != nil {
return "", err
}
enrichedAttr := enrichFieldWithMetadata(attr, fields)
if !enrichedAttr.IsColumn {
columnType := getClickhouseLogsColumnType(enrichedAttr.Type)
columnDataType := getClickhouseLogsColumnDataType(enrichedAttr.DataType)
Expand Down Expand Up @@ -231,7 +224,8 @@ func buildLogsQuery(start, end, step int64, mq *v3.BuilderQuery, fields map[stri

aggregationKey := ""
if mq.AggregateAttribute.Key != "" {
aggregationKey, err = getClickhouseColumnName(mq.AggregateAttribute, fields)
enrichedAttribute := enrichFieldWithMetadata(mq.AggregateAttribute, fields)
aggregationKey, err = getClickhouseColumnName(enrichedAttribute, fields)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -269,14 +263,10 @@ func buildLogsQuery(start, end, step int64, mq *v3.BuilderQuery, fields map[stri
return query, nil
case v3.AggregateOperatorCount:
if mq.AggregateAttribute.Key != "" {
field, err := encrichFieldWithMetadata(mq.AggregateAttribute, fields)
if err != nil {
return "", err
}
field := enrichFieldWithMetadata(mq.AggregateAttribute, fields)
columnType := getClickhouseLogsColumnType(field.Type)
columnDataType := getClickhouseLogsColumnDataType(field.DataType)
filterSubQuery = fmt.Sprintf("%s AND has(%s_%s_key, '%s')", filterSubQuery, columnType, columnDataType, mq.AggregateAttribute.Key)
// check having
}

op := "toFloat64(count(*))"
Expand Down
45 changes: 41 additions & 4 deletions pkg/query-service/app/logs/v3/query_builder_test.go
Expand Up @@ -100,6 +100,8 @@ var timeSeriesFilterQueryData = []struct {
FilterSet *v3.FilterSet
GroupBy []v3.AttributeKey
ExpectedFilter string
Fields map[string]v3.AttributeKey
Error string
}{
{
Name: "Test attribute and resource attribute",
Expand Down Expand Up @@ -173,6 +175,20 @@ var timeSeriesFilterQueryData = []struct {
}},
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'",
},
{
Name: "Test no metadata",
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "host"}, Value: "102.", Operator: "ncontains"},
}},
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'",
},
{
Name: "Test no metadata number",
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "bytes"}, Value: 102, Operator: "="},
}},
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'bytes')] = '102'",
},
{
Name: "Test groupBy",
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
Expand All @@ -189,14 +205,35 @@ var timeSeriesFilterQueryData = []struct {
GroupBy: []v3.AttributeKey{{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}},
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'",
},
{
Name: "Wrong data",
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "bytes"}, Value: true, Operator: "="},
}},
Fields: map[string]v3.AttributeKey{"bytes": {Key: "bytes", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag}},
Error: "failed to validate and cast value for bytes: invalid data type, expected float, got bool",
},
{
Name: "Cast data",
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "bytes"}, Value: 102, Operator: "="},
}},
Fields: map[string]v3.AttributeKey{"bytes": {Key: "bytes", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}},
ExpectedFilter: " AND attributes_int64_value[indexOf(attributes_int64_key, 'bytes')] = 102",
},
}

func TestBuildLogsTimeSeriesFilterQuery(t *testing.T) {
for _, tt := range timeSeriesFilterQueryData {
Convey("TestBuildLogsTimeSeriesFilterQuery", t, func() {
query, err := buildLogsTimeSeriesFilterQuery(tt.FilterSet, tt.GroupBy, map[string]v3.AttributeKey{})
So(err, ShouldBeNil)
So(query, ShouldEqual, tt.ExpectedFilter)
query, err := buildLogsTimeSeriesFilterQuery(tt.FilterSet, tt.GroupBy, tt.Fields)
if tt.Error != "" {
So(err.Error(), ShouldEqual, tt.Error)
} else {
So(err, ShouldBeNil)
So(query, ShouldEqual, tt.ExpectedFilter)
}

})
}
}
Expand Down Expand Up @@ -254,7 +291,7 @@ var testBuildLogsQueryData = []struct {
Expression: "A",
},
TableName: "logs",
ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_float64_value[indexOf(attributes_float64_key, 'bytes')] > 100 AND has(attributes_string_key, 'user_name') group by ts order by ts",
ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_float64_value[indexOf(attributes_float64_key, 'bytes')] > 100.000000 AND has(attributes_string_key, 'user_name') group by ts order by ts",
},
{
Name: "Test aggregate count distinct and order by value",
Expand Down

0 comments on commit d1a256a

Please sign in to comment.