Skip to content

Commit

Permalink
Alerting: Fix migration edge-case race condition for silences
Browse files Browse the repository at this point in the history
If the db already has an entry in the kvstore for the silences of an
alertmanager before the migration has taken place, then it's possible that the
active alertmanager will overwrite the silence file created by the migration
before it has a chance to load it into memory.

This should not happen normally but is possible in edge-cases.

This change opts to bypass the unnecessary step of writing the silences to disk
during the migration and instead write them directly to the kvstore. This avoids
 the race condition entirely and is more correct as we treat the database as the
  source of truth for AM state.
  • Loading branch information
JacobsonMT committed Jan 24, 2024
1 parent 71e70c4 commit 9502a79
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 107 deletions.
5 changes: 2 additions & 3 deletions pkg/services/ngalert/migration/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ func ProvideService(
migrationStore: migrationStore,
encryptionService: encryptionService,
silences: &silenceHandler{
dataPath: cfg.DataPath,
createSilenceFile: openReplace,
persistSilences: migrationStore.SetSilences,
},
}, nil
}
Expand Down Expand Up @@ -491,7 +490,7 @@ func (ms *migrationService) migrateAllOrgs(ctx context.Context) error {
return err
}

err = ms.silences.createSilences(o.ID, om.log)
err = ms.silences.createSilences(ctx, o.ID, om.log)
if err != nil {
return fmt.Errorf("create silences for org %d: %w", o.ID, err)
}
Expand Down
81 changes: 6 additions & 75 deletions pkg/services/ngalert/migration/silences.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package migration

import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
"strconv"
"time"

"github.com/matttproud/golang_protobuf_extensions/pbutil"
pb "github.com/prometheus/alertmanager/silence/silencepb"
"github.com/prometheus/common/model"

Expand All @@ -27,9 +21,7 @@ var TimeNow = time.Now
type silenceHandler struct {
rulesWithErrorSilenceLabels int
rulesWithNoDataSilenceLabels int
createSilenceFile func(filename string) (io.WriteCloser, error)

dataPath string
persistSilences func(context.Context, int64, []*pb.MeshSilence) error
}

// handleSilenceLabels adds labels to the alert rule if the rule requires silence labels for error/nodata keep_state.
Expand All @@ -45,7 +37,7 @@ func (sh *silenceHandler) handleSilenceLabels(ar *models.AlertRule, parsedSettin
}

