Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,11 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
job := &Job{id: id, registry: r}
stateString := *row[1].(*tree.DString)
jobTypeString := *row[2].(*tree.DString)

if err := job.Messages().Record(ctx, txn, "state", string(stateString)); err != nil {
return err
}

switch State(stateString) {
case StatePaused:
if !r.cancelRegisteredJobContext(id) {
Expand Down
74 changes: 74 additions & 0 deletions pkg/jobs/job_info_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/keyvisualizer"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase"
Expand Down Expand Up @@ -847,3 +850,74 @@ func TestStorageRejectsInvalidJobID(t *testing.T) {
sqlDB.CheckQueryResults(t,
"SELECT count(*) FROM system.job_info WHERE job_id = 0", [][]string{{"0"}})
}

func TestJobPauseStateTransitionsRecorded(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

args := base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
}
s, sqlDB, _ := serverutils.StartServer(t, args)
defer s.Stopper().Stop(ctx)

sql := sqlutils.MakeSQLRunner(sqlDB)
idb := s.InternalDB().(isql.DB)
r := s.JobRegistry().(*jobs.Registry)

blockCh := make(chan struct{})
defer close(blockCh)

// Register a fake resumer so the job doesn't complete during the test.
cleanup := jobs.TestingRegisterConstructor(jobspb.TypeBackup, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobstest.FakeResumer{
OnResume: func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-blockCh:
return nil
}
},
}
}, jobs.UsesTenantCostControl)
defer cleanup()

record := jobs.Record{
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
Username: username.TestUserName(),
}
job, err := jobs.TestingCreateAndStartJob(ctx, r, idb, record)
require.NoError(t, err)
jobutils.WaitForJobToRun(t, sql, job.ID())

sql.Exec(t, "PAUSE JOB $1", job.ID())
jobutils.WaitForJobToPause(t, sql, job.ID())

sql.Exec(t, "RESUME JOB $1", job.ID())
jobutils.WaitForJobToRun(t, sql, job.ID())

sql.Exec(t, "CANCEL JOB $1", job.ID())
jobutils.WaitForJobToCancel(t, sql, job.ID())

var messages []jobs.JobMessage
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
var err error
messages, err = job.Job.Messages().Fetch(ctx, txn)
return err
}))

var stateMessages []string
for _, msg := range messages {
if msg.Kind == "state" {
stateMessages = append(stateMessages, msg.Message)
}
}

require.Equal(t, []string{"canceled", "reverting", "cancel-requested", "running", "paused", "pause-requested"}, stateMessages)
}