Skip to content

Commit

Permalink
jobstest: move FakeResumer to jobstest
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
adityamaru committed Oct 17, 2023
1 parent 060b1bc commit 1b00425
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 81 deletions.
6 changes: 3 additions & 3 deletions pkg/jobs/delegate_control_test.go
Expand Up @@ -159,7 +159,7 @@ func TestJobsControlForSchedules(t *testing.T) {
// As such, the job does not undergo usual job state transitions
// (e.g. pause-request -> paused).
RegisterConstructor(jobspb.TypeImport, func(job *Job, _ *cluster.Settings) Resumer {
return FakeResumer{
return jobstest.FakeResumer{
OnResume: func(_ context.Context) error {
<-blockResume
return nil
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) {

// Our resume never completes any jobs, until this test completes.
RegisterConstructor(jobspb.TypeImport, func(job *Job, _ *cluster.Settings) Resumer {
return FakeResumer{
return jobstest.FakeResumer{
OnResume: func(_ context.Context) error {
<-blockResume
return nil
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestJobControlByType(t *testing.T) {
// Make the jobs of each type controllable.
for _, jobType := range allJobTypes {
RegisterConstructor(jobType, func(job *Job, _ *cluster.Settings) Resumer {
return FakeResumer{
return jobstest.FakeResumer{
OnResume: func(ctx context.Context) error {
<-ctx.Done()
return nil
Expand Down
53 changes: 0 additions & 53 deletions pkg/jobs/helpers_test.go
Expand Up @@ -19,62 +19,9 @@ import (
"github.com/cockroachdb/errors"
)

// FakeResumer calls optional callbacks during the job lifecycle.
type FakeResumer struct {
OnResume func(context.Context) error
FailOrCancel func(context.Context) error
Success func() error
PauseRequest onPauseRequestFunc
TraceRealSpan bool
}

func (d FakeResumer) ForceRealSpan() bool {
return d.TraceRealSpan
}

func (d FakeResumer) DumpTraceAfterRun() bool {
return true
}

var _ Resumer = FakeResumer{}

func (d FakeResumer) Resume(ctx context.Context, execCtx interface{}) error {
if d.OnResume != nil {
if err := d.OnResume(ctx); err != nil {
return err
}
}
if d.Success != nil {
return d.Success()
}
return nil
}

func (d FakeResumer) OnFailOrCancel(ctx context.Context, _ interface{}, _ error) error {
if d.FailOrCancel != nil {
return d.FailOrCancel(ctx)
}
return nil
}

func (d FakeResumer) CollectProfile(_ context.Context, _ interface{}) error {
return nil
}

// OnPauseRequestFunc forwards the definition for use in tests.
type OnPauseRequestFunc = onPauseRequestFunc

var _ PauseRequester = FakeResumer{}

func (d FakeResumer) OnPauseRequest(
ctx context.Context, execCtx interface{}, txn isql.Txn, details *jobspb.Progress,
) error {
if d.PauseRequest == nil {
return nil
}
return d.PauseRequest(ctx, execCtx, txn, details)
}

func (r *Registry) CancelRequested(ctx context.Context, txn isql.Txn, id jobspb.JobID) error {
return r.cancelRequested(ctx, txn, id)
}
Expand Down
33 changes: 17 additions & 16 deletions pkg/jobs/jobs_test.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/keyvisualizer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
Expand Down Expand Up @@ -207,7 +208,7 @@ func noopPauseRequestFunc(
return nil
}

var _ jobs.TraceableJob = (*jobs.FakeResumer)(nil)
var _ jobs.TraceableJob = (*jobstest.FakeResumer)(nil)

func (rts *registryTestSuite) setUp(t *testing.T) {
rts.ctx = context.Background()
Expand Down Expand Up @@ -261,7 +262,7 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
rts.onPauseRequest = noopPauseRequestFunc

jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
return jobstest.FakeResumer{
TraceRealSpan: rts.traceRealSpan,
OnResume: func(ctx context.Context) error {
t.Log("Starting resume")
Expand Down Expand Up @@ -1151,7 +1152,7 @@ func TestRegistryLifecycle(t *testing.T) {
resumerJob := make(chan *jobs.Job, 1)
jobs.RegisterConstructor(
jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
return jobstest.FakeResumer{
OnResume: func(ctx context.Context) error {
resumerJob <- j
return nil
Expand Down Expand Up @@ -2248,7 +2249,7 @@ func TestShowJobWhenComplete(t *testing.T) {
defer close(done)
jobs.RegisterConstructor(
jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
return jobstest.FakeResumer{
OnResume: func(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -2412,7 +2413,7 @@ func TestJobInTxn(t *testing.T) {
},
)
jobs.RegisterConstructor(jobspb.TypeBackup, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
return jobstest.FakeResumer{
OnResume: func(ctx context.Context) error {
t.Logf("Resuming job: %+v", job.Payload())
atomic.AddInt32(&hasRun, 1)
Expand Down Expand Up @@ -2452,7 +2453,7 @@ func TestJobInTxn(t *testing.T) {
},
)
jobs.RegisterConstructor(jobspb.TypeRestore, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
return jobstest.FakeResumer{
OnResume: func(_ context.Context) error {
return errors.New("RESTORE failed")
},
Expand Down Expand Up @@ -2550,7 +2551,7 @@ func TestStartableJobMixedVersion(t *testing.T) {
require.NoError(t, err)

jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{}
return jobstest.FakeResumer{}
}, jobs.UsesTenantCostControl)
var j *jobs.StartableJob
jobID := jr.MakeJobID()
Expand Down Expand Up @@ -2591,7 +2592,7 @@ func TestStartableJob(t *testing.T) {
return func() { resumeFunc.Store(prev) }
}
jobs.RegisterConstructor(jobspb.TypeRestore, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
return jobstest.FakeResumer{
OnResume: func(ctx context.Context) error {
return resumeFunc.Load().(func(ctx context.Context) error)(ctx)
},
Expand Down Expand Up @@ -2777,7 +2778,7 @@ func TestStartableJobTxnRetry(t *testing.T) {
defer s.Stopper().Stop(ctx)
jr := s.JobRegistry().(*jobs.Registry)
jobs.RegisterConstructor(jobspb.TypeRestore, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{}
return jobstest.FakeResumer{}
}, jobs.UsesTenantCostControl)
rec := jobs.Record{
Details: jobspb.RestoreDetails{},
Expand Down Expand Up @@ -2819,7 +2820,7 @@ func TestRegistryTestingNudgeAdoptionQueue(t *testing.T) {
defer jobs.ResetConstructors()()
resuming := make(chan struct{})
jobs.RegisterConstructor(jobspb.TypeBackup, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
return jobstest.FakeResumer{
OnResume: func(ctx context.Context) error {
resuming <- struct{}{}
return nil
Expand Down Expand Up @@ -2905,7 +2906,7 @@ func TestMetrics(t *testing.T) {
fakeBackupMetrics := makeFakeMetrics()
jobs.RegisterConstructor(jobspb.TypeBackup,
func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
return jobstest.FakeResumer{
OnResume: func(ctx context.Context) error {
defer fakeBackupMetrics.N.Inc(1)
return waitForErr(ctx)
Expand All @@ -2919,7 +2920,7 @@ func TestMetrics(t *testing.T) {
)

jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
return jobstest.FakeResumer{
OnResume: func(ctx context.Context) error {
return waitForErr(ctx)
},
Expand Down Expand Up @@ -3139,7 +3140,7 @@ func TestLoseLeaseDuringExecution(t *testing.T) {
defer jobs.ResetConstructors()()
resumed := make(chan error, 1)
jobs.RegisterConstructor(jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
return jobstest.FakeResumer{
OnResume: func(ctx context.Context) error {
defer close(resumed)
_, err := s.InternalExecutor().(isql.Executor).Exec(
Expand Down Expand Up @@ -3209,7 +3210,7 @@ func TestPauseReason(t *testing.T) {
defer close(done)
resumeSignaler := newResumeStartedSignaler()
jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
return jobstest.FakeResumer{
OnResume: func(ctx context.Context) error {
resumeSignaler.SignalResumeStarted()
select {
Expand Down Expand Up @@ -3464,7 +3465,7 @@ func TestPausepoints(t *testing.T) {
defer s.Stopper().Stop(ctx)
idb := s.InternalDB().(isql.DB)
jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
return jobstest.FakeResumer{
OnResume: func(ctx context.Context) error {
if err := registry.CheckPausepoint("test_pause_foo"); err != nil {
return err
Expand Down Expand Up @@ -3604,7 +3605,7 @@ func TestJobTypeMetrics(t *testing.T) {

for typ := range typeToRecord {
jobs.RegisterConstructor(typ, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
return jobstest.FakeResumer{
OnResume: func(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
Expand Down
2 changes: 2 additions & 0 deletions pkg/jobs/jobstest/BUILD.bazel
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "jobstest",
srcs = [
"logutils.go",
"resumer.go",
"utils.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobstest",
Expand All @@ -13,6 +14,7 @@ go_library(
"//pkg/jobs/jobspb",
"//pkg/scheduledjobs",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/isql",
"//pkg/sql/sem/tree",
"//pkg/testutils",
"//pkg/util/log",
Expand Down
67 changes: 67 additions & 0 deletions pkg/jobs/jobstest/resumer.go
@@ -0,0 +1,67 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package jobstest

import (
"context"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
)

// FakeResumer calls optional callbacks during the job lifecycle.
type FakeResumer struct {
OnResume func(context.Context) error
FailOrCancel func(context.Context) error
Success func() error
PauseRequest func(ctx context.Context, planHookState interface{}, txn isql.Txn, progress *jobspb.Progress) error
TraceRealSpan bool
}

func (d FakeResumer) ForceRealSpan() bool {
return d.TraceRealSpan
}

func (d FakeResumer) DumpTraceAfterRun() bool {
return true
}

func (d FakeResumer) Resume(ctx context.Context, _ interface{}) error {
if d.OnResume != nil {
if err := d.OnResume(ctx); err != nil {
return err
}
}
if d.Success != nil {
return d.Success()
}
return nil
}

func (d FakeResumer) OnFailOrCancel(ctx context.Context, _ interface{}, _ error) error {
if d.FailOrCancel != nil {
return d.FailOrCancel(ctx)
}
return nil
}

func (d FakeResumer) CollectProfile(_ context.Context, _ interface{}) error {
return nil
}

func (d FakeResumer) OnPauseRequest(
ctx context.Context, execCtx interface{}, txn isql.Txn, details *jobspb.Progress,
) error {
if d.PauseRequest == nil {
return nil
}
return d.PauseRequest(ctx, execCtx, txn, details)
}
5 changes: 3 additions & 2 deletions pkg/jobs/registry_external_test.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -364,7 +365,7 @@ func TestGCDurationControl(t *testing.T) {
}

jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, cs *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{}
return jobstest.FakeResumer{}
}, jobs.UsesTenantCostControl)
s, sqlDB, _ := serverutils.StartServer(t, args)
defer s.Stopper().Stop(ctx)
Expand Down Expand Up @@ -443,7 +444,7 @@ func TestErrorsPopulatedOnRetry(t *testing.T) {
return ctx.Err()
}
}
return jobs.FakeResumer{
return jobstest.FakeResumer{
OnResume: execFn,
FailOrCancel: execFn,
}
Expand Down

0 comments on commit 1b00425

Please sign in to comment.