diff --git a/pkg/controller/internal/testutil/backup.go b/pkg/controller/internal/testutil/backup.go index 7e882eeb4..346162475 100644 --- a/pkg/controller/internal/testutil/backup.go +++ b/pkg/controller/internal/testutil/backup.go @@ -67,7 +67,7 @@ func BackupForCluster(cluster *api.MysqlCluster) gomegatypes.GomegaMatcher { func BackupWithName(name string) gomegatypes.GomegaMatcher { return MatchFields(IgnoreExtras, Fields{ "ObjectMeta": MatchFields(IgnoreExtras, Fields{ - "Name": Equal(name), + "ClusterName": Equal(name), }), }) } diff --git a/pkg/controller/mysqlbackupcron/job_backup.go b/pkg/controller/mysqlbackupcron/job_backup.go index 658f29a91..a5b4bc9f1 100644 --- a/pkg/controller/mysqlbackupcron/job_backup.go +++ b/pkg/controller/mysqlbackupcron/job_backup.go @@ -20,133 +20,92 @@ import ( "context" "fmt" "sort" - "sync" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/client" api "github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1" ) -var ( - // polling time for backup to be completed - backupPollingTime = 5 * time.Second - // time to wait for a backup to be completed - backupWatchTimeout = time.Hour -) - // The job structure contains the context to schedule a backup type job struct { - Name string - Namespace string + ClusterName string + Namespace string - BackupRunning *bool - - lock *sync.Mutex - c client.Client + // kubernetes client + c client.Client BackupScheduleJobsHistoryLimit *int } -func (j job) Run() { - backupName := fmt.Sprintf("%s-auto-backup-%s", j.Name, time.Now().Format("2006-01-02t15-04-05")) - backupKey := types.NamespacedName{Name: backupName, Namespace: j.Namespace} - log.Info("scheduled backup job started", "namespace", j.Namespace, "name", backupName) +func (j *job) Run() { + log.Info("scheduled backup job started", "namespace", j.Namespace, "cluster_name", j.ClusterName) + // run garbage collector if needed if j.BackupScheduleJobsHistoryLimit != nil { defer j.backupGC() } - // Wrap backup creation to ensure that lock is released when backup is - // created - - created := func() bool { - j.lock.Lock() - defer j.lock.Unlock() - - if *j.BackupRunning { - log.Info("last scheduled backup still running! Can't initiate a new backup", - "cluster", fmt.Sprintf("%s/%s", j.Namespace, j.Name)) - return false - } + // check if a backup is running + if j.scheduledBackupsRunningCount() > 0 { + log.V(1).Info("at least a backup is running", + "backups_len", j.scheduledBackupsRunningCount()) + return + } - tries := 0 - for { - var err error - cluster := &api.MysqlBackup{ - ObjectMeta: metav1.ObjectMeta{ - Name: backupName, - Namespace: j.Namespace, - Labels: map[string]string{ - "recurrent": "true", - }, - }, - Spec: api.MysqlBackupSpec{ - ClusterName: j.Name, - }, - } - if err = j.c.Create(context.TODO(), cluster); err == nil { - break - } + // create the backup + if _, err := j.createBackup(); err != nil { + log.Error(err, "failed to create backup") + } +} - if tries > 5 { - log.Error(err, "fail to create backup, max tries exceeded", - "cluster", j.Name, "retries", tries, "backup", backupName) - return false - } +func (j *job) scheduledBackupsRunningCount() int { + backupsList := &api.MysqlBackupList{} + // select all backups with labels recurrent=true and and not completed of the cluster + selector := j.backupSelector() + selector.MatchingField("status.completed", "false") - log.Info("failed to create backup, retring", "backup", backupName, - "error", err, "tries", tries) + if err := j.c.List(context.TODO(), selector, backupsList); err != nil { + log.Error(err, "failed getting backups", "selector", selector) + return 0 + } - time.Sleep(5 * time.Second) - tries++ - } + return len(backupsList.Items) +} - *j.BackupRunning = true - return true - }() - if !created { - return +func (j *job) createBackup() (*api.MysqlBackup, error) { + backupName := fmt.Sprintf("%s-auto-%s", j.ClusterName, time.Now().Format("2006-01-02t15-04-05")) + + backup := &api.MysqlBackup{ + ObjectMeta: metav1.ObjectMeta{ + Name: backupName, + Namespace: j.Namespace, + Labels: j.recurrentBackupLabels(), + }, + Spec: api.MysqlBackupSpec{ + ClusterName: j.ClusterName, + }, } + return backup, j.c.Create(context.TODO(), backup) +} - defer func() { - j.lock.Lock() - defer j.lock.Unlock() - *j.BackupRunning = false - }() - - err := wait.PollImmediate(backupPollingTime, backupWatchTimeout, func() (bool, error) { - backup := &api.MysqlBackup{} - if err := j.c.Get(context.TODO(), backupKey, backup); err != nil { - log.Info("failed to get backup", "backup", backupName, "error", err) - return false, nil - } - if backup.Status.Completed { - log.Info("backup finished", "backup", backup) - return true, nil - } - - return false, nil - }) +func (j *job) backupSelector() *client.ListOptions { + return client.InNamespace(j.Namespace).MatchingLabels(j.recurrentBackupLabels()) +} - if err != nil { - log.Error(err, "waiting for backup to finish, failed", - "backup", backupName, "cluster", fmt.Sprintf("%s/%s", j.Namespace, j.Name)) +func (j *job) recurrentBackupLabels() map[string]string { + return map[string]string{ + "recurrent": "true", + "cluster": j.ClusterName, } } func (j *job) backupGC() { var err error - backupsList := &api.MysqlBackupList{} - selector := &client.ListOptions{} - selector = selector.InNamespace(j.Namespace).MatchingLabels(map[string]string{"recurrent": "true"}) - - if err = j.c.List(context.TODO(), selector, backupsList); err != nil { - log.Error(err, "failed getting backups", "selector", selector) + if err = j.c.List(context.TODO(), j.backupSelector(), backupsList); err != nil { + log.Error(err, "failed getting backups", "selector", j.backupSelector()) return } diff --git a/pkg/controller/mysqlbackupcron/job_backup_test.go b/pkg/controller/mysqlbackupcron/job_backup_test.go index 89ef2fe56..5b293131e 100644 --- a/pkg/controller/mysqlbackupcron/job_backup_test.go +++ b/pkg/controller/mysqlbackupcron/job_backup_test.go @@ -20,7 +20,6 @@ package mysqlbackupcron import ( "fmt" "math/rand" - "sync" "time" . "github.com/onsi/ginkgo" @@ -44,13 +43,32 @@ var _ = Describe("MysqlBackupCron cron job", func() { c client.Client // stop channel for controller manager stop chan struct{} + + clusterName string + namespace string + j *job ) BeforeEach(func() { mgr, err := manager.New(cfg, manager.Options{}) Expect(err).To(Succeed()) c = mgr.GetClient() + + // NOTE: field indexer should be added before starting the manager + Expect(addBackupFieldIndexers(mgr)).To(Succeed()) + stop = StartTestManager(mgr) + + clusterName = fmt.Sprintf("cl-%d", rand.Int31()) + namespace = "default" + + limit := 5 + j = &job{ + ClusterName: clusterName, + Namespace: namespace, + c: c, + BackupScheduleJobsHistoryLimit: &limit, + } }) AfterEach(func() { close(stop) @@ -58,22 +76,18 @@ var _ = Describe("MysqlBackupCron cron job", func() { When("more backups are created", func() { var ( - clusterName string - ns string - backups []api.MysqlBackup + backups []api.MysqlBackup ) BeforeEach(func() { - clusterName = fmt.Sprintf("cl-%d", rand.Int31()) - ns = "default" - - for i := 0; i < 10; i++ { + for i := 0; i < (*j.BackupScheduleJobsHistoryLimit + 5); i++ { backup := api.MysqlBackup{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("bk-%d", i), - Namespace: ns, + Namespace: namespace, Labels: map[string]string{ "recurrent": "true", + "cluster": clusterName, }, }, Spec: api.MysqlBackupSpec{ @@ -82,7 +96,7 @@ var _ = Describe("MysqlBackupCron cron job", func() { } Expect(c.Create(context.TODO(), &backup)).To(Succeed()) backups = append(backups, backup) - time.Sleep(time.Second / 3) + time.Sleep(time.Second / 6) } }) @@ -93,29 +107,47 @@ var _ = Describe("MysqlBackupCron cron job", func() { }) It("should delete only older backups", func() { - f := false - limit := len(backups) - 5 - j := job{ - Name: clusterName, - Namespace: ns, - BackupRunning: &f, - lock: &sync.Mutex{}, - c: c, - BackupScheduleJobsHistoryLimit: &limit, - } + lo := &client.ListOptions{ LabelSelector: labels.SelectorFromSet(labels.Set{ "recurrent": "true", + "cluster": clusterName, }), - Namespace: ns, + Namespace: namespace, } Eventually(testutil.ListAllBackupsFn(c, lo)).Should(HaveLen(len(backups))) j.backupGC() - Eventually(testutil.ListAllBackupsFn(c, lo)).Should(HaveLen(limit)) + Eventually(testutil.ListAllBackupsFn(c, lo)).Should(HaveLen(*j.BackupScheduleJobsHistoryLimit)) Eventually(testutil.ListAllBackupsFn(c, lo)).ShouldNot( ContainElement(testutil.BackupWithName("bk-3"))) }) }) + + When("a backup exists", func() { + var ( + backup *api.MysqlBackup + ) + + BeforeEach(func() { + var err error + backup, err = j.createBackup() + Expect(err).To(Succeed()) + }) + AfterEach(func() { + c.Delete(context.TODO(), backup) + }) + + It("should detect the running backup", func() { + Eventually(j.scheduledBackupsRunningCount).Should(Equal(1)) + }) + + It("should not detect any running backup", func() { + backup.Status.Completed = true + Expect(c.Update(context.TODO(), backup)).To(Succeed()) + + Eventually(j.scheduledBackupsRunningCount).Should(Equal(0)) + }) + }) }) diff --git a/pkg/controller/mysqlbackupcron/mysqlbackupcron_controller.go b/pkg/controller/mysqlbackupcron/mysqlbackupcron_controller.go index d44861213..2f839aed0 100644 --- a/pkg/controller/mysqlbackupcron/mysqlbackupcron_controller.go +++ b/pkg/controller/mysqlbackupcron/mysqlbackupcron_controller.go @@ -95,7 +95,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return err } - return nil + return addBackupFieldIndexers(mgr) } var _ reconcile.Reconciler = &ReconcileMysqlBackup{} @@ -147,8 +147,8 @@ func (r *ReconcileMysqlBackup) updateClusterSchedule(cluster *mysqlv1alpha1.Mysq defer r.lockJobRegister.Unlock() for _, entry := range r.cron.Entries() { - j, ok := entry.Job.(job) - if ok && j.Name == cluster.Name && j.Namespace == cluster.Namespace { + j, ok := entry.Job.(*job) + if ok && j.ClusterName == cluster.Name && j.Namespace == cluster.Namespace { log.V(1).Info("cluster already added to cron.", "cluster", cluster.Name) // change scheduler for already added crons @@ -182,12 +182,10 @@ func (r *ReconcileMysqlBackup) updateClusterSchedule(cluster *mysqlv1alpha1.Mysq } } - r.cron.Schedule(schedule, job{ - Name: cluster.Name, + r.cron.Schedule(schedule, &job{ + ClusterName: cluster.Name, Namespace: cluster.Namespace, c: r.Client, - BackupRunning: new(bool), - lock: new(sync.Mutex), BackupScheduleJobsHistoryLimit: cluster.Spec.BackupScheduleJobsHistoryLimit, }, cluster.Name) @@ -204,3 +202,13 @@ func (r *ReconcileMysqlBackup) unregisterCluster(clusterKey types.NamespacedName return nil } + +func addBackupFieldIndexers(mgr manager.Manager) error { + return mgr.GetFieldIndexer().IndexField(&mysqlv1alpha1.MysqlBackup{}, "status.completed", func(b runtime.Object) []string { + completed := "false" + if b.(*mysqlv1alpha1.MysqlBackup).Status.Completed { + completed = "true" + } + return []string{completed} + }) +} diff --git a/pkg/controller/mysqlbackupcron/mysqlbackupcron_controller_test.go b/pkg/controller/mysqlbackupcron/mysqlbackupcron_controller_test.go index 14c66b60f..3d7aca6b1 100644 --- a/pkg/controller/mysqlbackupcron/mysqlbackupcron_controller_test.go +++ b/pkg/controller/mysqlbackupcron/mysqlbackupcron_controller_test.go @@ -177,10 +177,10 @@ var _ = Describe("MysqlBackupCron controller", func() { // check cron entry to have a single entry Expect(cron.Entries()).To(ContainElement(PointTo(MatchFields(IgnoreExtras, Fields{ - "Job": MatchFields(IgnoreExtras, Fields{ - "Name": Equal(cluster.Name), + "Job": PointTo(MatchFields(IgnoreExtras, Fields{ + "ClusterName": Equal(cluster.Name), "BackupScheduleJobsHistoryLimit": PointTo(Equal(limit)), - }), + })), })))) }) @@ -203,7 +203,7 @@ var _ = Describe("MysqlBackupCron controller", func() { } }) - It("should create the mysqlbackup", func() { + It("should create the mysqlbackup only once", func() { lo := &client.ListOptions{ LabelSelector: labels.SelectorFromSet(labels.Set{ "recurrent": "true", @@ -222,9 +222,9 @@ var _ = Describe("MysqlBackupCron controller", func() { func haveCronJob(name string, sched cronpkg.Schedule) gomegatypes.GomegaMatcher { return ContainElement(PointTo(MatchFields(IgnoreExtras, Fields{ - "Job": MatchFields(IgnoreExtras, Fields{ - "Name": Equal(name), - }), + "Job": PointTo(MatchFields(IgnoreExtras, Fields{ + "ClusterName": Equal(name), + })), "Schedule": Equal(sched), }))) }