Skip to content

Commit

Permalink
Alerting: Attempt to retry retryable errors (#79161)
Browse files Browse the repository at this point in the history
* Alerting: Attempt to retry retryable errors

Retrying has been broken for a good while now (at least since version 9.4) - this change attempts to re-introduce them in their simplest and safest form possible.

I first introduced #79095 to make sure we don't disrupt or put additional load on our customer's data sources with this change in a patch release. Paired with this change, retries can now work as expected.

There's two small differences between how retries work now and how they used to work in legacy alerting.

Retries only occur for valid alert definitions - if we suspect that that error comes from a malformed alert definition we skip retrying.
We have added a constant backoff of 1s in between retries.

---------

Signed-off-by: gotjosh <josue.abreu@gmail.com>
  • Loading branch information
gotjosh committed Dec 6, 2023
1 parent ea36336 commit c631261
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 46 deletions.
29 changes: 29 additions & 0 deletions pkg/services/ngalert/eval/eval.go
Expand Up @@ -157,6 +157,23 @@ func (evalResults Results) HasErrors() bool {
return false
}

// HasNonRetryableErrors returns true if we have at least 1 result with:
// 1. A `State` of `Error`
// 2. The `Error` attribute is not nil
// 3. The `Error` type is of `&invalidEvalResultFormatError`
// Our thinking with this approach, is that we don't want to retry errors that have relation with invalid alert definition format.
func (evalResults Results) HasNonRetryableErrors() bool {
for _, r := range evalResults {
if r.State == Error && r.Error != nil {
var nonRetryableError *invalidEvalResultFormatError
if errors.As(r.Error, &nonRetryableError) {
return true
}
}
}
return false
}

// HasErrors returns true when Results contains at least one element and all elements are errors
func (evalResults Results) IsError() bool {
for _, r := range evalResults {
Expand All @@ -177,6 +194,18 @@ func (evalResults Results) IsNoData() bool {
return true
}

// Error returns the aggregated `error` of all results of which state is `Error`.
func (evalResults Results) Error() error {
var errs []error
for _, result := range evalResults {
if result.State == Error && result.Error != nil {
errs = append(errs, result.Error)
}
}

return errors.Join(errs...)
}

// Result contains the evaluated State of an alert instance
// identified by its labels.
type Result struct {
Expand Down
65 changes: 65 additions & 0 deletions pkg/services/ngalert/eval/eval_test.go
Expand Up @@ -2,6 +2,7 @@ package eval

import (
"context"
"errors"
"fmt"
"math/rand"
"testing"
Expand Down Expand Up @@ -769,6 +770,70 @@ func TestEvaluateRaw(t *testing.T) {
})
}

func TestResults_HasNonRetryableErrors(t *testing.T) {
tc := []struct {
name string
eval Results
expected bool
}{
{
name: "with non-retryable errors",
eval: Results{
{
State: Error,
Error: &invalidEvalResultFormatError{refID: "A", reason: "unable to get frame row length", err: errors.New("weird error")},
},
},
expected: true,
},
{
name: "with retryable errors",
eval: Results{
{
State: Error,
Error: errors.New("some weird error"),
},
},
expected: false,
},
}

for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.expected, tt.eval.HasNonRetryableErrors())
})
}
}

func TestResults_Error(t *testing.T) {
tc := []struct {
name string
eval Results
expected string
}{
{
name: "with non-retryable errors",
eval: Results{
{
State: Error,
Error: &invalidEvalResultFormatError{refID: "A", reason: "unable to get frame row length", err: errors.New("weird error")},
},
{
State: Error,
Error: errors.New("unable to get a data frame"),
},
},
expected: "invalid format of evaluation results for the alert definition A: unable to get frame row length: weird error\nunable to get a data frame",
},
}

for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.expected, tt.eval.Error().Error())
})
}
}

type fakeExpressionService struct {
hook func(ctx context.Context, now time.Time, pipeline expr.DataPipeline) (*backend.QueryDataResponse, error)
}
Expand Down
93 changes: 64 additions & 29 deletions pkg/services/ngalert/schedule/schedule.go
Expand Up @@ -34,6 +34,9 @@ type ScheduleService interface {
Run(context.Context) error
}

// retryDelay represents how long to wait between each failed rule evaluation.
const retryDelay = 1 * time.Second

// AlertsSender is an interface for a service that is responsible for sending notifications to the end-user.
//
//go:generate mockery --name AlertsSender --structname AlertsSenderMock --inpackage --filename alerts_sender_mock.go --with-expecter
Expand Down Expand Up @@ -345,6 +348,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
return readyToRun, registeredDefinitions, updatedRules
}

//nolint:gocyclo
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan ruleVersionAndPauseStatus) error {
grafanaCtx = ngmodels.WithRuleKey(grafanaCtx, key)
logger := sch.log.FromContext(grafanaCtx)
Expand Down Expand Up @@ -374,7 +378,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
notify(states)
}

