Skip to content

Commit

Permalink
Merge #111578
Browse files Browse the repository at this point in the history
111578: backupccl: pause scheduled backup if resumed on different cluster r=dt a=msbutler

Previously, after cluster restore or c2c cutover, a backed up/ replicated
backup schedule would begin executing on the new cluster. If the backup /
source cluster was still available and executing the schedule, then the two
schedules would compete to run, as outlined in #108028.

This patch prevents this poor UX by pausing the backup schedule if the schedule
realizes it is running on a new cluster. It is then up to the user to resume
backups on the new cluster and prevent the backup collision problem.

Fixes #108028

Release note (sql change): if a scheduled backup resumes on a new cluster (e.g.
after C2C cutover or a tenant restore), the backup schedule will pause. The
user may resume the schedule without changing it, but should take special care
to ensure no other schedule is backing up to the same collection. The user may
also want to cancel the paused schedule and start a new one.

Co-authored-by: Michael Butler <butler@cockroachlabs.com>
  • Loading branch information
craig[bot] and msbutler committed Oct 5, 2023
2 parents 0ab77b7 + ace81b6 commit 9a1bb22
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 29 deletions.
128 changes: 103 additions & 25 deletions pkg/ccl/backupccl/create_scheduled_backup_test.go
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
pbtypes "github.com/gogo/protobuf/types"
"github.com/robfig/cron/v3"
Expand Down Expand Up @@ -120,6 +121,17 @@ func newTestHelper(t *testing.T) (*testHelper, func()) {
}
}

func (h *testHelper) setOverrideAsOfClauseKnob(t *testing.T) {
// We'll be manipulating schedule time via th.env, but we can't fool actual
// backup when it comes to AsOf time. So, override AsOf backup clause to be
// the current time.
h.cfg.TestingKnobs.(*jobs.TestingKnobs).OverrideAsOfClause = func(clause *tree.AsOfClause, _ time.Time) {
expr, err := tree.MakeDTimestampTZ(h.cfg.DB.KV().Clock().PhysicalTime(), time.Microsecond)
require.NoError(t, err)
clause.Expr = expr
}
}

func (h *testHelper) loadSchedule(t *testing.T, scheduleID int64) *jobs.ScheduledJob {
t.Helper()

Expand Down Expand Up @@ -147,6 +159,26 @@ func (h *testHelper) waitForSuccessfulScheduledJob(t *testing.T, scheduleID int6
})
}

func (h *testHelper) waitForSuccessfulScheduledJobCount(
t *testing.T, scheduleID int64, expectedCount int,
) {
query := "SELECT count(*) FROM " + h.env.SystemJobsTableName() +
" WHERE status=$1 AND created_by_type=$2 AND created_by_id=$3"

testutils.SucceedsSoon(t, func() error {
// Force newly created job to be adopted and verify it succeeds.
h.server.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
var count int
err := h.sqlDB.DB.QueryRowContext(context.Background(),
query, jobs.StatusSucceeded, jobs.CreatedByScheduledJobs, scheduleID).Scan(&count)
require.NoError(t, err)
if count != expectedCount {
return errors.Newf("expected %d jobs; found %d", expectedCount, count)
}
return nil
})
}

// createBackupSchedule executes specified "CREATE SCHEDULE FOR BACKUP" query, with
// the provided arguments. Returns the list of created schedules
func (h *testHelper) createBackupSchedule(
Expand Down Expand Up @@ -754,15 +786,7 @@ USE db;
CREATE TABLE t1(a int);
INSERT INTO t1 values (1), (10), (100);
`)

// We'll be manipulating schedule time via th.env, but we can't fool actual
// backup when it comes to AsOf time. So, override AsOf backup clause to be
// the current time.
th.cfg.TestingKnobs.(*jobs.TestingKnobs).OverrideAsOfClause = func(clause *tree.AsOfClause, _ time.Time) {
expr, err := tree.MakeDTimestampTZ(th.cfg.DB.KV().Clock().PhysicalTime(), time.Microsecond)
require.NoError(t, err)
clause.Expr = expr
}
th.setOverrideAsOfClauseKnob(t)

checkScheduleDetailsWaitOption := func(schedules []*jobs.ScheduledJob,
expectedFullOption, expectedIncOption jobspb.ScheduleDetails_WaitBehavior) {
Expand Down Expand Up @@ -830,14 +854,7 @@ CREATE TABLE t1(a int);
INSERT INTO t1 values (-1), (10), (-100);
`)

