Skip to content

Commit

Permalink
Remove GC and callback from store.go
Browse files Browse the repository at this point in the history
This commit removes the GC and callback function from store.go
to address a number of data races that have occurred in the past
(prometheus#2040 and prometheus#3648). The store is no longer responsible for removing
resolved alerts after some elapsed period of time, and is instead
deferred to the consumer of the store (as done in prometheus#2040 and prometheus#3648).

Signed-off-by: George Robinson <george.robinson@grafana.com>
  • Loading branch information
grobinson-grafana committed May 26, 2024
1 parent fd37ce9 commit b9ee89e
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 149 deletions.
18 changes: 17 additions & 1 deletion inhibit/inhibit.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,24 @@ func (ih *Inhibitor) Run() {
ih.mtx.Unlock()
runCtx, runCancel := context.WithCancel(ctx)

// Run a periodic maintenance for each inhibition rule to remove
// resolved alerts from their local cache.
for _, rule := range ih.rules {
go rule.scache.Run(runCtx, 15*time.Minute)
rule := rule // Create a local rule variable for the goroutine.
g.Add(func() error {
ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rule.scache.DeleteResolved()
case <-runCtx.Done():
return nil
}
}
}, func(err error) {
runCancel()
})
}

g.Add(func() error {
Expand Down
76 changes: 38 additions & 38 deletions provider/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,46 +106,20 @@ func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duratio
a.registerMetrics(r)
}

go a.gcLoop(ctx, intervalGC)

return a, nil
}

func (a *Alerts) gcLoop(ctx context.Context, interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
a.gc()
go func() {
ticker := time.NewTicker(intervalGC)
defer ticker.Stop()
for {
select {
case <-ticker.C:
a.doMaintenance()
case <-ctx.Done():
return
}
}
}
}

func (a *Alerts) gc() {
a.mtx.Lock()
defer a.mtx.Unlock()

deleted := a.alerts.GC()
for _, alert := range deleted {
// As we don't persist alerts, we no longer consider them after
// they are resolved. Alerts waiting for resolved notifications are
// held in memory in aggregation groups redundantly.
a.marker.Delete(alert.Fingerprint())
a.callback.PostDelete(&alert)
}
}()

for i, l := range a.listeners {
select {
case <-l.done:
delete(a.listeners, i)
close(l.alerts)
default:
// listener is not closed yet, hence proceed.
}
}
return a, nil
}

// Close the alert provider.
Expand Down Expand Up @@ -283,6 +257,32 @@ func (a *Alerts) count(state types.AlertState) int {
return count
}

func (a *Alerts) doMaintenance() {
a.mtx.Lock()
defer a.mtx.Unlock()
for _, alert := range a.alerts.List() {
if alert.Resolved() {
// TODO(grobinson-grafana): See if we can use a single method instead of calling List() and then Delete().
a.alerts.Delete(alert.Fingerprint())
// As we don't persist alerts, we no longer consider them after
// they are resolved. Alerts waiting for resolved notifications are
// held in memory in aggregation groups redundantly.
a.marker.Delete(alert.Fingerprint())
a.callback.PostDelete(alert)
}
}

for i, l := range a.listeners {
select {
case <-l.done:
delete(a.listeners, i)
close(l.alerts)
default:
// listener is not closed yet, hence proceed.
}
}
}

type noopCallback struct{}

func (n noopCallback) PreStore(_ *types.Alert, _ bool) error { return nil }
Expand Down
80 changes: 21 additions & 59 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
package store

