diff --git a/pkg/services/annotations/annotationsimpl/cleanup_test.go b/pkg/services/annotations/annotationsimpl/cleanup_test.go index b072224d4464..372cfeb2c2f2 100644 --- a/pkg/services/annotations/annotationsimpl/cleanup_test.go +++ b/pkg/services/annotations/annotationsimpl/cleanup_test.go @@ -2,6 +2,7 @@ package annotationsimpl import ( "context" + "errors" "testing" "time" @@ -15,31 +16,30 @@ import ( "github.com/grafana/grafana/pkg/setting" ) -func TestAnnotationCleanUp(t *testing.T) { - fakeSQL := db.InitTestDB(t) - - t.Cleanup(func() { - err := fakeSQL.WithDbSession(context.Background(), func(session *db.Session) error { - _, err := session.Exec("DELETE FROM annotation") - return err - }) - assert.NoError(t, err) - }) +func TestIntegrationAnnotationCleanUp(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test") + } - createTestAnnotations(t, fakeSQL, 21, 6) - assertAnnotationCount(t, fakeSQL, "", 21) - assertAnnotationTagCount(t, fakeSQL, 42) + fakeSQL := db.InitTestDB(t) tests := []struct { - name string - cfg *setting.Cfg - alertAnnotationCount int64 - dashboardAnnotationCount int64 - APIAnnotationCount int64 - affectedAnnotations int64 + name string + createAnnotationsNum int + createOldAnnotationsNum int + + cfg *setting.Cfg + alertAnnotationCount int64 + annotationCleanupJobBatchSize int + dashboardAnnotationCount int64 + APIAnnotationCount int64 + affectedAnnotations int64 }{ { - name: "default settings should not delete any annotations", + name: "default settings should not delete any annotations", + createAnnotationsNum: 21, + createOldAnnotationsNum: 6, + annotationCleanupJobBatchSize: 1, cfg: &setting.Cfg{ AlertingAnnotationCleanupSetting: settingsFn(0, 0), DashboardAnnotationCleanupSettings: settingsFn(0, 0), @@ -51,7 +51,10 @@ func TestAnnotationCleanUp(t *testing.T) { affectedAnnotations: 0, }, { - name: "should remove annotations created before cut off point", + name: "should remove annotations created before cut off point", + createAnnotationsNum: 21, + createOldAnnotationsNum: 6, + annotationCleanupJobBatchSize: 1, cfg: &setting.Cfg{ AlertingAnnotationCleanupSetting: settingsFn(time.Hour*48, 0), DashboardAnnotationCleanupSettings: settingsFn(time.Hour*48, 0), @@ -63,7 +66,10 @@ func TestAnnotationCleanUp(t *testing.T) { affectedAnnotations: 6, }, { - name: "should only keep three annotations", + name: "should only keep three annotations", + createAnnotationsNum: 15, + createOldAnnotationsNum: 6, + annotationCleanupJobBatchSize: 1, cfg: &setting.Cfg{ AlertingAnnotationCleanupSetting: settingsFn(0, 3), DashboardAnnotationCleanupSettings: settingsFn(0, 3), @@ -75,7 +81,10 @@ func TestAnnotationCleanUp(t *testing.T) { affectedAnnotations: 6, }, { - name: "running the max count delete again should not remove any annotations", + name: "running the max count delete again should not remove any annotations", + createAnnotationsNum: 9, + createOldAnnotationsNum: 6, + annotationCleanupJobBatchSize: 1, cfg: &setting.Cfg{ AlertingAnnotationCleanupSetting: settingsFn(0, 3), DashboardAnnotationCleanupSettings: settingsFn(0, 3), @@ -86,12 +95,40 @@ func TestAnnotationCleanUp(t *testing.T) { APIAnnotationCount: 3, affectedAnnotations: 0, }, + { + name: "should not fail if batch size is larger than SQLITE_MAX_VARIABLE_NUMBER for SQLite >= 3.32.0", + createAnnotationsNum: 40003, + createOldAnnotationsNum: 0, + annotationCleanupJobBatchSize: 32767, + cfg: &setting.Cfg{ + AlertingAnnotationCleanupSetting: settingsFn(0, 1), + DashboardAnnotationCleanupSettings: settingsFn(0, 1), + APIAnnotationCleanupSettings: settingsFn(0, 1), + }, + alertAnnotationCount: 1, + dashboardAnnotationCount: 1, + APIAnnotationCount: 1, + affectedAnnotations: 40000, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + createTestAnnotations(t, fakeSQL, test.createAnnotationsNum, test.createOldAnnotationsNum) + assertAnnotationCount(t, fakeSQL, "", int64(test.createAnnotationsNum)) + assertAnnotationTagCount(t, fakeSQL, 2*int64(test.createAnnotationsNum)) + + t.Cleanup(func() { + err := fakeSQL.WithDbSession(context.Background(), func(session *db.Session) error { + _, deleteAnnotationErr := session.Exec("DELETE FROM annotation") + _, deleteAnnotationTagErr := session.Exec("DELETE FROM annotation_tag") + return errors.Join(deleteAnnotationErr, deleteAnnotationTagErr) + }) + assert.NoError(t, err) + }) + cfg := setting.NewCfg() - cfg.AnnotationCleanupJobBatchSize = 1 + cfg.AnnotationCleanupJobBatchSize = int64(test.annotationCleanupJobBatchSize) cleaner := ProvideCleanupService(fakeSQL, cfg, featuremgmt.WithFeatures()) affectedAnnotations, affectedAnnotationTags, err := cleaner.Run(context.Background(), test.cfg) require.NoError(t, err) @@ -112,7 +149,11 @@ func TestAnnotationCleanUp(t *testing.T) { } } -func TestOldAnnotationsAreDeletedFirst(t *testing.T) { +func TestIntegrationOldAnnotationsAreDeletedFirst(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test") + } + fakeSQL := db.InitTestDB(t) t.Cleanup(func() { @@ -194,8 +235,11 @@ func createTestAnnotations(t *testing.T, store db.DB, expectedCount int, oldAnno cutoffDate := time.Now() + newAnnotations := make([]*annotations.Item, 0, expectedCount) + newAnnotationTags := make([]*annotationTag, 0, 2*expectedCount) for i := 0; i < expectedCount; i++ { a := &annotations.Item{ + ID: int64(i + 1), DashboardID: 1, OrgID: 1, UserID: 1, @@ -223,22 +267,44 @@ func createTestAnnotations(t *testing.T, store db.DB, expectedCount int, oldAnno a.Created = cutoffDate.AddDate(-10, 0, -10).UnixNano() / int64(time.Millisecond) } - err := store.WithDbSession(context.Background(), func(sess *db.Session) error { - _, err := sess.Insert(a) - require.NoError(t, err, "should be able to save annotation", err) - - // mimick the SQL annotation Save logic by writing records to the annotation_tag table - // we need to ensure they get deleted when we clean up annotations - for tagID := range []int{1, 2} { - _, err = sess.Exec("INSERT INTO annotation_tag (annotation_id, tag_id) VALUES(?,?)", a.ID, tagID) - require.NoError(t, err, "should be able to save annotation tag ID", err) - } - return err - }) - require.NoError(t, err) + newAnnotations = append(newAnnotations, a) + newAnnotationTags = append(newAnnotationTags, &annotationTag{AnnotationID: a.ID, TagID: 1}, &annotationTag{AnnotationID: a.ID, TagID: 2}) } + + err := store.WithDbSession(context.Background(), func(sess *db.Session) error { + batchsize := 500 + for i := 0; i < len(newAnnotations); i += batchsize { + _, err := sess.InsertMulti(newAnnotations[i:min(i+batchsize, len(newAnnotations))]) + require.NoError(t, err) + } + return nil + }) + require.NoError(t, err) + + err = store.WithDbSession(context.Background(), func(sess *db.Session) error { + batchsize := 500 + for i := 0; i < len(newAnnotationTags); i += batchsize { + _, err := sess.InsertMulti(newAnnotationTags[i:min(i+batchsize, len(newAnnotationTags))]) + require.NoError(t, err) + } + return nil + }) + require.NoError(t, err) } func settingsFn(maxAge time.Duration, maxCount int64) setting.AnnotationCleanupSettings { return setting.AnnotationCleanupSettings{MaxAge: maxAge, MaxCount: maxCount} } + +func min(is ...int) int { + if len(is) == 0 { + return 0 + } + min := is[0] + for _, i := range is { + if i < min { + min = i + } + } + return min +} diff --git a/pkg/services/annotations/annotationsimpl/xorm_store.go b/pkg/services/annotations/annotationsimpl/xorm_store.go index 17b49489e700..385451ff62b4 100644 --- a/pkg/services/annotations/annotationsimpl/xorm_store.go +++ b/pkg/services/annotations/annotationsimpl/xorm_store.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/grafana/pkg/services/dashboards" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/sqlstore" + "github.com/grafana/grafana/pkg/services/sqlstore/migrator" "github.com/grafana/grafana/pkg/services/sqlstore/permissions" "github.com/grafana/grafana/pkg/services/sqlstore/searchstore" "github.com/grafana/grafana/pkg/services/tag" @@ -517,11 +518,21 @@ func (r *xormRepositoryImpl) validateTagsLength(item *annotations.Item) error { func (r *xormRepositoryImpl) CleanAnnotations(ctx context.Context, cfg setting.AnnotationCleanupSettings, annotationType string) (int64, error) { var totalAffected int64 if cfg.MaxAge > 0 { - cutoffDate := time.Now().Add(-cfg.MaxAge).UnixNano() / int64(time.Millisecond) - deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s AND created < %v ORDER BY id DESC %s) a)` - sql := fmt.Sprintf(deleteQuery, annotationType, cutoffDate, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) + cutoffDate := timeNow().Add(-cfg.MaxAge).UnixNano() / int64(time.Millisecond) + // Single-statement approaches, specifically ones using batched sub-queries, seem to deadlock with concurrent inserts on MySQL. + // We have a bounded batch size, so work around this by first loading the IDs into memory and allowing any locks to flush inside each batch. + // This may under-delete when concurrent inserts happen, but any such annotations will simply be cleaned on the next cycle. + // + // We execute the following batched operation repeatedly until either we run out of objects, the context is cancelled, or there is an error. + affected, err := untilDoneOrCancelled(ctx, func() (int64, error) { + cond := fmt.Sprintf(`%s AND created < %v ORDER BY id DESC %s`, annotationType, cutoffDate, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) + ids, err := r.fetchIDs(ctx, "annotation", cond) + if err != nil { + return 0, err + } - affected, err := r.executeUntilDoneOrCancelled(ctx, sql) + return r.deleteByIDs(ctx, "annotation", ids) + }) totalAffected += affected if err != nil { return totalAffected, err @@ -529,41 +540,105 @@ func (r *xormRepositoryImpl) CleanAnnotations(ctx context.Context, cfg setting.A } if cfg.MaxCount > 0 { - deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s ORDER BY id DESC %s) a)` - sql := fmt.Sprintf(deleteQuery, annotationType, r.db.GetDialect().LimitOffset(r.cfg.AnnotationCleanupJobBatchSize, cfg.MaxCount)) - affected, err := r.executeUntilDoneOrCancelled(ctx, sql) + // Similar strategy as the above cleanup process, to avoid deadlocks. + affected, err := untilDoneOrCancelled(ctx, func() (int64, error) { + cond := fmt.Sprintf(`%s ORDER BY id DESC %s`, annotationType, r.db.GetDialect().LimitOffset(r.cfg.AnnotationCleanupJobBatchSize, cfg.MaxCount)) + ids, err := r.fetchIDs(ctx, "annotation", cond) + if err != nil { + return 0, err + } + + return r.deleteByIDs(ctx, "annotation", ids) + }) totalAffected += affected - return totalAffected, err + if err != nil { + return totalAffected, err + } } return totalAffected, nil } func (r *xormRepositoryImpl) CleanOrphanedAnnotationTags(ctx context.Context) (int64, error) { - deleteQuery := `DELETE FROM annotation_tag WHERE id IN ( SELECT id FROM (SELECT id FROM annotation_tag WHERE NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s) a)` - sql := fmt.Sprintf(deleteQuery, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) - return r.executeUntilDoneOrCancelled(ctx, sql) + return untilDoneOrCancelled(ctx, func() (int64, error) { + cond := fmt.Sprintf(`NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s`, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) + ids, err := r.fetchIDs(ctx, "annotation_tag", cond) + if err != nil { + return 0, err + } + + return r.deleteByIDs(ctx, "annotation_tag", ids) + }) +} + +func (r *xormRepositoryImpl) fetchIDs(ctx context.Context, table, condition string) ([]int64, error) { + sql := fmt.Sprintf(`SELECT id FROM %s`, table) + if condition == "" { + return nil, fmt.Errorf("condition must be supplied; cannot fetch IDs from entire table") + } + sql += fmt.Sprintf(` WHERE %s`, condition) + ids := make([]int64, 0) + err := r.db.WithDbSession(ctx, func(session *db.Session) error { + return session.SQL(sql).Find(&ids) + }) + return ids, err } -func (r *xormRepositoryImpl) executeUntilDoneOrCancelled(ctx context.Context, sql string) (int64, error) { +func (r *xormRepositoryImpl) deleteByIDs(ctx context.Context, table string, ids []int64) (int64, error) { + if len(ids) == 0 { + return 0, nil + } + + sql := "" + args := make([]any, 0) + + // SQLite has a parameter limit of 999. + // If the batch size is bigger than that, and we're on SQLite, we have to put the IDs directly into the statement. + const sqliteParameterLimit = 999 + if r.db.GetDBType() == migrator.SQLite && r.cfg.AnnotationCleanupJobBatchSize > sqliteParameterLimit { + values := fmt.Sprint(ids[0]) + for _, v := range ids[1:] { + values = fmt.Sprintf("%s, %d", values, v) + } + sql = fmt.Sprintf(`DELETE FROM %s WHERE id IN (%s)`, table, values) + } else { + placeholders := "?" + strings.Repeat(",?", len(ids)-1) + sql = fmt.Sprintf(`DELETE FROM %s WHERE id IN (%s)`, table, placeholders) + args = asAny(ids) + } + + var affected int64 + err := r.db.WithDbSession(ctx, func(session *db.Session) error { + res, err := session.Exec(append([]any{sql}, args...)...) + if err != nil { + return err + } + affected, err = res.RowsAffected() + return err + }) + return affected, err +} + +func asAny(vs []int64) []any { + r := make([]any, len(vs)) + for i, v := range vs { + r[i] = v + } + return r +} + +// untilDoneOrCancelled repeatedly executes batched work until that work is either done (i.e., returns zero affected objects), +// a batch produces an error, or the provided context is cancelled. +// The work to be done is given as a callback that returns the number of affected objects for each batch, plus that batch's errors. +func untilDoneOrCancelled(ctx context.Context, batchWork func() (int64, error)) (int64, error) { var totalAffected int64 for { select { case <-ctx.Done(): return totalAffected, ctx.Err() default: - var affected int64 - err := r.db.WithDbSession(ctx, func(session *db.Session) error { - res, err := session.Exec(sql) - if err != nil { - return err - } - - affected, err = res.RowsAffected() - totalAffected += affected - - return err - }) + affected, err := batchWork() + totalAffected += affected if err != nil { return totalAffected, err } @@ -574,3 +649,8 @@ func (r *xormRepositoryImpl) executeUntilDoneOrCancelled(ctx context.Context, sq } } } + +type annotationTag struct { + AnnotationID int64 `xorm:"annotation_id"` + TagID int64 `xorm:"tag_id"` +}