// We'll be manipulating schedule time via th.env, but we can't fool actual
// backup when it comes to AsOf time. So, override AsOf backup clause to be
// the current time.
th.cfg.TestingKnobs.(*jobs.TestingKnobs).OverrideAsOfClause = func(clause *tree.AsOfClause, _ time.Time) {
expr, err := tree.MakeDTimestampTZ(th.cfg.DB.KV().Clock().PhysicalTime(), time.Microsecond)
require.NoError(t, err)
clause.Expr = expr
}
th.setOverrideAsOfClauseKnob(t)

type dbTables struct {
db string
Expand Down Expand Up @@ -1087,12 +1104,8 @@ INSERT INTO t values (1), (10), (100);
// We'll be manipulating schedule time via th.env, but we can't fool actual backup
// when it comes to AsOf time. So, override AsOf backup clause to be the current time.
useRealTimeAOST := func() func() {
th.setOverrideAsOfClauseKnob(t)
knobs := th.cfg.TestingKnobs.(*jobs.TestingKnobs)
knobs.OverrideAsOfClause = func(clause *tree.AsOfClause, _ time.Time) {
expr, err := tree.MakeDTimestampTZ(th.cfg.DB.KV().Clock().PhysicalTime(), time.Microsecond)
require.NoError(t, err)
clause.Expr = expr
}
return func() {
knobs.OverrideAsOfClause = nil
}
Expand Down Expand Up @@ -1146,9 +1159,9 @@ INSERT INTO t values (1), (10), (100);
s := th.loadSchedule(t, id)
s.SetNextRun(s.NextRun().Add(-365 * 24 * time.Hour))
// Set onError policy to the specified value.
s.SetScheduleDetails(jobstest.AddDummyScheduleDetails(jobspb.ScheduleDetails{
OnError: onError,
}))
details := s.ScheduleDetails()
details.OnError = onError
s.SetScheduleDetails(*details)
schedules := jobs.ScheduledJobDB(th.internalDB())
require.NoError(t, schedules.Update(context.Background(), s))
}
Expand Down Expand Up @@ -1488,3 +1501,68 @@ WITH SCHEDULE OPTIONS on_execution_failure = 'pause', ignore_existing_backups, f
}
requireRecoveryEvent(t, beforeBackup.UnixNano(), scheduledBackupEventType, expectedScheduledBackup)
}

// TestPauseScheduledBackupOnNewClusterID ensures that a schedule backup pauses
// if it is running on a cluster with a different ID than is stored in its
// details.
func TestPauseScheduledBackupOnNewClusterID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

th, cleanup := newTestHelper(t)
defer cleanup()
th.setOverrideAsOfClauseKnob(t)

schedules, err := th.createBackupSchedule(t,
"CREATE SCHEDULE FOR BACKUP INTO 'nodelocal://1/backup' RECURRING '@hourly' FULL BACKUP ALWAYS")
require.NoError(t, err)

full := schedules[0]

// Force the schedule to execute.
th.env.SetTime(full.NextRun().Add(time.Second))
require.NoError(t, th.executeSchedules())
th.waitForSuccessfulScheduledJob(t, full.ScheduleID())

scheduleStorage := jobs.ScheduledJobDB(th.internalDB())

hostClusterID := full.ScheduleDetails().ClusterID
require.NotZero(t, hostClusterID)

updateClusterIDAndExecute := func(clusterID uuid.UUID, scheduleID int64) {
schedule := th.loadSchedule(t, scheduleID)
details := schedule.ScheduleDetails()
details.ClusterID = clusterID
schedule.SetScheduleDetails(*details)
th.env.SetTime(schedule.NextRun().Add(time.Second))
require.NoError(t, scheduleStorage.Update(context.Background(), schedule))
require.NoError(t, th.executeSchedules())
}