// createSilences creates silences and writes them to a file.
func (sh *silenceHandler) createSilences(orgID int64, log log.Logger) error {
func (sh *silenceHandler) createSilences(ctx context.Context, orgID int64, log log.Logger) error {
var silences []*pb.MeshSilence
if sh.rulesWithErrorSilenceLabels > 0 {
log.Info("Creating silence for rules with ExecutionErrorState = keep_state", "rules", sh.rulesWithErrorSilenceLabels)
Expand All @@ -56,9 +48,9 @@ func (sh *silenceHandler) createSilences(orgID int64, log log.Logger) error {
silences = append(silences, noDataSilence())
}
if len(silences) > 0 {
log.Debug("Writing silences file", "silences", len(silences))
if err := sh.writeSilencesFile(orgID, silences); err != nil {
return fmt.Errorf("write silence file: %w", err)
log.Debug("Writing silences to kvstore", "silences", len(silences))
if err := sh.persistSilences(ctx, orgID, silences); err != nil {
return fmt.Errorf("write silences to kvstore: %w", err)
}
}
return nil
Expand Down Expand Up @@ -115,64 +107,3 @@ func noDataSilence() *pb.MeshSilence {
ExpiresAt: TimeNow().AddDate(1, 0, 0), // 1 year.
}
}

func (sh *silenceHandler) writeSilencesFile(orgId int64, silences []*pb.MeshSilence) error {
var buf bytes.Buffer
for _, e := range silences {
if _, err := pbutil.WriteDelimited(&buf, e); err != nil {
return err
}
}

f, err := sh.createSilenceFile(silencesFileNameForOrg(sh.dataPath, orgId))
if err != nil {
return err
}

if _, err := io.Copy(f, bytes.NewReader(buf.Bytes())); err != nil {
return err
}

return f.Close()
}

func silencesFileNameForOrg(dataPath string, orgID int64) string {
return filepath.Join(dataPath, "alerting", strconv.Itoa(int(orgID)), "silences")
}

// replaceFile wraps a file that is moved to another filename on closing.
type replaceFile struct {
*os.File
filename string
}

func (f *replaceFile) Close() error {
if err := f.File.Sync(); err != nil {
return err
}
if err := f.File.Close(); err != nil {
return err
}
return os.Rename(f.File.Name(), f.filename)
}

// openReplace opens a new temporary file that is moved to filename on closing.
func openReplace(filename string) (io.WriteCloser, error) {
tmpFilename := fmt.Sprintf("%s.%x", filename, uint64(rand.Int63()))

if err := os.MkdirAll(filepath.Dir(tmpFilename), os.ModePerm); err != nil {
return nil, err
}

//nolint:gosec
f, err := os.Create(tmpFilename)
if err != nil {
return nil, err
}

rf := &replaceFile{
File: f,
filename: filename,
}
return rf, nil
}
48 changes: 19 additions & 29 deletions pkg/services/ngalert/migration/silences_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package migration

import (
"bytes"
"context"
"encoding/base64"
"fmt"
"io"
"strings"
"testing"
"time"

Expand All @@ -13,10 +14,11 @@ import (
"github.com/matttproud/golang_protobuf_extensions/pbutil"
pb "github.com/prometheus/alertmanager/silence/silencepb"
"github.com/stretchr/testify/require"
"xorm.io/xorm"

"github.com/grafana/grafana/pkg/infra/db"
legacymodels "github.com/grafana/grafana/pkg/services/alerting/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
)

func TestSilences(t *testing.T) {
Expand Down Expand Up @@ -79,32 +81,13 @@ func TestSilences(t *testing.T) {
_, err = x.Insert(test.alerts)
require.NoError(t, err)

cfg := setting.NewCfg()
cfg.DataPath = "/a/b/c"
service := NewTestMigrationService(t, sqlStore, cfg)

sb := stringsBuilderCloser{
Builder: &strings.Builder{},
}
silenceFileAsString := func(filename string) (io.WriteCloser, error) {
_, err := sb.WriteString(filename)
require.NoError(t, err)
return sb, nil
}
service.silences.createSilenceFile = silenceFileAsString
service := NewTestMigrationService(t, sqlStore, nil)

require.NoError(t, service.migrateAllOrgs(context.Background()))

expectedFilename := ""
if len(test.expectedSilences) > 0 {
expectedFilename = cfg.DataPath + "/alerting/1/silences"
filename := sb.String()[:len(expectedFilename)]
require.Equal(t, expectedFilename, filename)
}
// Get silences from kvstore.
st := getSilenceState(t, x, o.ID)

contents := sb.String()[len(expectedFilename):]
st, err := decodeState(strings.NewReader(contents))
require.NoError(t, err)
require.Len(t, st, len(test.expectedSilences))

silences := make([]*pb.MeshSilence, 0, len(st))
Expand All @@ -125,12 +108,19 @@ func TestSilences(t *testing.T) {
})
}

type stringsBuilderCloser struct {
*strings.Builder
}
// getSilenceState returns the silences state from the kvstore.
func getSilenceState(t *testing.T, x *xorm.Engine, orgId int64) state {
content := ""
_, err := x.Table("kv_store").Where("org_id = ? AND namespace = ? AND key = ?", orgId, notifier.KVNamespace, notifier.SilencesFilename).Cols("value").Get(&content)
require.NoError(t, err)

b, err := base64.StdEncoding.DecodeString(content)
require.NoError(t, err)

st, err := decodeState(bytes.NewReader(b))
require.NoError(t, err)

func (s stringsBuilderCloser) Close() error {
return nil
return st
}

// state copied from prometheus-alertmanager/silence/silence.go.
Expand Down
20 changes: 20 additions & 0 deletions pkg/services/ngalert/migration/store/database.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package store

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
Expand All @@ -11,6 +13,8 @@ import (
"strings"

"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
pb "github.com/prometheus/alertmanager/silence/silencepb"

"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/kvstore"
Expand Down Expand Up @@ -96,6 +100,8 @@ type WriteStore interface {
DeleteFolders(ctx context.Context, orgID int64, uids ...string) error

UpdateRuleLabels(ctx context.Context, key models.AlertRuleKeyWithVersion, labels data.Labels) error

SetSilences(ctx context.Context, orgID int64, silences []*pb.MeshSilence) error
}

type migrationStore struct {
Expand Down Expand Up @@ -273,6 +279,20 @@ func (ms *migrationStore) SetOrgMigrationState(ctx context.Context, orgID int64,
return kv.Set(ctx, stateKey, string(raw))
}

// SetSilences stores the given silences in the kvstore.
func (ms *migrationStore) SetSilences(ctx context.Context, orgID int64, silences []*pb.MeshSilence) error {
kv := kvstore.WithNamespace(ms.kv, orgID, notifier.KVNamespace)

var buf bytes.Buffer
for _, e := range silences {
if _, err := pbutil.WriteDelimited(&buf, e); err != nil {
return err
}
}

return kv.Set(ctx, notifier.SilencesFilename, base64.StdEncoding.EncodeToString(buf.Bytes()))
}

// GetAlertRuleTitles returns a map of namespaceUID -> title for all alert rules in the given org and namespace uids.
func (ms *migrationStore) GetAlertRuleTitles(ctx context.Context, orgID int64, namespaceUIDs ...string) (map[string][]string, error) {
res := make(map[string][]string)
Expand Down

0 comments on commit 9502a79

Please sign in to comment.