From c5854504da41f893349497b6b04e1759af1a9a35 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Fri, 7 Nov 2025 10:40:20 -0500 Subject: [PATCH] jobs: log message on job pause and cancel state change The message logger is supposed to log all job state changes. Release note: none --- pkg/jobs/adopt.go | 5 +++ pkg/jobs/job_info_storage_test.go | 74 +++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 04ef4b3f4414..1aedb8cd4f43 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -501,6 +501,11 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes job := &Job{id: id, registry: r} stateString := *row[1].(*tree.DString) jobTypeString := *row[2].(*tree.DString) + + if err := job.Messages().Record(ctx, txn, "state", string(stateString)); err != nil { + return err + } + switch State(stateString) { case StatePaused: if !r.cancelRegisteredJobContext(id) { diff --git a/pkg/jobs/job_info_storage_test.go b/pkg/jobs/job_info_storage_test.go index 17719b0c29c3..ccedfc2f67a2 100644 --- a/pkg/jobs/job_info_storage_test.go +++ b/pkg/jobs/job_info_storage_test.go @@ -16,12 +16,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" "github.com/cockroachdb/cockroach/pkg/keyvisualizer" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase" @@ -847,3 +850,74 @@ func TestStorageRejectsInvalidJobID(t *testing.T) { sqlDB.CheckQueryResults(t, "SELECT count(*) FROM system.job_info WHERE job_id = 0", [][]string{{"0"}}) } + +func TestJobPauseStateTransitionsRecorded(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + args := base.TestServerArgs{ + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + } + s, sqlDB, _ := serverutils.StartServer(t, args) + defer s.Stopper().Stop(ctx) + + sql := sqlutils.MakeSQLRunner(sqlDB) + idb := s.InternalDB().(isql.DB) + r := s.JobRegistry().(*jobs.Registry) + + blockCh := make(chan struct{}) + defer close(blockCh) + + // Register a fake resumer so the job doesn't complete during the test. + cleanup := jobs.TestingRegisterConstructor(jobspb.TypeBackup, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { + return jobstest.FakeResumer{ + OnResume: func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-blockCh: + return nil + } + }, + } + }, jobs.UsesTenantCostControl) + defer cleanup() + + record := jobs.Record{ + Details: jobspb.BackupDetails{}, + Progress: jobspb.BackupProgress{}, + Username: username.TestUserName(), + } + job, err := jobs.TestingCreateAndStartJob(ctx, r, idb, record) + require.NoError(t, err) + jobutils.WaitForJobToRun(t, sql, job.ID()) + + sql.Exec(t, "PAUSE JOB $1", job.ID()) + jobutils.WaitForJobToPause(t, sql, job.ID()) + + sql.Exec(t, "RESUME JOB $1", job.ID()) + jobutils.WaitForJobToRun(t, sql, job.ID()) + + sql.Exec(t, "CANCEL JOB $1", job.ID()) + jobutils.WaitForJobToCancel(t, sql, job.ID()) + + var messages []jobs.JobMessage + require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + var err error + messages, err = job.Job.Messages().Fetch(ctx, txn) + return err + })) + + var stateMessages []string + for _, msg := range messages { + if msg.Kind == "state" { + stateMessages = append(stateMessages, msg.Message) + } + } + + require.Equal(t, []string{"canceled", "reverting", "cancel-requested", "running", "paused", "pause-requested"}, stateMessages) +}