Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: use querier in alerts #5045

Merged
merged 6 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ ee/query-service/signoz.db
ee/query-service/tests/test-deploy/data/

# local data
*.backup
*.db
/deploy/docker/clickhouse-setup/data/
/deploy/docker-swarm/clickhouse-setup/data/
Expand Down
3 changes: 2 additions & 1 deletion ee/query-service/app/api/dashboard.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package api

import (
"net/http"

"github.com/gorilla/mux"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/model"
"net/http"
)

func (ah *APIHandler) lockDashboard(w http.ResponseWriter, r *http.Request) {
Expand Down
1 change: 1 addition & 0 deletions ee/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ func makeRulesManager(
Logger: nil,
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
}

// create Manager
Expand Down
110 changes: 4 additions & 106 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"text/template"
"time"

"github.com/SigNoz/govaluate"
"github.com/gorilla/mux"
jsoniter "github.com/json-iterator/go"
_ "github.com/mattn/go-sqlite3"
Expand All @@ -38,6 +37,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"

"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -3160,13 +3160,13 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
return
}

applyMetricLimit(result, queryRangeParams)
postprocess.ApplyMetricLimit(result, queryRangeParams)

sendQueryResultEvents(r, result, queryRangeParams)
// only adding applyFunctions instead of postProcess since experssion are
// are executed in clickhouse directly and we wanted to add support for timeshift
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
applyFunctions(result, queryRangeParams)
postprocess.ApplyFunctions(result, queryRangeParams)
}

resp := v3.QueryRangeResponse{
Expand Down Expand Up @@ -3418,7 +3418,7 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que

if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {

result, err = postProcessResult(result, queryRangeParams)
result, err = postprocess.PostProcessResult(result, queryRangeParams)
}

if err != nil {
Expand Down Expand Up @@ -3453,105 +3453,3 @@ func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) {

aH.queryRangeV4(r.Context(), queryRangeParams, w, r)
}

// postProcessResult applies having clause, metric limit, reduce function to the result
// This function is effective for metrics data source for now, but it can be extended to other data sources
// if needed
// Much of this work can be done in the ClickHouse query, but we decided to do it here because:
// 1. Effective use of caching
// 2. Easier to add new functions
func postProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) ([]*v3.Result, error) {
// Having clause is not part of the clickhouse query, so we need to apply it here
// It's not included in the query because it doesn't work nicely with caching
// With this change, if you have a query with a having clause, and then you change the having clause
// to something else, the query will still be cached.
applyHavingClause(result, queryRangeParams)
// We apply the metric limit here because it's not part of the clickhouse query
// The limit in the context of the time series query is the number of time series
// So for the limit to work, we need to know what series to keep and what to discard
// For us to know that, we need to execute the query first, and then apply the limit
// which we found expensive, because we are executing the query twice on the same data
// So we decided to apply the limit here, after the query is executed
// The function is named applyMetricLimit because it only applies to metrics data source
// In traces and logs, the limit is achieved using subqueries
applyMetricLimit(result, queryRangeParams)
// Each series in the result produces N number of points, where N is (end - start) / step
// For the panel type table, we need to show one point for each series in the row
// We do that by applying a reduce function to each series
applyReduceTo(result, queryRangeParams)
// We apply the functions here it's easier to add new functions
applyFunctions(result, queryRangeParams)

// expressions are executed at query serivce so the value of time.now in the invdividual
// queries will be different so for table panel we are making it same.
if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeTable {
tablePanelResultProcessor(result)
}

for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
// The way we distinguish between a formula and a query is by checking if the expression
// is the same as the query name
// TODO(srikanthccv): Update the UI to send a flag to distinguish between a formula and a query
if query.Expression != query.QueryName {
expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, evalFuncs())
// This shouldn't happen here, because it should have been caught earlier in validation
if err != nil {
zap.L().Error("error in expression", zap.Error(err))
return nil, err
}
formulaResult, err := processResults(result, expression)
if err != nil {
zap.L().Error("error in expression", zap.Error(err))
return nil, err
}
formulaResult.QueryName = query.QueryName
result = append(result, formulaResult)
}
}
// we are done with the formula calculations, only send the results for enabled queries
removeDisabledQueries := func(result []*v3.Result) []*v3.Result {
var newResult []*v3.Result
for _, res := range result {
if queryRangeParams.CompositeQuery.BuilderQueries[res.QueryName].Disabled {
continue
}
newResult = append(newResult, res)
}
return newResult
}
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
result = removeDisabledQueries(result)
}
return result, nil
}

// applyFunctions applies functions for each query in the composite query
// The functions can be more than one, and they are applied in the order they are defined
func applyFunctions(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
for idx, result := range results {
builderQueries := queryRangeParams.CompositeQuery.BuilderQueries

if builderQueries != nil {
functions := builderQueries[result.QueryName].Functions

for _, function := range functions {
results[idx] = queryBuilder.ApplyFunction(function, result)
}
}
}
}