t.Run("pause schedule due to different cluster id", func(t *testing.T) {
updateClusterIDAndExecute(jobstest.DummyClusterID, full.ScheduleID())

// Expect the schedule to pause because of the different cluster ID
testutils.SucceedsSoon(t, func() error {
expectPausedSchedule := th.loadSchedule(t, full.ScheduleID())
if !expectPausedSchedule.IsPaused() {
return errors.New("schedule has not paused yet")
}
// The cluster ID should have been reset.
require.Equal(t, hostClusterID, expectPausedSchedule.ScheduleDetails().ClusterID)
return nil
})

// Resume the schedule
th.sqlDB.Exec(t, "RESUME SCHEDULE $1", full.ScheduleID())
resumedSchedule := th.loadSchedule(t, full.ScheduleID())
require.False(t, resumedSchedule.IsPaused())
th.env.SetTime(resumedSchedule.NextRun().Add(time.Second))
require.NoError(t, th.executeSchedules())
th.waitForSuccessfulScheduledJobCount(t, full.ScheduleID(), 2)
})
t.Run("empty cluster id does not affect schedule", func(t *testing.T) {
updateClusterIDAndExecute(uuid.UUID{}, full.ScheduleID())
th.waitForSuccessfulScheduledJobCount(t, full.ScheduleID(), 3)
})
}
24 changes: 21 additions & 3 deletions pkg/ccl/backupccl/schedule_exec.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
pbtypes "github.com/gogo/protobuf/types"
)
Expand Down Expand Up @@ -99,19 +100,36 @@ func (e *scheduledBackupExecutor) executeBackup(
}
backupStmt.AsOf = tree.AsOfClause{Expr: endTime}

log.Infof(ctx, "Starting scheduled backup %d", sj.ScheduleID())

// Invoke backup plan hook.
hook, cleanup := cfg.PlanHookMaker(ctx, "exec-backup", txn.KV(), sj.Owner())
defer cleanup()

planner := hook.(sql.PlanHookState)
currentClusterID := planner.ExtendedEvalContext().ClusterID
currentDetails := sj.ScheduleDetails()

// If the current cluster ID is different than the schedule's cluster ID,
// pause the schedule. To maintain backward compatability with schedules
// without a clusterID, don't pause schedules without a clusterID.
if !currentDetails.ClusterID.Equal(uuid.Nil) && currentClusterID != currentDetails.ClusterID {
log.Infof(ctx, "schedule %d last run by different cluster %s, pausing until manually resumed",
sj.ScheduleID(),
currentDetails.ClusterID)
currentDetails.ClusterID = currentClusterID
sj.SetScheduleDetails(*currentDetails)
sj.Pause()
return nil
}

log.Infof(ctx, "Starting scheduled backup %d", sj.ScheduleID())

if knobs, ok := cfg.TestingKnobs.(*jobs.TestingKnobs); ok {
if knobs.OverrideAsOfClause != nil {
knobs.OverrideAsOfClause(&backupStmt.AsOf, hook.(sql.PlanHookState).ExtendedEvalContext().StmtTimestamp)
}
}

backupFn, err := planBackup(ctx, hook.(sql.PlanHookState), backupStmt)
backupFn, err := planBackup(ctx, planner, backupStmt)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/jobs/job_scheduler_test.go
Expand Up @@ -65,7 +65,9 @@ func TestJobSchedulerReschedulesRunning(t *testing.T) {
t.Run(wait.String(), func(t *testing.T) {
// Create job with the target wait behavior.
j := h.newScheduledJob(t, "j", "j sql")
j.SetScheduleDetails(jobstest.AddDummyScheduleDetails(jobspb.ScheduleDetails{Wait: wait}))
details := j.ScheduleDetails()
details.Wait = wait
j.SetScheduleDetails(*details)
require.NoError(t, j.SetSchedule("@hourly"))

require.NoError(t,
Expand Down

0 comments on commit 9a1bb22

Please sign in to comment.