evaluate := func(ctx context.Context, f fingerprint, attempt int64, e *evaluation, span trace.Span) {
evaluate := func(ctx context.Context, f fingerprint, attempt int64, e *evaluation, span trace.Span, retry bool) error {
logger := logger.New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx)
start := sch.clock.Now()

Expand All @@ -396,18 +400,43 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
evalTotal.Inc()
evalDuration.Observe(dur.Seconds())

if ctx.Err() != nil { // check if the context is not cancelled. The evaluation can be a long-running task.
span.SetStatus(codes.Error, "rule evaluation cancelled")
logger.Debug("Skip updating the state because the context has been cancelled")
return nil
}

if err != nil || results.HasErrors() {
evalTotalFailures.Inc()

// Only retry (return errors) if this isn't the last attempt, otherwise skip these return operations.
if retry {
// The only thing that can return non-nil `err` from ruleEval.Evaluate is the server side expression pipeline.
// This includes transport errors such as transient network errors.
if err != nil {
span.SetStatus(codes.Error, "rule evaluation failed")
span.RecordError(err)
return fmt.Errorf("server side expressions pipeline returned an error: %w", err)
}

// If the pipeline executed successfully but have other types of errors that can be retryable, we should do so.
if !results.HasNonRetryableErrors() {
span.SetStatus(codes.Error, "rule evaluation failed")
span.RecordError(err)
return fmt.Errorf("the result-set has errors that can be retried: %w", results.Error())
}
}

// If results is nil, we assume that the error must be from the SSE pipeline (ruleEval.Evaluate) which is the only code that can actually return an `err`.
if results == nil {
results = append(results, eval.NewResultFromError(err, e.scheduledAt, dur))
}

// If err is nil, we assume that the SSS pipeline succeeded and that the error must be embedded in the results.
if err == nil {
for _, result := range results {
if result.Error != nil {
err = errors.Join(err, result.Error)
}
}
err = results.Error()
}

span.SetStatus(codes.Error, "rule evaluation failed")
span.RecordError(err)
} else {
Expand All @@ -416,10 +445,6 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
attribute.Int64("results", int64(len(results))),
))
}
if ctx.Err() != nil { // check if the context is not cancelled. The evaluation can be a long-running task.
logger.Debug("Skip updating the state because the context has been cancelled")
return
}
start = sch.clock.Now()
processedStates := sch.stateManager.ProcessEvalResults(
ctx,
Expand All @@ -440,18 +465,8 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
sch.alertsSender.Send(ctx, key, alerts)
}
sendDuration.Observe(sch.clock.Now().Sub(start).Seconds())
}

retryIfError := func(f func(attempt int64) error) error {
var attempt int64
var err error
for attempt = 0; attempt < sch.maxAttempts; attempt++ {
err = f(attempt)
if err == nil {
return nil
}
}
return err
return nil
}

evalRunning := false
Expand Down Expand Up @@ -487,7 +502,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
sch.evalApplied(key, ctx.scheduledAt)
}()

err := retryIfError(func(attempt int64) error {
for attempt := int64(1); attempt <= sch.maxAttempts; attempt++ {
isPaused := ctx.rule.IsPaused
f := ruleWithFolder{ctx.rule, ctx.folderTitle}.Fingerprint()
// Do not clean up state if the eval loop has just started.
Expand All @@ -506,7 +521,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
currentFingerprint = f
if isPaused {
logger.Debug("Skip rule evaluation because it is paused")
return nil
return
}

fpStr := currentFingerprint.String()
Expand All @@ -518,15 +533,35 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
attribute.String("rule_fingerprint", fpStr),
attribute.String("tick", utcTick),
))
defer span.End()

evaluate(tracingCtx, f, attempt, ctx, span)
return nil
})
if err != nil {
logger.Error("Evaluation failed after all retries", "error", err)
// Check before any execution if the context was cancelled so that we don't do any evaluations.
if tracingCtx.Err() != nil {
span.SetStatus(codes.Error, "rule evaluation cancelled")
span.End()
logger.Error("Skip evaluation and updating the state because the context has been cancelled", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt)
return
}

retry := attempt < sch.maxAttempts
err := evaluate(tracingCtx, f, attempt, ctx, span, retry)
// This is extremely confusing - when we exhaust all retry attempts, or we have no retryable errors
// we return nil - so technically, this is meaningless to know whether the evaluation has errors or not.
span.End()
if err == nil {
return
}

logger.Error("Failed to evaluate rule", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt)
select {
case <-tracingCtx.Done():
logger.Error("Context has been cancelled while backing off", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt)
return
case <-time.After(retryDelay):
continue
}
}
}()

case <-grafanaCtx.Done():
// clean up the state only if the reason for stopping the evaluation loop is that the rule was deleted
if errors.Is(grafanaCtx.Err(), errRuleDeleted) {
Expand Down
35 changes: 18 additions & 17 deletions pkg/services/ngalert/schedule/schedule_unit_test.go
Expand Up @@ -662,6 +662,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()

sch, ruleStore, _, reg := createSchedule(evalAppliedChan, &sender)
sch.maxAttempts = 3
ruleStore.PutRule(context.Background(), rule)

go func() {
Expand All @@ -682,28 +683,28 @@ func TestSchedule_ruleRoutine(t *testing.T) {
expectedMetric := fmt.Sprintf(
`# HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule.
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 3
grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 3
# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures.
# TYPE grafana_alerting_rule_evaluation_failures_total counter
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 1
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 3
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
# TYPE grafana_alerting_rule_evaluations_total counter
grafana_alerting_rule_evaluations_total{org="%[1]d"} 1
grafana_alerting_rule_evaluations_total{org="%[1]d"} 3
# HELP grafana_alerting_rule_process_evaluation_duration_seconds The time to process the evaluation results for a rule.
# TYPE grafana_alerting_rule_process_evaluation_duration_seconds histogram
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
Expand Down

0 comments on commit c631261

Please sign in to comment.