func tablePanelResultProcessor(results []*v3.Result) {
var ts int64
for ridx := range results {
for sidx := range results[ridx].Series {
for pidx := range results[ridx].Series[sidx].Points {
if ts == 0 {
ts = results[ridx].Series[sidx].Points[pidx].Timestamp
} else {
results[ridx].Series[sidx].Points[pidx].Timestamp = ts
}
}
}
}
}
27 changes: 27 additions & 0 deletions pkg/query-service/app/metrics/v3/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,33 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
adjustStep := int64(math.Min(float64(mq.StepInterval), 60))
end = end - (end % (adjustStep * 1000))

// if the aggregate operator is a histogram quantile, and user has not forgotten
// the le tag in the group by then add the le tag to the group by
if mq.AggregateOperator == v3.AggregateOperatorHistQuant50 ||
mq.AggregateOperator == v3.AggregateOperatorHistQuant75 ||
mq.AggregateOperator == v3.AggregateOperatorHistQuant90 ||
mq.AggregateOperator == v3.AggregateOperatorHistQuant95 ||
mq.AggregateOperator == v3.AggregateOperatorHistQuant99 {
found := false
for _, tag := range mq.GroupBy {
if tag.Key == "le" {
found = true
break
}
}
if !found {
mq.GroupBy = append(
mq.GroupBy,
v3.AttributeKey{
Key: "le",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
)
}
}

var query string
var err error
if mq.Temporality == v3.Delta {
Expand Down
31 changes: 2 additions & 29 deletions pkg/query-service/app/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"go.signoz.io/signoz/pkg/query-service/utils"
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
)
Expand Down Expand Up @@ -1007,7 +1008,7 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
// Formula query
// Check if the queries used in the expression can be joined
if query.QueryName != query.Expression {
expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, evalFuncs())
expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, postprocess.EvalFuncs())
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
Expand Down Expand Up @@ -1065,34 +1066,6 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
}
query.ShiftBy = timeShiftBy

// for metrics v3
// if the aggregate operator is a histogram quantile, and user has not forgotten
// the le tag in the group by then add the le tag to the group by
if query.AggregateOperator == v3.AggregateOperatorHistQuant50 ||
query.AggregateOperator == v3.AggregateOperatorHistQuant75 ||
query.AggregateOperator == v3.AggregateOperatorHistQuant90 ||
query.AggregateOperator == v3.AggregateOperatorHistQuant95 ||
query.AggregateOperator == v3.AggregateOperatorHistQuant99 {
found := false
for _, tag := range query.GroupBy {
if tag.Key == "le" {
found = true
break
}
}
if !found {
query.GroupBy = append(
query.GroupBy,
v3.AttributeKey{
Key: "le",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
},
)
}
}

if query.Filters == nil || len(query.Filters.Items) == 0 {
continue
}
Expand Down
1 change: 1 addition & 0 deletions pkg/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ func makeRulesManager(
Logger: nil,
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
}

// create Manager
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package app
package postprocess

import (
"fmt"
Expand Down Expand Up @@ -162,7 +162,7 @@ func processResults(results []*v3.Result, expression *govaluate.EvaluableExpress

var SupportedFunctions = []string{"exp", "log", "ln", "exp2", "log2", "exp10", "log10", "sqrt", "cbrt", "erf", "erfc", "lgamma", "tgamma", "sin", "cos", "tan", "asin", "acos", "atan", "degrees", "radians", "now", "toUnixTimestamp"}

func evalFuncs() map[string]govaluate.ExpressionFunction {
func EvalFuncs() map[string]govaluate.ExpressionFunction {
GoValuateFuncs := make(map[string]govaluate.ExpressionFunction)
// Returns e to the power of the given argument.
GoValuateFuncs["exp"] = func(args ...interface{}) (interface{}, error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package app
package postprocess

import (
"math"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package app
package postprocess

import (
"strings"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package app
package postprocess

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package app
package postprocess

import (
"math"
Expand All @@ -9,8 +9,8 @@ import (
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)

// applyMetricLimit applies limit to the metrics query results
func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
// ApplyMetricLimit applies limit to the metrics query results
func ApplyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
// apply limit if any for metrics
// use the grouping set points to apply the limit

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package app
package postprocess

import (
"testing"
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestApplyLimitOnMetricResult(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
result := c.inputResult
applyMetricLimit(result, c.params)
ApplyMetricLimit(result, c.params)
if len(result) != len(c.expectedResult) {
t.Errorf("expected result length: %d, but got: %d", len(c.expectedResult), len(result))
}
Expand Down
Loading
Loading