Skip to content

Commit

Permalink
vmalert: add evalAlignment for rule group and fix evalutaion timsta…
Browse files Browse the repository at this point in the history
…mp (#5066)

* vmalert: add `query_time_alignment` for rule group

1. add `eval_alignment` attribute for group which by default is true. So group rule query stamp will be aligned with interval and propagated to ALERT metrics and the messages for alertmanager;
2. deprecate `datasource.queryTimeAlignment` flag.

#5049
(cherry picked from commit 2aa0f5f)
  • Loading branch information
Haleygo authored and hagen1778 committed Oct 10, 2023
1 parent 1cc6cd3 commit b52f1d1
Show file tree
Hide file tree
Showing 16 changed files with 148 additions and 151 deletions.
14 changes: 10 additions & 4 deletions app/vmalert/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ name: <string>
# By default, "prometheus" type is used.
[ type: <string> ]

# Optional
# The evaluation timestamp will be aligned with group's interval,
# instead of using the actual timestamp that evaluation happens at.
# By default, it's enabled to get more predictable results
# and to visually align with results plotted via Grafana or vmui.
# See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5049
# Available starting from v1.95
[ eval_alignment: <bool> | default true]

# Optional list of HTTP URL parameters
# applied for all rules requests within a group
# For example:
Expand Down Expand Up @@ -527,10 +536,6 @@ Alertmanagers.
To avoid recording rules results and alerts state duplication in VictoriaMetrics server
don't forget to configure [deduplication](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#deduplication).
The recommended value for `-dedup.minScrapeInterval` must be multiple of vmalert's `-evaluationInterval`.
If you observe inconsistent or "jumping" values in series produced by vmalert, try disabling `-datasource.queryTimeAlignment`
command line flag. Because of alignment, two or more vmalert HA pairs will produce results with the same timestamps.
But due of backfilling (data delivered to the datasource with some delay) values of such results may differ,
which would affect deduplication logic and result into "jumping" datapoints.

Alertmanager will automatically deduplicate alerts with identical labels, so ensure that
all `vmalert`s are having the same config.
Expand Down Expand Up @@ -975,6 +980,7 @@ The shortlist of configuration flags is the following:
-datasource.queryStep duration
How far a value can fallback to when evaluating queries. For example, if -datasource.queryStep=15s then param "step" with value "15s" will be added to every query. If set to 0, rule's evaluation interval will be used instead. (default 5m0s)
-datasource.queryTimeAlignment
Flag is deprecated and will be removed in next releases, please use `eval_alignment` in rule group instead.
Whether to align "time" parameter with evaluation interval.Alignment supposed to produce deterministic results despite number of vmalert replicas or time they were started. See more details here https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1257 (default true)
-datasource.roundDigits int
Adds "round_digits" GET param to datasource requests. In VM "round_digits" limits the number of digits after the decimal point in response values.
Expand Down
1 change: 0 additions & 1 deletion app/vmalert/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
q: qb.BuildWithParams(datasource.QuerierParams{
DataSourceType: group.Type.String(),
EvaluationInterval: group.Interval,
EvalOffset: group.EvalOffset,
QueryParams: group.Params,
Headers: group.Headers,
Debug: cfg.Debug,
Expand Down
2 changes: 2 additions & 0 deletions app/vmalert/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Group struct {
Headers []Header `yaml:"headers,omitempty"`
// NotifierHeaders contains optional HTTP headers sent to notifiers for generated notifications
NotifierHeaders []Header `yaml:"notifier_headers,omitempty"`
// EvalAlignment will make the timestamp of group query requests be aligned with interval
EvalAlignment *bool `yaml:"eval_alignment,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}
Expand Down
1 change: 0 additions & 1 deletion app/vmalert/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type QuerierBuilder interface {
type QuerierParams struct {
DataSourceType string
EvaluationInterval time.Duration
EvalOffset *time.Duration
QueryParams url.Values
Headers map[string]string
Debug bool
Expand Down
7 changes: 6 additions & 1 deletion app/vmalert/datasource/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)

var (
Expand Down Expand Up @@ -46,7 +47,8 @@ var (
queryStep = flag.Duration("datasource.queryStep", 5*time.Minute, "How far a value can fallback to when evaluating queries. "+
"For example, if -datasource.queryStep=15s then param \"step\" with value \"15s\" will be added to every query. "+
"If set to 0, rule's evaluation interval will be used instead.")
queryTimeAlignment = flag.Bool("datasource.queryTimeAlignment", true, `Whether to align "time" parameter with evaluation interval.`+
queryTimeAlignment = flag.Bool("datasource.queryTimeAlignment", true, "Flag is deprecated and will be removed in next releases, please use `eval_alignment` in rule group instead."+
`Whether to align "time" parameter with evaluation interval.`+
"Alignment supposed to produce deterministic results despite number of vmalert replicas or time they were started. See more details here https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1257")
maxIdleConnections = flag.Int("datasource.maxIdleConnections", 100, `Defines the number of idle (keep-alive connections) to each configured datasource. Consider setting this value equal to the value: groups_total * group.concurrency. Too low a value may result in a high number of sockets in TIME_WAIT state.`)
disableKeepAlive = flag.Bool("datasource.disableKeepAlive", false, `Whether to disable long-lived connections to the datasource. `+
Expand Down Expand Up @@ -79,6 +81,9 @@ func Init(extraParams url.Values) (QuerierBuilder, error) {
if *addr == "" {
return nil, fmt.Errorf("datasource.url is empty")
}
if !*queryTimeAlignment {
logger.Warnf("flag `datasource.queryTimeAlignment` is deprecated and will be removed in next releases, please use `eval_alignment` in rule group instead")
}

tr, err := utils.Transport(*addr, *tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify)
if err != nil {
Expand Down
9 changes: 1 addition & 8 deletions app/vmalert/datasource/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,8 @@ type VMStorage struct {
queryStep time.Duration
dataSourceType datasourceType

// evaluationInterval will align the request's timestamp
// if `datasource.queryTimeAlignment` is enabled,
// will set request's `step` param as well.
// evaluationInterval will help setting request's `step` param.
evaluationInterval time.Duration
// evaluationOffset shifts the request's timestamp, will be equal
// to the offset specified evaluationInterval.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4693
evaluationOffset *time.Duration
// extraParams contains params to be attached to each HTTP request
extraParams url.Values
// extraHeaders are headers to be attached to each HTTP request
Expand Down Expand Up @@ -95,7 +89,6 @@ func (s *VMStorage) Clone() *VMStorage {
func (s *VMStorage) ApplyParams(params QuerierParams) *VMStorage {
s.dataSourceType = toDatasourceType(params.DataSourceType)
s.evaluationInterval = params.EvaluationInterval
s.evaluationOffset = params.EvalOffset
if params.QueryParams != nil {
if s.extraParams == nil {
s.extraParams = url.Values{}
Expand Down
35 changes: 3 additions & 32 deletions app/vmalert/datasource/vm_prom_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ func (s *VMStorage) setPrometheusInstantReqParams(r *http.Request, query string,
r.URL.Path += "/api/v1/query"
}
q := r.URL.Query()

timestamp = s.adjustReqTimestamp(timestamp)
if s.lookBack > 0 {
timestamp = timestamp.Add(-s.lookBack)
}
q.Set("time", timestamp.Format(time.RFC3339))
if !*disableStepParam && s.evaluationInterval > 0 { // set step as evaluationInterval by default
// always convert to seconds to keep compatibility with older
Expand All @@ -186,9 +187,6 @@ func (s *VMStorage) setPrometheusRangeReqParams(r *http.Request, query string, s
r.URL.Path += "/api/v1/query_range"
}
q := r.URL.Query()
if s.evaluationOffset != nil {
start = start.Truncate(s.evaluationInterval).Add(*s.evaluationOffset)
}
q.Add("start", start.Format(time.RFC3339))
q.Add("end", end.Format(time.RFC3339))
if s.evaluationInterval > 0 { // set step as evaluationInterval by default
Expand All @@ -213,30 +211,3 @@ func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string) {
q.Set("query", query)
r.URL.RawQuery = q.Encode()
}

func (s *VMStorage) adjustReqTimestamp(timestamp time.Time) time.Time {
if s.evaluationOffset != nil {
// calculate the min timestamp on the evaluationInterval
intervalStart := timestamp.Truncate(s.evaluationInterval)
ts := intervalStart.Add(*s.evaluationOffset)
if timestamp.Before(ts) {
// if passed timestamp is before the expected evaluation offset,
// then we should adjust it to the previous evaluation round.
// E.g. request with evaluationInterval=1h and evaluationOffset=30m
// was evaluated at 11:20. Then the timestamp should be adjusted
// to 10:30, to the previous evaluationInterval.
return ts.Add(-s.evaluationInterval)
}
// evaluationOffset shouldn't interfere with queryTimeAlignment or lookBack,
// so we return it immediately
return ts
}
if *queryTimeAlignment {
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232
timestamp = timestamp.Truncate(s.evaluationInterval)
}
if s.lookBack > 0 {
timestamp = timestamp.Add(-s.lookBack)
}
return timestamp
}
72 changes: 0 additions & 72 deletions app/vmalert/datasource/vm_prom_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package datasource
import (
"encoding/json"
"testing"
"time"
)

func BenchmarkMetrics(b *testing.B) {
Expand All @@ -19,74 +18,3 @@ func BenchmarkMetrics(b *testing.B) {
}
})
}

func TestGetPrometheusReqTimestamp(t *testing.T) {
offset := 30 * time.Minute
testCases := []struct {
name string
s *VMStorage
queryTimeAlignment bool
originTS, expTS string
}{
{
"with eval_offset, find previous offset point",
&VMStorage{
evaluationOffset: &offset,
evaluationInterval: time.Hour,
lookBack: 1 * time.Minute,
},
false,
"2023-08-28T11:11:00+00:00",
"2023-08-28T10:30:00+00:00",
},
{
"with eval_offset",
&VMStorage{
evaluationOffset: &offset,
evaluationInterval: time.Hour,
},
true,
"2023-08-28T11:41:00+00:00",
"2023-08-28T11:30:00+00:00",
},
{
"with query align",
&VMStorage{
evaluationInterval: time.Hour,
},
true,
"2023-08-28T11:11:00+00:00",
"2023-08-28T11:00:00+00:00",
},
{
"with query align and lookback",
&VMStorage{
evaluationInterval: time.Hour,
lookBack: 1 * time.Minute,
},
true,
"2023-08-28T11:11:00+00:00",
"2023-08-28T10:59:00+00:00",
},
{
"without query align",
&VMStorage{
evaluationInterval: time.Hour,
},
false,
"2023-08-28T11:11:00+00:00",
"2023-08-28T11:11:00+00:00",
},
}
for _, tc := range testCases {
oldAlignPara := *queryTimeAlignment
*queryTimeAlignment = tc.queryTimeAlignment
originT, _ := time.Parse(time.RFC3339, tc.originTS)
expT, _ := time.Parse(time.RFC3339, tc.expTS)
gotTS := tc.s.adjustReqTimestamp(originT)
if !gotTS.Equal(expT) {
t.Fatalf("get wrong prometheus request timestamp, expect %s, got %s", expT, gotTS)
}
*queryTimeAlignment = oldAlignPara
}
}
7 changes: 2 additions & 5 deletions app/vmalert/datasource/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,7 @@ func TestRequestParams(t *testing.T) {
},
func(t *testing.T, r *http.Request) {
evalInterval := 15 * time.Second
tt := timestamp.Truncate(evalInterval)
exp := url.Values{"query": {query}, "step": {evalInterval.String()}, "time": {tt.Format(time.RFC3339)}}
exp := url.Values{"query": {query}, "step": {evalInterval.String()}, "time": {timestamp.Format(time.RFC3339)}}
checkEqualString(t, exp.Encode(), r.URL.RawQuery)
},
},
Expand All @@ -521,7 +520,6 @@ func TestRequestParams(t *testing.T) {
func(t *testing.T, r *http.Request) {
evalInterval := 15 * time.Second
tt := timestamp.Add(-time.Minute)
tt = tt.Truncate(evalInterval)
exp := url.Values{"query": {query}, "step": {evalInterval.String()}, "time": {tt.Format(time.RFC3339)}}
checkEqualString(t, exp.Encode(), r.URL.RawQuery)
},
Expand Down Expand Up @@ -549,8 +547,7 @@ func TestRequestParams(t *testing.T) {
},
func(t *testing.T, r *http.Request) {
evalInterval := 3 * time.Hour
tt := timestamp.Truncate(evalInterval)
exp := url.Values{"query": {query}, "step": {fmt.Sprintf("%ds", int(evalInterval.Seconds()))}, "time": {tt.Format(time.RFC3339)}}
exp := url.Values{"query": {query}, "step": {fmt.Sprintf("%ds", int(evalInterval.Seconds()))}, "time": {timestamp.Format(time.RFC3339)}}
checkEqualString(t, exp.Encode(), r.URL.RawQuery)
},
},
Expand Down
39 changes: 35 additions & 4 deletions app/vmalert/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type Group struct {
evalCancel context.CancelFunc

metrics *groupMetrics
// evalAlignment will make the timestamp of group query
// requests be aligned with interval
evalAlignment *bool
}

type groupMetrics struct {
Expand Down Expand Up @@ -106,6 +109,7 @@ func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti
Headers: make(map[string]string),
NotifierHeaders: make(map[string]string),
Labels: cfg.Labels,
evalAlignment: cfg.EvalAlignment,

doneCh: make(chan struct{}),
finishedCh: make(chan struct{}),
Expand Down Expand Up @@ -285,10 +289,11 @@ var skipRandSleepOnGroupStart bool
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client, rr datasource.QuerierBuilder) {
defer func() { close(g.finishedCh) }()

evalTS := time.Now()
// sleep random duration to spread group rules evaluation
// over time in order to reduce load on datasource.
if !skipRandSleepOnGroupStart {
sleepBeforeStart := delayBeforeStart(time.Now(), g.ID(), g.Interval, g.EvalOffset)
sleepBeforeStart := delayBeforeStart(evalTS, g.ID(), g.Interval, g.EvalOffset)
g.infof("will start in %v", sleepBeforeStart)

sleepTimer := time.NewTimer(sleepBeforeStart)
Expand All @@ -301,10 +306,9 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
return
case <-sleepTimer.C:
}
evalTS = evalTS.Add(sleepBeforeStart)
}

evalTS := time.Now()

e := &executor{
rw: rw,
notifiers: nts,
Expand All @@ -326,6 +330,7 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
}

resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration)
ts = g.adjustReqTimestamp(ts)
errs := e.execConcurrently(ctx, g.Rules, ts, g.Concurrency, resolveDuration, g.Limit)
for err := range errs {
if err != nil {
Expand Down Expand Up @@ -424,7 +429,7 @@ func delayBeforeStart(ts time.Time, key uint64, interval time.Duration, offset *
randSleep += *offset
}
}
return randSleep.Truncate(time.Second)
return randSleep
}

func (g *Group) infof(format string, args ...interface{}) {
Expand All @@ -446,6 +451,32 @@ func getResolveDuration(groupInterval, delta, maxDuration time.Duration) time.Du
return resolveDuration
}

func (g *Group) adjustReqTimestamp(timestamp time.Time) time.Time {
if g.EvalOffset != nil {
// calculate the min timestamp on the evaluationInterval
intervalStart := timestamp.Truncate(g.Interval)
ts := intervalStart.Add(*g.EvalOffset)
if timestamp.Before(ts) {
// if passed timestamp is before the expected evaluation offset,
// then we should adjust it to the previous evaluation round.
// E.g. request with evaluationInterval=1h and evaluationOffset=30m
// was evaluated at 11:20. Then the timestamp should be adjusted
// to 10:30, to the previous evaluationInterval.
return ts.Add(-g.Interval)
}
// EvalOffset shouldn't interfere with evalAlignment,
// so we return it immediately
return ts
}
if g.evalAlignment == nil || *g.evalAlignment {
// align query time with interval to get similar result with grafana when plotting time series.
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5049
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232
return timestamp.Truncate(g.Interval)
}
return timestamp
}

type executor struct {
notifiers func() []notifier.Notifier
notifierHeaders map[string]string
Expand Down

0 comments on commit b52f1d1

Please sign in to comment.