import (
"context"
"errors"
"sync"
"time"

"github.com/prometheus/common/model"

Expand All @@ -28,72 +26,19 @@ import (
var ErrNotFound = errors.New("alert not found")

// Alerts provides lock-coordinated to an in-memory map of alerts, keyed by
// their fingerprint. Resolved alerts are removed from the map based on
// gcInterval. An optional callback can be set which receives a slice of all
// resolved alerts that have been removed.
// their fingerprint.
type Alerts struct {
sync.Mutex
c map[model.Fingerprint]*types.Alert
cb func([]types.Alert)
c map[model.Fingerprint]*types.Alert
}

// NewAlerts returns a new Alerts struct.
func NewAlerts() *Alerts {
a := &Alerts{
c: make(map[model.Fingerprint]*types.Alert),
cb: func(_ []types.Alert) {},
}

return a
}

// SetGCCallback sets a GC callback to be executed after each GC.
func (a *Alerts) SetGCCallback(cb func([]types.Alert)) {
a.Lock()
defer a.Unlock()

a.cb = cb
}

// Run starts the GC loop. The interval must be greater than zero; if not, the function will panic.
func (a *Alerts) Run(ctx context.Context, interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
a.GC()
}
return &Alerts{
c: make(map[model.Fingerprint]*types.Alert),
}
}

// GC deletes resolved alerts and returns them.
func (a *Alerts) GC() []types.Alert {
a.Lock()
var resolved []types.Alert
for fp, alert := range a.c {
if alert.Resolved() {
delete(a.c, fp)
resolved = append(resolved, types.Alert{
Alert: model.Alert{
Labels: alert.Labels.Clone(),
Annotations: alert.Annotations.Clone(),
StartsAt: alert.StartsAt,
EndsAt: alert.EndsAt,
GeneratorURL: alert.GeneratorURL,
},
UpdatedAt: alert.UpdatedAt,
Timeout: alert.Timeout,
})
}
}
a.Unlock()
a.cb(resolved)
return resolved
}

// Get returns the Alert with the matching fingerprint, or an error if it is
// not found.
func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
Expand All @@ -116,6 +61,12 @@ func (a *Alerts) Set(alert *types.Alert) error {
return nil
}

func (a *Alerts) Delete(fp model.Fingerprint) {
a.Lock()
defer a.Unlock()
delete(a.c, fp)
}

// DeleteIfNotModified deletes the slice of Alerts from the store if not
// modified.
func (a *Alerts) DeleteIfNotModified(alerts types.AlertSlice) error {
Expand All @@ -130,6 +81,17 @@ func (a *Alerts) DeleteIfNotModified(alerts types.AlertSlice) error {
return nil
}

// DeleteResolved deletes all resolved alerts.
func (a *Alerts) DeleteResolved() {
a.Lock()
defer a.Unlock()
for fp, alert := range a.c {
if alert.Resolved() {
delete(a.c, fp)
}
}
}

// List returns a slice of Alerts currently held in memory.
func (a *Alerts) List() []*types.Alert {
a.Lock()
Expand Down
83 changes: 32 additions & 51 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package store

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -138,60 +137,42 @@ func TestDeleteIfNotModified(t *testing.T) {
})
}

func TestGC(t *testing.T) {
now := time.Now()
newAlert := func(key string, start, end time.Duration) *types.Alert {
return &types.Alert{
func TestDeleteResolved(t *testing.T) {
t.Run("active alert should not be deleted", func(t *testing.T) {
a := NewAlerts()
a1 := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{model.LabelName(key): "b"},
StartsAt: now.Add(start * time.Minute),
EndsAt: now.Add(end * time.Minute),
Labels: model.LabelSet{
"foo": "bar",
},
StartsAt: time.Now(),
EndsAt: time.Now().Add(5 * time.Minute),
},
}
}
active := []*types.Alert{
newAlert("b", 10, 20),
newAlert("c", -10, 10),
}
resolved := []*types.Alert{
newAlert("a", -10, -5),
newAlert("d", -10, -1),
}
s := NewAlerts()
var (
n int
done = make(chan struct{})
ctx, cancel = context.WithCancel(context.Background())
)
s.SetGCCallback(func(a []types.Alert) {
n += len(a)
if n >= len(resolved) {
cancel()
}
require.NoError(t, a.Set(a1))
a.DeleteResolved()
// a1 should not have been deleted.
got, err := a.Get(a1.Fingerprint())
require.NoError(t, err)
require.Equal(t, a1, got)
})
for _, alert := range append(active, resolved...) {
require.NoError(t, s.Set(alert))
}
go func() {
s.Run(ctx, 10*time.Millisecond)
close(done)
}()
select {
case <-done:
break
case <-time.After(1 * time.Second):
t.Fatal("garbage collection didn't complete in time")
}

for _, alert := range active {
if _, err := s.Get(alert.Fingerprint()); err != nil {
t.Errorf("alert %v should not have been gc'd", alert)
}
}
for _, alert := range resolved {
if _, err := s.Get(alert.Fingerprint()); err == nil {
t.Errorf("alert %v should have been gc'd", alert)
t.Run("resolved alert should not be deleted", func(t *testing.T) {
a := NewAlerts()
a1 := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"foo": "bar",
},
StartsAt: time.Now().Add(-5 * time.Minute),
EndsAt: time.Now().Add(-time.Second),
},
}
}
require.Len(t, resolved, n)
require.NoError(t, a.Set(a1))
a.DeleteResolved()
// a1 should have been deleted.
got, err := a.Get(a1.Fingerprint())
require.Equal(t, ErrNotFound, err)
require.Nil(t, got)
})
}

0 comments on commit b9ee89e

Please sign in to comment.