Skip to content

Commit

Permalink
app/vmalert: detect alerting rules which don't match any series at all (
Browse files Browse the repository at this point in the history
#4198)

app/vmalert: detect alerting rules which don't match any series at all

vmalert starts to understand /query responses which contain object:
```
"stats":{"seriesFetched": "42"}
```
If object is present, vmalert parses it and populates a new field
`SeriesFetched`. This field is then used to populate the new metric
`vmalert_alerting_rules_last_evaluation_series_fetched` and to
display warnings in the vmalert's UI.

If response doesn't contain the new object (Prometheus or
VictoriaMetrics earlier than v1.90), then `SeriesFetched=nil`.
In this case, UI will contain no additional warnings.
And `vmalert_alerting_rules_last_evaluation_series_fetched` will
be set to `-1`. Negative value of the metric will help to compile
correct alerting rule in follow-up.

Thanks for the initial implementation to @Haleygo
See #4056

See #4039

Signed-off-by: hagen1778 <roman@victoriametrics.com>
  • Loading branch information
hagen1778 authored and valyala committed May 10, 2023
1 parent 2856e15 commit 4edb97f
Show file tree
Hide file tree
Showing 18 changed files with 837 additions and 536 deletions.
19 changes: 18 additions & 1 deletion app/vmalert/README.md
Expand Up @@ -29,7 +29,8 @@ Use this feature for the following cases:
* Recording and Alerting rules backfilling (aka `replay`). See [these docs](#rules-backfilling);
* Lightweight and without extra dependencies.
* Supports [reusable templates](#reusable-templates) for annotations;
* Load of recording and alerting rules from local filesystem, GCS and S3.
* Load of recording and alerting rules from local filesystem, GCS and S3;
* Detect alerting rules which [don't match any series](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4039).

## Limitations

Expand Down Expand Up @@ -812,6 +813,22 @@ and vmalert will start printing additional log messages:
2022-09-15T13:36:56.153Z DEBUG rule "TestGroup":"Conns" (2601299393013563564) at 2022-09-15T15:36:56+02:00: alert 10705778000901301787 {alertgroup="TestGroup",alertname="Conns",cluster="east-1",instance="localhost:8429",replica="a"} PENDING => FIRING: 1m0s since becoming active at 2022-09-15 15:35:56.126006 +0200 CEST m=+39.384575417
```

### Never-firing alerts

vmalert can detect if alert's expression doesn't match any time series in runtime. This problem usually happens
when alerting expression selects time series which aren't present in the datasource (i.e. wrong `job` label)
or there is a typo in the series selector (i.e. `env=rpod`). Such alerting rules will be marked with special icon in
vmalert's UI and exposed via `vmalert_alerting_rules_last_evaluation_series_fetched` metric. The metric's value will
show how many time series were matched before the filtering by rule's expression. If metric's value is `-1`, then
this feature is not supported by the datasource (old versions of VictoriaMetrics). The following expression can be
used to detect rules matching no series:
```
max(vmalert_alerting_rules_last_evaluation_series_fetched) by(group, alertname) == 0
```

See more details [here](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4039).
This feature is available only if vmalert is using VictoriaMetrics v1.90 or higher as a datasource.


## Profiling

Expand Down
78 changes: 46 additions & 32 deletions app/vmalert/alerting.go
Expand Up @@ -47,10 +47,11 @@ type AlertingRule struct {
}

type alertingRuleMetrics struct {
errors *utils.Gauge
pending *utils.Gauge
active *utils.Gauge
samples *utils.Gauge
errors *utils.Gauge
pending *utils.Gauge
active *utils.Gauge
samples *utils.Gauge
seriesFetched *utils.Gauge
}

func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule) *AlertingRule {
Expand Down Expand Up @@ -121,6 +122,15 @@ func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
e := ar.state.getLast()
return float64(e.samples)
})
ar.metrics.seriesFetched = utils.GetOrCreateGauge(fmt.Sprintf(`vmalert_alerting_rules_last_evaluation_series_fetched{%s}`, labels),
func() float64 {
e := ar.state.getLast()
if e.seriesFetched == nil {
// means seriesFetched is unsupported
return -1
}
return float64(*e.seriesFetched)
})
return ar
}

Expand All @@ -130,6 +140,7 @@ func (ar *AlertingRule) Close() {
ar.metrics.pending.Unregister()
ar.metrics.errors.Unregister()
ar.metrics.samples.Unregister()
ar.metrics.seriesFetched.Unregister()
}

// String implements Stringer interface
Expand Down Expand Up @@ -234,15 +245,15 @@ func (ar *AlertingRule) toLabels(m datasource.Metric, qFn templates.QueryFn) (*l
// to get time series for backfilling.
// It returns ALERT and ALERT_FOR_STATE time series as result.
func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) {
series, err := ar.q.QueryRange(ctx, ar.Expr, start, end)
res, err := ar.q.QueryRange(ctx, ar.Expr, start, end)
if err != nil {
return nil, err
}
var result []prompbmarshal.TimeSeries
qFn := func(query string) ([]datasource.Metric, error) {
return nil, fmt.Errorf("`query` template isn't supported in replay mode")
}
for _, s := range series {
for _, s := range res.Data {
a, err := ar.newAlert(s, nil, time.Time{}, qFn) // initial alert
if err != nil {
return nil, fmt.Errorf("failed to create alert: %s", err)
Expand Down Expand Up @@ -282,14 +293,15 @@ const resolvedRetention = 15 * time.Minute
// Based on the Querier results AlertingRule maintains notifier.Alerts
func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time, limit int) ([]prompbmarshal.TimeSeries, error) {
start := time.Now()
qMetrics, req, err := ar.q.Query(ctx, ar.Expr, ts)
res, req, err := ar.q.Query(ctx, ar.Expr, ts)
curState := ruleStateEntry{
time: start,
at: ts,
duration: time.Since(start),
samples: len(qMetrics),
err: err,
curl: requestToCurl(req),
time: start,
at: ts,
duration: time.Since(start),
samples: len(res.Data),
seriesFetched: res.SeriesFetched,
err: err,
curl: requestToCurl(req),
}

defer func() {
Expand All @@ -315,11 +327,11 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time, limit int) ([]pr

qFn := func(query string) ([]datasource.Metric, error) {
res, _, err := ar.q.Query(ctx, query, ts)
return res, err
return res.Data, err
}
updated := make(map[uint64]struct{})
// update list of active alerts
for _, m := range qMetrics {
for _, m := range res.Data {
ls, err := ar.toLabels(m, qFn)
if err != nil {
curState.err = fmt.Errorf("failed to expand labels: %s", err)
Expand Down Expand Up @@ -485,22 +497,23 @@ func (ar *AlertingRule) AlertAPI(id uint64) *APIAlert {
func (ar *AlertingRule) ToAPI() APIRule {
lastState := ar.state.getLast()
r := APIRule{
Type: "alerting",
DatasourceType: ar.Type.String(),
Name: ar.Name,
Query: ar.Expr,
Duration: ar.For.Seconds(),
Labels: ar.Labels,
Annotations: ar.Annotations,
LastEvaluation: lastState.time,
EvaluationTime: lastState.duration.Seconds(),
Health: "ok",
State: "inactive",
Alerts: ar.AlertsToAPI(),
LastSamples: lastState.samples,
MaxUpdates: ar.state.size(),
Updates: ar.state.getAll(),
Debug: ar.Debug,
Type: "alerting",
DatasourceType: ar.Type.String(),
Name: ar.Name,
Query: ar.Expr,
Duration: ar.For.Seconds(),
Labels: ar.Labels,
Annotations: ar.Annotations,
LastEvaluation: lastState.time,
EvaluationTime: lastState.duration.Seconds(),
Health: "ok",
State: "inactive",
Alerts: ar.AlertsToAPI(),
LastSamples: lastState.samples,
LastSeriesFetched: lastState.seriesFetched,
MaxUpdates: ar.state.size(),
Updates: ar.state.getAll(),
Debug: ar.Debug,

// encode as strings to avoid rounding in JSON
ID: fmt.Sprintf("%d", ar.ID()),
Expand Down Expand Up @@ -637,11 +650,12 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, ts ti

ar.logDebugf(ts, nil, "restoring alert state via query %q", expr)

qMetrics, _, err := q.Query(ctx, expr, ts)
res, _, err := q.Query(ctx, expr, ts)
if err != nil {
return err
}

qMetrics := res.Data
if len(qMetrics) < 1 {
ar.logDebugf(ts, nil, "no response was received from restore query")
continue
Expand Down
15 changes: 13 additions & 2 deletions app/vmalert/datasource/datasource.go
Expand Up @@ -13,11 +13,22 @@ type Querier interface {
// It returns list of Metric in response, the http.Request used for sending query
// and error if any. Returned http.Request can't be reused and its body is already read.
// Query should stop once ctx is cancelled.
Query(ctx context.Context, query string, ts time.Time) ([]Metric, *http.Request, error)
Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error)
// QueryRange executes range request with the given query on the given time range.
// It returns list of Metric in response and error if any.
// QueryRange should stop once ctx is cancelled.
QueryRange(ctx context.Context, query string, from, to time.Time) ([]Metric, error)
QueryRange(ctx context.Context, query string, from, to time.Time) (Result, error)
}

// Result represents expected response from the datasource
type Result struct {
// Data contains list of received Metric
Data []Metric
// SeriesFetched contains amount of time series processed by datasource
// during query evaluation.
// If nil, then this feature is not supported by the datasource.
// SeriesFetched is supported by VictoriaMetrics since v1.90.
SeriesFetched *int
}

// QuerierBuilder builds Querier with given params.
Expand Down
20 changes: 10 additions & 10 deletions app/vmalert/datasource/vm.go
Expand Up @@ -99,10 +99,10 @@ func NewVMStorage(baseURL string, authCfg *promauth.Config, lookBack time.Durati
}

// Query executes the given query and returns parsed response
func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) ([]Metric, *http.Request, error) {
func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) {
req, err := s.newRequestPOST()
if err != nil {
return nil, nil, err
return Result{}, nil, err
}

switch s.dataSourceType {
Expand All @@ -111,12 +111,12 @@ func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) ([]Me
case datasourceGraphite:
s.setGraphiteReqParams(req, query, ts)
default:
return nil, nil, fmt.Errorf("engine not found: %q", s.dataSourceType)
return Result{}, nil, fmt.Errorf("engine not found: %q", s.dataSourceType)
}

resp, err := s.do(ctx, req)
if err != nil {
return nil, req, err
return Result{}, req, err
}
defer func() {
_ = resp.Body.Close()
Expand All @@ -133,24 +133,24 @@ func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) ([]Me
// QueryRange executes the given query on the given time range.
// For Prometheus type see https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
// Graphite type isn't supported.
func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end time.Time) ([]Metric, error) {
func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end time.Time) (res Result, err error) {
if s.dataSourceType != datasourcePrometheus {
return nil, fmt.Errorf("%q is not supported for QueryRange", s.dataSourceType)
return res, fmt.Errorf("%q is not supported for QueryRange", s.dataSourceType)
}
req, err := s.newRequestPOST()
if err != nil {
return nil, err
return res, err
}
if start.IsZero() {
return nil, fmt.Errorf("start param is missing")
return res, fmt.Errorf("start param is missing")
}
if end.IsZero() {
return nil, fmt.Errorf("end param is missing")
return res, fmt.Errorf("end param is missing")
}
s.setPrometheusRangeReqParams(req, query, start, end)
resp, err := s.do(ctx, req)
if err != nil {
return nil, err
return res, err
}
defer func() {
_ = resp.Body.Close()
Expand Down
6 changes: 3 additions & 3 deletions app/vmalert/datasource/vm_graphite_api.go
Expand Up @@ -35,12 +35,12 @@ func (r graphiteResponse) metrics() []Metric {
return ms
}

func parseGraphiteResponse(req *http.Request, resp *http.Response) ([]Metric, error) {
func parseGraphiteResponse(req *http.Request, resp *http.Response) (Result, error) {
r := &graphiteResponse{}
if err := json.NewDecoder(resp.Body).Decode(r); err != nil {
return nil, fmt.Errorf("error parsing graphite metrics for %s: %w", req.URL.Redacted(), err)
return Result{}, fmt.Errorf("error parsing graphite metrics for %s: %w", req.URL.Redacted(), err)
}
return r.metrics(), nil
return Result{Data: r.metrics()}, nil
}

const (
Expand Down
43 changes: 31 additions & 12 deletions app/vmalert/datasource/vm_prom_api.go
Expand Up @@ -22,6 +22,10 @@ type promResponse struct {
ResultType string `json:"resultType"`
Result json.RawMessage `json:"result"`
} `json:"data"`
// Stats supported by VictoriaMetrics since v1.90
Stats struct {
SeriesFetched *string `json:"seriesFetched,omitempty"`
} `json:"stats,omitempty"`
}

type promInstant struct {
Expand Down Expand Up @@ -96,39 +100,54 @@ const (
rtVector, rtMatrix, rScalar = "vector", "matrix", "scalar"
)

func parsePrometheusResponse(req *http.Request, resp *http.Response) ([]Metric, error) {
func parsePrometheusResponse(req *http.Request, resp *http.Response) (res Result, err error) {
r := &promResponse{}
if err := json.NewDecoder(resp.Body).Decode(r); err != nil {
return nil, fmt.Errorf("error parsing prometheus metrics for %s: %w", req.URL.Redacted(), err)
if err = json.NewDecoder(resp.Body).Decode(r); err != nil {
return res, fmt.Errorf("error parsing prometheus metrics for %s: %w", req.URL.Redacted(), err)
}
if r.Status == statusError {
return nil, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL.Redacted(), r.ErrorType, r.Error)
return res, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL.Redacted(), r.ErrorType, r.Error)
}
if r.Status != statusSuccess {
return nil, fmt.Errorf("unknown status: %s, Expected success or error ", r.Status)
return res, fmt.Errorf("unknown status: %s, Expected success or error ", r.Status)
}
var parseFn func() ([]Metric, error)
switch r.Data.ResultType {
case rtVector:
var pi promInstant
if err := json.Unmarshal(r.Data.Result, &pi.Result); err != nil {
return nil, fmt.Errorf("umarshal err %s; \n %#v", err, string(r.Data.Result))
return res, fmt.Errorf("umarshal err %s; \n %#v", err, string(r.Data.Result))
}
return pi.metrics()
parseFn = pi.metrics
case rtMatrix:
var pr promRange
if err := json.Unmarshal(r.Data.Result, &pr.Result); err != nil {
return nil, err
return res, err
}
return pr.metrics()
parseFn = pr.metrics
case rScalar:
var ps promScalar
if err := json.Unmarshal(r.Data.Result, &ps); err != nil {
return nil, err
return res, err
}
return ps.metrics()
parseFn = ps.metrics
default:
return nil, fmt.Errorf("unknown result type %q", r.Data.ResultType)
return res, fmt.Errorf("unknown result type %q", r.Data.ResultType)
}

ms, err := parseFn()
if err != nil {
return res, err
}
res = Result{Data: ms}
if r.Stats.SeriesFetched != nil {
intV, err := strconv.Atoi(*r.Stats.SeriesFetched)
if err != nil {
return res, fmt.Errorf("failed to convert stats.seriesFetched to int: %w", err)
}
res.SeriesFetched = &intV
}
return res, nil
}

func (s *VMStorage) setPrometheusInstantReqParams(r *http.Request, query string, timestamp time.Time) {
Expand Down

0 comments on commit 4edb97f

Please sign in to comment.